Skip to content

Commit

Permalink
Fix primary key constraint for nullable keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 8, 2024
1 parent c3fa16a commit ccd0cda
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ public SqlNode visit(SqlCall call) {
* Implements a CREATE TABLE...WITH... DDL statement.
*
* N.B. the following magic:
* - field 'KEY' is treated as a PRIMARY KEY
* - non-nullable field 'KEY' is treated as a PRIMARY KEY
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand All @@ -192,7 +192,8 @@ public void implement(SqlWriter w) {
(new CompoundIdentifierImplementor(database, name)).implement(w);
SqlWriter.Frame frame1 = w.startList("(", ")");
(new RowTypeSpecImplementor(rowType)).implement(w);
if (rowType.getField("KEY", true, false) != null) {
RelDataTypeField key = rowType.getField("KEY", true, false);
if (key != null && key.getType().isNullable() == false) {
w.sep(",");
w.literal("PRIMARY KEY (KEY) NOT ENFORCED");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ public Schema create(SchemaPlus parentSchema, String name, Map<String, Object> o
.with("KEY", DataType.VARCHAR);
ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig)
.withPrefix("properties.")
.with("connector", "upsert-kafka")
.with("connector", "kafka")
.with("key.format", "csv")
.with("key.fields", "KEY")
.with("value.format", "csv")
.with("value.fields-include", "EXCEPT_KEY")
.with("topic", x -> x);
Expand Down

0 comments on commit ccd0cda

Please sign in to comment.