diff --git a/chunjun-connectors/chunjun-connector-format-base/pom.xml b/chunjun-connectors/chunjun-connector-format-base/pom.xml new file mode 100644 index 0000000000..314ff8899a --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/pom.xml @@ -0,0 +1,56 @@ + + + + + + chunjun-connectors + com.dtstack.chunjun + ${revision} + + 4.0.0 + + chunjun-connector-format-base + ChunJun : Connector : Format Base + + + 2.8.0 + + + + + + org.apache.tika + tika-core + ${tika.version} + provided + + + + + org.apache.tika + tika-parsers-standard-package + ${tika.version} + provided + + + + diff --git a/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/common/TikaData.java b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/common/TikaData.java new file mode 100644 index 0000000000..a0ca4012f9 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/common/TikaData.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.format.base.common; + +import lombok.Data; + +@Data +public class TikaData { + + private String[] data; + private boolean end; + + public TikaData(String[] data, boolean end) { + this.data = data; + this.end = end; + } + + public TikaData(String[] data) { + this.data = data; + } +} diff --git a/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/config/TikaReadConfig.java b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/config/TikaReadConfig.java new file mode 100644 index 0000000000..f5d5850c12 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/config/TikaReadConfig.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.format.base.config; + +import lombok.Data; + +import java.io.Serializable; + +@Data +public class TikaReadConfig implements Serializable { + + public static final String ORIGINAL_FILENAME = "_ORIGINAL_FILENAME"; + + private static final long serialVersionUID = 9142075335239994317L; + + /** 是否启用tika提取 */ + private boolean useExtract = false; + + /** 内容重合度比例值 0-100 */ + private int overlapRatio = 0; + + /** 是否启动分块 */ + private boolean enableChunk = false; + + /** 分块大小 */ + private int chunkSize = -1; + + public boolean isEnableChunk() { + return chunkSize > 0 ? true : false; + } +} diff --git a/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/options/TikaOptions.java b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/options/TikaOptions.java new file mode 100644 index 0000000000..5927411112 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/options/TikaOptions.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.format.base.options; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class TikaOptions { + + public static final ConfigOption USE_EXTRACT = + ConfigOptions.key("tika-use-extract") + .booleanType() + .defaultValue(false) + .withDescription("use tika extract"); + + public static final ConfigOption OVERLAP_RATIO = + ConfigOptions.key("tika-overlap-ratio") + .intType() + .defaultValue(0) + .withDescription("content overlap ratio"); + + public static final ConfigOption CHUNK_SIZE = + ConfigOptions.key("tika-chunk-size") + .intType() + .defaultValue(-1) + .withDescription("chunk size"); +} diff --git a/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaInputFormat.java b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaInputFormat.java new file mode 100644 index 0000000000..c063f25e08 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaInputFormat.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.format.base.source; + +import com.dtstack.chunjun.connector.format.base.common.TikaData; +import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig; + +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; + +import java.io.Closeable; +import java.io.InputStream; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static java.util.concurrent.TimeUnit.NANOSECONDS; + +@Slf4j +public class TikaInputFormat implements Closeable { + private ThreadPoolExecutor executorService; + private final BlockingQueue queue = new LinkedBlockingQueue<>(4096); + private TikaReadConfig tikaReadConfig; + private TikaData row; + private int fieldCount; + + public TikaInputFormat(TikaReadConfig tikaReadConfig, int fieldCount) { + this.tikaReadConfig = tikaReadConfig; + this.fieldCount = fieldCount; + } + + public void open(InputStream inputStream, String originalFilename) { + this.executorService = + new ThreadPoolExecutor( + 1, + 1, + 0, + NANOSECONDS, + new LinkedBlockingDeque<>(2), + new BasicThreadFactory.Builder() + .namingPattern("tika-schedule-pool-%d") + .daemon(false) + .build()); + TikaReaderExecutor executor = + new TikaReaderExecutor(tikaReadConfig, queue, inputStream, originalFilename); + executorService.execute(executor); + } + + public boolean hasNext() { + try { + row = queue.poll(3000L, TimeUnit.MILLISECONDS); + // 如果没有数据,则继续等待 + if (row == null) { + log.warn("Waiting for queue get tika data"); + hasNext(); + } + if (row != null && row.isEnd()) { + return false; + } + return true; + } catch (InterruptedException e) { + throw new RuntimeException( + "cannot get data from the queue because the current thread is interrupted.", e); + } + } + + /** 根据声明的字段个数,对数据进行补全 */ + public String[] nextRecord() { + String[] data = row.getData(); + if (fieldCount == data.length) { + return data; + } + if (fieldCount < data.length) { + fieldCount = data.length; + } + return formatValue(data); + } + + private String[] formatValue(String[] data) { + String[] record = initDataContainer(fieldCount, ""); + // because fieldCount is always >= data.length + System.arraycopy(data, 0, record, 0, data.length); + return record; + } + + private String[] initDataContainer(int capacity, String defValue) { + String[] container = new String[capacity]; + for (int i = 0; i < capacity; i++) { + container[i] = defValue; + } + return container; + } + + @Override + public void close() { + if (executorService != null) { + executorService.shutdown(); + queue.clear(); + } + } +} diff --git a/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaReaderExecutor.java b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaReaderExecutor.java new file mode 100644 index 0000000000..1e48190745 --- /dev/null +++ b/chunjun-connectors/chunjun-connector-format-base/src/main/java/com/dtstack/chunjun/connector/format/base/source/TikaReaderExecutor.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.dtstack.chunjun.connector.format.base.source; + +import com.dtstack.chunjun.connector.format.base.common.TikaData; +import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig; +import com.dtstack.chunjun.util.GsonUtil; + +import org.apache.commons.lang3.StringUtils; +import org.apache.tika.metadata.Metadata; +import org.apache.tika.parser.AutoDetectParser; +import org.apache.tika.parser.ParseContext; +import org.apache.tika.parser.Parser; +import org.apache.tika.sax.BodyContentHandler; +import org.apache.tika.sax.ContentHandlerDecorator; +import org.xml.sax.ContentHandler; + +import java.io.BufferedInputStream; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; + +import static com.dtstack.chunjun.connector.format.base.config.TikaReadConfig.ORIGINAL_FILENAME; + +public class TikaReaderExecutor implements Runnable { + + private TikaReadConfig tikaReadConfig; + private BlockingQueue queue; + private Map metaData = new HashMap<>(); + private String metaDataString; + private String originalFilename; + private InputStream in; + + public TikaReaderExecutor( + TikaReadConfig tikaReadConfig, + BlockingQueue queue, + InputStream in, + String originalFilename) { + this.tikaReadConfig = tikaReadConfig; + this.queue = queue; + this.in = in; + this.originalFilename = originalFilename; + } + + @Override + public void run() { + // 抽取文档内容 + Parser parser = new AutoDetectParser(); + // 内容处理器,用来收集结果,Tika可以将解析结果包装成XHTML SAX + // event进行分发,通过ContentHandler处理这些event就可以得到文本内容和其他有用的信息 + ContentHandler contentHandler = new BodyContentHandler(-1); + // 元数据,既是输入也是输出,可以将文件名或者可能的文件类型传入,tika解析时可以根据这些信息判断文件类型, + // 再调用相应的解析器进行处理;另外,tika也会将一些额外的信息保存到Metadata中,如文件修改日期,作者,编辑工具等 + Metadata metadata = new Metadata(); + // 解析上下文,用来控制解析过程,比如是否提取Office文档里面的宏等 + ParseContext context = new ParseContext(); + + // tika官方文档提供的分块处理思路, 但是测试发现比如同类型word(doc)两个文件,有的可以正常分块,有的不能分块。 + // 还有txt类型文件未能分块读取, pdf文件暂时测试。 + // 因此暂时不建议使用 + final List chunks = new ArrayList<>(); + chunks.add(""); + ContentHandlerDecorator trunkHandler = + new ContentHandlerDecorator() { + @Override + public void characters(char[] ch, int start, int length) { + String chunkContent = ""; + String lastChunk = chunks.get(chunks.size() - 1); + String thisStr = new String(ch, start, length); + if (lastChunk.length() + length > tikaReadConfig.getChunkSize()) { + chunks.add(thisStr); + chunkContent = thisStr; + } else { + String chunkString = lastChunk + thisStr; + chunks.set(chunks.size() - 1, chunkString); + if (StringUtils.isNotBlank(chunkString)) { + chunkContent = chunkString; + } + } + if (metaData.isEmpty()) { + for (String name : metadata.names()) { + metaData.put(name, metadata.get(name)); + } + metaData.put(ORIGINAL_FILENAME, originalFilename); + metaDataString = GsonUtil.GSON.toJson(metaData); + } + if (StringUtils.isNotBlank(chunkContent)) { + try { + queue.put( + new TikaData( + new String[] {chunkContent, metaDataString}, + false)); + } catch (InterruptedException e) { + throw new RuntimeException( + "because the current thread was interrupted, adding data to the queue failed", + e); + } + } + } + }; + + // InputStream in 待解析的文档,以字节流形式传入,可以避免tika占用太多内存 + try (BufferedInputStream bufferedInputStream = new BufferedInputStream(in)) { + // 如何想要使用官方的分块处理方式,需要将contentHandler替换成trunkHandler + parser.parse(bufferedInputStream, contentHandler, metadata, context); + String content = contentHandler.toString(); + for (String name : metadata.names()) { + metaData.put(name, metadata.get(name)); + } + metaData.put(ORIGINAL_FILENAME, originalFilename); + metaDataString = GsonUtil.GSON.toJson(metaData); + if (tikaReadConfig.getChunkSize() > 0) { + // 对整个抽取出来的内容进行分块、内容重复度处理 + List chunkList = + getChunkList( + content, + tikaReadConfig.getChunkSize(), + tikaReadConfig.getOverlapRatio()); + for (String chunk : chunkList) { + queue.put(new TikaData(new String[] {chunk, metaDataString}, false)); + } + } else { + queue.put(new TikaData(new String[] {content, metaDataString}, false)); + } + queue.put(new TikaData(null, true)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public List getChunkList(String content, int chunkSize, int overlapRatio) { + List chunks = new ArrayList<>(); + int length = content.length(); + int startIndex = 0; + int step = chunkSize; + int endIndex = startIndex + step; + int increment = step * overlapRatio / 100; + if (step >= length) { + chunks.add(content); + } else { + while (endIndex <= length) { + // 确保截取的字符串不会超过原始字符串的长度 + if (startIndex + step > length) { + endIndex = length; + } + String substring = content.substring(startIndex, endIndex); + chunks.add(substring); + // 更新起始和结束位置 + startIndex = endIndex - increment; + endIndex = startIndex + step; + } + if (endIndex > length && startIndex + increment < length) { + String substring = content.substring(startIndex, length); + chunks.add(substring); + } + } + return chunks; + } +} diff --git a/chunjun-connectors/chunjun-connector-s3/pom.xml b/chunjun-connectors/chunjun-connector-s3/pom.xml index 51faa129a1..8214236468 100644 --- a/chunjun-connectors/chunjun-connector-s3/pom.xml +++ b/chunjun-connectors/chunjun-connector-s3/pom.xml @@ -68,6 +68,11 @@ 1.11-8 test + + com.dtstack.chunjun + chunjun-connector-format-base + ${project.version} + diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java index 2fe73532da..7f2ecbb338 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/config/S3Config.java @@ -19,6 +19,7 @@ package com.dtstack.chunjun.connector.s3.config; import com.dtstack.chunjun.config.CommonConfig; +import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig; import com.amazonaws.regions.Regions; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @@ -86,4 +87,21 @@ public class S3Config extends CommonConfig implements Serializable { /** 生成的文件名后缀 */ private String suffix; + + /** 对象匹配规则 */ + private String objectsRegex; + + /** 是否使用文本限定符 */ + private boolean useTextQualifier = true; + + /** 是否开启每条记录生成一个对应的文件 */ + private boolean enableWriteSingleRecordAsFile = false; + + /** 保留原始文件名 */ + private boolean keepOriginalFilename = false; + + /** 禁用 Bucket 名称注入到 endpoint 前缀 */ + private boolean disableBucketNameInEndpoint = false; + + private TikaReadConfig tikaReadConfig = new TikaReadConfig(); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java index e50a2e1125..bea54bc953 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3DynamicTableSink.java @@ -84,6 +84,6 @@ public DynamicTableSink copy() { @Override public String asSummaryString() { - return "StreamDynamicTableSink"; + return S3DynamicTableSink.class.getName(); } } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java index 7abd7d4c87..2ea943b4d0 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/sink/S3OutputFormat.java @@ -27,6 +27,7 @@ import com.dtstack.chunjun.sink.format.BaseRichOutputFormat; import com.dtstack.chunjun.throwable.ChunJunRuntimeException; import com.dtstack.chunjun.throwable.WriteRecordException; +import com.dtstack.chunjun.util.GsonUtil; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.table.data.RowData; @@ -34,15 +35,20 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.model.PartETag; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import java.io.StringWriter; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; +import static com.dtstack.chunjun.connector.format.base.config.TikaReadConfig.ORIGINAL_FILENAME; + /** The OutputFormat Implementation which write data to Amazon S3. */ @Slf4j public class S3OutputFormat extends BaseRichOutputFormat { @@ -137,7 +143,8 @@ private void checkOutputDir() { amazonS3, s3Config.getBucket(), s3Config.getObject(), - s3Config.getFetchSize()); + s3Config.getFetchSize(), + s3Config.getObjectsRegex()); } else { subObjects = S3Util.listObjectsByv1( @@ -166,11 +173,17 @@ private void nextBlock() { sw = new StringWriter(); } this.writerUtil = new WriterUtil(sw, s3Config.getFieldDelimiter()); + if (!s3Config.isUseTextQualifier()) { + writerUtil.setUseTextQualifier(false); + } this.currentPartNumber = this.currentPartNumber + 1; } /** Create file multipart upload ID */ private void createActionFinishedTag() { + if (s3Config.isEnableWriteSingleRecordAsFile()) { + return; + } if (!StringUtils.isNotBlank(currentUploadId)) { this.currentUploadId = S3Util.initiateMultipartUploadAndGetId( @@ -193,8 +206,11 @@ private void beforeWriteRecords() { } protected void flushDataInternal() { + if (sw == null) { + return; + } StringBuffer sb = sw.getBuffer(); - if (sb.length() > MIN_SIZE || willClose) { + if (sb.length() > MIN_SIZE || willClose || s3Config.isEnableWriteSingleRecordAsFile()) { byte[] byteArray; try { byteArray = sb.toString().getBytes(s3Config.getEncoding()); @@ -202,17 +218,23 @@ protected void flushDataInternal() { throw new ChunJunRuntimeException(e); } log.info("Upload part size:" + byteArray.length); - PartETag partETag = - S3Util.uploadPart( - amazonS3, - s3Config.getBucket(), - s3Config.getObject(), - this.currentUploadId, - this.currentPartNumber, - byteArray); - - MyPartETag myPartETag = new MyPartETag(partETag); - myPartETags.add(myPartETag); + + if (s3Config.isEnableWriteSingleRecordAsFile()) { + S3Util.putStringObject( + amazonS3, s3Config.getBucket(), s3Config.getObject(), sb.toString()); + } else { + PartETag partETag = + S3Util.uploadPart( + amazonS3, + s3Config.getBucket(), + s3Config.getObject(), + this.currentUploadId, + this.currentPartNumber, + byteArray); + + MyPartETag myPartETag = new MyPartETag(partETag); + myPartETags.add(myPartETag); + } log.debug( "task-{} upload etag:[{}]", @@ -225,6 +247,9 @@ protected void flushDataInternal() { } private void completeMultipartUploadFile() { + if (s3Config.isEnableWriteSingleRecordAsFile()) { + return; + } if (this.currentPartNumber > 10000) { throw new IllegalArgumentException("part can not bigger than 10000"); } @@ -282,7 +307,11 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce // convert row to string stringRecord = (String[]) rowConverter.toExternal(rowData, stringRecord); try { - for (int i = 0; i < columnNameList.size(); ++i) { + int columnSize = columnNameList.size(); + if (s3Config.isEnableWriteSingleRecordAsFile()) { + columnSize = 1; + } + for (int i = 0; i < columnSize; ++i) { String column = stringRecord[i]; @@ -292,6 +321,25 @@ protected void writeSingleRecordInternal(RowData rowData) throws WriteRecordExce writerUtil.write(column); } writerUtil.endRecord(); + + if (s3Config.isEnableWriteSingleRecordAsFile()) { + Map metadataMap = + GsonUtil.GSON.fromJson(stringRecord[1], Map.class); + String key = FilenameUtils.getPath(s3Config.getObject()); + // 是否保留原始文件名 + if (s3Config.isKeepOriginalFilename()) { + key += metadataMap.get(ORIGINAL_FILENAME) + getExtension(); + } else { + key += + jobId + + "_" + + taskNumber + + "_" + + UUID.randomUUID().toString() + + getExtension(); + } + s3Config.setObject(key); + } flushDataInternal(); } catch (Exception ex) { String msg = "RowData2string error RowData(" + rowData + ")"; diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java index c450c96854..6660bb63cf 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/source/S3InputFormat.java @@ -19,6 +19,8 @@ package com.dtstack.chunjun.connector.s3.source; import com.dtstack.chunjun.config.RestoreConfig; +import com.dtstack.chunjun.connector.format.base.common.TikaData; +import com.dtstack.chunjun.connector.format.base.source.TikaInputFormat; import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.enums.CompressType; import com.dtstack.chunjun.connector.s3.util.ReaderUtil; @@ -38,6 +40,7 @@ import com.amazonaws.services.s3.model.S3Object; import com.amazonaws.services.s3.model.S3ObjectInputStream; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import java.io.IOException; @@ -71,6 +74,9 @@ public class S3InputFormat extends BaseRichInputFormat { private RestoreConfig restoreConf; + private transient TikaData tikaData; + private TikaInputFormat tikaInputFormat; + @Override public void openInputFormat() throws IOException { super.openInputFormat(); @@ -137,7 +143,10 @@ protected InputSplit[] createInputSplitsInternal(int minNumSplits) { protected RowData nextRecordInternal(RowData rowData) throws ReadRecordException { String[] fields; try { - fields = readerUtil.getValues(); + fields = + s3Config.getTikaReadConfig().isUseExtract() && tikaData != null + ? tikaData.getData() + : readerUtil.getValues(); rowData = rowConverter.toInternal(fields); } catch (IOException e) { throw new ChunJunRuntimeException(e); @@ -164,9 +173,47 @@ protected void closeInternal() { @Override public boolean reachedEnd() throws IOException { + if (s3Config.getTikaReadConfig().isUseExtract()) { + tikaData = getTikaData(); + return tikaData == null || tikaData.getData() == null; + } return reachedEndWithoutCheckState(); } + public TikaData getTikaData() { + if (tikaInputFormat == null) { + nextTikaDataStream(); + } + if (tikaInputFormat != null) { + if (!tikaInputFormat.hasNext()) { + tikaInputFormat.close(); + tikaInputFormat = null; + return getTikaData(); + } + String[] record = tikaInputFormat.nextRecord(); + return new TikaData(record); + } else { + return null; + } + } + + private void nextTikaDataStream() { + if (splits.hasNext()) { + currentObject = splits.next(); + GetObjectRequest rangeObjectRequest = + new GetObjectRequest(s3Config.getBucket(), currentObject); + log.info("Current read file {}", currentObject); + S3Object o = amazonS3.getObject(rangeObjectRequest); + S3ObjectInputStream s3is = o.getObjectContent(); + tikaInputFormat = + new TikaInputFormat( + s3Config.getTikaReadConfig(), s3Config.getFieldNameList().size()); + tikaInputFormat.open(s3is, FilenameUtils.getName(currentObject)); + } else { + tikaInputFormat = null; + } + } + public boolean reachedEndWithoutCheckState() throws IOException { // br is empty, indicating that a new file needs to be read if (readerUtil == null) { @@ -259,7 +306,11 @@ public List resolveObjects() { if (s3Config.isUseV2()) { subObjects = S3Util.listObjectsKeyByPrefix( - amazonS3, bucket, prefix, s3Config.getFetchSize()); + amazonS3, + bucket, + prefix, + s3Config.getFetchSize(), + s3Config.getObjectsRegex()); } else { subObjects = S3Util.listObjectsByv1( diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java index fa05abaf7f..c3c873982a 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/S3DynamicTableFactory.java @@ -18,6 +18,8 @@ package com.dtstack.chunjun.connector.s3.table; +import com.dtstack.chunjun.connector.format.base.config.TikaReadConfig; +import com.dtstack.chunjun.connector.format.base.options.TikaOptions; import com.dtstack.chunjun.connector.s3.config.S3Config; import com.dtstack.chunjun.connector.s3.sink.S3DynamicTableSink; import com.dtstack.chunjun.connector.s3.source.S3DynamicTableSource; @@ -61,6 +63,14 @@ public DynamicTableSource createDynamicTableSource(Context context) { s3Config.setFirstLineHeader(options.get(S3Options.IS_FIRST_LINE_HEADER)); s3Config.setEndpoint(options.get(S3Options.ENDPOINT)); s3Config.setCompress(options.get(S3Options.COMPRESS)); + s3Config.setObjectsRegex(options.get(S3Options.OBJECTS_REGEX)); + s3Config.setDisableBucketNameInEndpoint( + options.get(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT)); + TikaReadConfig tikaReadConfig = new TikaReadConfig(); + tikaReadConfig.setUseExtract(options.get(TikaOptions.USE_EXTRACT)); + tikaReadConfig.setOverlapRatio(options.get(TikaOptions.OVERLAP_RATIO)); + tikaReadConfig.setChunkSize(options.get(TikaOptions.CHUNK_SIZE)); + s3Config.setTikaReadConfig(tikaReadConfig); return new S3DynamicTableSource(context.getCatalogTable().getResolvedSchema(), s3Config); } @@ -94,6 +104,14 @@ public Set> optionalOptions() { options.add(S3Options.SUFFIX); options.add(SinkOptions.SINK_PARALLELISM); options.add(S3Options.WRITE_MODE); + options.add(S3Options.OBJECTS_REGEX); + options.add(S3Options.USE_TEXT_QUALIFIER); + options.add(S3Options.ENABLE_WRITE_SINGLE_RECORD_AS_FILE); + options.add(S3Options.KEEP_ORIGINAL_FILENAME); + options.add(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT); + options.add(TikaOptions.USE_EXTRACT); + options.add(TikaOptions.CHUNK_SIZE); + options.add(TikaOptions.OVERLAP_RATIO); return options; } @@ -121,6 +139,12 @@ public DynamicTableSink createDynamicTableSink(Context context) { s3Config.setSuffix(options.get(S3Options.SUFFIX)); s3Config.setParallelism(options.get(SinkOptions.SINK_PARALLELISM)); s3Config.setWriteMode(options.get(S3Options.WRITE_MODE)); + s3Config.setUseTextQualifier(options.get(S3Options.USE_TEXT_QUALIFIER)); + s3Config.setEnableWriteSingleRecordAsFile( + options.get(S3Options.ENABLE_WRITE_SINGLE_RECORD_AS_FILE)); + s3Config.setKeepOriginalFilename(options.get(S3Options.KEEP_ORIGINAL_FILENAME)); + s3Config.setDisableBucketNameInEndpoint( + options.get(S3Options.DISABLE_BUCKET_NAME_IN_ENDPOINT)); return new S3DynamicTableSink(context.getCatalogTable().getResolvedSchema(), s3Config); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java index 1f6236438f..a8fff32659 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/table/options/S3Options.java @@ -95,4 +95,31 @@ public class S3Options { public static final ConfigOption WRITE_MODE = key("writeMode").stringType().defaultValue("overwrite").withDescription("writeMode"); + + public static final ConfigOption OBJECTS_REGEX = + key("objectsRegex").stringType().noDefaultValue().withDescription("objects regex rule"); + + public static final ConfigOption USE_TEXT_QUALIFIER = + key("useTextQualifier") + .booleanType() + .defaultValue(true) + .withDescription("use text qualifier"); + + public static final ConfigOption ENABLE_WRITE_SINGLE_RECORD_AS_FILE = + key("enableWriteSingleRecordAsFile") + .booleanType() + .defaultValue(false) + .withDescription("enable write single record as each file"); + + public static final ConfigOption KEEP_ORIGINAL_FILENAME = + key("keepOriginalFilename") + .booleanType() + .defaultValue(false) + .withDescription("keep original filename"); + + public static final ConfigOption DISABLE_BUCKET_NAME_IN_ENDPOINT = + key("disableBucketNameInEndpoint") + .booleanType() + .defaultValue(false) + .withDescription("disable Bucket Name In Endpoint"); } diff --git a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java index a7d849a4a0..02d708f17e 100644 --- a/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java +++ b/chunjun-connectors/chunjun-connector-s3/src/main/java/com/dtstack/chunjun/connector/s3/util/S3Util.java @@ -30,6 +30,7 @@ import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3Client; import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.s3.S3ClientOptions; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; @@ -46,12 +47,14 @@ import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.io.FilenameUtils; import org.apache.commons.lang3.StringUtils; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.ArrayList; import java.util.List; +import java.util.regex.Pattern; @Slf4j public class S3Util { @@ -75,6 +78,10 @@ public static AmazonS3 getS3Client(S3Config s3Config) { } else { builder = builder.withRegion(clientRegion.getName()); } + // 禁用 Bucket 名称注入到 endpoint 前缀 + if (s3Config.isDisableBucketNameInEndpoint()) { + builder = builder.withPathStyleAccessEnabled(true); + } return builder.build(); } else { @@ -89,6 +96,11 @@ public static AmazonS3 getS3Client(S3Config s3Config) { } AmazonS3Client client = new AmazonS3Client(cred, ccfg); client.setEndpoint(s3Config.getEndpoint()); + // 禁用 Bucket 名称注入到 endpoint 前缀 + if (s3Config.isDisableBucketNameInEndpoint()) { + client.setS3ClientOptions( + S3ClientOptions.builder().setPathStyleAccess(true).build()); + } return client; } } else { @@ -103,18 +115,29 @@ public static PutObjectResult putStringObject( } public static List listObjectsKeyByPrefix( - AmazonS3 s3Client, String bucketName, String prefix, int fetchSize) { + AmazonS3 s3Client, String bucketName, String prefix, int fetchSize, String regex) { List objects = new ArrayList<>(fetchSize); ListObjectsV2Request req = new ListObjectsV2Request().withBucketName(bucketName).withMaxKeys(fetchSize); if (StringUtils.isNotBlank(prefix)) { req.setPrefix(prefix); } + // 定义正则表达式 + Pattern pattern = null; + if (StringUtils.isNotBlank(regex)) { + pattern = Pattern.compile(regex); + } + ListObjectsV2Result result; do { result = s3Client.listObjectsV2(req); for (S3ObjectSummary objectSummary : result.getObjectSummaries()) { + // 如果对象键与正则表达式匹配,则进行相应处理 + if (pattern != null + && !pattern.matcher(FilenameUtils.getName(objectSummary.getKey())).find()) { + continue; + } objects.add(objectSummary.getKey()); } String token = result.getNextContinuationToken(); diff --git a/chunjun-connectors/pom.xml b/chunjun-connectors/pom.xml index a91f7cfe07..059f7379cc 100755 --- a/chunjun-connectors/pom.xml +++ b/chunjun-connectors/pom.xml @@ -110,6 +110,7 @@ chunjun-connector-nebula chunjun-connector-kingbase chunjun-connector-hudi + chunjun-connector-format-base