Skip to content

Commit

Permalink
[flink] Fix mongo primary key field not objectId type (apache#2606)
Browse files Browse the repository at this point in the history
  • Loading branch information
MonsterChenzhuo authored Jan 2, 2024
1 parent 95d8006 commit 29931c1
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ default Map<String, String> getExtractRow(
}
JsonNode document =
mongodbConfig.getBoolean(DEFAULT_ID_GENERATION)
? objectNode.set(ID_FIELD, idNode.get(OID_FIELD))
? objectNode.set(
ID_FIELD,
idNode.get(OID_FIELD) == null ? idNode : idNode.get(OID_FIELD))
: objectNode;
switch (mode) {
case SPECIFIED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void testDefaultId() throws Exception {

Map<String, String> mongodbConfig = getBasicMongoDBConfig();
mongodbConfig.put("database", database);
mongodbConfig.put("collection", "defaultId");
mongodbConfig.put("collection", "defaultId1");
mongodbConfig.put("default.id.generation", "false");

MongoDBSyncTableAction action =
Expand All @@ -311,6 +311,41 @@ public void testDefaultId() throws Exception {
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);
List<String> primaryKeys = Collections.singletonList("_id");
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"_id", "name", "description", "weight"});

List<String> expectedInsert =
Arrays.asList(
"+I[{\"$oid\":\"100000000000000000000101\"}, scooter, Small 2-wheel scooter, 3.14]",
"+I[{\"$oid\":\"100000000000000000000102\"}, car battery, 12V car battery, 8.1]",
"+I[{\"$oid\":\"100000000000000000000103\"}, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]");
waitForResult(expectedInsert, table, rowType, primaryKeys);
}

@Test
@Timeout(60)
public void testPrimaryKeyNotObjectIdType() throws Exception {
writeRecordsToMongoDB("defaultId-2", database, "table/defaultid");

Map<String, String> mongodbConfig = getBasicMongoDBConfig();
mongodbConfig.put("database", database);
mongodbConfig.put("collection", "defaultId2");

MongoDBSyncTableAction action =
syncTableActionBuilder(mongodbConfig)
.withTableConfig(getBasicTableConfig())
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable(tableName);
List<String> primaryKeys = Collections.singletonList("_id");
RowType rowType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@
// See the License for the specific language governing permissions and
// limitations under the License.

db.getCollection('defaultId').insertMany([
db.getCollection('defaultId1').insertMany([
{
"_id": "100000000000000000000101",
"_id": ObjectId("100000000000000000000101"),
"name": "scooter",
"description": "Small 2-wheel scooter",
"weight": 3.14
},
{
"_id": "100000000000000000000102",
"_id": ObjectId("100000000000000000000102"),
"name": "car battery",
"description": "12V car battery",
"weight": 8.1
},
{
"_id": "100000000000000000000103",
"_id": ObjectId("100000000000000000000103"),
"name": "12-pack drill bits",
"description": "12-pack of drill bits with sizes ranging from #40 to #3",
"weight": 0.8
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// -- this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

db.getCollection('defaultId2').insertMany([
{
"_id": "100000000000000000000101",
"name": "scooter",
"description": "Small 2-wheel scooter",
"weight": 3.14
},
{
"_id": "100000000000000000000102",
"name": "car battery",
"description": "12V car battery",
"weight": 8.1
},
{
"_id": "100000000000000000000103",
"name": "12-pack drill bits",
"description": "12-pack of drill bits with sizes ranging from #40 to #3",
"weight": 0.8
}
]);

0 comments on commit 29931c1

Please sign in to comment.