Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix] [Kafka Source] kafka source use topic as table name instead of fullName #8401

Merged
merged 4 commits into from
Jan 15, 2025

Conversation

Cheun99
Copy link
Contributor

@Cheun99 Cheun99 commented Dec 30, 2024

Purpose of this pull request

resolve #8396

Does this PR introduce any user-facing change?

How was this patch tested?

Check list

@liunaijie liunaijie changed the title [Bug] [Kafka Source] Failed when the consume topic name like a.b.c.d [Fix] [Kafka Source] Failed when the consume topic name like a.b.c.d Dec 30, 2024
@liunaijie liunaijie changed the title [Fix] [Kafka Source] Failed when the consume topic name like a.b.c.d [Fix] [Kafka Source] kafka source use topic as table name instead of fullName Dec 30, 2024
@Cheun99 Cheun99 marked this pull request as ready for review December 30, 2024 08:04
Hisoka-X
Hisoka-X previously approved these changes Jan 2, 2025
Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM if ci passes.

Comment on lines 208 to 212
String topic = readonlyConfig.get(TOPIC).replace(".", "_");
TablePath tablePath = TablePath.of("kafka", topic);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi why not use TablePath.of("kafka", readonlyConfig.get(TOPIC))?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://github.com/apache/seatunnel/blob/dev/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java#L132

If the kafka topic name contains more than 3 points, when this line of code is executed, it will still be separated by points. If the element is greater than 3, IllegalArgumentException will still be thrown

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should fix it. We can save the TablePath directly instead of the corresponding string in SinkFlowLifeCycle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to save the TablePath before, but it involved other files and many of them were modified

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Never mind. Please commit it then we can review it together.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X Hi,pls have a review

@github-actions github-actions bot added core SeaTunnel core module Zeta api and removed e2e kafka labels Jan 9, 2025
Comment on lines 669 to 670
// String tableId =
// action.getConfig().getMultipleRowTableId();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// String tableId =
// action.getConfig().getMultipleRowTableId();

@@ -25,5 +27,6 @@
@NoArgsConstructor
@AllArgsConstructor
public class SinkConfig implements Config {
private String multipleRowTableId;
// private String multipleRowTableId;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// private String multipleRowTableId;

@Hisoka-X
Copy link
Member

Hisoka-X commented Jan 9, 2025

Overall LGTM. Let's waiting to check the ci result.

Copy link
Member

@Hisoka-X Hisoka-X left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But you lost the change of kafka and test case.

@Cheun99
Copy link
Contributor Author

Cheun99 commented Jan 9, 2025

But you lost the change of kafka and test case.

ok, I will delete the comment code you just mentioned and add kafka test case

Copy link
Member

@liunaijie liunaijie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@@ -295,7 +315,7 @@ public void testSourceKafkaJsonFormatErrorHandleWaySkipToConsole(TestContainer c
DEFAULT_FORMAT,
DEFAULT_FIELD_DELIMITER,
null);
generateTestData(serializer::serializeRow, 0, 100);
generateTestData(serializer::serializeRow, 0, 10);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run test cases, it always times out. This method took too much time, so I reduced the size of the mock data from 100 to 10

image

@Hisoka-X Hisoka-X merged commit 3d4f4bb into apache:dev Jan 15, 2025
6 checks passed
litiliu pushed a commit to litiliu/seatunnel that referenced this pull request Jan 16, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] [Kafka Source] Failed when the consume topic name like a.b.c.d
3 participants