Skip to content

Commit

Permalink
resolve consume topic name like a.b.c.d fail
Browse files Browse the repository at this point in the history
  • Loading branch information
Cheun99 committed Dec 30, 2024
1 parent 37612d9 commit 817c0e6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.api.table.catalog;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class TablePathTest {

@Test
void ofFullName() {
TablePath tablePath = TablePath.of("db1.db2.table1");
Assertions.assertEquals("db1", tablePath.getDatabaseName());
Assertions.assertEquals("db2", tablePath.getSchemaName());
Assertions.assertEquals("table1", tablePath.getTableName());
Assertions.assertThrows(
IllegalArgumentException.class, () -> TablePath.of("db1.db2.table1.view1"));
TablePath kafkaTable = TablePath.of("kafka", "db1.db2.table1.view1");
Assertions.assertEquals("kafka", kafkaTable.getDatabaseName());
Assertions.assertNull(kafkaTable.getSchemaName());
Assertions.assertEquals("db1.db2.table1.view1", kafkaTable.getTableName());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private ConsumerMetadata createConsumerMetadata(ReadonlyConfig readonlyConfig) {
private CatalogTable createCatalogTable(ReadonlyConfig readonlyConfig) {
Optional<Map<String, Object>> schemaOptions =
readonlyConfig.getOptional(TableSchemaOptions.SCHEMA);
TablePath tablePath = TablePath.of(readonlyConfig.get(TOPIC));
TablePath tablePath = TablePath.of("kafka", readonlyConfig.get(TOPIC));
TableSchema tableSchema;
if (schemaOptions.isPresent()) {
tableSchema = new ReadonlyConfigParser().parse(readonlyConfig);
Expand Down

0 comments on commit 817c0e6

Please sign in to comment.