Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-#1918][s3] Add support for reading all types of documents supported by Apache Tika #1919

Merged
merged 1 commit into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions chunjun-connectors/chunjun-connector-s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@
<version>1.11-8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-format-tika</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.dtstack.chunjun</groupId>
<artifactId>chunjun-format-excel</artifactId>
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package com.dtstack.chunjun.connector.s3.config;

import com.dtstack.chunjun.config.CommonConfig;
import com.dtstack.chunjun.format.excel.config.ExcelFormatConfig;
import com.dtstack.chunjun.format.tika.config.TikaReadConfig;

import com.amazonaws.regions.Regions;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
Expand Down Expand Up @@ -86,4 +88,23 @@ public class S3Config extends CommonConfig implements Serializable {

/** 生成的文件名后缀 */
private String suffix;

/** 对象匹配规则 */
private String objectsRegex;

/** 是否使用文本限定符 */
private boolean useTextQualifier = true;

/** 是否开启每条记录生成一个对应的文件 */
private boolean enableWriteSingleRecordAsFile = false;

/** 保留原始文件名 */
private boolean keepOriginalFilename = false;

/** 禁用 Bucket 名称注入到 endpoint 前缀 */
private boolean disableBucketNameInEndpoint = false;

private TikaReadConfig tikaReadConfig = new TikaReadConfig();

private ExcelFormatConfig excelFormatConfig = new ExcelFormatConfig();
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ public DynamicTableSink copy() {

@Override
public String asSummaryString() {
return "StreamDynamicTableSink";
return S3DynamicTableSink.class.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,28 @@
import com.dtstack.chunjun.sink.format.BaseRichOutputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
import com.dtstack.chunjun.throwable.WriteRecordException;
import com.dtstack.chunjun.util.GsonUtil;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.data.RowData;

import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.PartETag;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.StringWriter;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.stream.Collectors;

import static com.dtstack.chunjun.format.tika.config.TikaReadConfig.ORIGINAL_FILENAME;

/** The OutputFormat Implementation which write data to Amazon S3. */
@Slf4j
public class S3OutputFormat extends BaseRichOutputFormat {
Expand Down Expand Up @@ -137,7 +143,8 @@ private void checkOutputDir() {
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
s3Config.getFetchSize());
s3Config.getFetchSize(),
s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
Expand Down Expand Up @@ -166,11 +173,17 @@ private void nextBlock() {
sw = new StringWriter();
}
this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter());
if (!s3Config.isUseTextQualifier()) {
writerUtil.setUseTextQualifier(false);
}
this.currentPartNumber = this.currentPartNumber + 1;
}

/** Create file multipart upload ID */
private void createActionFinishedTag() {
if (s3Config.isEnableWriteSingleRecordAsFile()) {
return;
}
if (!StringUtils.isNotBlank(currentUploadId)) {
this.currentUploadId =
S3Util.initiateMultipartUploadAndGetId(
Expand All @@ -193,26 +206,35 @@ private void beforeWriteRecords() {
}

protected void flushDataInternal() {
if (sw == null) {
return;
}
StringBuffer sb = sw.getBuffer();
if (sb.length() > MIN_SIZE || willClose) {
if (sb.length() > MIN_SIZE || willClose || s3Config.isEnableWriteSingleRecordAsFile()) {
byte[] byteArray;
try {
byteArray = sb.toString().getBytes(s3Config.getEncoding());
} catch (UnsupportedEncodingException e) {
throw new ChunJunRuntimeException(e);
}
log.info("Upload part size:" + byteArray.length);
PartETag partETag =
S3Util.uploadPart(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
this.currentUploadId,
this.currentPartNumber,
byteArray);

MyPartETag myPartETag = new MyPartETag(partETag);
myPartETags.add(myPartETag);

if (s3Config.isEnableWriteSingleRecordAsFile()) {
S3Util.putStringObject(
amazonS3, s3Config.getBucket(), s3Config.getObject(), sb.toString());
} else {
PartETag partETag =
S3Util.uploadPart(
amazonS3,
s3Config.getBucket(),
s3Config.getObject(),
this.currentUploadId,
this.currentPartNumber,
byteArray);

MyPartETag myPartETag = new MyPartETag(partETag);
myPartETags.add(myPartETag);
}

log.debug(
"task-{} upload etag:[{}]",
Expand All @@ -225,6 +247,9 @@ protected void flushDataInternal() {
}

private void completeMultipartUploadFile() {
if (s3Config.isEnableWriteSingleRecordAsFile()) {
return;
}
if (this.currentPartNumber > 10000) {
throw new IllegalArgumentException("part can not bigger than 10000");
}
Expand Down Expand Up @@ -282,7 +307,11 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
// convert row to string
stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord);
try {
for (int i = 0; i < columnNameList.size(); ++i) {
int columnSize = columnNameList.size();
if (s3Config.isEnableWriteSingleRecordAsFile()) {
columnSize = 1;
}
for (int i = 0; i < columnSize; ++i) {

String column = stringRecord[i];

Expand All @@ -292,6 +321,25 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce
writerUtil.write(column);
}
writerUtil.endRecord();

if (s3Config.isEnableWriteSingleRecordAsFile()) {
Map<String, String> metadataMap =
GsonUtil.GSON.fromJson(stringRecord[1], Map.class);
String key = FilenameUtils.getPath(s3Config.getObject());
// 是否保留原始文件名
if (s3Config.isKeepOriginalFilename()) {
key += metadataMap.get(ORIGINAL_FILENAME) + getExtension();
} else {
key +=
jobId
+ "_"
+ taskNumber
+ "_"
+ UUID.randomUUID().toString()
+ getExtension();
}
s3Config.setObject(key);
}
flushDataInternal();
} catch (Exception ex) {
String msg = "RowData2string error RowData(" + rowData + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,13 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
field.setName(column.getName());
field.setType(
TypeConfig.fromString(column.getDataType().getLogicalType().asSummaryString()));
field.setIndex(i);
int index =
s3Config.getExcelFormatConfig().getColumnIndex() != null
? s3Config.getExcelFormatConfig()
.getColumnIndex()
.get(columns.indexOf(column))
: columns.indexOf(column);
field.setIndex(index);
columnList.add(field);
}
s3Config.setColumn(columnList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@

package com.dtstack.chunjun.connector.s3.source;

import com.dtstack.chunjun.config.FieldConfig;
import com.dtstack.chunjun.config.RestoreConfig;
import com.dtstack.chunjun.connector.s3.config.S3Config;
import com.dtstack.chunjun.connector.s3.enums.CompressType;
import com.dtstack.chunjun.connector.s3.util.ReaderUtil;
import com.dtstack.chunjun.connector.s3.util.S3SimpleObject;
import com.dtstack.chunjun.connector.s3.util.S3Util;
import com.dtstack.chunjun.format.excel.common.ExcelData;
import com.dtstack.chunjun.format.excel.source.ExcelInputFormat;
import com.dtstack.chunjun.format.tika.common.TikaData;
import com.dtstack.chunjun.format.tika.source.TikaInputFormat;
import com.dtstack.chunjun.restore.FormatState;
import com.dtstack.chunjun.source.format.BaseRichInputFormat;
import com.dtstack.chunjun.throwable.ChunJunRuntimeException;
Expand All @@ -38,6 +43,8 @@
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang3.StringUtils;

import java.io.IOException;
Expand Down Expand Up @@ -71,6 +78,12 @@ public class S3InputFormat extends BaseRichInputFormat {

private RestoreConfig restoreConf;

private transient TikaData tikaData;
private TikaInputFormat tikaInputFormat;

private transient ExcelData excelData;
private ExcelInputFormat excelInputFormat;

@Override
public void openInputFormat() throws IOException {
super.openInputFormat();
Expand Down Expand Up @@ -137,7 +150,31 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) {
protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException {
String[] fields;
try {
fields = readerUtil.getValues();
if (s3Config.getTikaReadConfig().isUseExtract() && tikaData != null) {
fields = tikaData.getData();
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat() && excelData != null) {
fields = excelData.getData();
} else {
fields = readerUtil.getValues();
}
// 处理字段配置了对应的列索引
if (s3Config.getExcelFormatConfig().getColumnIndex() != null) {
List<FieldConfig> columns = s3Config.getColumn();
String[] fieldsData = new String[columns.size()];
for (int i = 0; i < CollectionUtils.size(columns); i++) {
FieldConfig fieldConfig = columns.get(i);
if (fieldConfig.getIndex() >= fields.length) {
String errorMessage =
String.format(
"The column index is greater than the data size."
+ " The current column index is [%s], but the data size is [%s]. Data loss may occur.",
fieldConfig.getIndex(), fields.length);
throw new IllegalArgumentException(errorMessage);
}
fieldsData[i] = fields[fieldConfig.getIndex()];
}
fields = fieldsData;
}
rowData = rowConverter.toInternal(fields);
} catch (IOException e) {
throw new ChunJunRuntimeException(e);
Expand All @@ -164,9 +201,82 @@ protected void closeInternal() {

@Override
public boolean reachedEnd() throws IOException {
if (s3Config.getTikaReadConfig().isUseExtract()) {
tikaData = getTikaData();
return tikaData == null || tikaData.getData() == null;
} else if (s3Config.getExcelFormatConfig().isUseExcelFormat()) {
excelData = getExcelData();
return excelData == null || excelData.getData() == null;
}
return reachedEndWithoutCheckState();
}

public ExcelData getExcelData() {
if (excelInputFormat == null) {
nextExcelDataStream();
}
if (excelInputFormat != null) {
if (!excelInputFormat.hasNext()) {
excelInputFormat.close();
excelInputFormat = null;
return getExcelData();
}
String[] record = excelInputFormat.nextRecord();
return new ExcelData(record);
} else {
return null;
}
}

private void nextExcelDataStream() {
if (splits.hasNext()) {
currentObject = splits.next();
GetObjectRequest rangeObjectRequest =
new GetObjectRequest(s3Config.getBucket(), currentObject);
log.info("Current read file {}", currentObject);
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
excelInputFormat = new ExcelInputFormat();
excelInputFormat.open(s3is, s3Config.getExcelFormatConfig());
} else {
excelInputFormat = null;
}
}

public TikaData getTikaData() {
if (tikaInputFormat == null) {
nextTikaDataStream();
}
if (tikaInputFormat != null) {
if (!tikaInputFormat.hasNext()) {
tikaInputFormat.close();
tikaInputFormat = null;
return getTikaData();
}
String[] record = tikaInputFormat.nextRecord();
return new TikaData(record);
} else {
return null;
}
}

private void nextTikaDataStream() {
if (splits.hasNext()) {
currentObject = splits.next();
GetObjectRequest rangeObjectRequest =
new GetObjectRequest(s3Config.getBucket(), currentObject);
log.info("Current read file {}", currentObject);
S3Object o = amazonS3.getObject(rangeObjectRequest);
S3ObjectInputStream s3is = o.getObjectContent();
tikaInputFormat =
new TikaInputFormat(
s3Config.getTikaReadConfig(), s3Config.getFieldNameList().size());
tikaInputFormat.open(s3is, FilenameUtils.getName(currentObject));
} else {
tikaInputFormat = null;
}
}

public boolean reachedEndWithoutCheckState() throws IOException {
// br is empty, indicating that a new file needs to be read
if (readerUtil == null) {
Expand Down Expand Up @@ -259,7 +369,11 @@ public List<S3SimpleObject> resolveObjects() {
if (s3Config.isUseV2()) {
subObjects =
S3Util.listObjectsKeyByPrefix(
amazonS3, bucket, prefix, s3Config.getFetchSize());
amazonS3,
bucket,
prefix,
s3Config.getFetchSize(),
s3Config.getObjectsRegex());
} else {
subObjects =
S3Util.listObjectsByv1(
Expand Down
Loading
Loading