diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8e773fe6dd3..78c7c539e80 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -33,7 +33,7 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |---------------------------------------|---------|----------|--------------------------------------------|---------------------------------------------------------------------------------------------------| | path | string | yes | - | | | tmp_path | string | no | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. | diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index f40a123a47f..f33ad3c560b 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -56,6 +56,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | skip_header_row_number | long | no | 0 | | schema | config | no | - | | sheet_name | string | no | - | +| excel_engine | string | no | POI | | | xml_row_tag | string | no | - | | xml_use_attr_format | boolean | no | - | | file_filter_pattern | string | no | | @@ -239,6 +240,16 @@ Only need to be configured when file_format is excel. Reader the sheet of the workbook. +### excel_engine [string] + +Only need to be configured when file_format is excel. + +supported as the following file types: +`POI` `EasyExcel` + +The default excel reading engine is POI, but POI can easily cause memory overflow when reading Excel with more than 65,000 rows, so you can switch to EasyExcel as the reading engine. + + ### xml_row_tag [string] Only need to be configured when file_format is xml. diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java index 8c5d7c5475e..b1e0856e1fb 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateTimeUtils.java @@ -56,6 +56,18 @@ public class DateTimeUtils { FORMATTER_MAP.put( Formatter.YYYY_MM_DD_HH_MM_SS_SLASH, DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SLASH.value)); + FORMATTER_MAP.put( + Formatter.YYYY_M_D_HH_MM_SS_SLASH, + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SS_SLASH.value)); + FORMATTER_MAP.put( + Formatter.YYYY_M_D_HH_MM_SS_ISO8601, + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SS_ISO8601.value)); + FORMATTER_MAP.put( + Formatter.YYYY_M_D_HH_MM_SLASH, + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SLASH.value)); + FORMATTER_MAP.put( + Formatter.YYYY_M_D_HH_MM_ISO8601, + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_ISO8601.value)); FORMATTER_MAP.put( Formatter.YYYY_MM_DD_HH_MM_SS_NO_SPLIT, DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_NO_SPLIT.value)); @@ -73,9 +85,26 @@ public class DateTimeUtils { DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SSSSSSSSS_ISO8601.value)); } + // if the datatime string length is 17, find the DateTimeFormatter from this map + public static final Map YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP = + new LinkedHashMap<>(); + + // if the datatime string length is 15, find the DateTimeFormatter from this map + public static final Map YYYY_M_D_HH_MM_15_FORMATTER_MAP = + new LinkedHashMap<>(); + + // all Pattern in this set + public static Set> + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); + + // all Pattern in this set + public static Set> + YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); + // if the datatime string length is 19, find the DateTimeFormatter from this map public static final Map YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP = new LinkedHashMap<>(); + public static Set> YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP_ENTRY_SET = new LinkedHashSet<>(); @@ -115,6 +144,22 @@ public class DateTimeUtils { Pattern.compile("\\d{4}/\\d{2}/\\d{2}\\s\\d{2}:\\d{2}:\\d{2}"), DateTimeFormatter.ofPattern(Formatter.YYYY_MM_DD_HH_MM_SS_SLASH.value)); + YYYY_M_D_HH_MM_15_FORMATTER_MAP.put( + Pattern.compile("\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SLASH.value)); + + YYYY_M_D_HH_MM_15_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_ISO8601.value)); + + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP.put( + Pattern.compile("\\d{4}/\\d{1,2}/\\d{1,2}\\s\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SS_SLASH.value)); + + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP.put( + Pattern.compile("\\d{4}-\\d{1,2}-\\d{1,2}\\s\\d{2}:\\d{2}:\\d{2}"), + DateTimeFormatter.ofPattern(Formatter.YYYY_M_D_HH_MM_SS_ISO8601.value)); + YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.put( Pattern.compile("\\d{4}/\\d{2}/\\d{2}\\s\\d{2}:\\d{2}.*"), new DateTimeFormatterBuilder() @@ -159,6 +204,12 @@ public class DateTimeUtils { YYYY_MM_DD_HH_MM_SS_19_FORMATTER_MAP.entrySet()); YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP_ENTRY_SET.addAll( YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP.entrySet()); + + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP_ENTRY_SET.addAll( + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP.entrySet()); + + YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET.addAll( + YYYY_M_D_HH_MM_15_FORMATTER_MAP.entrySet()); } /** @@ -176,6 +227,12 @@ public static DateTimeFormatter matchDateTimeFormatter(String dateTime) { return entry.getValue(); } } + for (Map.Entry entry : + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } } else if (dateTime.length() > 19) { for (Map.Entry entry : YYYY_MM_DD_HH_MM_SS_M19_FORMATTER_MAP_ENTRY_SET) { @@ -183,7 +240,27 @@ public static DateTimeFormatter matchDateTimeFormatter(String dateTime) { return entry.getValue(); } } + } else if (dateTime.length() == 17 || dateTime.length() == 18) { + for (Map.Entry entry : + YYYY_M_D_HH_MM_SS_17_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } + } else if (dateTime.length() == 15 || dateTime.length() == 16) { + for (Map.Entry entry : + YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } } else if (dateTime.length() == 14) { + for (Map.Entry entry : + YYYY_M_D_HH_MM_15_FORMATTER_MAP_ENTRY_SET) { + if (entry.getKey().matcher(dateTime).matches()) { + return entry.getValue(); + } + } return YYYY_MM_DD_HH_MM_SS_14_FORMATTER; } return null; @@ -247,6 +324,10 @@ public enum Formatter { YYYY_MM_DD_HH_MM_SS_SSSSSS("yyyy-MM-dd HH:mm:ss.SSSSSS"), YYYY_MM_DD_HH_MM_SS_SPOT("yyyy.MM.dd HH:mm:ss"), YYYY_MM_DD_HH_MM_SS_SLASH("yyyy/MM/dd HH:mm:ss"), + YYYY_M_D_HH_MM_SLASH("yyyy/M/d HH:mm"), + YYYY_M_D_HH_MM_ISO8601("yyyy-M-d HH:mm"), + YYYY_M_D_HH_MM_SS_SLASH("yyyy/M/d HH:mm:ss"), + YYYY_M_D_HH_MM_SS_ISO8601("yyyy-M-d HH:mm:ss"), YYYY_MM_DD_HH_MM_SS_NO_SPLIT("yyyyMMddHHmmss"), YYYY_MM_DD_HH_MM_SS_ISO8601("yyyy-MM-dd'T'HH:mm:ss"), YYYY_MM_DD_HH_MM_SS_SSS_ISO8601("yyyy-MM-dd'T'HH:mm:ss.SSS"), diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java index 659b66b9ba9..3fc4af909a3 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/DateUtils.java @@ -60,6 +60,7 @@ public class DateUtils { Pattern.compile("\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d{1,9})?Z"), Pattern.compile("\\d{2}:\\d{2}:\\d{2}\\+\\d{2}:\\d{2}"), Pattern.compile("\\d{2}:\\d{2}:\\d{2}(\\.\\d{1,9})?"), + Pattern.compile("\\d{4}/\\d{1,2}/\\d{1,2}") }; public static final Map DATE_FORMATTER_MAP = new HashMap(); @@ -147,6 +148,12 @@ public class DateUtils { .toFormatter()); DATE_FORMATTER_MAP.put(PATTERN_ARRAY[6], ISO_OFFSET_TIME); DATE_FORMATTER_MAP.put(PATTERN_ARRAY[7], ISO_LOCAL_TIME); + DATE_FORMATTER_MAP.put( + PATTERN_ARRAY[8], + new DateTimeFormatterBuilder() + .parseCaseInsensitive() + .append(DateTimeFormatter.ofPattern("yyyy/M/d")) + .toFormatter()); } /** @@ -184,6 +191,7 @@ public static String toString(LocalDate date, Formatter formatter) { public enum Formatter { YYYY_MM_DD("yyyy-MM-dd"), + YYYY_M_D("yyyy/M/d"), YYYY_MM_DD_SPOT("yyyy.MM.dd"), YYYY_MM_DD_SLASH("yyyy/MM/dd"); private final String value; diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java index a5c02937e02..75697c68f74 100644 --- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java +++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/TimeUtils.java @@ -21,6 +21,7 @@ import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; +import java.util.regex.Pattern; public class TimeUtils { private static final Map FORMATTER_MAP = @@ -37,6 +38,29 @@ public static LocalTime parse(String time, Formatter formatter) { return LocalTime.parse(time, FORMATTER_MAP.get(formatter)); } + public static final Pattern[] PATTERN_ARRAY = + new Pattern[] { + Pattern.compile("\\d{2}:\\d{2}:\\d{2}"), + Pattern.compile("\\d{2}:\\d{2}:\\d{2}.\\d{3}"), + }; + + public static Formatter matchTimeFormatter(String dateTime) { + for (int j = 0; j < PATTERN_ARRAY.length; j++) { + if (PATTERN_ARRAY[j].matcher(dateTime).matches()) { + Formatter dateTimeFormatter = Time_FORMATTER_MAP.get(PATTERN_ARRAY[j]); + return dateTimeFormatter; + } + } + return null; + } + + public static final Map Time_FORMATTER_MAP = new HashMap(); + + static { + Time_FORMATTER_MAP.put(PATTERN_ARRAY[0], Formatter.parse(Formatter.HH_MM_SS.value)); + Time_FORMATTER_MAP.put(PATTERN_ARRAY[1], Formatter.parse(Formatter.HH_MM_SS_SSS.value)); + } + public static String toString(LocalTime time, Formatter formatter) { return time.format(FORMATTER_MAP.get(formatter)); } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java index d4ee0462de4..1fc12eeae29 100644 --- a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateTimeUtilsTest.java @@ -67,6 +67,15 @@ public void testAutoDateTimeFormatter() { datetimeStr = "2020/10/10 10:10:10"; Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); + datetimeStr = "2020/1/1 10:10"; + Assertions.assertEquals("2020-01-01T10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2024/12/2 10:10"; + Assertions.assertEquals("2024-12-02T10:10", DateTimeUtils.parse(datetimeStr).toString()); + + datetimeStr = "2024/12/1 10:10"; + Assertions.assertEquals("2024-12-01T10:10", DateTimeUtils.parse(datetimeStr).toString()); + datetimeStr = "2020年10月10日 10时10分10秒"; Assertions.assertEquals("2020-10-10T10:10:10", DateTimeUtils.parse(datetimeStr).toString()); diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java index b7426c9d16f..7e4c3c58c90 100644 --- a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/DateUtilsTest.java @@ -66,5 +66,17 @@ public void testMatchDateTimeFormatter() { Assertions.assertEquals( "2020-10-10", DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + datetimeStr = "2024/1/1"; + Assertions.assertEquals( + "2024-01-01", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + datetimeStr = "2024/10/1"; + Assertions.assertEquals( + "2024-10-01", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); + datetimeStr = "2024/1/10"; + Assertions.assertEquals( + "2024-01-10", + DateUtils.parse(datetimeStr, DateUtils.matchDateFormatter(datetimeStr)).toString()); } } diff --git a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/TimeUtilsTest.java b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/TimeUtilsTest.java new file mode 100644 index 00000000000..52b5bc65f30 --- /dev/null +++ b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/TimeUtilsTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.common.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TimeUtilsTest { + @Test + public void testMatchTimeFormatter() { + String timeStr = "12:12:12"; + Assertions.assertEquals( + "12:12:12", + TimeUtils.parse(timeStr, TimeUtils.matchTimeFormatter(timeStr)).toString()); + + timeStr = "12:12:12.123"; + Assertions.assertEquals( + "12:12:12.123", + TimeUtils.parse(timeStr, TimeUtils.matchTimeFormatter(timeStr)).toString()); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml index f091e7023d9..5eb7e3d5a77 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml @@ -39,6 +39,8 @@ 3.1.4 2.1.4 2.0.0 + 4.0.3 + 0.18.4 @@ -158,6 +160,13 @@ jaxen ${jaxen.version} + + + com.alibaba + easyexcel + ${easyexcel.version} + + diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java index dedddeacbb1..c2506deaf30 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java @@ -140,6 +140,12 @@ public class BaseSourceConfigOptions { .noDefaultValue() .withDescription("To be read sheet name,only valid for excel files"); + public static final Option EXCEL_ENGINE = + Options.key("excel_engine") + .enumType(ExcelEngine.class) + .defaultValue(ExcelEngine.POI) + .withDescription("To switch excel read engine, e.g. POI , EasyExcel"); + public static final Option XML_ROW_TAG = Options.key("xml_row_tag") .stringType() diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ExcelEngine.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ExcelEngine.java new file mode 100644 index 00000000000..2e4d3b85f7a --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/ExcelEngine.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.config; + +import java.io.Serializable; + +public enum ExcelEngine implements Serializable { + POI("POI"), + EASY_EXCEL("EasyExcel"); + + private final String excelEngineName; + + ExcelEngine(String excelEngineName) { + this.excelEngineName = excelEngineName; + } + + public String getExcelEngineName() { + return excelEngineName; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelCellUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelCellUtils.java new file mode 100644 index 00000000000..6781508cd7e --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelCellUtils.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.excel; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; + +import com.alibaba.excel.enums.CellDataTypeEnum; +import com.alibaba.excel.metadata.Cell; +import com.alibaba.excel.metadata.data.ReadCellData; +import lombok.SneakyThrows; + +import javax.annotation.Nullable; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.nio.charset.StandardCharsets; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.format.DateTimeFormatter; + +public class ExcelCellUtils implements Serializable { + + static final long serialVersionUID = 42L; + + private DateTimeFormatter dateFormatter; + private DateTimeFormatter dateTimeFormatter; + private DateTimeFormatter timeFormatter; + + protected Config pluginConfig; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + public ExcelCellUtils( + Config pluginConfig, + String dateFormatterPattern, + String dateTimeFormatterPattern, + String timeFormatterPattern) { + this.pluginConfig = pluginConfig; + this.dateFormatter = DateTimeFormatter.ofPattern(dateFormatterPattern); + this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormatterPattern); + this.timeFormatter = DateTimeFormatter.ofPattern(timeFormatterPattern); + } + + private String getCellValue(ReadCellData cellData) { + + if (cellData.getStringValue() != null) { + return cellData.getStringValue(); + } else if (cellData.getNumberValue() != null) { + return cellData.getNumberValue().toString(); + } else if (cellData.getOriginalNumberValue() != null) { + return cellData.getOriginalNumberValue().toString(); + } else if (cellData.getBooleanValue() != null) { + return cellData.getBooleanValue().toString(); + } else if (cellData.getType() == CellDataTypeEnum.EMPTY) { + return ""; + } + return null; + } + + @SneakyThrows(JsonProcessingException.class) + public Object convert(Object field, SeaTunnelDataType fieldType, @Nullable Cell cellRaw) { + if (field == null && cellRaw == null) { + return null; + } + + String fieldValue = + (field instanceof String) || cellRaw == null + ? field.toString() + : getCellValue((ReadCellData) cellRaw); + + SqlType sqlType = fieldType.getSqlType(); + + if (fieldValue == null || (fieldValue.equals("") && sqlType != SqlType.STRING)) { + return null; + } + + switch (sqlType) { + case MAP: + case ARRAY: + return objectMapper.readValue(fieldValue, fieldType.getTypeClass()); + case STRING: + if (field instanceof Double) { + String stringValue = field.toString(); + if (stringValue.endsWith(".0")) { + return stringValue.substring(0, stringValue.length() - 2); + } + return stringValue; + } + return fieldValue; + case DOUBLE: + return Double.parseDouble(fieldValue); + case BOOLEAN: + return Boolean.parseBoolean(fieldValue); + case FLOAT: + return (float) Double.parseDouble(fieldValue); + case BIGINT: + return (long) Double.parseDouble(fieldValue); + case INT: + return (int) Double.parseDouble(fieldValue); + case TINYINT: + return (byte) Double.parseDouble(fieldValue); + case SMALLINT: + return (short) Double.parseDouble(fieldValue); + case DECIMAL: + return BigDecimal.valueOf(Double.parseDouble(fieldValue)); + case DATE: + return parseDate(field, fieldType); + case TIME: + return parseTime(field, fieldType); + case TIMESTAMP: + return parseTimestamp(field, fieldType); + case NULL: + return null; + case BYTES: + return fieldValue.getBytes(StandardCharsets.UTF_8); + case ROW: + return parseRow(fieldValue, fieldType); + default: + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, + "User defined schema validation failed"); + } + } + + private Object parseDate(Object fieldValue, SeaTunnelDataType fieldType) { + if (fieldValue instanceof LocalDateTime) { + return ((LocalDateTime) fieldValue).toLocalDate(); + } + return LocalDate.parse(fieldValue.toString(), dateFormatter); + } + + private Object parseTime(Object fieldValue, SeaTunnelDataType fieldType) { + if (fieldValue instanceof LocalDateTime) { + return ((LocalDateTime) fieldValue).toLocalTime(); + } + return LocalTime.parse(fieldValue.toString(), timeFormatter); + } + + private Object parseTimestamp(Object fieldValue, SeaTunnelDataType fieldType) { + if (fieldValue instanceof LocalDateTime) { + return fieldValue; + } + return LocalDateTime.parse(fieldValue.toString(), dateTimeFormatter); + } + + private Object parseRow(String fieldValue, SeaTunnelDataType fieldType) { + String delimiter = + ReadonlyConfig.fromConfig(pluginConfig) + .get(BaseSourceConfigOptions.FIELD_DELIMITER); + String[] context = fieldValue.split(delimiter); + SeaTunnelRowType ft = (SeaTunnelRowType) fieldType; + int length = context.length; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(length); + for (int j = 0; j < length; j++) { + seaTunnelRow.setField(j, convert(context[j], ft.getFieldType(j), null)); + } + return seaTunnelRow; + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelReaderListener.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelReaderListener.java new file mode 100644 index 00000000000..297e667b65a --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/excel/ExcelReaderListener.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.excel; + +import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.seatunnel.shade.com.typesafe.config.Config; + +import org.apache.seatunnel.api.source.Collector; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; + +import com.alibaba.excel.context.AnalysisContext; +import com.alibaba.excel.event.AnalysisEventListener; +import com.alibaba.excel.exception.ExcelDataConvertException; +import com.alibaba.excel.metadata.Cell; +import com.alibaba.excel.metadata.data.ReadCellData; +import lombok.extern.slf4j.Slf4j; + +import java.io.Closeable; +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +@Slf4j +public class ExcelReaderListener extends AnalysisEventListener> + implements Serializable, Closeable { + private final String tableId; + private final Collector output; + private int cellCount; + + private final ObjectMapper objectMapper = new ObjectMapper(); + + protected Config pluginConfig; + + protected SeaTunnelRowType seaTunnelRowType; + + private SeaTunnelDataType[] fieldTypes; + + private ExcelCellUtils excelCellUtils; + + Map customHeaders = new HashMap<>(); + + public ExcelReaderListener( + String tableId, + Collector output, + ExcelCellUtils excelCellUtils, + SeaTunnelRowType seaTunnelRowType) { + this.tableId = tableId; + this.output = output; + this.excelCellUtils = excelCellUtils; + this.seaTunnelRowType = seaTunnelRowType; + + fieldTypes = seaTunnelRowType.getFieldTypes(); + } + + @Override + public void invokeHead(Map> headMap, AnalysisContext context) { + for (int i = 0; i < headMap.size(); i++) { + String header = headMap.get(i).getStringValue(); + if (!"null".equals(header)) { + customHeaders.put(i, header); + } + } + } + + @Override + public void invoke(Map data, AnalysisContext context) { + cellCount = data.size(); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fieldTypes.length); + Map cellMap = context.readRowHolder().getCellMap(); + int i = 0; + for (; i < fieldTypes.length; i++) { + if (cellMap.get(i) == null) { + seaTunnelRow.setField(i, null); + } else { + Object cell = excelCellUtils.convert(data.get(i), fieldTypes[i], cellMap.get(i)); + seaTunnelRow.setField(i, cell); + } + } + seaTunnelRow.setTableId(tableId); + output.collect(seaTunnelRow); + } + + @Override + public void doAfterAllAnalysed(AnalysisContext context) { + log.info("excel parsing completed"); + } + + @Override + public void onException(Exception exception, AnalysisContext context) { + log.debug("cell parsing exception :{}", exception.getMessage()); + if (exception instanceof ExcelDataConvertException) { + ExcelDataConvertException excelDataConvertException = + (ExcelDataConvertException) exception; + log.debug( + "row:{},cell:{},data:{}", + excelDataConvertException.getRowIndex(), + excelDataConvertException.getColumnIndex(), + excelDataConvertException.getCellData()); + } + } + + @Override + public void close() throws IOException {} +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java index c4a982fc90d..0a3f995f419 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java @@ -17,21 +17,20 @@ package org.apache.seatunnel.connectors.seatunnel.file.source.reader; -import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper; - -import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.Collector; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.table.type.SqlType; import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated; import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.config.ExcelEngine; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; +import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelCellUtils; +import org.apache.seatunnel.connectors.seatunnel.file.excel.ExcelReaderListener; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.poi.hssf.usermodel.HSSFWorkbook; @@ -47,35 +46,33 @@ import org.apache.poi.xssf.usermodel.XSSFFormulaEvaluator; import org.apache.poi.xssf.usermodel.XSSFWorkbook; +import com.alibaba.excel.EasyExcel; +import com.alibaba.excel.read.builder.ExcelReaderBuilder; +import lombok.Getter; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; import java.io.IOException; import java.io.InputStream; -import java.math.BigDecimal; -import java.nio.charset.StandardCharsets; -import java.time.LocalDate; -import java.time.LocalDateTime; -import java.time.LocalTime; -import java.time.format.DateTimeFormatter; import java.util.Map; import java.util.Objects; import java.util.stream.IntStream; -import static org.apache.seatunnel.common.utils.DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; - +@Getter +@Slf4j public class ExcelReadStrategy extends AbstractReadStrategy { - private final DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; + private String dateFormatterPattern = DateUtils.Formatter.YYYY_MM_DD.getValue(); + + private String dateTimeFormatterPattern = + DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS.getValue(); - private final DateTimeUtils.Formatter datetimeFormat = YYYY_MM_DD_HH_MM_SS; - private final TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS; + private String timeFormatterPattern = TimeUtils.Formatter.HH_MM_SS.getValue(); private int[] indexes; private int cellCount; - private final ObjectMapper objectMapper = new ObjectMapper(); - @SneakyThrows @Override public void read(String path, String tableId, Collector output) { @@ -92,70 +89,119 @@ protected void readProcess( Map partitionsMap, String currentFileName) throws IOException { - Workbook workbook; - FormulaEvaluator formulaEvaluator; - if (currentFileName.endsWith(".xls")) { - workbook = new HSSFWorkbook(inputStream); - formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); - } else if (currentFileName.endsWith(".xlsx")) { - workbook = new XSSFWorkbook(inputStream); - formulaEvaluator = new XSSFFormulaEvaluator((XSSFWorkbook) workbook); - } else { - throw new FileConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, - "Only support read excel file"); - } - DataFormatter formatter = new DataFormatter(); - Sheet sheet = - pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key()) - ? workbook.getSheet( - pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key())) - : workbook.getSheetAt(0); - cellCount = seaTunnelRowType.getTotalFields(); - cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size(); - SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); - int rowCount = sheet.getPhysicalNumberOfRows(); - if (skipHeaderNumber > Integer.MAX_VALUE - || skipHeaderNumber < Integer.MIN_VALUE - || skipHeaderNumber > rowCount) { + + if (skipHeaderNumber > Integer.MAX_VALUE || skipHeaderNumber < Integer.MIN_VALUE) { throw new FileConnectorException( CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, "Skip the number of rows exceeds the maximum or minimum limit of Sheet"); } - IntStream.range((int) skipHeaderNumber, rowCount) - .mapToObj(sheet::getRow) - .filter(Objects::nonNull) - .forEach( - rowData -> { - int[] cellIndexes = - indexes == null - ? IntStream.range(0, cellCount).toArray() - : indexes; - int z = 0; - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount); - for (int j : cellIndexes) { - Cell cell = rowData.getCell(j); - seaTunnelRow.setField( - z++, - cell == null - ? null - : convert( - getCellValue( - cell.getCellType(), - cell, - formulaEvaluator, - formatter), - fieldTypes[z - 1])); - } - if (isMergePartition) { - int index = seaTunnelRowType.getTotalFields(); - for (String value : partitionsMap.values()) { - seaTunnelRow.setField(index++, value); + + if (pluginConfig.hasPath(BaseSourceConfigOptions.DATE_FORMAT.key())) { + dateFormatterPattern = + pluginConfig.getString(BaseSourceConfigOptions.DATE_FORMAT.key()); + } + if (pluginConfig.hasPath(BaseSourceConfigOptions.DATETIME_FORMAT.key())) { + dateTimeFormatterPattern = + pluginConfig.getString(BaseSourceConfigOptions.DATETIME_FORMAT.key()); + } + if (pluginConfig.hasPath(BaseSourceConfigOptions.TIME_FORMAT.key())) { + timeFormatterPattern = + pluginConfig.getString(BaseSourceConfigOptions.TIME_FORMAT.key()); + } + + ExcelCellUtils excelCellUtils = + new ExcelCellUtils( + pluginConfig, + dateFormatterPattern, + dateTimeFormatterPattern, + timeFormatterPattern); + + if (pluginConfig.hasPath(BaseSourceConfigOptions.EXCEL_ENGINE.key()) + && pluginConfig + .getString(BaseSourceConfigOptions.EXCEL_ENGINE.key()) + .equals(ExcelEngine.EASY_EXCEL.getExcelEngineName())) { + log.info("Parsing Excel with EasyExcel"); + + ExcelReaderBuilder read = + EasyExcel.read( + inputStream, + new ExcelReaderListener( + tableId, output, excelCellUtils, seaTunnelRowType)); + if (pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key())) { + read.sheet(pluginConfig.getString(BaseSourceConfigOptions.SHEET_NAME.key())) + .headRowNumber((int) skipHeaderNumber) + .doReadSync(); + } else { + read.sheet(0).headRowNumber((int) skipHeaderNumber).doReadSync(); + } + } else { + log.info("Parsing Excel with POI"); + + Workbook workbook; + FormulaEvaluator formulaEvaluator; + if (currentFileName.endsWith(".xls")) { + workbook = new HSSFWorkbook(inputStream); + formulaEvaluator = workbook.getCreationHelper().createFormulaEvaluator(); + } else if (currentFileName.endsWith(".xlsx")) { + workbook = new XSSFWorkbook(inputStream); + formulaEvaluator = new XSSFFormulaEvaluator((XSSFWorkbook) workbook); + } else { + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "Only support read excel file"); + } + DataFormatter formatter = new DataFormatter(); + Sheet sheet = + pluginConfig.hasPath(BaseSourceConfigOptions.SHEET_NAME.key()) + ? workbook.getSheet( + pluginConfig.getString( + BaseSourceConfigOptions.SHEET_NAME.key())) + : workbook.getSheetAt(0); + cellCount = seaTunnelRowType.getTotalFields(); + cellCount = partitionsMap.isEmpty() ? cellCount : cellCount + partitionsMap.size(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + int rowCount = sheet.getPhysicalNumberOfRows(); + if (skipHeaderNumber > rowCount) { + throw new FileConnectorException( + CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION, + "Skip the number of rows exceeds the maximum or minimum limit of Sheet"); + } + IntStream.range((int) skipHeaderNumber, rowCount) + .mapToObj(sheet::getRow) + .filter(Objects::nonNull) + .forEach( + rowData -> { + int[] cellIndexes = + indexes == null + ? IntStream.range(0, cellCount).toArray() + : indexes; + int z = 0; + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(cellCount); + for (int j : cellIndexes) { + Cell cell = rowData.getCell(j); + seaTunnelRow.setField( + z++, + cell == null + ? null + : excelCellUtils.convert( + getCellValue( + cell.getCellType(), + cell, + formulaEvaluator, + formatter), + fieldTypes[z - 1], + null)); + } + if (isMergePartition) { + int index = seaTunnelRowType.getTotalFields(); + for (String value : partitionsMap.values()) { + seaTunnelRow.setField(index++, value); + } } - } - seaTunnelRow.setTableId(tableId); - output.collect(seaTunnelRow); - }); + seaTunnelRow.setTableId(tableId); + output.collect(seaTunnelRow); + }); + } } @Override @@ -229,86 +275,6 @@ private Object getCellValue( return null; } - @SneakyThrows - private Object convert(Object field, SeaTunnelDataType fieldType) { - if (field == null) { - return null; - } - - SqlType sqlType = fieldType.getSqlType(); - if (!(SqlType.STRING.equals(sqlType)) && "".equals(field)) { - return null; - } - switch (sqlType) { - case MAP: - case ARRAY: - return objectMapper.readValue((String) field, fieldType.getTypeClass()); - case STRING: - if (field instanceof Double) { - String stringValue = field.toString(); - if (stringValue.endsWith(".0")) { - return stringValue.substring(0, stringValue.length() - 2); - } - return stringValue; - } - return String.valueOf(field); - case DOUBLE: - return Double.parseDouble(field.toString()); - case BOOLEAN: - return Boolean.parseBoolean(field.toString()); - case FLOAT: - return (float) Double.parseDouble(field.toString()); - case BIGINT: - return (long) Double.parseDouble(field.toString()); - case INT: - return (int) Double.parseDouble(field.toString()); - case TINYINT: - return (byte) Double.parseDouble(field.toString()); - case SMALLINT: - return (short) Double.parseDouble(field.toString()); - case DECIMAL: - return BigDecimal.valueOf(Double.parseDouble(field.toString())); - case DATE: - if (field instanceof LocalDateTime) { - return ((LocalDateTime) field).toLocalDate(); - } - return LocalDate.parse( - (String) field, DateTimeFormatter.ofPattern(dateFormat.getValue())); - case TIME: - if (field instanceof LocalDateTime) { - return ((LocalDateTime) field).toLocalTime(); - } - return LocalTime.parse( - (String) field, DateTimeFormatter.ofPattern(timeFormat.getValue())); - case TIMESTAMP: - if (field instanceof LocalDateTime) { - return field; - } - return LocalDateTime.parse( - (String) field, DateTimeFormatter.ofPattern(datetimeFormat.getValue())); - case NULL: - return null; - case BYTES: - return field.toString().getBytes(StandardCharsets.UTF_8); - case ROW: - String delimiter = - ReadonlyConfig.fromConfig(pluginConfig) - .get(BaseSourceConfigOptions.FIELD_DELIMITER); - String[] context = field.toString().split(delimiter); - SeaTunnelRowType ft = (SeaTunnelRowType) fieldType; - int length = context.length; - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(length); - for (int j = 0; j < length; j++) { - seaTunnelRow.setField(j, convert(context[j], ft.getFieldType(j))); - } - return seaTunnelRow; - default: - throw new FileConnectorException( - CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, - "User defined schema validation failed"); - } - } - private boolean isNullOrEmpty(T[] arr) { return arr == null || arr.length == 0; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/ExcelReadStrategyTest.java similarity index 92% rename from seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java rename to seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/ExcelReadStrategyTest.java index f445d932d7d..91b4d7041de 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/reader/ExcelReadStrategyTest.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.seatunnel.connectors.seatunnel.file.writer; +package org.apache.seatunnel.connectors.seatunnel.file.reader; import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; @@ -227,6 +227,41 @@ public void testExcelReadDateString() throws IOException, URISyntaxException { } } + @Test + public void testEasyExcelRead() throws IOException, URISyntaxException { + testLargeExcelRead( + "/excel/test_read_excel_date_string.xlsx", + "/excel/test_read_excel_data_string.conf", + 1); + testLargeExcelRead("/excel/e2e.xls", "/excel/e2exls.conf", 5); + testLargeExcelRead("/excel/e2e.xlsx", "/excel/e2exls.conf", 5); + } + + private void testLargeExcelRead(String filePath, String configPath, int rowCount) + throws IOException, URISyntaxException { + URL excelFile = ExcelReadStrategyTest.class.getResource(filePath); + URL conf = ExcelReadStrategyTest.class.getResource(configPath); + + Assertions.assertNotNull(excelFile); + Assertions.assertNotNull(conf); + String excelFilePath = Paths.get(excelFile.toURI()).toString(); + String confPath = Paths.get(conf.toURI()).toString(); + Config pluginConfig = ConfigFactory.parseFile(new File(confPath)); + ExcelReadStrategy excelReadStrategy = new ExcelReadStrategy(); + LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT); + excelReadStrategy.setPluginConfig(pluginConfig); + excelReadStrategy.init(localConf); + + List fileNamesByPath = excelReadStrategy.getFileNamesByPath(excelFilePath); + CatalogTable userDefinedCatalogTable = CatalogTableUtil.buildWithConfig(pluginConfig); + excelReadStrategy.setCatalogTable(userDefinedCatalogTable); + + TestCollector testCollector = new TestCollector(); + excelReadStrategy.read(fileNamesByPath.get(0), "", testCollector); + + Assertions.assertEquals(testCollector.getRows().size(), rowCount); + } + @Test public void testExcelReadFormulaXls() throws IOException, URISyntaxException { URL excelFile = ExcelReadStrategyTest.class.getResource("/excel/test_read_formula.xls"); diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xls b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xls new file mode 100644 index 00000000000..644ce78ef56 Binary files /dev/null and b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xls differ diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xlsx b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xlsx new file mode 100644 index 00000000000..87d363d7db3 Binary files /dev/null and b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2e.xlsx differ diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2exls.conf b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2exls.conf new file mode 100644 index 00000000000..69f4c32e9f5 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/e2exls.conf @@ -0,0 +1,56 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{ + sheet_name = "test" + skip_header_row_number = 1 + field_delimiter = ";" + excel_engine = "EasyExcel" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_data_string.conf b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_data_string.conf new file mode 100644 index 00000000000..32905242273 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_data_string.conf @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + sheet_name = "Sheet1" + skip_header_row_number = 1 + date_format = "yyyy-MM-dd" + excel_engine = "EasyExcel" + schema = { + fields { + c_bytes = "tinyint" + c_short = "smallint" + c_int = "int" + c_bigint = "bigint" + c_string = "string" + c_double = "double" + c_float = "float" + c_decimal = "decimal(10, 2)" + c_boolean = "boolean" + c_map = "map" + c_array = "array" + c_date = "date" + c_datetime = "timestamp" + c_time = "time" + } + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_large.conf b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_large.conf new file mode 100644 index 00000000000..b5e6404914f --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/excel/test_read_excel_large.conf @@ -0,0 +1,102 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +{ + plugin_output = "fake1" + delimiter = "," + ignore_first_line = false + file_format_type = "excel" + datatime_format = "yyyy-MM-dd HH:mm:ss" + sheet_name = "Sheet1" + skip_header_row_number = 1 + excel_engine = "EasyExcel" + schema { + fields { + c1 = TIMESTAMP + c2 = string + c3 = string + c4 = string + c5 = string + c6 = string + c7 = string + c8 = string + c9 = string + c10 = string + c11 = string + c12 = string + c13 = string + c14 = string + c15 = string + c16 = string + c17 = string + c18 = string + c19 = string + c20 = string + c21 = string + c22 = string + c23 = string + c24 = string + c25 = string + c26 = string + c27 = string + c28 = string + c29 = string + c30 = string + c31 = string + c32 = string + c33 = string + c34 = string + c35 = string + c36 = string + c37 = string + c38 = string + c39 = string + c40 = string + c41 = string + c42 = string + c43 = string + c44 = string + c45 = string + c46 = string + c47 = string + c48 = string + c49 = string + c50 = string + c51 = string + c52 = string + c53 = string + c54 = string + c55 = string + c56 = string + c57 = string + c58 = string + c59 = string + c60 = string + c61 = string + c62 = string + c63 = string + c64 = string + c65 = string + c66 = string + c67 = string + c68 = string + c69 = string + c70 = string + c71 = string + } + } + } \ No newline at end of file