Skip to content

Commit

Permalink
#119: Support predefined consumers for CDC topics
Browse files Browse the repository at this point in the history
Adding an option to specify a list of consumers that
will be created alongside the CDC topic during the initial bootstrap of
the application.

A minor consideration is that TopicService/AlterTopic is not idempotent
and will throw an exception if we specify already existing consumers in
the addConsumer block. To address this behavior, we will perform a
describe operation each time a table is attempted to be created.

These changes have been tested against local YDB
(ydbplatform/local-ydb:latest version).

---------

Co-authored-by: Daniil Zulin <[email protected]>
  • Loading branch information
Eistern and Daniil Zulin authored Jan 21, 2025
1 parent 3e9fdde commit 6aaa1a3
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@
*/
boolean initialScan() default false;

/**
* Initial consumers of the changefeed
*/
Consumer[] consumers() default {};

enum Mode {
/**
* Only the key component of the modified row
Expand Down Expand Up @@ -71,4 +76,22 @@ enum Mode {
enum Format {
JSON
}

@interface Consumer {
String name();

Codec[] codecs() default {};

String readFrom() default "1970-01-01T00:00:00Z";

boolean important() default false;

enum Codec {
RAW,
GZIP,
LZOP,
ZSTD,
CUSTOM
}
}
}
31 changes: 30 additions & 1 deletion databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -245,13 +247,23 @@ private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schem
var retentionPeriod = Duration.parse(changefeed.retentionPeriod());
Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()),
"RetentionPeriod value defined for %s must be positive", getType());
List<Changefeed.Consumer> consumers = Arrays.stream(changefeed.consumers())
.map(consumer -> new Changefeed.Consumer(
consumer.name(),
List.of(consumer.codecs()),
Instant.parse(consumer.readFrom()),
consumer.important()
))
.toList();

return new Changefeed(
changefeed.name(),
changefeed.mode(),
changefeed.format(),
changefeed.virtualTimestamps(),
retentionPeriod,
changefeed.initialScan()
changefeed.initialScan(),
consumers
);
}

Expand Down Expand Up @@ -813,5 +825,22 @@ public static class Changefeed {
Duration retentionPeriod;

boolean initialScan;

@NonNull
List<Consumer> consumers;

@Value
public static class Consumer {
@NonNull
String name;

@NonNull
List<tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec> codecs;

@NonNull
Instant readFrom;

boolean important;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -25,6 +27,7 @@ public void testChangefeedDefaultsEntity() {
assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod()).isEqualTo(Duration.ofHours(24));
assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestamps()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).isInitialScan()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).getConsumers()).isEmpty();
}

@Test
Expand All @@ -37,6 +40,38 @@ public void testConflictingChangefeedNameEntity() {
assertThatThrownBy(() -> schemaOf(ConflictingChangefeedNameEntity.class));
}

@Test
public void testPredefinedConsumersChangefeedEntity() {
var entitySchema = schemaOf(PredefinedConsumersChangefeedEntity.class);

Schema.Changefeed expectedChangefeed = new Schema.Changefeed(
"feed1",
Changefeed.Mode.NEW_IMAGE,
Changefeed.Format.JSON,
false,
Duration.ofHours(24),
false,
List.of(
new Schema.Changefeed.Consumer(
"consumer1",
List.of(),
Instant.EPOCH,
false
),
new Schema.Changefeed.Consumer(
"consumer2",
List.of(Changefeed.Consumer.Codec.RAW),
Instant.parse("2020-01-01T00:00:00Z"),
true
)
)
);

assertThat(entitySchema.getChangefeeds())
.singleElement()
.isEqualTo(expectedChangefeed);
}

private static <T> Schema<T> schemaOf(Class<T> entityType) {
return new TestSchema<>(entityType);
}
Expand Down Expand Up @@ -74,4 +109,19 @@ private static class ConflictingChangefeedNameEntity {
int field1;
int field2;
}

@Value
@Changefeed(name = "feed1", consumers = {
@Changefeed.Consumer(name = "consumer1"),
@Changefeed.Consumer(
name = "consumer2",
readFrom = "2020-01-01T00:00:00Z",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
})
private static class PredefinedConsumersChangefeedEntity {
int field1;
int field2;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,16 @@
format = JSON,
virtualTimestamps = true,
retentionPeriod = "PT1H",
initialScan = false /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
initialScan = false, /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
consumers = {
@Changefeed.Consumer(name = "test-consumer1"),
@Changefeed.Consumer(
name = "test-consumer2",
readFrom = "2025-01-21T08:01:25+00:00",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
}
)
@Changefeed(name = "test-changefeed2")
public class ChangefeedEntity implements Entity<ChangefeedEntity> {
Expand Down
2 changes: 2 additions & 0 deletions repository-ydb-v2/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ java_library(
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
],
)

Expand Down Expand Up @@ -65,5 +66,6 @@ java_test_suite(
"@java_contribs_stable//:tech_ydb_ydb_sdk_core",
"@java_contribs_stable//:tech_ydb_ydb_sdk_scheme",
"@java_contribs_stable//:tech_ydb_ydb_sdk_table",
"@java_contribs_stable//:tech_ydb_ydb_sdk_topic",
],
)
10 changes: 10 additions & 0 deletions repository-ydb-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-table</artifactId>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-topic</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.code.findbugs</groupId>
<artifactId>annotations</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-scheme</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package tech.ydb.yoj.repository.ydb.client;

import com.google.common.collect.Sets;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
Expand All @@ -26,6 +27,11 @@
import tech.ydb.table.settings.PartitioningSettings;
import tech.ydb.table.settings.TtlSettings;
import tech.ydb.table.values.Type;
import tech.ydb.topic.TopicClient;
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec;
import tech.ydb.yoj.databind.schema.Schema;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.exception.CreateTableException;
Expand All @@ -38,11 +44,14 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toMap;
import static java.util.stream.Collectors.toSet;
import static lombok.AccessLevel.PRIVATE;
import static tech.ydb.core.StatusCode.SCHEME_ERROR;
Expand All @@ -53,12 +62,14 @@ public class YdbSchemaOperations {

private final SessionManager sessionManager;
private final SchemeClient schemeClient;
private final TopicClient topicClient;
private String tablespace;

public YdbSchemaOperations(String tablespace, @NonNull SessionManager sessionManager, GrpcTransport transport) {
this.tablespace = YdbPaths.canonicalTablespace(tablespace);
this.sessionManager = sessionManager;
this.schemeClient = SchemeClient.newClient(transport).build();
this.topicClient = TopicClient.newClient(transport).build();
}

public void setTablespace(String tablespace) {
Expand All @@ -81,7 +92,7 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
columns.forEach(c -> {
ValueProtos.Type.PrimitiveTypeId yqlType = YqlPrimitiveType.of(c).getYqlType();
int yqlTypeNumber = yqlType.getNumber();
ValueProtos.Type.PrimitiveTypeId primitiveTypeId = Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
Stream.of(ValueProtos.Type.PrimitiveTypeId.values())
.filter(id -> id.getNumber() == yqlTypeNumber)
.findFirst()
.orElseThrow(() -> new CreateTableException(String.format("Can't create table '%s'%n"
Expand Down Expand Up @@ -149,6 +160,46 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't alter table %s: %s", name, status));
}

if (changefeed.getConsumers().isEmpty()) {
continue;
}

String changeFeedTopicPath = YdbPaths.join(tablespace + name, changefeed.getName());
Result<TopicDescription> result = topicClient.describeTopic(changeFeedTopicPath).join();
if (result.getStatus().getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't describe CDC topic %s: %s", changeFeedTopicPath, result.getStatus()));
}

Set<String> existingConsumerNames = result.getValue().getConsumers().stream()
.map(Consumer::getName)
.collect(toSet());

Map<String, Schema.Changefeed.Consumer> specifiedConsumers = changefeed.getConsumers().stream()
.collect(toMap(Schema.Changefeed.Consumer::getName, Function.identity()));

Set<String> addedConsumers = Sets.difference(specifiedConsumers.keySet(), existingConsumerNames);

AlterTopicSettings.Builder addConsumersRequest = AlterTopicSettings.newBuilder();
for (String addedConsumer : addedConsumers) {
Schema.Changefeed.Consumer consumer = specifiedConsumers.get(addedConsumer);
Consumer.Builder consumerConfiguration = Consumer.newBuilder()
.setName(consumer.getName())
.setImportant(consumer.isImportant())
.setReadFrom(consumer.getReadFrom());

for (Codec consumerCodec : consumer.getCodecs()) {
consumerConfiguration.addSupportedCodec(
tech.ydb.topic.description.Codec.valueOf(consumerCodec.name())
);
}

addConsumersRequest.addAddConsumer(consumerConfiguration.build());
}
status = topicClient.alterTopic(changeFeedTopicPath, addConsumersRequest.build()).join();
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
throw new CreateTableException(String.format("Can't alter CDC topic %s: %s", changeFeedTopicPath, status));
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc;
import tech.ydb.proto.scheme.v1.SchemeServiceGrpc;
import tech.ydb.proto.table.v1.TableServiceGrpc;
import tech.ydb.proto.topic.v1.TopicServiceGrpc;
import tech.ydb.table.Session;
import tech.ydb.table.SessionPoolStats;
import tech.ydb.table.TableClient;
Expand Down Expand Up @@ -151,6 +152,7 @@ private YdbConfig getProxyServerConfig() {
.addService(new ProxyYdbTableService(channel))
.addService(proxyDiscoveryService)
.addService(new DelegateSchemeServiceImplBase(SchemeServiceGrpc.newStub(channel)))
.addService(new DelegateTopicServiceImplBase(TopicServiceGrpc.newStub(channel)))
.build();
proxyServer.start();
Runtime.getRuntime().addShutdownHook(new Thread(proxyServer::shutdown));
Expand Down Expand Up @@ -1023,6 +1025,12 @@ private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.Sch
final SchemeServiceGrpc.SchemeServiceStub schemeServiceStub;
}

@AllArgsConstructor
private static class DelegateTopicServiceImplBase extends TopicServiceGrpc.TopicServiceImplBase {
@Delegate
final TopicServiceGrpc.TopicServiceStub topicServiceStub;
}

private static class ProxyDiscoveryService extends DiscoveryServiceGrpc.DiscoveryServiceImplBase {
@Delegate(excludes = ProxyDiscoveryService.OverriddenMethod.class)
DiscoveryServiceGrpc.DiscoveryServiceStub stub;
Expand Down

0 comments on commit 6aaa1a3

Please sign in to comment.