Skip to content

Commit

Permalink
feat(MM2): allow customizing MM2 offset-syncs/checkpoints/heartbeats …
Browse files Browse the repository at this point in the history
…topic easily (#1095)

Add AutoMQIdentityReplicationPolicy
  • Loading branch information
aaron-ai authored Apr 7, 2024
1 parent 6cb92bf commit 33b00bd
Showing 1 changed file with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package org.apache.kafka.connect.mirror;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* AutoMQIdentityReplicationPolicy is a custom implementation of the ReplicationPolicy interface that allows for the
* configuration of the offset-syncs-topic, checkpoints-topic, and heartbeats-topic via environment variables.
* <p>
* See more details from KIP-690.
*/
public class AutoMQIdentityReplicationPolicy extends IdentityReplicationPolicy {
private static final Logger log = LoggerFactory.getLogger(AutoMQIdentityReplicationPolicy.class);

private static final String OFFSET_SYNC_TOPIC_ENV_KEY = "OFFSET_SYNCS_TOPIC";
private static final String CHECKPOINTS_TOPIC_ENV_KEY = "CHECKPOINTS_TOPIC";
private static final String HEARTBEATS_TOPIC_ENV_KEY = "HEARTBEATS_TOPIC";

@Override
public String offsetSyncsTopic(String clusterAlias) {
String offsetSyncsTopic = System.getenv(OFFSET_SYNC_TOPIC_ENV_KEY);
if (offsetSyncsTopic == null) {
return super.offsetSyncsTopic(clusterAlias);
}
log.info("Using offset syncs topic: {}", offsetSyncsTopic);
return offsetSyncsTopic;
}

@Override
public String checkpointsTopic(String clusterAlias) {
String checkpointsTopic = System.getenv(CHECKPOINTS_TOPIC_ENV_KEY);
if (checkpointsTopic == null) {
return super.checkpointsTopic(clusterAlias);
}
log.info("Using checkpoints topic: {}", checkpointsTopic);
return checkpointsTopic;
}

@Override
public String heartbeatsTopic() {
String heartbeatsTopic = System.getenv(HEARTBEATS_TOPIC_ENV_KEY);
if (heartbeatsTopic == null) {
return super.heartbeatsTopic();
}
log.info("Using heartbeats topic: {}", heartbeatsTopic);
return heartbeatsTopic;
}
}

0 comments on commit 33b00bd

Please sign in to comment.