Skip to content

Commit

Permalink
[Improve][Connector-file-base] split a parquet file into multi splits…
Browse files Browse the repository at this point in the history
… for big file read scene.
  • Loading branch information
wangbin committed Jan 12, 2025
1 parent 7f706c7 commit a2e28cb
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
/** user can specified row count per split */
private long rowCountPerSplitByUser = 0;

private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
private final long DEFAULT_ROW_COUNT = 100000;
private final long DEFAULT_FILE_SIZE_PER_SPLIT = 1024 * 1024 * 30;
private long fileSizePerSplitByUser = DEFAULT_FILE_SIZE_PER_SPLIT;

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.avro.Conversions;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetReader;
Expand Down Expand Up @@ -66,8 +68,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

Expand All @@ -82,6 +86,101 @@ public class ParquetReadStrategy extends AbstractReadStrategy {

private int[] indexes;

@Override
public Set<FileSourceSplit> getFileSourceSplits(String path) {
Set<FileSourceSplit> fileSourceSplits = new HashSet<>();
ParquetMetadata metadata;
try (ParquetFileReader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
((configuration, userGroupInformation) -> {
HadoopInputFile hadoopInputFile =
HadoopInputFile.fromPath(new Path(path), configuration);
return ParquetFileReader.open(hadoopInputFile);
}))) {
metadata = reader.getFooter();
} catch (IOException e) {
String errorMsg =
String.format("Create parquet reader for this file [%s] failed", path);
throw new FileConnectorException(
CommonErrorCodeDeprecated.READER_OPERATION_FAILED, errorMsg, e);
}
if (metadata == null || CollectionUtils.isEmpty(metadata.getBlocks())) {
log.warn("cannot get meta or blocks for path:{}", path);
fileSourceSplits.add(new FileSourceSplit(path));
return fileSourceSplits;
}

long low = 0;
long high = low;
long splitCountAll = 0;
for (int i = 0; i < metadata.getBlocks().size(); i++) {
high = low + metadata.getBlocks().get(i).getCompressedSize();
FileSourceSplit split = new FileSourceSplit(path, low, high);
fileSourceSplits.add(split);
low = high;
}
log.info("generate parquet split count:{} for this file:{}", splitCountAll, path);
return fileSourceSplits;
}

@Override
public void read(FileSourceSplit split, Collector<SeaTunnelRow> output)
throws IOException, FileConnectorException {
String path = split.getFilePath();
String tableId = split.getTableId();
if (split.getMinRowIndex() == null || split.getMaxRowIndex() == null) {
log.warn(
"minRowIndex or maxRowIndex is null, use fileBaseRead. fileSourceSplit:{}",
split);
read(path, tableId, output);
return;
}
if (Boolean.FALSE.equals(checkFileType(path))) {
String errorMsg =
String.format(
"This file [%s] is not a parquet file, please check the format of this file",
path);
throw new FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
}
Path filePath = new Path(path);
Map<String, String> partitionsMap = parsePartitionsByPath(path);
HadoopInputFile hadoopInputFile =
hadoopFileSystemProxy.doWithHadoopAuth(
(configuration, userGroupInformation) ->
HadoopInputFile.fromPath(filePath, configuration));
int fieldsCount = seaTunnelRowType.getTotalFields();
GenericData dataModel = new GenericData();
dataModel.addLogicalTypeConversion(new Conversions.DecimalConversion());
dataModel.addLogicalTypeConversion(new TimeConversions.DateConversion());
dataModel.addLogicalTypeConversion(new TimeConversions.LocalTimestampMillisConversion());
GenericRecord record;
try (ParquetReader<GenericData.Record> reader =
AvroParquetReader.<GenericData.Record>builder(hadoopInputFile)
.withDataModel(dataModel)
.withFileRange(split.getMinRowIndex(), split.getMaxRowIndex())
.build()) {
while ((record = reader.read()) != null) {
Object[] fields;
if (isMergePartition) {
int index = fieldsCount;
fields = new Object[fieldsCount + partitionsMap.size()];
for (String value : partitionsMap.values()) {
fields[index++] = value;
}
} else {
fields = new Object[fieldsCount];
}
for (int i = 0; i < fieldsCount; i++) {
Object data = record.get(indexes[i]);
fields[i] = resolveObject(data, seaTunnelRowType.getFieldType(i));
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
seaTunnelRow.setTableId(tableId);
output.collect(seaTunnelRow);
}
}
}

@Override
public void read(String path, String tableId, Collector<SeaTunnelRow> output)
throws FileConnectorException, IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericArray;
Expand All @@ -35,6 +36,7 @@
import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.orc.StripeInformation;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -44,6 +46,7 @@
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;

import java.io.File;
Expand All @@ -53,12 +56,166 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TimeZone;
import java.util.stream.Collectors;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;

@Slf4j
public class ParquetReadStrategyTest {

public static List<StripeInformation> autoGenData(String filePath, int count)
throws IOException {
// Define the schema (for example, a single column with a string type)

String schemaString =
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
Schema schema = new Schema.Parser().parse(schemaString);

Configuration conf = new Configuration();

Path file = new Path(filePath);
long startTs = System.currentTimeMillis();
try (ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build(); ) {
GenericRecord record1 = new GenericData.Record(schema);
record1.put("id", 1);
record1.put("name", "Alice");
record1.put("salary", 50000.0);
GenericArray<Utf8> skills1 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills1.add(new Utf8("Java"));
skills1.add(new Utf8("Python"));
record1.put("skills", skills1);
writer.write(record1);

Random random = new Random();
for (int i = 1; i < count; i++) {
GenericRecord record2 = new GenericData.Record(schema);
record2.put("id", i);
record2.put("name", String.valueOf(random.nextInt()));
record2.put("salary", random.nextDouble());
GenericArray<Utf8> skills2 =
new GenericData.Array<>(2, schema.getField("skills").schema());
skills2.add(new Utf8(String.valueOf(random.nextInt())));
skills2.add(new Utf8("Python"));
record2.put("skills", skills2);
writer.write(record2);
}
}
System.out.println(
"write file success. count:"
+ count
+ ", useTime:"
+ (System.currentTimeMillis() - startTs));
return null;
}

public static void deleteFile(String filePath) {
File file = new File(filePath);
if (file.exists()) {
file.delete();
}
}

@Test
public void testParquetGetSplits() throws IOException {
String file = "d:/local_output.parquet";
deleteFile(file);
int rowCount = 10000000;
if (!new File(file).exists()) {
autoGenData(file, rowCount);
}
try {
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);

Set<FileSourceSplit> set = parquetReadStrategy.getFileSourceSplits(file);
List<FileSourceSplit> list =
set.stream()
.sorted((o1, o2) -> (int) (o1.getMinRowIndex() - o2.getMinRowIndex()))
.collect(Collectors.toList());
for (FileSourceSplit fileSourceSplit : list) {
System.out.println(fileSourceSplit);
}
// file size: 9m
Assertions.assertTrue(set.size() > 1);
Assertions.assertEquals(list.get(1).getMinRowIndex(), list.get(0).getMaxRowIndex());

SeaTunnelRowType seaTunnelRowTypeInfo =
parquetReadStrategy.getSeaTunnelRowTypeInfo(file);

long rowSize = 0;
SeaTunnelRow firstRow = null;
SeaTunnelRow lastRow = null;
for (int i = 0; i < list.size(); i++) {
// 读取单个分片128m大概需要6s多
FileSourceSplit split = list.get(i);
TestCollector testCollector = new TestCollector();
long l1 = System.currentTimeMillis();
parquetReadStrategy.read(split, testCollector);
long l2 = System.currentTimeMillis();
System.out.println("read split use time " + (l2 - l1));
rowSize += testCollector.rows.size();
if (i == 0) {
firstRow = testCollector.getRows().get(0);
}
if (i == list.size() - 1) {
lastRow = testCollector.getRows().get(testCollector.rows.size() - 1);
}
}
Assertions.assertEquals(rowCount, rowSize);

List<SeaTunnelRow> sl = test1(parquetReadStrategy, file, rowCount);

Assertions.assertEquals(firstRow.getField(0), sl.get(0).getField(0));
Assertions.assertEquals(lastRow.getField(1), sl.get(1).getField(1));

} finally {
deleteFile(file);
}
}

public List<SeaTunnelRow> test1(
ParquetReadStrategy parquetReadStrategy, String file, long rowCount)
throws IOException {
TestCollector testCollector1 = new TestCollector();
long l1 = System.currentTimeMillis();
parquetReadStrategy.read(file, "", testCollector1);
long l2 = System.currentTimeMillis();
System.out.println("read file use time " + (l2 - l1));
Assertions.assertEquals(rowCount, testCollector1.rows.size());
return Lists.newArrayList(
testCollector1.rows.get(0),
testCollector1.getRows().get(testCollector1.rows.size() - 1));
}

@Test
public void testParquetRead3() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/hive.parquet");
Assertions.assertNotNull(resource);
String path = Paths.get(resource.toURI()).toString();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRowType seaTunnelRowTypeInfo = parquetReadStrategy.getSeaTunnelRowTypeInfo(path);
Assertions.assertNotNull(seaTunnelRowTypeInfo);
log.info(seaTunnelRowTypeInfo.toString());
TestCollector testCollector = new TestCollector();
Set<FileSourceSplit> set = parquetReadStrategy.getFileSourceSplits(path);
for (FileSourceSplit split : set) {
parquetReadStrategy.read(split, testCollector);
}
Assertions.assertEquals(2, testCollector.rows.size());
}

@Test
public void testParquetRead1() throws Exception {
URL resource = ParquetReadStrategyTest.class.getResource("/timestamp_as_int64.parquet");
Expand All @@ -72,6 +229,12 @@ public void testParquetRead1() throws Exception {
log.info(seaTunnelRowTypeInfo.toString());
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, "", testCollector);
TestCollector testCollector1 = new TestCollector();
Set<FileSourceSplit> set = parquetReadStrategy.getFileSourceSplits(path);
for (FileSourceSplit split : set) {
parquetReadStrategy.read(split, testCollector1);
}
Assertions.assertEquals(testCollector.getRows().size(), testCollector1.rows.size());
}

@Test
Expand All @@ -87,6 +250,13 @@ public void testParquetRead2() throws Exception {
log.info(seaTunnelRowTypeInfo.toString());
TestCollector testCollector = new TestCollector();
parquetReadStrategy.read(path, "", testCollector);
Assertions.assertEquals(2, testCollector.rows.size());
TestCollector testCollector1 = new TestCollector();
Set<FileSourceSplit> set = parquetReadStrategy.getFileSourceSplits(path);
for (FileSourceSplit split : set) {
parquetReadStrategy.read(split, testCollector1);
}
Assertions.assertEquals(testCollector.getRows().size(), testCollector1.rows.size());
}

@Test
Expand Down

0 comments on commit a2e28cb

Please sign in to comment.