From f6bcc4d59d91e6c47f0726fa6051b6c324ae5c2e Mon Sep 17 00:00:00 2001 From: "mingbei.xu" <41814775+WilliamTan778@users.noreply.github.com> Date: Tue, 2 Jan 2024 18:01:36 +0800 Subject: [PATCH] [Feature][Connectors-v2-file-ftp] FTP source/sink add ftp connection mode (#6077) (#6099) --- docs/en/connector-v2/sink/FtpFile.md | 7 ++ docs/en/connector-v2/source/FtpFile.md | 7 ++ .../file/config/BaseFileSinkConfig.java | 5 ++ .../seatunnel/file/ftp/config/FtpConf.java | 5 ++ .../file/ftp/config/FtpConfigOptions.java | 8 ++ .../file/ftp/sink/FtpFileSinkFactory.java | 1 + .../file/ftp/source/FtpFileSourceFactory.java | 1 + .../file/ftp/system/FtpConnectionMode.java | 47 ++++++++++ .../ftp/system/SeaTunnelFTPFileSystem.java | 27 ++++++ .../e2e/connector/file/ftp/FtpFileIT.java | 5 ++ .../fake_source_to_ftp_root_path_excel.conf | 85 +++++++++++++++++++ 11 files changed, 198 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md index ab55b6e4dae..3233fc3c6d6 100644 --- a/docs/en/connector-v2/sink/FtpFile.md +++ b/docs/en/connector-v2/sink/FtpFile.md @@ -38,6 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once` | password | string | yes | - | | | path | string | yes | - | | | tmp_path | string | yes | /tmp/seatunnel | The result file will write to a tmp path first and then use `mv` to submit tmp dir to target dir. Need a FTP dir. | +| connection_mode | string | no | active_local | The target ftp connection mode | | custom_filename | boolean | no | false | Whether you need custom the filename | | file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | | filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | @@ -76,6 +77,12 @@ The target ftp password is required The target dir path is required. +### connection_mode [string] + +The target ftp connection mode , default is active mode, supported as the following modes: + +`active_local` `passive_local` + ### custom_filename [boolean] Whether custom the filename diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index 781d7d40bc2..ee231bb087b 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -44,6 +44,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | password | string | yes | - | | path | string | yes | - | | file_format_type | string | yes | - | +| connection_mode | string | no | active_local | | delimiter/field_delimiter | string | no | \001 | | read_columns | list | no | - | | parse_partition_from_path | boolean | no | true | @@ -154,6 +155,12 @@ connector will generate data as the following: |---------------|-----|--------| | tyrantlucifer | 26 | male | +### connection_mode [string] + +The target ftp connection mode , default is active mode, supported as the following modes: + +`active_local` `passive_local` + ### delimiter/field_delimiter [string] **delimiter** parameter will deprecate after version 2.3.5, please use **field_delimiter** instead. diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 112ab9fa1c6..3a6513e993d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -28,6 +28,7 @@ import lombok.Data; import lombok.NonNull; +import java.io.File; import java.io.Serializable; import java.util.Locale; @@ -71,6 +72,10 @@ public BaseFileSinkConfig(@NonNull Config config) { } checkNotNull(path); + if (path.equals(File.separator)) { + this.path = ""; + } + if (config.hasPath(BaseSinkConfig.FILE_NAME_EXPRESSION.key()) && !StringUtils.isBlank( config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key()))) { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java index 7ab43b6db13..9186e1d8ee9 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConf.java @@ -52,6 +52,11 @@ public static HadoopConf buildWithConfig(Config config) { "fs.ftp.user." + host, config.getString(FtpConfigOptions.FTP_USERNAME.key())); ftpOptions.put( "fs.ftp.password." + host, config.getString(FtpConfigOptions.FTP_PASSWORD.key())); + if (config.hasPath(FtpConfigOptions.FTP_CONNECTION_MODE.key())) { + ftpOptions.put( + "fs.ftp.connection.mode", + config.getString(FtpConfigOptions.FTP_CONNECTION_MODE.key())); + } hadoopConf.setExtraOptions(ftpOptions); return hadoopConf; } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java index 2834b7ac2f0..1f00a56abfd 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/config/FtpConfigOptions.java @@ -20,6 +20,9 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions; +import org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode; + +import static org.apache.seatunnel.connectors.seatunnel.file.ftp.system.FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE; public class FtpConfigOptions extends BaseSourceConfigOptions { public static final Option FTP_PASSWORD = @@ -36,4 +39,9 @@ public class FtpConfigOptions extends BaseSourceConfigOptions { Options.key("host").stringType().noDefaultValue().withDescription("FTP server host"); public static final Option FTP_PORT = Options.key("port").intType().noDefaultValue().withDescription("FTP server port"); + public static final Option FTP_CONNECTION_MODE = + Options.key("connection_mode") + .enumType(FtpConnectionMode.class) + .defaultValue(ACTIVE_LOCAL_DATA_CONNECTION_MODE) + .withDescription("FTP server connection mode "); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java index 2cc9a06a5f9..a3fbf886fbb 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java @@ -85,6 +85,7 @@ public OptionRule optionRule() { .optional(BaseSinkConfig.DATE_FORMAT) .optional(BaseSinkConfig.DATETIME_FORMAT) .optional(BaseSinkConfig.TIME_FORMAT) + .optional(FtpConfigOptions.FTP_CONNECTION_MODE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index e15afac55a5..529c93a3f79 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -62,6 +62,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfigOptions.TIME_FORMAT) .optional(BaseSourceConfigOptions.FILE_FILTER_PATTERN) .optional(BaseSourceConfigOptions.COMPRESS_CODEC) + .optional(FtpConfigOptions.FTP_CONNECTION_MODE) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java new file mode 100644 index 00000000000..068aa5974c1 --- /dev/null +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/FtpConnectionMode.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.file.ftp.system; + +/** Ftp connection mode enum. href="http://commons.apache.org/net/">Apache Commons Net. */ +public enum FtpConnectionMode { + + /** ACTIVE_LOCAL_DATA_CONNECTION_MODE */ + ACTIVE_LOCAL_DATA_CONNECTION_MODE("active_local"), + + /** PASSIVE_LOCAL_DATA_CONNECTION_MODE */ + PASSIVE_LOCAL_DATA_CONNECTION_MODE("passive_local"); + + private final String mode; + + FtpConnectionMode(String mode) { + this.mode = mode; + } + + public String getMode() { + return mode; + } + + public static FtpConnectionMode fromMode(String mode) { + for (FtpConnectionMode ftpConnectionModeEnum : FtpConnectionMode.values()) { + if (ftpConnectionModeEnum.getMode().equals(mode)) { + return ftpConnectionModeEnum; + } + } + throw new IllegalArgumentException("Unknown ftp connection mode: " + mode); + } +} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java index 4b69c63416c..04ba218e455 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/system/SeaTunnelFTPFileSystem.java @@ -62,6 +62,8 @@ public class SeaTunnelFTPFileSystem extends FileSystem { public static final String FS_FTP_HOST = "fs.ftp.host"; public static final String FS_FTP_HOST_PORT = "fs.ftp.host.port"; public static final String FS_FTP_PASSWORD_PREFIX = "fs.ftp.password."; + public static final String FS_FTP_CONNECTION_MODE = "fs.ftp.connection.mode"; + public static final String E_SAME_DIRECTORY_ONLY = "only same directory renames are supported"; private URI uri; @@ -153,9 +155,34 @@ private FTPClient connect() throws IOException { + "'"); } + setFsFtpConnectionMode( + client, + conf.get( + FS_FTP_CONNECTION_MODE, + FtpConnectionMode.ACTIVE_LOCAL_DATA_CONNECTION_MODE.getMode())); + return client; } + /** + * Set FTP connection mode. * + * + * @param client FTPClient + * @param mode mode + */ + private void setFsFtpConnectionMode(FTPClient client, String mode) { + switch (FtpConnectionMode.fromMode(mode)) { + case ACTIVE_LOCAL_DATA_CONNECTION_MODE: + client.enterLocalActiveMode(); + break; + case PASSIVE_LOCAL_DATA_CONNECTION_MODE: + client.enterLocalPassiveMode(); + break; + default: + break; + } + } + /** * Logout and disconnect the given FTPClient. * * diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java index 15a58ebf082..2a1598bf32a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/ftp/FtpFileIT.java @@ -104,6 +104,9 @@ public void startUp() throws Exception { "/home/vsftpd/seatunnel/tmp/seatunnel/read/excel_filter/name=tyrantlucifer/hobby=coding/e2e_filter.xlsx", ftpContainer); + ContainerUtil.copyFileIntoContainers( + "/excel/e2e.xlsx", "/home/vsftpd/seatunnel/e2e.xlsx", ftpContainer); + ftpContainer.execInContainer("sh", "-c", "chmod -R 777 /home/vsftpd/seatunnel/"); ftpContainer.execInContainer("sh", "-c", "chown -R ftp:ftp /home/vsftpd/seatunnel/"); } @@ -136,6 +139,8 @@ public void testFtpFileReadAndWrite(TestContainer container) helper.execute("/parquet/fake_to_ftp_file_parquet.conf"); // test write ftp orc file helper.execute("/orc/fake_to_ftp_file_orc.conf"); + // test write ftp root path excel file + helper.execute("/excel/fake_source_to_ftp_root_path_excel.conf"); } @AfterAll diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf new file mode 100644 index 00000000000..3e11b0a08f7 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-ftp-e2e/src/test/resources/excel/fake_source_to_ftp_root_path_excel.conf @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + parallelism = 1 + job.mode = "BATCH" + + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 1 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local +} + +source { + FakeSource { + result_table_name = "ftp" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + } +} + +sink { + FtpFile { + host = "ftp" + port = 21 + user = seatunnel + password = pass + path = "/" + source_table_name = "ftp" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "excel" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + } +}