Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink 1.18.1 upgrade #48

Merged
merged 47 commits into from
Jan 23, 2025
Merged
Changes from 1 commit
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ecf7d2c
WIP 1.18 upgrade initial commit - added dynamic PSC source and finish…
jeffxiang Nov 26, 2024
034ab2c
Finish sink code changes
jeffxiang Nov 27, 2024
39f067f
Finish source changes; add pause + resume API's to PscConsumer in cor…
jeffxiang Nov 28, 2024
f5c45ae
Finish streaming.connectors.psc (mainly table) code changes
jeffxiang Dec 3, 2024
9e01a18
Finish first round code changes for sink tests
jeffxiang Dec 3, 2024
54a6c4f
Finish code changes for source tests
jeffxiang Dec 4, 2024
78c7b73
WIP finish test code changes for streaming.connectors.psc top level
jeffxiang Dec 4, 2024
d588c71
WIP finish test code changes for PscDynamicTableFactoryTest
jeffxiang Dec 4, 2024
fdecfdc
Finish table tests refactor
jeffxiang Dec 5, 2024
4004bef
Finish test refactor
jeffxiang Dec 5, 2024
16d3155
Compiles
jeffxiang Dec 5, 2024
153d685
All dynamic.source tests pass except DynamicPscSourceITTest specific …
jeffxiang Dec 10, 2024
b50fcbf
WIP fixing basic cluster read dynamic source
jeffxiang Dec 11, 2024
bf4b858
Remove extra debugging logic
jeffxiang Dec 11, 2024
d325fb4
Fixed DynamicPscSourceITTest
jeffxiang Dec 11, 2024
5c95746
Finish fixing dynamic.source tests
jeffxiang Dec 12, 2024
d8d23e8
WIP all sink tests pass except one test checkProducerLeak()
jeffxiang Dec 12, 2024
8b6cea8
Finish source tests
jeffxiang Dec 12, 2024
a789999
WIP finish fixing streaming.connectors.psc internals, shuffle, table …
jeffxiang Dec 12, 2024
9f95a6b
WIP fixing PscSerializerUpgradeTest
jeffxiang Dec 12, 2024
bf362ae
Generated test files for migration/snapshot
jeffxiang Dec 12, 2024
18ced74
Everything should pass except testBrokerFailure
jeffxiang Dec 12, 2024
7406f35
Fix testIteratorFor and testTransactionalProducerWithKafkaBackendCann…
jeffxiang Dec 13, 2024
e30306f
Force close pscProducer in testTransactionalProducerWithKafkaBackendC…
jeffxiang Dec 13, 2024
3bd478d
Update log4j settings to prevent flood of build logs
jeffxiang Dec 13, 2024
c7aa188
Add repository io.confluent for kafka-schema-registry-client; change …
jeffxiang Dec 13, 2024
70d83e2
Add thread.sleep(5s) in FlinkPscProducerITCase.testFlinkPscProducerFa…
jeffxiang Dec 13, 2024
30a5b22
Wrap metrics() in sychronized block to try and prevent CME
jeffxiang Dec 13, 2024
7195742
Split build to parallelize
jeffxiang Dec 13, 2024
4c5f16b
Rename build workflow
jeffxiang Dec 13, 2024
ddc8172
Attempt to allow for Flink-Kafka checkpoint recovery
jeffxiang Jan 10, 2025
defa065
Remove check to set clusterUri only once
jeffxiang Jan 10, 2025
c78756d
Create PscMetadataClient config converter and related logic
jeffxiang Jan 10, 2025
3b00c3d
Update PscSubscriberTest to fix after secure protocol is allowable in…
jeffxiang Jan 11, 2025
7d20d6c
Add logs to debug double consumption during checkpoint recovery from …
jeffxiang Jan 13, 2025
654cdf8
Add more debug logs
jeffxiang Jan 13, 2025
bcc57d5
Implement versioning for PSC state serializers so that we can recover…
jeffxiang Jan 13, 2025
39d5dbb
Add back PscSimpleTypeSerializerSnapshot to try and fix checkpoint re…
jeffxiang Jan 16, 2025
84281db
Revert "Add back PscSimpleTypeSerializerSnapshot to try and fix check…
jeffxiang Jan 16, 2025
06a836c
Update PscTopicUriPartitionSplitSerializerTest
jeffxiang Jan 16, 2025
2386806
Add debug logs to investigate partition discovery
jeffxiang Jan 17, 2025
f2c41e4
Remove unnecessary logs
jeffxiang Jan 20, 2025
c91994f
Dedupe between PscConfigurationInternal and PscConfiguration
jeffxiang Jan 21, 2025
7ae3cd0
Fix missing import
jeffxiang Jan 21, 2025
e143f9c
Rearrange logs to prevent noise
jeffxiang Jan 21, 2025
a46aa87
Remove noisy logs
jeffxiang Jan 22, 2025
a8b4bbc
Address comments
jeffxiang Jan 23, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Finish first round code changes for sink tests
jeffxiang committed Dec 3, 2024
commit 9e01a18e71bf371002fc2b1fcc4f97a64ac4282e
8 changes: 8 additions & 0 deletions psc-flink/pom.xml
Original file line number Diff line number Diff line change
@@ -189,6 +189,14 @@
</exclusions>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<!-- force using the latest zkclient -->
<dependency>
<groupId>com.101tec</groupId>
Original file line number Diff line number Diff line change
@@ -38,11 +38,11 @@ public class ClusterMetadata implements Serializable {
/**
* Constructs the {@link ClusterMetadata} with the required properties.
*
* @param topics the topics belonging to a cluster.
* @param topicUris the topics belonging to a cluster.
* @param properties the properties to access a cluster.
*/
public ClusterMetadata(Set<String> topics, Properties properties) {
this.topicUris = topics;
public ClusterMetadata(Set<String> topicUris, Properties properties) {
this.topicUris = topicUris;
this.properties = properties;
}

Original file line number Diff line number Diff line change
@@ -177,16 +177,16 @@ public DynamicPscSourceBuilder<T> setProperty(String key, String value) {
return this;
}

// /**
// * Set the property for {@link CommonClientConfigs#GROUP_ID_CONFIG}. This will be applied to all
// * clusters.
// *
// * @param groupId the group id.
// * @return the builder.
// */
// public DynamicPscSourceBuilder<T> setGroupId(String groupId) {
// return setProperty(CommonClientConfigs.GROUP_ID_CONFIG, groupId);
// }
/**
* Set the property for {@link PscConfiguration#PSC_CONSUMER_GROUP_ID}. This will be applied to all
* clusters.
*
* @param groupId the group id.
* @return the builder.
*/
public DynamicPscSourceBuilder<T> setGroupId(String groupId) {
return setProperty(PscConfiguration.PSC_CONSUMER_GROUP_ID, groupId);
}

/**
* Set the client id prefix. This applies {@link PscSourceOptions#CLIENT_ID_PREFIX} to all
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ public class DynamicPscSourceEnumerator
private final OffsetsInitializer startingOffsetsInitializer;
private final OffsetsInitializer stoppingOffsetInitializer;
private final Boundedness boundedness;
private final StoppablePscEnumContextProxy.StoppableKafkaEnumContextProxyFactory
private final StoppablePscEnumContextProxy.StoppablePscEnumContextProxyFactory
stoppablePscEnumContextProxyFactory;

// options
@@ -114,7 +114,7 @@ public DynamicPscSourceEnumerator(
properties,
boundedness,
dynamicPscSourceEnumState,
StoppablePscEnumContextProxy.StoppableKafkaEnumContextProxyFactory
StoppablePscEnumContextProxy.StoppablePscEnumContextProxyFactory
.getDefaultFactory());
}

@@ -128,7 +128,7 @@ public DynamicPscSourceEnumerator(
Properties properties,
Boundedness boundedness,
DynamicPscSourceEnumState dynamicPscSourceEnumState,
StoppablePscEnumContextProxy.StoppableKafkaEnumContextProxyFactory
StoppablePscEnumContextProxy.StoppablePscEnumContextProxyFactory
stoppablePscEnumContextProxyFactory) {
this.pscStreamSubscriber = pscStreamSubscriber;
this.boundedness = boundedness;
@@ -459,7 +459,7 @@ public void addSplitsBack(List<DynamicPscSourceSplit> splits, int subtaskId) {
ArrayListMultimap<String, PscTopicUriPartitionSplit> kafkaPartitionSplits =
ArrayListMultimap.create();
for (DynamicPscSourceSplit split : splits) {
kafkaPartitionSplits.put(split.getClusterId(), split.getPscTopicUriPartitionSplit());
kafkaPartitionSplits.put(split.getPubSubClusterId(), split.getPscTopicUriPartitionSplit());
}

// add splits back and assign pending splits for all enumerators
Original file line number Diff line number Diff line change
@@ -290,15 +290,15 @@ public String getMessage() {
* periodic discovery loops on demand.
*/
@Internal
public interface StoppableKafkaEnumContextProxyFactory {
public interface StoppablePscEnumContextProxyFactory {

StoppablePscEnumContextProxy create(
SplitEnumeratorContext<DynamicPscSourceSplit> enumContext,
String kafkaClusterId,
PscMetadataService kafkaMetadataService,
Runnable signalNoMoreSplitsCallback);

static StoppableKafkaEnumContextProxyFactory getDefaultFactory() {
static StoppablePscEnumContextProxyFactory getDefaultFactory() {
return (enumContext,
kafkaClusterId,
kafkaMetadataService,
Original file line number Diff line number Diff line change
@@ -189,7 +189,7 @@ public void addSplits(List<DynamicPscSourceSplit> splits) {
ArrayListMultimap<String, PscTopicUriPartitionSplit> clusterSplitsMap =
ArrayListMultimap.create();
for (DynamicPscSourceSplit split : splits) {
clusterSplitsMap.put(split.getClusterId(), split);
clusterSplitsMap.put(split.getPubSubClusterId(), split);
}

Set<String> clusterIds = clusterSplitsMap.keySet();
@@ -263,15 +263,15 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
// the data structures above
for (DynamicPscSourceSplit split : currentSplitState) {
currentMetadataFromState
.computeIfAbsent(split.getClusterId(), (ignore) -> new HashSet<>())
.computeIfAbsent(split.getPubSubClusterId(), (ignore) -> new HashSet<>())
.add(split.getPscTopicUriPartitionSplit().getTopicUri());
// check if cluster topic exists in the metadata update
if (newClustersAndTopicUris.containsKey(split.getClusterId())
if (newClustersAndTopicUris.containsKey(split.getPubSubClusterId())
&& newClustersAndTopicUris
.get(split.getClusterId())
.get(split.getPubSubClusterId())
.contains(split.getPscTopicUriPartitionSplit().getTopicUri())) {
filteredNewClusterSplitStateMap
.computeIfAbsent(split.getClusterId(), (ignore) -> new ArrayList<>())
.computeIfAbsent(split.getPubSubClusterId(), (ignore) -> new ArrayList<>())
.add(split);
} else {
logger.info("Skipping outdated split due to metadata changes: {}", split);
@@ -346,8 +346,8 @@ public void handleSourceEvents(SourceEvent sourceEvent) {

private static boolean isSplitForActiveClusters(
DynamicPscSourceSplit split, Map<String, Set<String>> metadata) {
return metadata.containsKey(split.getClusterId())
&& metadata.get(split.getClusterId())
return metadata.containsKey(split.getPubSubClusterId())
&& metadata.get(split.getPubSubClusterId())
.contains(split.getPscTopicUriPartitionSplit().getTopicUri());
}

Original file line number Diff line number Diff line change
@@ -45,7 +45,7 @@ public String splitId() {
return clusterId + "-" + pscTopicUriPartitionSplit.splitId();
}

public String getClusterId() {
public String getPubSubClusterId() {
return clusterId;
}

Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ public int getVersion() {
public byte[] serialize(DynamicPscSourceSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(split.getClusterId());
out.writeUTF(split.getPubSubClusterId());
out.writeInt(pscTopicUriPartitionSplitSerializer.getVersion());
out.write(pscTopicUriPartitionSplitSerializer.serialize(split.getPscTopicUriPartitionSplit()));
out.flush();
Loading