Skip to content

Commit

Permalink
[Feature] [rest-api] Support Rest Api to upload file and submit task (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
fcb-xiaobo authored Jan 10, 2025
1 parent 7964c9d commit 1d9a8b3
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 1 deletion.
34 changes: 34 additions & 0 deletions docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,40 @@ sink {

------------------------------------------------------------------------------------------

### Submit A Job By Upload Config File

<details>
<summary><code>POST</code> <code><b>/submit-job/upload</b></code> <code>(Returns jobId and jobName if job submitted successfully.)</code></summary>

#### Parameters

> | name | type | data type | description |
> |----------------------|----------|-----------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |
#### Request Body
The name of the uploaded file key is config_file, and the file suffix json is parsed in json format. The conf or config file suffix is parsed in hocon format

curl Example :
```
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'
```
#### Responses

```json
{
"jobId": 733584788375666689,
"jobName": "SeaTunnel_Job"
}
```

</details>

------------------------------------------------------------------------------------------

### Batch Submit Jobs

<details>
Expand Down
33 changes: 33 additions & 0 deletions docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,40 @@ sink {
</details>

------------------------------------------------------------------------------------------
### 提交作业来源上传配置文件

<details>
<summary><code>POST</code> <code><b>/submit-job</b></code> <code>(如果作业提交成功,返回jobId和jobName。)</code></summary>

#### 参数

> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
> |----------------------|----------|-----------------------------------|-----------------------------------|
> | jobId | optional | string | job id |
> | jobName | optional | string | job name |
> | isStartWithSavePoint | optional | string | if job is started with save point |
#### 请求体
上传文件key的名称是config_file,文件后缀json的按照json格式来解析,conf或config文件后缀按照hocon格式解析

curl Example

```
curl --location 'http://127.0.0.1:8080/submit-job/upload' --form 'config_file=@"/temp/fake_to_console.conf"'
```
#### 响应

```json
{
"jobId": 733584788375666689,
"jobName": "SeaTunnel_Job"
}
```

</details>

------------------------------------------------------------------------------------------

### 批量提交作业

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import io.restassured.response.Response;
import scala.Tuple3;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -253,6 +255,34 @@ public void testStartWithSavePointWithoutJobIdV2() {
});
}

@Test
public void testRestApiSubmitJobByUploadFileV2() {
Arrays.asList(server, secondServer)
.forEach(
container -> {
Tuple3<Integer, String, Long> task = tasks.get(1);
URL resource =
this.getClass().getClassLoader().getResource("upload-file");
File fileDirect = new File(resource.getFile());
File[] files = fileDirect.listFiles();
for (File file : files) {
Response response =
given().multiPart("config_file", file)
.baseUri(
http
+ container.getHost()
+ colon
+ task._1())
.basePath(
RestConstant
.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE)
.when()
.post();
Assertions.assertEquals(200, response.getStatusCode());
}
});
}

@Test
public void testStopJob() {
AtomicInteger i = new AtomicInteger();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
plugin_output = "fake"
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
console {
plugin_input="fake"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
{
"env": {
"job.mode": "batch"
},
"source": [
{
"plugin_name": "FakeSource",
"plugin_output": "fake",
"row.num": 100,
"schema": {
"fields": {
"name": "string",
"age": "int",
"card": "int"
}
}
}
],
"transform": [
],
"sink": [
{
"plugin_name": "Console",
"plugin_input": ["fake"]
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.engine.server.rest.servlet.RunningThreadsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.StopJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobByUploadFileServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SubmitJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.SystemMonitoringServlet;
Expand All @@ -47,6 +48,7 @@
import lombok.extern.slf4j.Slf4j;

import javax.servlet.DispatcherType;
import javax.servlet.MultipartConfigElement;

import java.io.IOException;
import java.net.DatagramSocket;
Expand All @@ -70,6 +72,7 @@
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_STOP_JOBS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOBS;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_SYSTEM_MONITORING_INFORMATION;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_THREAD_DUMP;
import static org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_UPDATE_TAGS;
Expand Down Expand Up @@ -122,6 +125,9 @@ public void createJettyServer() {
ServletHolder threadDumpHolder = new ServletHolder(new ThreadDumpServlet(nodeEngine));

ServletHolder submitJobHolder = new ServletHolder(new SubmitJobServlet(nodeEngine));
ServletHolder submitJobByUploadFileHolder =
new ServletHolder(new SubmitJobByUploadFileServlet(nodeEngine));

ServletHolder submitJobsHolder = new ServletHolder(new SubmitJobsServlet(nodeEngine));
ServletHolder stopJobHolder = new ServletHolder(new StopJobServlet(nodeEngine));
ServletHolder stopJobsHolder = new ServletHolder(new StopJobsServlet(nodeEngine));
Expand All @@ -147,7 +153,10 @@ public void createJettyServer() {
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_JOB_INFO));
context.addServlet(jobInfoHolder, convertUrlToPath(REST_URL_RUNNING_JOB));
context.addServlet(threadDumpHolder, convertUrlToPath(REST_URL_THREAD_DUMP));

MultipartConfigElement multipartConfigElement = new MultipartConfigElement("");
submitJobByUploadFileHolder.getRegistration().setMultipartConfig(multipartConfigElement);
context.addServlet(
submitJobByUploadFileHolder, convertUrlToPath(REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE));
context.addServlet(submitJobHolder, convertUrlToPath(REST_URL_SUBMIT_JOB));
context.addServlet(submitJobsHolder, convertUrlToPath(REST_URL_SUBMIT_JOBS));
context.addServlet(stopJobHolder, convertUrlToPath(REST_URL_STOP_JOB));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ public class RestConstant {
public static final String REST_URL_SYSTEM_MONITORING_INFORMATION =
"/system-monitoring-information";
public static final String REST_URL_SUBMIT_JOB = "/submit-job";

public static final String REST_URL_SUBMIT_JOB_BY_UPLOAD_FILE = "/submit-job/upload";

public static final String REST_URL_SUBMIT_JOBS = "/submit-jobs";
public static final String REST_URL_STOP_JOB = "/stop-job";
public static final String REST_URL_STOP_JOBS = "/stop-jobs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ public JsonObject submitJob(Map<String, String> requestParams, byte[] requestBod
return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
}

public JsonObject submitJob(Map<String, String> requestParams, Config config) {
if (Boolean.parseBoolean(requestParams.get(RestConstant.IS_START_WITH_SAVE_POINT))
&& requestParams.get(RestConstant.JOB_ID) == null) {
throw new IllegalArgumentException("Please provide jobId when start with save point.");
}
SeaTunnelServer seaTunnelServer = getSeaTunnelServer(false);
return submitJobInternal(config, requestParams, seaTunnelServer, nodeEngine.getNode());
}

public JsonArray submitJobs(byte[] requestBody) {
List<Tuple2<Map<String, String>, Config>> configTuples =
RestUtil.buildConfigList(requestHandle(requestBody), false);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.rest.servlet;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigParseOptions;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigSyntax;

import org.apache.seatunnel.engine.server.rest.service.JobInfoService;

import org.apache.commons.io.IOUtils;

import com.hazelcast.spi.impl.NodeEngineImpl;

import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.Part;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

public class SubmitJobByUploadFileServlet extends BaseServlet {
private final JobInfoService jobInfoService;

public SubmitJobByUploadFileServlet(NodeEngineImpl nodeEngine) {
super(nodeEngine);
this.jobInfoService = new JobInfoService(nodeEngine);
}

@Override
public void doPost(HttpServletRequest req, HttpServletResponse resp)
throws IOException, ServletException {

Part filePart = req.getPart("config_file");
String submittedFileName = filePart.getSubmittedFileName();
String content = IOUtils.toString(filePart.getInputStream(), StandardCharsets.UTF_8);
Config config;
if (submittedFileName.endsWith(".json")) {
config =
ConfigFactory.parseString(
content, ConfigParseOptions.defaults().setSyntax(ConfigSyntax.JSON));
} else {
config = ConfigFactory.parseString(content);
}
writeJson(resp, jobInfoService.submitJob(getParameterMap(req), config));
}
}

0 comments on commit 1d9a8b3

Please sign in to comment.