diff --git a/docs/en/concept/sink-options-placeholders.md b/docs/en/concept/sink-options-placeholders.md index 88eada299fc..e7db53c1324 100644 --- a/docs/en/concept/sink-options-placeholders.md +++ b/docs/en/concept/sink-options-placeholders.md @@ -37,6 +37,8 @@ The placeholders are mainly controlled by the following expressions: - Used to get the table unique-key fields in the upstream catalog table - `${field_names}` - Used to get the table field keys in the upstream catalog table +- `${comment}` + - Used to get the table comment in the upstream catalog table ## Configuration diff --git a/docs/en/connector-v2/sink/Clickhouse.md b/docs/en/connector-v2/sink/Clickhouse.md index 06ab4c05944..0837f762039 100644 --- a/docs/en/connector-v2/sink/Clickhouse.md +++ b/docs/en/connector-v2/sink/Clickhouse.md @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS - index_granularity = 8192; + index_granularity = 8192 +COMMENT '${comment}'; ``` If custom fields are added to the template, for example, adding an `id` field: @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS - index_granularity = 8192; + index_granularity = 8192 +COMMENT '${comment}'; ``` The connector will automatically retrieve the corresponding types from the upstream source and fill in the template, removing the `id` field from the `rowtype_fields`. This method can be used to modify custom field types and attributes. @@ -121,6 +123,7 @@ The following placeholders can be used: - `rowtype_fields`: Retrieves all fields from the upstream schema and automatically maps them to Clickhouse field descriptions. - `rowtype_primary_key`: Retrieves the primary key from the upstream schema (this may be a list). - `rowtype_unique_key`: Retrieves the unique key from the upstream schema (this may be a list). +- `comment`: Retrieves the table comment from the upstream schema. ## How to Create a Clickhouse Data Synchronization Jobs diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index bddbad7713a..dc177a3962b 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -93,6 +93,7 @@ ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) +COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", @@ -110,6 +111,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` id, ${rowtype_fields} ) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key}) + COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( @@ -129,6 +131,7 @@ You can use the following placeholders - rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) - rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list) - rowtype_duplicate_key: Used to get the duplicate key in the upstream schema (only for doris source, maybe a list) +- comment: Used to get the table comment in the upstream schema ## Data Type Mapping diff --git a/docs/en/connector-v2/sink/Maxcompute.md b/docs/en/connector-v2/sink/Maxcompute.md index 7210ee5bd36..b387ce9d970 100644 --- a/docs/en/connector-v2/sink/Maxcompute.md +++ b/docs/en/connector-v2/sink/Maxcompute.md @@ -62,7 +62,7 @@ Default template: ```sql CREATE TABLE IF NOT EXISTS `${table}` ( ${rowtype_fields} -); +) COMMENT '${comment}'; ``` If a custom field is filled in the template, such as adding an `id` field @@ -72,7 +72,7 @@ CREATE TABLE IF NOT EXISTS `${table}` ( id, ${rowtype_fields} -); +) COMMENT '${comment}'; ``` The connector will automatically obtain the corresponding type from the upstream to complete the filling, @@ -86,6 +86,7 @@ You can use the following placeholders description of MaxCompute - rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) - rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list) +- comment: Used to get the table comment in the upstream schema ### schema_save_mode[Enum] diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 5358d6030b3..e90c72d6295 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -67,6 +67,7 @@ ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP PRIMARY KEY (${rowtype_primary_key}) +COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES ( "replication_num" = "1" ) @@ -79,7 +80,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( id, ${rowtype_fields} -) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key}) +) ENGINE = OLAP + COMMENT '${comment}' + DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_num" = "1" @@ -97,6 +100,7 @@ You can use the following placeholders description of StarRocks - rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) - rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list) +- comment: Used to get the table comment in the upstream schema ### table [string] diff --git a/docs/zh/concept/sink-options-placeholders.md b/docs/zh/concept/sink-options-placeholders.md index 2553feb549f..05542b76e08 100644 --- a/docs/zh/concept/sink-options-placeholders.md +++ b/docs/zh/concept/sink-options-placeholders.md @@ -37,6 +37,8 @@ SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占 - 用于获取上游表中的唯一键字段名称列表 - `${field_names}` - 用于获取上游表中的所有字段名称列表 +- `${comment}` + - 用于获取上游表中的表注释 ## 配置 diff --git a/docs/zh/connector-v2/sink/Clickhouse.md b/docs/zh/connector-v2/sink/Clickhouse.md index 9b36f936b05..3dc04ce962c 100644 --- a/docs/zh/connector-v2/sink/Clickhouse.md +++ b/docs/zh/connector-v2/sink/Clickhouse.md @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS - index_granularity = 8192; + index_granularity = 8192 +COMMENT '${comment}'; ``` 如果模板中填写了自定义字段,例如添加 id 字段 @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( ORDER BY (${rowtype_primary_key}) PRIMARY KEY (${rowtype_primary_key}) SETTINGS - index_granularity = 8192; + index_granularity = 8192 + COMMENT '${comment}'; ``` 连接器会自动从上游获取对应类型完成填充, @@ -122,6 +124,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` ( - rowtype_fields:用于获取上游schema中的所有字段,自动映射到 Clickhouse 的字段描述。 - rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。 - rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。 +- comment:用于获取上游模式中的表注释。 ## 如何创建一个clickhouse 同步任务 diff --git a/docs/zh/connector-v2/sink/Doris.md b/docs/zh/connector-v2/sink/Doris.md index 9de0580f7a6..66fbe728ae5 100644 --- a/docs/zh/connector-v2/sink/Doris.md +++ b/docs/zh/connector-v2/sink/Doris.md @@ -92,6 +92,7 @@ ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP UNIQUE KEY (${rowtype_primary_key}) +COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_allocation" = "tag.location.default: 1", @@ -109,6 +110,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` id, ${rowtype_fields} ) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key}) + COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( @@ -124,8 +126,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` - database:用于获取上游schema中的数据库。 - table_name:用于获取上游schema中的表名。 - rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。 -- rowtype_primary_key:用于获取上游模式中的主键(可能是列表) +- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。 - rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。 +- comment:用于获取上游模式中的表注释。 ## 数据类型映射 diff --git a/docs/zh/connector-v2/sink/StarRocks.md b/docs/zh/connector-v2/sink/StarRocks.md index a2258bf5bba..3a9ac1eab35 100644 --- a/docs/zh/connector-v2/sink/StarRocks.md +++ b/docs/zh/connector-v2/sink/StarRocks.md @@ -64,6 +64,7 @@ ${rowtype_primary_key}, ${rowtype_fields} ) ENGINE=OLAP PRIMARY KEY (${rowtype_primary_key}) +COMMENT '${comment}' DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES ( "replication_num" = "1" ) @@ -76,7 +77,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` ( id, ${rowtype_fields} -) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key}) +) ENGINE = OLAP + COMMENT '${comment}' + DISTRIBUTED BY HASH (${rowtype_primary_key}) PROPERTIES ( "replication_num" = "1" @@ -92,6 +95,7 @@ StarRocks数据接收器根据上游数据自动获取相应的信息来填充 - rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型 - rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表 - rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表 +- comment: 上游数据模式的注释信息 ### table [string] diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java index 02b72faffb2..9b4e7ae4265 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SaveModePlaceHolder.java @@ -27,6 +27,7 @@ public enum SaveModePlaceHolder { ROWTYPE_FIELDS("rowtype_fields", "fields"), TABLE("table", "table"), DATABASE("database", "database"), + COMMENT("comment", "comment"), /** @deprecated instead by {@link #TABLE} todo remove this enum */ @Deprecated TABLE_NAME("table_name", "table name"); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java index 4c7bba896ef..6d4ef98ced1 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/catalog/ClickhouseCatalog.java @@ -144,6 +144,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI tablePath.getDatabaseName(), tablePath.getTableName(), template, + table.getComment(), table.getTableSchema()); } @@ -252,6 +253,7 @@ public PreviewResult previewAction( tablePath.getDatabaseName(), tablePath.getTableName(), catalogTable.get().getTableSchema(), + catalogTable.get().getComment(), ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key())); } else if (actionType == ActionType.DROP_TABLE) { return new SQLPreviewResult( diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java index 6476223e897..8ee0e837476 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java @@ -228,7 +228,11 @@ public class ClickhouseConfig { + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ")\n" + "SETTINGS\n" - + " index_granularity = 8192;") + + " index_granularity = 8192" + + "\n" + + "COMMENT '" + + SaveModePlaceHolder.COMMENT.getPlaceHolder() + + "';") .withDescription( "Create table statement template, used to create Clickhouse table"); } diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java index 275e16791b5..61243deaa45 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseProxy.java @@ -351,13 +351,18 @@ public void executeSql(String sql) { } public void createTable( - String database, String table, String template, TableSchema tableSchema) { + String database, + String table, + String template, + String comment, + TableSchema tableSchema) { String createTableSql = ClickhouseCatalogUtil.INSTANCE.getCreateTableSql( template, database, table, tableSchema, + comment, ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()); log.debug("Create Clickhouse table sql: {}", createTableSql); executeSql(createTableSql); diff --git a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java index 5728b18bcfe..d270be52a87 100644 --- a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java @@ -96,6 +96,7 @@ public void test() { .ASC))))) .columns(columns) .build(), + "clickhouse test table", ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()); Assertions.assertEquals( createTableSql, @@ -129,6 +130,7 @@ public void test() { "test1", "test2", tableSchema, + "clickhouse test table", ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key())); String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(); @@ -224,6 +226,7 @@ public void testInSeq() { "", Arrays.asList("L_ORDERKEY", "L_LINENUMBER"))) .columns(columns) .build(), + "clickhouse test table", ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()); String expected = "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" @@ -249,4 +252,110 @@ public void testInSeq() { + " index_granularity = 8192;"; Assertions.assertEquals(result, expected); } + + @Test + public void testTableComment() { + List columns = new ArrayList<>(); + + columns.add( + PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_LINENUMBER", BasicType.INT_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_QUANTITY", new DecimalType(15, 2), (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_EXTENDEDPRICE", new DecimalType(15, 2), (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_DISCOUNT", new DecimalType(15, 2), (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of("L_TAX", new DecimalType(15, 2), (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_RETURNFLAG", BasicType.STRING_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_LINESTATUS", BasicType.STRING_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_COMMITDATE", + LocalTimeType.LOCAL_DATE_TYPE, + (Long) null, + false, + null, + "")); + columns.add( + PhysicalColumn.of( + "L_RECEIPTDATE", + LocalTimeType.LOCAL_DATE_TYPE, + (Long) null, + false, + null, + "")); + columns.add( + PhysicalColumn.of( + "L_SHIPINSTRUCT", BasicType.STRING_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_SHIPMODE", BasicType.STRING_TYPE, (Long) null, false, null, "")); + columns.add( + PhysicalColumn.of( + "L_COMMENT", BasicType.STRING_TYPE, (Long) null, false, null, "")); + + String result = + ClickhouseCatalogUtil.INSTANCE.getCreateTableSql( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n" + + "${rowtype_primary_key},\n" + + "${rowtype_fields}\n" + + ") ENGINE = MergeTree()\n" + + "ORDER BY (${rowtype_primary_key})\n" + + "PRIMARY KEY (${rowtype_primary_key})\n" + + "SETTINGS\n" + + " index_granularity = 8192\n" + + "COMMENT '${comment}';", + "tpch", + "lineitem", + TableSchema.builder() + .primaryKey( + PrimaryKey.of( + "", Arrays.asList("L_ORDERKEY", "L_LINENUMBER"))) + .columns(columns) + .build(), + "clickhouse test table", + ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()); + String expected = + "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" + + "`L_ORDERKEY` Int32 ,`L_LINENUMBER` Int32 ,\n" + + "`L_PARTKEY` Int32 ,\n" + + "`L_SUPPKEY` Int32 ,\n" + + "`L_QUANTITY` Decimal(15, 2) ,\n" + + "`L_EXTENDEDPRICE` Decimal(15, 2) ,\n" + + "`L_DISCOUNT` Decimal(15, 2) ,\n" + + "`L_TAX` Decimal(15, 2) ,\n" + + "`L_RETURNFLAG` String ,\n" + + "`L_LINESTATUS` String ,\n" + + "`L_SHIPDATE` Date ,\n" + + "`L_COMMITDATE` Date ,\n" + + "`L_RECEIPTDATE` Date ,\n" + + "`L_SHIPINSTRUCT` String ,\n" + + "`L_SHIPMODE` String ,\n" + + "`L_COMMENT` String \n" + + ") ENGINE = MergeTree()\n" + + "ORDER BY (`L_ORDERKEY`,`L_LINENUMBER`)\n" + + "PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`)\n" + + "SETTINGS\n" + + " index_granularity = 8192\n" + + "COMMENT 'clickhouse test table';"; + Assertions.assertEquals(result, expected); + } } diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java index 7d10260cf5e..1fdc59579a3 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java @@ -30,6 +30,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; @@ -43,6 +44,7 @@ public String getCreateTableSql( String database, String table, TableSchema tableSchema, + String comment, String optionsKey) { String primaryKey = ""; if (tableSchema.getPrimaryKey() != null) { @@ -101,7 +103,10 @@ public String getCreateTableSql( return template.replaceAll(SaveModePlaceHolder.DATABASE.getReplacePlaceHolder(), database) .replaceAll(SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), table) .replaceAll( - SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); + SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields) + .replaceAll( + SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(), + Objects.isNull(comment) ? "" : comment); } private String mergeColumnInTemplate( diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java index 372418d12a4..5340c953bac 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java @@ -134,6 +134,9 @@ public interface DorisSinkOptions { + " UNIQUE KEY (" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ")\n" + + "COMMENT '" + + SaveModePlaceHolder.COMMENT.getPlaceHolder() + + "'\n" + "DISTRIBUTED BY HASH (" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ")\n " diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java index 91ce2a51d2d..11018d5c5cc 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java @@ -37,6 +37,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; @@ -205,7 +206,10 @@ public static String getCreateTableStatement( .replaceAll( SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName()) .replaceAll( - SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); + SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields) + .replaceAll( + SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(), + Objects.isNull(catalogTable.getComment()) ? "" : catalogTable.getComment()); } private static String mergeColumnInTemplate( diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java index cdaa55487c6..2142c0e2e2c 100644 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java @@ -119,7 +119,7 @@ public void test() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""), + "doris test comment"), DorisTypeConverterV1.INSTANCE); Assertions.assertEquals( result, @@ -153,7 +153,7 @@ public void test() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""); + "doris test comment"); TablePath tablePath = TablePath.of("test1.test2"); SeaTunnelRuntimeException actualSeaTunnelRuntimeException = Assertions.assertThrows( @@ -263,7 +263,7 @@ public void testInSeq() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""), + "doris test comment"), DorisTypeConverterV1.INSTANCE); String expected = "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" @@ -328,7 +328,7 @@ public void testWithVarchar() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""), + "doris test comment"), DorisTypeConverterV1.INSTANCE); Assertions.assertEquals( @@ -380,7 +380,7 @@ public void testWithThreePrimaryKeys() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""), + "doris test comment"), DorisTypeConverterV1.INSTANCE); Assertions.assertEquals( @@ -394,4 +394,123 @@ public void testWithThreePrimaryKeys() { + " partitioned by `id`,`age`,`name`;", result); } + + @Test + public void testTableComment() { + List columns = new ArrayList<>(); + + columns.add( + PhysicalColumn.of( + "id", + BasicType.LONG_TYPE, + (Long) null, + true, + null, + "This is the ID column")); + columns.add( + PhysicalColumn.of( + "name", + BasicType.STRING_TYPE, + (Long) null, + true, + null, + "This is the name column")); + columns.add( + PhysicalColumn.of( + "age", + BasicType.INT_TYPE, + (Long) null, + true, + null, + "This is the age column")); + columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, "")); + columns.add( + PhysicalColumn.of( + "gender", + BasicType.BYTE_TYPE, + (Long) null, + true, + null, + "This is the gender column")); + columns.add( + PhysicalColumn.of( + "create_time", + BasicType.LONG_TYPE, + (Long) null, + true, + null, + "This is the create_time column")); + + String result = + DorisCatalogUtil.getCreateTableStatement( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n" + + "${rowtype_primary_key},\n" + + "${rowtype_fields}\n" + + ") ENGINE=OLAP\n" + + " UNIQUE KEY (${rowtype_primary_key})\n" + + "COMMENT '${comment}'\n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key})\n" + + " PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ")", + TablePath.of("test1.test2"), + CatalogTable.of( + TableIdentifier.of("test", "test1", "test2"), + TableSchema.builder() + .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) + .constraintKey( + Arrays.asList( + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "name", + ConstraintKey + .ColumnSortType + .DESC))), + ConstraintKey.of( + ConstraintKey.ConstraintType + .UNIQUE_KEY, + "unique_key2", + Collections.singletonList( + ConstraintKey + .ConstraintKeyColumn + .of( + "score", + ConstraintKey + .ColumnSortType + .ASC))))) + .columns(columns) + .build(), + Collections.emptyMap(), + Collections.emptyList(), + "doris test comment"), + DorisTypeConverterV1.INSTANCE); + + Assertions.assertEquals( + result, + "CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n" + + "`id` BIGINT NULL COMMENT 'This is the ID column',`age` INT NULL COMMENT 'This is the age column',\n" + + "`name` STRING NULL COMMENT 'This is the name column',\n" + + "`score` INT NULL ,\n" + + "`gender` TINYINT NULL COMMENT 'This is the gender column',\n" + + "`create_time` BIGINT NULL COMMENT 'This is the create_time column'\n" + + ") ENGINE=OLAP\n" + + " UNIQUE KEY (`id`,`age`)\n" + + "COMMENT 'doris test comment'\n" + + "DISTRIBUTED BY HASH (`id`,`age`)\n" + + " PROPERTIES (\n" + + "\"replication_allocation\" = \"tag.location.default: 1\",\n" + + "\"in_memory\" = \"false\",\n" + + "\"storage_format\" = \"V2\",\n" + + "\"disable_auto_compaction\" = \"false\"\n" + + ")"); + } } diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java index 96d6e90f970..754e30cca01 100644 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java +++ b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/PreviewActionTest.java @@ -108,6 +108,7 @@ public void testDorisPreviewAction() { + "`test` STRING NULL \n" + ") ENGINE=OLAP\n" + " UNIQUE KEY (`id`)\n" + + "COMMENT 'comment'\n" + "DISTRIBUTED BY HASH (`id`)\n" + " PROPERTIES (\n" + "\"replication_allocation\" = \"tag.location.default: 1\",\n" diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java index 8097d95a653..fb87385907d 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCatalogUtil.java @@ -36,6 +36,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.function.Function; import java.util.stream.Collectors; @@ -108,7 +109,10 @@ public static String getCreateTableStatement( .replaceAll( SaveModePlaceHolder.TABLE.getReplacePlaceHolder(), tablePath.getTableName()) .replaceAll( - SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields); + SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields) + .replaceAll( + SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(), + Objects.isNull(catalogTable.getComment()) ? "" : catalogTable.getComment()); } private static String mergeColumnInTemplate( diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java index d3465bd3dba..c1e65be9491 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/main/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/config/MaxcomputeConfig.java @@ -98,7 +98,9 @@ public class MaxcomputeConfig implements Serializable { + "` (\n" + SaveModePlaceHolder.ROWTYPE_FIELDS.getPlaceHolder() + "\n" - + ");") + + ") COMMENT '" + + SaveModePlaceHolder.COMMENT.getPlaceHolder() + + "' ;") .withDescription( "Create table statement template, used to create MaxCompute table"); } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java index efdcd070e3b..5e5a3745bbd 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/MaxComputeCreateTableTest.java @@ -73,7 +73,7 @@ public void test() { + "\"in_memory\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"disable_auto_compaction\" = \"false\"\n" - + ");", + + ") COMMENT '${comment}';", TablePath.of("test1.test2"), CatalogTable.of( TableIdentifier.of("test", "test1", "test2"), @@ -109,7 +109,7 @@ public void test() { .build(), Collections.emptyMap(), Collections.emptyList(), - "")); + "comment")); Assertions.assertEquals( result, "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n" @@ -128,6 +128,6 @@ public void test() { + "\"in_memory\" = \"false\",\n" + "\"storage_format\" = \"V2\",\n" + "\"disable_auto_compaction\" = \"false\"\n" - + ");"); + + ") COMMENT 'comment';"); } } diff --git a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java index d4ba6799ac8..78b55e9de80 100644 --- a/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java +++ b/seatunnel-connectors-v2/connector-maxcompute/src/test/java/org/apache/seatunnel/connectors/seatunnel/maxcompute/catalog/PreviewActionTest.java @@ -90,7 +90,7 @@ public void testDorisPreviewAction() { "CREATE TABLE IF NOT EXISTS `testtable` (\n" + "`id` BIGINT NOT NULL ,\n" + "`test` STRING NULL \n" - + ");", + + ") COMMENT 'comment' ;", Optional.of(CATALOG_TABLE)); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java index ae97cccfa43..cd44b7cf401 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java @@ -209,6 +209,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI tablePath.getDatabaseName(), tablePath.getTableName(), table.getTableSchema(), + table.getComment(), StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key())); } @@ -504,6 +505,7 @@ public PreviewResult previewAction( tablePath.getDatabaseName(), tablePath.getTableName(), catalogTable.get().getTableSchema(), + catalogTable.get().getComment(), StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key())); } else if (actionType == ActionType.DROP_TABLE) { return new SQLPreviewResult( diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java index bb34aaa5d14..a0467d99179 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/StarRocksSinkOptions.java @@ -71,6 +71,9 @@ public interface StarRocksSinkOptions { + " PRIMARY KEY (" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ")\n" + + "COMMENT '" + + SaveModePlaceHolder.COMMENT.getPlaceHolder() + + "'\n" + "DISTRIBUTED BY HASH (" + SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder() + ")" diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index 6057eb97af9..b2f482c2018 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -100,7 +100,7 @@ public TableSink createSink(TableSinkFactoryContext context) { catalogTable.getTableSchema(), catalogTable.getOptions(), catalogTable.getPartitionKeys(), - catalogTable.getCatalogName()); + catalogTable.getComment()); return () -> new StarRocksSink(sinkConfig, finalCatalogTable); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java index 37c06345fd8..51b2b2290bd 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/PreviewActionTest.java @@ -106,6 +106,7 @@ public void testDorisPreviewAction() { + "`test2` STRING NULL \n" + ") ENGINE=OLAP\n" + " PRIMARY KEY (`test`)\n" + + "COMMENT 'comment'\n" + "DISTRIBUTED BY HASH (`test`)PROPERTIES (\n" + " \"replication_num\" = \"1\" \n" + ")", diff --git a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java index c4d06167335..25b06f2806c 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java @@ -111,6 +111,7 @@ public void test() { .ASC))))) .columns(columns) .build(), + "test table", StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); Assertions.assertEquals( "CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n" @@ -143,7 +144,7 @@ public void test() { .build(), Collections.emptyMap(), Collections.emptyList(), - ""); + "test table"); TablePath tablePath = TablePath.of("test1.test2"); String createTemplate = StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.defaultValue(); RuntimeException actualSeaTunnelRuntimeException = @@ -155,6 +156,7 @@ public void test() { tablePath.getDatabaseName(), tablePath.getTableName(), catalogTable.getTableSchema(), + catalogTable.getComment(), StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key())); String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder(); SeaTunnelRuntimeException exceptSeaTunnelRuntimeException = @@ -255,6 +257,7 @@ public void testInSeq() { "", Arrays.asList("L_ORDERKEY", "L_LINENUMBER"))) .columns(columns) .build(), + "test table", StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); String expected = "CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n" @@ -316,6 +319,7 @@ public void testWithVarchar() { .primaryKey(PrimaryKey.of("", Arrays.asList("id", "age"))) .columns(columns) .build(), + "test table", StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); Assertions.assertEquals( @@ -363,6 +367,7 @@ public void testWithThreePrimaryKeys() { PrimaryKey.of("test", Arrays.asList("id", "age", "name"))) .columns(columns) .build(), + "test table", StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); Assertions.assertEquals( @@ -376,4 +381,49 @@ public void testWithThreePrimaryKeys() { + " partitioned by `id`,`age`,`name`;", result); } + + @Test + public void testTableComment() { + List columns = new ArrayList<>(); + + columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, true, null, "")); + columns.add(PhysicalColumn.of("comment", BasicType.STRING_TYPE, 500, true, null, "")); + columns.add(PhysicalColumn.of("description", BasicType.STRING_TYPE, 70000, true, null, "")); + + String result = + StarRocksSaveModeUtil.INSTANCE.getCreateTableSql( + "CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n" + + "${rowtype_primary_key},\n" + + "${rowtype_fields}\n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (${rowtype_primary_key})\n" + + "COMMENT '${comment}'\n" + + "DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")\n", + "test1", + "test2", + TableSchema.builder() + .primaryKey( + PrimaryKey.of("test", Arrays.asList("id", "age", "name"))) + .columns(columns) + .build(), + "test table", + StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key()); + + Assertions.assertEquals( + "CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n" + + "`id` BIGINT NULL ,`age` INT NULL ,`name` STRING NULL ,\n" + + "`comment` VARCHAR(500) NULL ,\n" + + "`description` STRING NULL \n" + + ") ENGINE=OLAP\n" + + " PRIMARY KEY (`id`,`age`,`name`)\n" + + "COMMENT 'test table'\n" + + "DISTRIBUTED BY HASH (`id`,`age`,`name`)PROPERTIES (\n" + + " \"replication_num\" = \"1\" \n" + + ")\n", + result); + } }