From 29931c187ec06f11f1c22a314eda58d85a7ae6ba Mon Sep 17 00:00:00 2001 From: monster <60029759+MonsterChenzhuo@users.noreply.github.com> Date: Tue, 2 Jan 2024 10:21:58 +0800 Subject: [PATCH] [flink] Fix mongo primary key field not objectId type (#2606) --- .../strategy/MongoVersionStrategy.java | 4 +- .../mongodb/MongoDBSyncTableActionITCase.java | 37 ++++++++++++++++++- .../mongodb/table/defaultid/defaultId-1.js | 8 ++-- .../mongodb/table/defaultid/defaultId-2.js | 35 ++++++++++++++++++ 4 files changed, 78 insertions(+), 6 deletions(-) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java index 24d2be85e96f..3908c0e681b2 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mongodb/strategy/MongoVersionStrategy.java @@ -94,7 +94,9 @@ default Map getExtractRow( } JsonNode document = mongodbConfig.getBoolean(DEFAULT_ID_GENERATION) - ? objectNode.set(ID_FIELD, idNode.get(OID_FIELD)) + ? objectNode.set( + ID_FIELD, + idNode.get(OID_FIELD) == null ? idNode : idNode.get(OID_FIELD)) : objectNode; switch (mode) { case SPECIFIED: diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java index d43b4e925389..78ad2f35042d 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncTableActionITCase.java @@ -302,7 +302,7 @@ public void testDefaultId() throws Exception { Map mongodbConfig = getBasicMongoDBConfig(); mongodbConfig.put("database", database); - mongodbConfig.put("collection", "defaultId"); + mongodbConfig.put("collection", "defaultId1"); mongodbConfig.put("default.id.generation", "false"); MongoDBSyncTableAction action = @@ -311,6 +311,41 @@ public void testDefaultId() throws Exception { .build(); runActionWithDefaultEnv(action); + FileStoreTable table = getFileStoreTable(tableName); + List primaryKeys = Collections.singletonList("_id"); + RowType rowType = + RowType.of( + new DataType[] { + DataTypes.STRING().notNull(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING() + }, + new String[] {"_id", "name", "description", "weight"}); + + List expectedInsert = + Arrays.asList( + "+I[{\"$oid\":\"100000000000000000000101\"}, scooter, Small 2-wheel scooter, 3.14]", + "+I[{\"$oid\":\"100000000000000000000102\"}, car battery, 12V car battery, 8.1]", + "+I[{\"$oid\":\"100000000000000000000103\"}, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"); + waitForResult(expectedInsert, table, rowType, primaryKeys); + } + + @Test + @Timeout(60) + public void testPrimaryKeyNotObjectIdType() throws Exception { + writeRecordsToMongoDB("defaultId-2", database, "table/defaultid"); + + Map mongodbConfig = getBasicMongoDBConfig(); + mongodbConfig.put("database", database); + mongodbConfig.put("collection", "defaultId2"); + + MongoDBSyncTableAction action = + syncTableActionBuilder(mongodbConfig) + .withTableConfig(getBasicTableConfig()) + .build(); + runActionWithDefaultEnv(action); + FileStoreTable table = getFileStoreTable(tableName); List primaryKeys = Collections.singletonList("_id"); RowType rowType = diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js index 12e305ec5633..2d9b87b177cf 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-1.js @@ -13,21 +13,21 @@ // See the License for the specific language governing permissions and // limitations under the License. -db.getCollection('defaultId').insertMany([ +db.getCollection('defaultId1').insertMany([ { - "_id": "100000000000000000000101", + "_id": ObjectId("100000000000000000000101"), "name": "scooter", "description": "Small 2-wheel scooter", "weight": 3.14 }, { - "_id": "100000000000000000000102", + "_id": ObjectId("100000000000000000000102"), "name": "car battery", "description": "12V car battery", "weight": 8.1 }, { - "_id": "100000000000000000000103", + "_id": ObjectId("100000000000000000000103"), "name": "12-pack drill bits", "description": "12-pack of drill bits with sizes ranging from #40 to #3", "weight": 0.8 diff --git a/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js new file mode 100644 index 000000000000..c1acf8fe21eb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/resources/mongodb/table/defaultid/defaultId-2.js @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// -- this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +db.getCollection('defaultId2').insertMany([ + { + "_id": "100000000000000000000101", + "name": "scooter", + "description": "Small 2-wheel scooter", + "weight": 3.14 + }, + { + "_id": "100000000000000000000102", + "name": "car battery", + "description": "12V car battery", + "weight": 8.1 + }, + { + "_id": "100000000000000000000103", + "name": "12-pack drill bits", + "description": "12-pack of drill bits with sizes ranging from #40 to #3", + "weight": 0.8 + } +]);