diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 59f3852cb0d..f65255bfd77 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -38,7 +38,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
## Options
-| name | type | required | default value |
+| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| host | string | yes | - |
| port | int | yes | - |
@@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| compress_codec | string | no | none |
| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
+| null_format | string | no | - |
| common-options | | no | - |
### host [string]
@@ -336,6 +337,13 @@ The compress codec of archive files and the details that supported as the follow
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`.
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
### common options
Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details.
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 161b0e63183..caaf9972a06 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -66,6 +66,7 @@ Read data from hdfs file system.
| compress_codec | string | no | none | The compress codec of files |
| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 | |
+| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
### delimiter/field_delimiter [string]
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 8923a031607..477a4d41399 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -62,6 +62,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| compress_codec | string | no | none |
| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
+| null_format | string | no | - |
| common-options | | no | - |
| tables_configs | list | no | used to define a multiple table task |
@@ -330,6 +331,13 @@ The compress codec of archive files and the details that supported as the follow
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`.
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
### common options
Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index ae2686b7e81..42163a9d13e 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -211,6 +211,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
| xml_use_attr_format | boolean | no | - | Specifies whether to process data using the tag attribute format, only used when file_format is xml. |
| compress_codec | string | no | none | Which compress codec the files used. |
| encoding | string | no | UTF-8 |
+| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` |
| file_filter_pattern | string | no | | Filter pattern, which used for filtering files. |
| common-options | config | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index 1db5d62a441..9b83a0b0501 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -72,6 +72,7 @@ It only supports hadoop version **2.9.X+**.
| compress_codec | string | no | none |
| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
+| null_format | string | no | - |
| common-options | | no | - |
### path [string]
@@ -343,6 +344,13 @@ The compress codec of archive files and the details that supported as the follow
Only used when file_format_type is json,text,csv,xml.
The encoding of the file to read. This param will be parsed by `Charset.forName(encoding)`.
+### null_format [string]
+
+Only used when file_format_type is text.
+null_format to define which strings can be represented as null.
+
+e.g: `\N`
+
### common options
Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details.
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index ba4b71cfe93..b0e69cd1e36 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -220,6 +220,7 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto
| compress_codec | string | no | none | |
| archive_compress_codec | string | no | none | |
| encoding | string | no | UTF-8 | |
+| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` |
| file_filter_pattern | string | no | | Filter pattern, which used for filtering files. |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index c6e4aa6c84c..f5b76ce3055 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -71,7 +71,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel
## Source Options
-| Name | Type | Required | default value | Description |
+| Name | Type | Required | default value | Description |
|---------------------------|---------|----------|---------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| host | String | Yes | - | The target sftp host is required |
| port | Int | Yes | - | The target sftp port is required |
@@ -94,6 +94,7 @@ The File does not have a specific type list, and we can indicate which SeaTunnel
| compress_codec | String | No | None | The compress codec of files and the details that supported as the following shown:
- txt: `lzo` `None`
- json: `lzo` `None`
- csv: `lzo` `None`
- orc: `lzo` `snappy` `lz4` `zlib` `None`
- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `None`
Tips: excel type does Not support any compression format |
| archive_compress_codec | string | no | none |
| encoding | string | no | UTF-8 |
+| null_format | string | no | - | Only used when file_format_type is text. null_format to define which strings can be represented as null. e.g: `\N` |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
### file_filter_pattern [string]
diff --git a/docs/zh/connector-v2/source/HdfsFile.md b/docs/zh/connector-v2/source/HdfsFile.md
index 9cd254ef808..4de3014f5c0 100644
--- a/docs/zh/connector-v2/source/HdfsFile.md
+++ b/docs/zh/connector-v2/source/HdfsFile.md
@@ -56,6 +56,7 @@
| kerberos_keytab_path | string | 否 | - | kerberos 的 keytab 路径。 |
| skip_header_row_number | long | 否 | 0 | 跳过前几行,但仅适用于 txt 和 csv。例如,设置如下:`skip_header_row_number = 2`。然后 Seatunnel 将跳过源文件中的前两行。 |
| file_filter_pattern | string | 否 | - | 过滤模式,用于过滤文件。 |
+| null_format | string | 否 | - | 定义哪些字符串可以表示为 null,但仅适用于 txt 和 csv. 例如: `\N` |
| schema | config | 否 | - | 上游数据的模式字段。 |
| sheet_name | string | 否 | - | 读取工作簿的表格,仅在文件格式为 excel 时使用。 |
| compress_codec | string | 否 | none | 文件的压缩编解码器。 |
diff --git a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
index f6908b989d2..33193cb314a 100644
--- a/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
+++ b/seatunnel-connectors-v2/connector-assert/src/main/java/org/apache/seatunnel/connectors/seatunnel/assertion/excecutor/AssertExecutor.java
@@ -285,11 +285,11 @@ private boolean compareMapValue(Map, ?> value, MapType, ?> type, Map, ?> c
private Boolean checkType(Object value, SeaTunnelDataType> fieldType) {
if (value == null) {
- if (fieldType.getSqlType() == SqlType.NULL) {
- return true;
- } else {
- return false;
- }
+ return true;
+ }
+
+ if (fieldType.getSqlType() == SqlType.NULL) {
+ return false;
}
if (fieldType.getSqlType() == SqlType.ROW) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
index de45726e3c7..dedddeacbb1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfigOptions.java
@@ -48,6 +48,12 @@ public class BaseSourceConfigOptions {
.withDescription(
"The separator between columns in a row of data. Only needed by `text` file format");
+ public static final Option NULL_FORMAT =
+ Options.key("null_format")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The string that represents a null value");
+
public static final Option ENCODING =
Options.key("encoding")
.stringType()
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 1a7a7398a4f..b13aad765a5 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -157,10 +157,15 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) {
"When reading json/text/csv files, if user has not specified schema information, "
+ "SeaTunnel will not support column projection");
}
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
TextDeserializationSchema.Builder builder =
TextDeserializationSchema.builder()
.delimiter(TextFormatConstant.PLACEHOLDER)
- .textLineSplitor(textLineSplitor);
+ .textLineSplitor(textLineSplitor)
+ .nullFormat(
+ readonlyConfig
+ .getOptional(BaseSourceConfigOptions.NULL_FORMAT)
+ .orElse(null));
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(this.seaTunnelRowTypeWithPartition).build();
@@ -175,11 +180,11 @@ public void setCatalogTable(CatalogTable catalogTable) {
SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
SeaTunnelRowType userDefinedRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), rowType);
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
Optional fieldDelimiterOptional =
- ReadonlyConfig.fromConfig(pluginConfig)
- .getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
+ readonlyConfig.getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
encoding =
- ReadonlyConfig.fromConfig(pluginConfig)
+ readonlyConfig
.getOptional(BaseSourceConfigOptions.ENCODING)
.orElse(StandardCharsets.UTF_8.name());
if (fieldDelimiterOptional.isPresent()) {
@@ -198,7 +203,11 @@ public void setCatalogTable(CatalogTable catalogTable) {
TextDeserializationSchema.Builder builder =
TextDeserializationSchema.builder()
.delimiter(fieldDelimiter)
- .textLineSplitor(textLineSplitor);
+ .textLineSplitor(textLineSplitor)
+ .nullFormat(
+ readonlyConfig
+ .getOptional(BaseSourceConfigOptions.NULL_FORMAT)
+ .orElse(null));
if (isMergePartition) {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
index 388d245047b..ab519d299da 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java
@@ -72,6 +72,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 9bb4b98e053..3411e9c407b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -83,6 +83,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(FtpConfigOptions.FTP_CONNECTION_MODE)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 88e46841801..f09dba2c1f8 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -69,6 +69,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
index a6c9276c76e..f195ac30714 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSourceFactory.java
@@ -71,6 +71,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 0d58e506da5..1c66cc6f8b6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -78,6 +78,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.TIME_FORMAT)
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
index e1cd0ee97ba..586718b6410 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSourceFactory.java
@@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.DATE_FORMAT)
.optional(BaseSourceConfigOptions.DATETIME_FORMAT)
.optional(BaseSourceConfigOptions.TIME_FORMAT)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 6f140330cc8..9e70249be72 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -83,6 +83,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index d1107d46cf7..a3376e745e6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -87,6 +87,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index b4d1d1c63f5..8e5fd9ade81 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -76,6 +76,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN)
.optional(BaseSourceConfigOptions.COMPRESS_CODEC)
.optional(BaseSourceConfigOptions.ARCHIVE_COMPRESS_CODEC)
+ .optional(BaseSourceConfigOptions.NULL_FORMAT)
.build();
}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index eba9b5a15be..dd3d9478c13 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -46,6 +46,7 @@
import org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveTypeConvertor;
import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -63,6 +64,7 @@
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.ROW_DELIMITER;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions.NULL_FORMAT;
@Getter
public class HiveSourceConfig implements Serializable {
@@ -122,6 +124,19 @@ private ReadStrategy parseReadStrategy(
case TEXT:
// if the file format is text, we set the delim.
Map parameters = table.getSd().getSerdeInfo().getParameters();
+ if (!readonlyConfig.getOptional(NULL_FORMAT).isPresent()) {
+ String nullFormatKey = "serialization.null.format";
+ String nullFormat = table.getParameters().get(nullFormatKey);
+ if (StringUtils.isEmpty(nullFormat)) {
+ nullFormat = parameters.get(nullFormatKey);
+ }
+ if (StringUtils.isEmpty(nullFormat)) {
+ nullFormat = "\\N";
+ }
+ config =
+ config.withValue(
+ NULL_FORMAT.key(), ConfigValueFactory.fromAnyRef(nullFormat));
+ }
config =
config.withValue(
FIELD_DELIMITER.key(),
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index ed055a3a303..87e3483df43 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -268,6 +268,12 @@ public class LocalFileIT extends TestSuiteBase {
"/excel/e2e.xlsx",
"/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx",
container);
+
+ ContainerUtil.copyFileIntoContainers(
+ "/text/e2e_null_format.txt",
+ "/seatunnel/read/e2e_null_format/e2e_null_format.txt",
+ container);
+
container.execInContainer("mkdir", "-p", "/tmp/fake_empty");
};
@@ -293,6 +299,7 @@ public void testLocalFileReadAndWrite(TestContainer container)
helper.execute("/text/fake_to_local_file_with_encoding.conf");
// test read local csv file with assigning encoding
helper.execute("/text/local_file_text_to_console_with_encoding.conf");
+ helper.execute("/text/local_file_null_format_assert.conf");
// test write local json file
helper.execute("/json/fake_to_local_file_json.conf");
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt
new file mode 100644
index 00000000000..e5c0c117e57
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e_null_format.txt
@@ -0,0 +1,3 @@
+1,a,a,1
+2,a,a,1
+3,a,a,1
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf
new file mode 100644
index 00000000000..a70f83a737a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_null_format_assert.conf
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/e2e_null_format"
+ file_format_type = "text"
+ delimiter = ","
+ null_format = "a"
+ schema = {
+ fields {
+ f1 = bigint
+ f2 = bigint
+ f3 = string
+ f4 = bigint
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = f1
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = f2
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NULL
+ }
+ ]
+ },
+ {
+ field_name = f3
+ field_type = string
+ field_value = [
+ {
+ rule_type = NULL
+ }
+ ]
+ },
+ {
+ field_name = f4
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+
diff --git a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
index 8c06a0e68c4..16c4bb93ea1 100644
--- a/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
+++ b/seatunnel-formats/seatunnel-format-text/src/main/java/org/apache/seatunnel/format/text/TextDeserializationSchema.java
@@ -61,6 +61,7 @@ public class TextDeserializationSchema implements DeserializationSchema splitsMap = splitLineBySeaTunnelRowType(content, seaTunnelRowType, 0);
Object[] objects = new Object[seaTunnelRowType.getTotalFields()];
for (int i = 0; i < objects.length; i++) {
+ String fieldValue = splitsMap.get(i);
+ if (StringUtils.isBlank(fieldValue)) {
+ continue;
+ }
+ if (StringUtils.equals(fieldValue, nullFormat)) {
+ continue;
+ }
objects[i] =
convert(
- splitsMap.get(i),
+ fieldValue,
seaTunnelRowType.getFieldType(i),
0,
seaTunnelRowType.getFieldNames()[i]);