Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix][Connector-V2] Fixed adding table comments #8514

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/en/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ The placeholders are mainly controlled by the following expressions:
- Used to get the table unique-key fields in the upstream catalog table
- `${field_names}`
- Used to get the table field keys in the upstream catalog table
- `${comment}`
- Used to get the table comment in the upstream catalog table

## Configuration

Expand Down
7 changes: 5 additions & 2 deletions docs/en/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

If custom fields are added to the template, for example, adding an `id` field:
Expand All @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

The connector will automatically retrieve the corresponding types from the upstream source and fill in the template, removing the `id` field from the `rowtype_fields`. This method can be used to modify custom field types and attributes.
Expand All @@ -121,6 +123,7 @@ The following placeholders can be used:
- `rowtype_fields`: Retrieves all fields from the upstream schema and automatically maps them to Clickhouse field descriptions.
- `rowtype_primary_key`: Retrieves the primary key from the upstream schema (this may be a list).
- `rowtype_unique_key`: Retrieves the unique key from the upstream schema (this may be a list).
- `comment`: Retrieves the table comment from the upstream schema.

## How to Create a Clickhouse Data Synchronization Jobs

Expand Down
3 changes: 3 additions & 0 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand All @@ -110,6 +111,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
id,
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
Expand All @@ -129,6 +131,7 @@ You can use the following placeholders
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- rowtype_duplicate_key: Used to get the duplicate key in the upstream schema (only for doris source, maybe a list)
- comment: Used to get the table comment in the upstream schema

## Data Type Mapping

Expand Down
5 changes: 3 additions & 2 deletions docs/en/connector-v2/sink/Maxcompute.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ Default template:
```sql
CREATE TABLE IF NOT EXISTS `${table}` (
${rowtype_fields}
);
) COMMENT '${comment}';
```

If a custom field is filled in the template, such as adding an `id` field
Expand All @@ -72,7 +72,7 @@ CREATE TABLE IF NOT EXISTS `${table}`
(
id,
${rowtype_fields}
);
) COMMENT '${comment}';
```

The connector will automatically obtain the corresponding type from the upstream to complete the filling,
Expand All @@ -86,6 +86,7 @@ You can use the following placeholders
description of MaxCompute
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- comment: Used to get the table comment in the upstream schema

### schema_save_mode[Enum]

Expand Down
6 changes: 5 additions & 1 deletion docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)
Expand All @@ -79,7 +80,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(
id,
${rowtype_fields}
) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
Expand All @@ -97,6 +100,7 @@ You can use the following placeholders
description of StarRocks
- rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
- rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
- comment: Used to get the table comment in the upstream schema

### table [string]

Expand Down
2 changes: 2 additions & 0 deletions docs/zh/concept/sink-options-placeholders.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占
- 用于获取上游表中的唯一键字段名称列表
- `${field_names}`
- 用于获取上游表中的所有字段名称列表
- `${comment}`
- 用于获取上游表中的表注释

## 配置

Expand Down
7 changes: 5 additions & 2 deletions docs/zh/connector-v2/sink/Clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

如果模板中填写了自定义字段,例如添加 id 字段
Expand All @@ -109,7 +110,8 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
ORDER BY (${rowtype_primary_key})
PRIMARY KEY (${rowtype_primary_key})
SETTINGS
index_granularity = 8192;
index_granularity = 8192
COMMENT '${comment}';
```

连接器会自动从上游获取对应类型完成填充,
Expand All @@ -122,6 +124,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到 Clickhouse 的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
- comment:用于获取上游模式中的表注释。

## 如何创建一个clickhouse 同步任务

Expand Down
5 changes: 4 additions & 1 deletion docs/zh/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
Expand All @@ -109,6 +110,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
id,
${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
Expand All @@ -124,8 +126,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
- database:用于获取上游schema中的数据库。
- table_name:用于获取上游schema中的表名。
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
- comment:用于获取上游模式中的表注释。

## 数据类型映射

Expand Down
6 changes: 5 additions & 1 deletion docs/zh/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ ${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)
Expand All @@ -76,7 +77,9 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(
id,
${rowtype_fields}
) ENGINE = OLAP DISTRIBUTED BY HASH (${rowtype_primary_key})
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
Expand All @@ -92,6 +95,7 @@ StarRocks数据接收器根据上游数据自动获取相应的信息来填充
- rowtype_fields: 上游数据模式的所有字段信息,连接器会将字段信息自动映射到StarRocks对应的类型
- rowtype_primary_key: 上游数据模式的主键信息,结果可能是列表
- rowtype_unique_key: 上游数据模式的唯一键信息,结果可能是列表
- comment: 上游数据模式的注释信息

### table [string]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public enum SaveModePlaceHolder {
ROWTYPE_FIELDS("rowtype_fields", "fields"),
TABLE("table", "table"),
DATABASE("database", "database"),
COMMENT("comment", "comment"),
/** @deprecated instead by {@link #TABLE} todo remove this enum */
@Deprecated
TABLE_NAME("table_name", "table name");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
tablePath.getDatabaseName(),
tablePath.getTableName(),
template,
table.getComment(),
table.getTableSchema());
}

Expand Down Expand Up @@ -252,6 +253,7 @@ public PreviewResult previewAction(
tablePath.getDatabaseName(),
tablePath.getTableName(),
catalogTable.get().getTableSchema(),
catalogTable.get().getComment(),
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));
} else if (actionType == ActionType.DROP_TABLE) {
return new SQLPreviewResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,11 @@ public class ClickhouseConfig {
+ SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder()
+ ")\n"
+ "SETTINGS\n"
+ " index_granularity = 8192;")
+ " index_granularity = 8192"
+ "\n"
+ "COMMENT '"
+ SaveModePlaceHolder.COMMENT.getPlaceHolder()
+ "';")
corgy-w marked this conversation as resolved.
Show resolved Hide resolved
.withDescription(
"Create table statement template, used to create Clickhouse table");
}
Original file line number Diff line number Diff line change
Expand Up @@ -351,13 +351,18 @@ public void executeSql(String sql) {
}

public void createTable(
String database, String table, String template, TableSchema tableSchema) {
String database,
String table,
String template,
String comment,
TableSchema tableSchema) {
String createTableSql =
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
template,
database,
table,
tableSchema,
comment,
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
log.debug("Create Clickhouse table sql: {}", createTableSql);
executeSql(createTableSql);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public void test() {
.ASC)))))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
createTableSql,
Expand Down Expand Up @@ -129,6 +130,7 @@ public void test() {
"test1",
"test2",
tableSchema,
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key()));

String primaryKeyHolder = SaveModePlaceHolder.ROWTYPE_PRIMARY_KEY.getPlaceHolder();
Expand Down Expand Up @@ -224,6 +226,7 @@ public void testInSeq() {
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
Expand All @@ -249,4 +252,110 @@ public void testInSeq() {
+ " index_granularity = 8192;";
Assertions.assertEquals(result, expected);
}

@Test
public void testTableComment() {
List<Column> columns = new ArrayList<>();

columns.add(
PhysicalColumn.of("L_ORDERKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_PARTKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_SUPPKEY", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_LINENUMBER", BasicType.INT_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_QUANTITY", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_EXTENDEDPRICE", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_DISCOUNT", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of("L_TAX", new DecimalType(15, 2), (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_RETURNFLAG", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_LINESTATUS", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_SHIPDATE", LocalTimeType.LOCAL_DATE_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_COMMITDATE",
LocalTimeType.LOCAL_DATE_TYPE,
(Long) null,
false,
null,
""));
columns.add(
PhysicalColumn.of(
"L_RECEIPTDATE",
LocalTimeType.LOCAL_DATE_TYPE,
(Long) null,
false,
null,
""));
columns.add(
PhysicalColumn.of(
"L_SHIPINSTRUCT", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_SHIPMODE", BasicType.STRING_TYPE, (Long) null, false, null, ""));
columns.add(
PhysicalColumn.of(
"L_COMMENT", BasicType.STRING_TYPE, (Long) null, false, null, ""));

String result =
ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (\n"
+ "${rowtype_primary_key},\n"
+ "${rowtype_fields}\n"
+ ") ENGINE = MergeTree()\n"
+ "ORDER BY (${rowtype_primary_key})\n"
+ "PRIMARY KEY (${rowtype_primary_key})\n"
+ "SETTINGS\n"
+ " index_granularity = 8192\n"
+ "COMMENT '${comment}';",
"tpch",
"lineitem",
TableSchema.builder()
.primaryKey(
PrimaryKey.of(
"", Arrays.asList("L_ORDERKEY", "L_LINENUMBER")))
.columns(columns)
.build(),
"clickhouse test table",
ClickhouseConfig.SAVE_MODE_CREATE_TEMPLATE.key());
String expected =
"CREATE TABLE IF NOT EXISTS `tpch`.`lineitem` (\n"
+ "`L_ORDERKEY` Int32 ,`L_LINENUMBER` Int32 ,\n"
+ "`L_PARTKEY` Int32 ,\n"
+ "`L_SUPPKEY` Int32 ,\n"
+ "`L_QUANTITY` Decimal(15, 2) ,\n"
+ "`L_EXTENDEDPRICE` Decimal(15, 2) ,\n"
+ "`L_DISCOUNT` Decimal(15, 2) ,\n"
+ "`L_TAX` Decimal(15, 2) ,\n"
+ "`L_RETURNFLAG` String ,\n"
+ "`L_LINESTATUS` String ,\n"
+ "`L_SHIPDATE` Date ,\n"
+ "`L_COMMITDATE` Date ,\n"
+ "`L_RECEIPTDATE` Date ,\n"
+ "`L_SHIPINSTRUCT` String ,\n"
+ "`L_SHIPMODE` String ,\n"
+ "`L_COMMENT` String \n"
+ ") ENGINE = MergeTree()\n"
+ "ORDER BY (`L_ORDERKEY`,`L_LINENUMBER`)\n"
+ "PRIMARY KEY (`L_ORDERKEY`,`L_LINENUMBER`)\n"
+ "SETTINGS\n"
+ " index_granularity = 8192\n"
+ "COMMENT 'clickhouse test table';";
Assertions.assertEquals(result, expected);
}
}
Loading
Loading