From a2e28cbf16ded53a86579f8bdbaf1c8843f5fcfe Mon Sep 17 00:00:00 2001 From: wangbin Date: Sun, 12 Jan 2025 17:23:13 +0800 Subject: [PATCH] [Improve][Connector-file-base] split a parquet file into multi splits for big file read scene. --- .../file/source/reader/OrcReadStrategy.java | 2 +- .../source/reader/ParquetReadStrategy.java | 99 ++++++++++ .../file/writer/ParquetReadStrategyTest.java | 170 ++++++++++++++++++ 3 files changed, 270 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java index 99f70f17514..062be3652c2 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java @@ -91,8 +91,8 @@ public class OrcReadStrategy extends AbstractReadStrategy { /** user can specified row count per split */ private long rowCountPerSplitByUser = 0; - private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30; private final long DEFAULT_ROW_COUNT = 100000; + private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30; private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT; @Override diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java index 1264df9807a..78408377b1b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java @@ -33,12 +33,14 @@ import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; +import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; import org.apache.avro.Conversions; import org.apache.avro.data.TimeConversions; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.commons.collections4.CollectionUtils; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.parquet.avro.AvroParquetReader; @@ -66,8 +68,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; @@ -82,6 +86,101 @@ public class ParquetReadStrategy extends AbstractReadStrategy { private int[] indexes; + @Override + public Set getFileSourceSplits(String path) { + Set fileSourceSplits = new HashSet<>(); + ParquetMetadata metadata; + try (ParquetFileReader reader = + hadoopFileSystemProxy.doWithHadoopAuth( + ((configuration, userGroupInformation) -> { + HadoopInputFile hadoopInputFile = + HadoopInputFile.fromPath(new Path(path), configuration); + return ParquetFileReader.open(hadoopInputFile); + }))) { + metadata = reader.getFooter(); + } catch (IOException e) { + String errorMsg = + String.format("Create parquet reader for this file [%s] failed", path); + throw new FileConnectorException( + CommonErrorCodeDeprecated.READER_OPERATION_FAILED, errorMsg, e); + } + if (metadata == null || CollectionUtils.isEmpty(metadata.getBlocks())) { + log.warn("cannot get meta or blocks for path:{}", path); + fileSourceSplits.add(new FileSourceSplit(path)); + return fileSourceSplits; + } + + long low = 0; + long high = low; + long splitCountAll = 0; + for (int i = 0; i < metadata.getBlocks().size(); i++) { + high = low + metadata.getBlocks().get(i).getCompressedSize(); + FileSourceSplit split = new FileSourceSplit(path, low, high); + fileSourceSplits.add(split); + low = high; + } + log.info("generate parquet split count:{} for this file:{}", splitCountAll, path); + return fileSourceSplits; + } + + @Override + public void read(FileSourceSplit split, Collector output) + throws IOException, FileConnectorException { + String path = split.getFilePath(); + String tableId = split.getTableId(); + if (split.getMinRowIndex() == null || split.getMaxRowIndex() == null) { + log.warn( + "minRowIndex or maxRowIndex is null, use fileBaseRead. fileSourceSplit:{}", + split); + read(path, tableId, output); + return; + } + if (Boolean.FALSE.equals(checkFileType(path))) { + String errorMsg = + String.format( + "This file [%s] is not a parquet file, please check the format of this file", + path); + throw new FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg); + } + Path filePath = new Path(path); + Map partitionsMap = parsePartitionsByPath(path); + HadoopInputFile hadoopInputFile = + hadoopFileSystemProxy.doWithHadoopAuth( + (configuration, userGroupInformation) -> + HadoopInputFile.fromPath(filePath, configuration)); + int fieldsCount = seaTunnelRowType.getTotalFields(); + GenericData dataModel = new GenericData(); + dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion()); + dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion()); + dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion()); + GenericRecord record; + try (ParquetReader reader = + AvroParquetReader.builder(hadoopInputFile) + .withDataModel(dataModel) + .withFileRange(split.getMinRowIndex(), split.getMaxRowIndex()) + .build()) { + while ((record = reader.read()) != null) { + Object[] fields; + if (isMergePartition) { + int index = fieldsCount; + fields = new Object[fieldsCount + partitionsMap.size()]; + for (String value : partitionsMap.values()) { + fields[index++] = value; + } + } else { + fields = new Object[fieldsCount]; + } + for (int i = 0; i < fieldsCount; i++) { + Object data = record.get(indexes[i]); + fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i)); + } + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields); + seaTunnelRow.setTableId(tableId); + output.collect(seaTunnelRow); + } + } + } + @Override public void read(String path, String tableId, Collector output) throws FileConnectorException, IOException { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java index 66af39f18d0..26336deded6 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy; +import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; @@ -35,6 +36,7 @@ import org.apache.avro.util.Utf8; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.orc.StripeInformation; import org.apache.parquet.avro.AvroParquetWriter; import org.apache.parquet.hadoop.ParquetWriter; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -44,6 +46,7 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; +import com.google.common.collect.Lists; import lombok.extern.slf4j.Slf4j; import java.io.File; @@ -53,12 +56,166 @@ import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; +import java.util.Random; +import java.util.Set; import java.util.TimeZone; +import java.util.stream.Collectors; import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT; @Slf4j public class ParquetReadStrategyTest { + + public static List autoGenData(String filePath, int count) + throws IOException { + // Define the schema (for example, a single column with a string type) + + String schemaString = + "{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}"; + Schema schema = new Schema.Parser().parse(schemaString); + + Configuration conf = new Configuration(); + + Path file = new Path(filePath); + long startTs = System.currentTimeMillis(); + try (ParquetWriter writer = + AvroParquetWriter.builder(file) + .withSchema(schema) + .withConf(conf) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .build(); ) { + GenericRecord record1 = new GenericData.Record(schema); + record1.put("id", 1); + record1.put("name", "Alice"); + record1.put("salary", 50000.0); + GenericArray skills1 = + new GenericData.Array<>(2, schema.getField("skills").schema()); + skills1.add(new Utf8("Java")); + skills1.add(new Utf8("Python")); + record1.put("skills", skills1); + writer.write(record1); + + Random random = new Random(); + for (int i = 1; i < count; i++) { + GenericRecord record2 = new GenericData.Record(schema); + record2.put("id", i); + record2.put("name", String.valueOf(random.nextInt())); + record2.put("salary", random.nextDouble()); + GenericArray skills2 = + new GenericData.Array<>(2, schema.getField("skills").schema()); + skills2.add(new Utf8(String.valueOf(random.nextInt()))); + skills2.add(new Utf8("Python")); + record2.put("skills", skills2); + writer.write(record2); + } + } + System.out.println( + "write file success. count:" + + count + + ", useTime:" + + (System.currentTimeMillis() - startTs)); + return null; + } + + public static void deleteFile(String filePath) { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } + + @Test + public void testParquetGetSplits() throws IOException { + String file = "d:/local_output.parquet"; + deleteFile(file); + int rowCount = 10000000; + if (!new File(file).exists()) { + autoGenData(file, rowCount); + } + try { + ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + parquetReadStrategy.init(localConf); + + Set set = parquetReadStrategy.getFileSourceSplits(file); + List list = + set.stream() + .sorted((o1, o2) -> (int) (o1.getMinRowIndex() - o2.getMinRowIndex())) + .collect(Collectors.toList()); + for (FileSourceSplit fileSourceSplit : list) { + System.out.println(fileSourceSplit); + } + // file size: 9m + Assertions.assertTrue(set.size() > 1); + Assertions.assertEquals(list.get(1).getMinRowIndex(), list.get(0).getMaxRowIndex()); + + SeaTunnelRowType seaTunnelRowTypeInfo = + parquetReadStrategy.getSeaTunnelRowTypeInfo(file); + + long rowSize = 0; + SeaTunnelRow firstRow = null; + SeaTunnelRow lastRow = null; + for (int i = 0; i < list.size(); i++) { + // 读取单个分片128m大概需要6s多 + FileSourceSplit split = list.get(i); + TestCollector testCollector = new TestCollector(); + long l1 = System.currentTimeMillis(); + parquetReadStrategy.read(split, testCollector); + long l2 = System.currentTimeMillis(); + System.out.println("read split use time " + (l2 - l1)); + rowSize += testCollector.rows.size(); + if (i == 0) { + firstRow = testCollector.getRows().get(0); + } + if (i == list.size() - 1) { + lastRow = testCollector.getRows().get(testCollector.rows.size() - 1); + } + } + Assertions.assertEquals(rowCount, rowSize); + + List sl = test1(parquetReadStrategy, file, rowCount); + + Assertions.assertEquals(firstRow.getField(0), sl.get(0).getField(0)); + Assertions.assertEquals(lastRow.getField(1), sl.get(1).getField(1)); + + } finally { + deleteFile(file); + } + } + + public List test1( + ParquetReadStrategy parquetReadStrategy, String file, long rowCount) + throws IOException { + TestCollector testCollector1 = new TestCollector(); + long l1 = System.currentTimeMillis(); + parquetReadStrategy.read(file, "", testCollector1); + long l2 = System.currentTimeMillis(); + System.out.println("read file use time " + (l2 - l1)); + Assertions.assertEquals(rowCount, testCollector1.rows.size()); + return Lists.newArrayList( + testCollector1.rows.get(0), + testCollector1.getRows().get(testCollector1.rows.size() - 1)); + } + + @Test + public void testParquetRead3() throws Exception { + URL resource = ParquetReadStrategyTest.class.getResource("/hive.parquet"); + Assertions.assertNotNull(resource); + String path = Paths.get(resource.toURI()).toString(); + ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + parquetReadStrategy.init(localConf); + SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(path); + Assertions.assertNotNull(seaTunnelRowTypeInfo); + log.info(seaTunnelRowTypeInfo.toString()); + TestCollector testCollector = new TestCollector(); + Set set = parquetReadStrategy.getFileSourceSplits(path); + for (FileSourceSplit split : set) { + parquetReadStrategy.read(split, testCollector); + } + Assertions.assertEquals(2, testCollector.rows.size()); + } + @Test public void testParquetRead1() throws Exception { URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet"); @@ -72,6 +229,12 @@ public void testParquetRead1() throws Exception { log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); + TestCollector testCollector1 = new TestCollector(); + Set set = parquetReadStrategy.getFileSourceSplits(path); + for (FileSourceSplit split : set) { + parquetReadStrategy.read(split, testCollector1); + } + Assertions.assertEquals(testCollector.getRows().size(), testCollector1.rows.size()); } @Test @@ -87,6 +250,13 @@ public void testParquetRead2() throws Exception { log.info(seaTunnelRowTypeInfo.toString()); TestCollector testCollector = new TestCollector(); parquetReadStrategy.read(path, "", testCollector); + Assertions.assertEquals(2, testCollector.rows.size()); + TestCollector testCollector1 = new TestCollector(); + Set set = parquetReadStrategy.getFileSourceSplits(path); + for (FileSourceSplit split : set) { + parquetReadStrategy.read(split, testCollector1); + } + Assertions.assertEquals(testCollector.getRows().size(), testCollector1.rows.size()); } @Test