Skip to content

Commit

Permalink
[feature-#1775][connector][http] http supports offline mode
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoliang committed Feb 4, 2024
1 parent 786f68a commit fd59a73
Show file tree
Hide file tree
Showing 14 changed files with 680 additions and 14 deletions.
28 changes: 28 additions & 0 deletions chunjun-connectors/chunjun-connector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,30 @@
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>

<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>2.8.0</version>
<exclusions>
<exclusion>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>net.minidev</groupId>
<artifactId>json-smart</artifactId>
<version>2.4.10</version>
</dependency>
<dependency>
<artifactId>slf4j-api</artifactId>
<groupId>org.slf4j</groupId>
<version>1.7.36</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
Expand All @@ -56,6 +80,10 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.dtstack.chunjun.util.GsonUtil;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpUriRequest;
Expand All @@ -41,9 +42,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static com.dtstack.chunjun.connector.http.common.ConstantValue.CSV_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.TEXT_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.XML_DECODE;
import static com.dtstack.chunjun.connector.http.common.ConstantValue.*;

@Slf4j
public class HttpClient {
Expand Down Expand Up @@ -134,6 +133,9 @@ public void initPosition(HttpRequestParam requestParam, String response) {
}

public void execute() {
if (restConfig.getLimitRequestTime() < restConfig.getRequestTime()) {
return;
}

if (!running) {
return;
Expand Down Expand Up @@ -182,6 +184,9 @@ public void execute() {
first = false;
requestRetryTime = 3;
requestNumber++;
// 子类和父类使用同一个对象,可以向上汇报请求次数进度,以便及时触发finish
Integer requestTime = restConfig.getRequestTime();
restConfig.setRequestTime(++requestTime);
}

public void doExecute(int retryTime) {
Expand All @@ -204,6 +209,21 @@ public void doExecute(int retryTime) {
String responseValue;
int responseStatus;
try {
Map<String, Object> requestParam = currentParam.getParam();
Map<String, Object> requestBody = currentParam.getBody();
if (StringUtils.isNotBlank(restConfig.getPageParamName())) {
Integer pagePosition =
restConfig.getStartIndex()
+ restConfig.getStep() * restConfig.getRequestTime();
if (pagePosition > restConfig.getEndIndex()) {
return;
}
if ("get".equals(restConfig.getRequestMode())) {
requestParam.put(restConfig.getPageParamName(), pagePosition);
} else {
requestBody.put(restConfig.getPageParamName(), pagePosition);
}
}

HttpUriRequest request =
HttpUtil.getRequest(
Expand All @@ -221,7 +241,8 @@ public void doExecute(int retryTime) {
return;
}

responseValue = EntityUtils.toString(httpResponse.getEntity());
// utf-8:支持中文
responseValue = EntityUtils.toString(httpResponse.getEntity(), "utf-8");
responseStatus = httpResponse.getStatusLine().getStatusCode();
} catch (Throwable e) {
// 只要本次请求中出现了异常 都会进行重试,如果重试次数达到了就真正结束任务
Expand Down Expand Up @@ -264,9 +285,16 @@ public void doExecute(int retryTime) {
}
}

responseParse.parse(responseValue, responseStatus, HttpRequestParam.copy(currentParam));
while (responseParse.hasNext()) {
processData(responseParse.next());
if (StringUtils.isBlank(responseValue)) {
reachEnd = true;
running = false;
} else {
responseParse.parse(
responseValue, responseStatus, HttpRequestParam.copy(currentParam));
while (responseParse.hasNext()) {
// 一条一条数据的增加
processData(responseParse.next());
}
}

if (-1 != restConfig.getCycles() && requestNumber >= restConfig.getCycles()) {
Expand Down Expand Up @@ -342,6 +370,8 @@ protected ResponseParse getResponseParse(AbstractRowConverter converter) {
return new XmlResponseParse(restConfig, converter);
case TEXT_DECODE:
return new TextResponseParse(restConfig, converter);
case OFFLINE_JSON_DECODE:
return new OfflineJsonResponseParse(restConfig, converter);
default:
return new JsonResponseParse(restConfig, converter);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.dtstack.chunjun.connector.http.client;

import com.dtstack.chunjun.connector.http.common.HttpRestConfig;
import com.dtstack.chunjun.converter.AbstractRowConverter;
import com.dtstack.chunjun.util.GsonUtil;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.jayway.jsonpath.JsonPath;
import org.apache.commons.lang3.StringUtils;

import java.util.Iterator;
import java.util.Map;

/** @Description 离线任务 @Author lianggao @Date 2023/6/1 下午5:50 */
public class OfflineJsonResponseParse extends ResponseParse {

private String responseValue;
private HttpRequestParam requestParam;
private final Gson gson;
private Iterator<Object> iterator;
/** true:single true:array false:single false:array */
String parserFlag;

String jsonPath;

public OfflineJsonResponseParse(HttpRestConfig config, AbstractRowConverter converter) {
super(config, converter);
this.gson = GsonUtil.GSON;
this.jsonPath = config.getJsonPath();
this.parserFlag = !StringUtils.isBlank(jsonPath) + ":" + config.getReturnedDataType();
}

@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public ResponseValue next() throws Exception {
String rowJson = gson.toJson(iterator.next());
Map<String, Object> data =
DefaultRestHandler.gson.fromJson(rowJson, GsonUtil.gsonMapTypeToken);
return new ResponseValue(converter.toInternal(data), requestParam, responseValue);
}

@Override
public void parse(String responseValue, int responseStatus, HttpRequestParam requestParam) {
this.responseValue = responseValue;
this.requestParam = requestParam;
runParseJson();
}

public void runParseJson() {
switch (parserFlag) {
case "false:array":
iterator = JsonPath.read(responseValue, "$");
break;
case "true:single":
Object read = JsonPath.read(responseValue, jsonPath);
iterator = Lists.newArrayList(read).iterator();
break;
case "true:array":
Object eval = JsonPath.read(responseValue, jsonPath);
if (eval instanceof net.minidev.json.JSONArray) {
iterator = ((net.minidev.json.JSONArray) eval).iterator();
} else {
// 如果为null 则直接报错,返回解析错误的数据
if (eval == null) {
throw new RuntimeException(
"response parsing is incorrect Please check the conf ,get response is"
+ responseValue);
}
iterator = Lists.newArrayList(eval).iterator();
}
break;
default:
Lists.newArrayList(responseValue);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ConstantValue {
public static final String TEXT_DECODE = "text";

public static final String DEFAULT_DECODE = "json";
public static final String OFFLINE_JSON_DECODE = "offline-json";

public static final String PREFIX = "${";
public static final String SUFFIX = "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ public class HttpRestConfig extends CommonConfig {
/** 请求的超时时间 单位毫秒 */
private long timeOut = 10000;

private String returnedDataType;
private String jsonPath;

private String pageParamName;
private Integer endIndex;
private Integer startIndex;
private Integer Step;

private Integer limitRequestTime = 1;
private Integer requestTime = 0;

public String getFieldTypes() {
return fieldTypes;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ protected void openInternal(InputSplit inputSplit) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
ResponseValue value = myHttpClient.takeEvent();
if (null == value) {
reachEnd = httpRestConfig.getLimitRequestTime().equals(httpRestConfig.getRequestTime());
return null;
}
if (value.isNormal()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;

import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -69,7 +70,12 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(HttpOptions.DELAY);
options.add(HttpOptions.DATA_SUBJECT);
options.add(HttpOptions.CYCLES);

options.add(HttpOptions.RETURNEDDATATYPE);
options.add(HttpOptions.JSONPATH);
options.add(HttpOptions.PAGEPARAMNAME);
options.add(HttpOptions.STEP);
options.add(HttpOptions.STARTINDEX);
options.add(HttpOptions.ENDINDEX);
return options;
}

Expand Down Expand Up @@ -141,6 +147,24 @@ private HttpRestConfig getRestapiConf(ReadableConfig config) {
httpRestConfig.setRequestMode(config.get(HttpOptions.METHOD));
httpRestConfig.setDataSubject(config.get(HttpOptions.DATA_SUBJECT));
httpRestConfig.setCycles(config.get(HttpOptions.CYCLES));

httpRestConfig.setReturnedDataType(config.get(HttpOptions.RETURNEDDATATYPE));
httpRestConfig.setJsonPath(config.get(HttpOptions.JSONPATH));

if (StringUtils.isNotBlank(config.get(HttpOptions.PAGEPARAMNAME))) {
httpRestConfig.setPageParamName(config.get(HttpOptions.PAGEPARAMNAME));
httpRestConfig.setStep(config.get(HttpOptions.STEP));
httpRestConfig.setStartIndex(config.get(HttpOptions.STARTINDEX));
httpRestConfig.setEndIndex(config.get(HttpOptions.ENDINDEX));

Integer limitRequestTime =
(config.get(HttpOptions.ENDINDEX) - config.get(HttpOptions.STARTINDEX))
/ config.get(HttpOptions.STEP)
+ 1;
httpRestConfig.setLimitRequestTime(limitRequestTime);
httpRestConfig.setCycles(limitRequestTime);
}

httpRestConfig.setParam(
gson.fromJson(
config.get(HttpOptions.PARAMS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@ public String asSummaryString() {

@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.UPDATE_BEFORE)
.addContainedKind(RowKind.UPDATE_AFTER)
.addContainedKind(RowKind.DELETE)
.build();
return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,40 @@ public class HttpOptions {
.longType()
.defaultValue(1L)
.withDescription("request cycle");

public static final ConfigOption<String> RETURNEDDATATYPE =
ConfigOptions.key("returned-data-type")
.stringType()
.defaultValue("single")
.withDescription("The data structure returned is single or array");

public static final ConfigOption<String> JSONPATH =
ConfigOptions.key("json-path")
.stringType()
.defaultValue("")
.withDescription("json Path");

public static final ConfigOption<String> PAGEPARAMNAME =
ConfigOptions.key("page-param-name")
.stringType()
.defaultValue("")
.withDescription("Pagination request page name,for example: pageName");

public static final ConfigOption<Integer> STARTINDEX =
ConfigOptions.key("start-index")
.intType()
.defaultValue(1)
.withDescription("The initial page number of multiple requests");

public static final ConfigOption<Integer> ENDINDEX =
ConfigOptions.key("end-index")
.intType()
.defaultValue(1)
.withDescription("The final page number of multiple requests");

public static final ConfigOption<Integer> STEP =
ConfigOptions.key("step")
.intType()
.defaultValue(1)
.withDescription("The step size of the requested page number");
}
Loading

0 comments on commit fd59a73

Please sign in to comment.