diff --git a/databind/src/main/java/tech/ydb/yoj/databind/schema/Changefeed.java b/databind/src/main/java/tech/ydb/yoj/databind/schema/Changefeed.java index ef086ba2..602f2881 100644 --- a/databind/src/main/java/tech/ydb/yoj/databind/schema/Changefeed.java +++ b/databind/src/main/java/tech/ydb/yoj/databind/schema/Changefeed.java @@ -41,6 +41,11 @@ */ boolean initialScan() default false; + /** + * Initial consumers of the changefeed + */ + Consumer[] consumers() default {}; + enum Mode { /** * Only the key component of the modified row @@ -71,4 +76,22 @@ enum Mode { enum Format { JSON } + + @interface Consumer { + String name(); + + Codec[] codecs() default {}; + + String readFrom() default "1970-01-01T00:00:00Z"; + + boolean important() default false; + + enum Codec { + RAW, + GZIP, + LZOP, + ZSTD, + CUSTOM + } + } } diff --git a/databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java b/databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java index 48d47689..df97aa05 100644 --- a/databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java +++ b/databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java @@ -23,7 +23,9 @@ import java.lang.reflect.Constructor; import java.lang.reflect.Type; import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.LinkedHashMap; @@ -245,13 +247,23 @@ private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schem var retentionPeriod = Duration.parse(changefeed.retentionPeriod()); Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()), "RetentionPeriod value defined for %s must be positive", getType()); + List consumers = Arrays.stream(changefeed.consumers()) + .map(consumer -> new Changefeed.Consumer( + consumer.name(), + List.of(consumer.codecs()), + Instant.parse(consumer.readFrom()), + consumer.important() + )) + .toList(); + return new Changefeed( changefeed.name(), changefeed.mode(), changefeed.format(), changefeed.virtualTimestamps(), retentionPeriod, - changefeed.initialScan() + changefeed.initialScan(), + consumers ); } @@ -813,5 +825,22 @@ public static class Changefeed { Duration retentionPeriod; boolean initialScan; + + @NonNull + List consumers; + + @Value + public static class Consumer { + @NonNull + String name; + + @NonNull + List codecs; + + @NonNull + Instant readFrom; + + boolean important; + } } } diff --git a/databind/src/test/java/tech/ydb/yoj/databind/schema/ChangefeedSchemaTest.java b/databind/src/test/java/tech/ydb/yoj/databind/schema/ChangefeedSchemaTest.java index 664d31c7..2b3615c8 100644 --- a/databind/src/test/java/tech/ydb/yoj/databind/schema/ChangefeedSchemaTest.java +++ b/databind/src/test/java/tech/ydb/yoj/databind/schema/ChangefeedSchemaTest.java @@ -4,6 +4,8 @@ import org.junit.Test; import java.time.Duration; +import java.time.Instant; +import java.util.List; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -25,6 +27,7 @@ public void testChangefeedDefaultsEntity() { assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod()).isEqualTo(Duration.ofHours(24)); assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestamps()).isFalse(); assertThat(entitySchema.getChangefeeds().get(0).isInitialScan()).isFalse(); + assertThat(entitySchema.getChangefeeds().get(0).getConsumers()).isEmpty(); } @Test @@ -37,6 +40,38 @@ public void testConflictingChangefeedNameEntity() { assertThatThrownBy(() -> schemaOf(ConflictingChangefeedNameEntity.class)); } + @Test + public void testPredefinedConsumersChangefeedEntity() { + var entitySchema = schemaOf(PredefinedConsumersChangefeedEntity.class); + + Schema.Changefeed expectedChangefeed = new Schema.Changefeed( + "feed1", + Changefeed.Mode.NEW_IMAGE, + Changefeed.Format.JSON, + false, + Duration.ofHours(24), + false, + List.of( + new Schema.Changefeed.Consumer( + "consumer1", + List.of(), + Instant.EPOCH, + false + ), + new Schema.Changefeed.Consumer( + "consumer2", + List.of(Changefeed.Consumer.Codec.RAW), + Instant.parse("2020-01-01T00:00:00Z"), + true + ) + ) + ); + + assertThat(entitySchema.getChangefeeds()) + .singleElement() + .isEqualTo(expectedChangefeed); + } + private static Schema schemaOf(Class entityType) { return new TestSchema<>(entityType); } @@ -74,4 +109,19 @@ private static class ConflictingChangefeedNameEntity { int field1; int field2; } + + @Value + @Changefeed(name = "feed1", consumers = { + @Changefeed.Consumer(name = "consumer1"), + @Changefeed.Consumer( + name = "consumer2", + readFrom = "2020-01-01T00:00:00Z", + codecs = {Changefeed.Consumer.Codec.RAW}, + important = true + ) + }) + private static class PredefinedConsumersChangefeedEntity { + int field1; + int field2; + } } diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/ChangefeedEntity.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/ChangefeedEntity.java index 037fd5ae..505f2fbb 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/ChangefeedEntity.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/ChangefeedEntity.java @@ -16,7 +16,16 @@ format = JSON, virtualTimestamps = true, retentionPeriod = "PT1H", - initialScan = false /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */ + initialScan = false, /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */ + consumers = { + @Changefeed.Consumer(name = "test-consumer1"), + @Changefeed.Consumer( + name = "test-consumer2", + readFrom = "2025-01-21T08:01:25+00:00", + codecs = {Changefeed.Consumer.Codec.RAW}, + important = true + ) + } ) @Changefeed(name = "test-changefeed2") public class ChangefeedEntity implements Entity { diff --git a/repository-ydb-v2/BUILD b/repository-ydb-v2/BUILD index 75cbc754..790532c5 100644 --- a/repository-ydb-v2/BUILD +++ b/repository-ydb-v2/BUILD @@ -25,6 +25,7 @@ java_library( "@java_contribs_stable//:tech_ydb_ydb_sdk_core", "@java_contribs_stable//:tech_ydb_ydb_sdk_scheme", "@java_contribs_stable//:tech_ydb_ydb_sdk_table", + "@java_contribs_stable//:tech_ydb_ydb_sdk_topic", ], ) @@ -65,5 +66,6 @@ java_test_suite( "@java_contribs_stable//:tech_ydb_ydb_sdk_core", "@java_contribs_stable//:tech_ydb_ydb_sdk_scheme", "@java_contribs_stable//:tech_ydb_ydb_sdk_table", + "@java_contribs_stable//:tech_ydb_ydb_sdk_topic", ], ) diff --git a/repository-ydb-v2/pom.xml b/repository-ydb-v2/pom.xml index d0dcdbf8..0ac7452e 100644 --- a/repository-ydb-v2/pom.xml +++ b/repository-ydb-v2/pom.xml @@ -38,6 +38,16 @@ tech.ydb ydb-sdk-table + + tech.ydb + ydb-sdk-topic + + + com.google.code.findbugs + annotations + + + tech.ydb ydb-sdk-scheme diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSchemaOperations.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSchemaOperations.java index 2b647321..267b8843 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSchemaOperations.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/client/YdbSchemaOperations.java @@ -1,5 +1,6 @@ package tech.ydb.yoj.repository.ydb.client; +import com.google.common.collect.Sets; import lombok.Getter; import lombok.NonNull; import lombok.RequiredArgsConstructor; @@ -26,6 +27,11 @@ import tech.ydb.table.settings.PartitioningSettings; import tech.ydb.table.settings.TtlSettings; import tech.ydb.table.values.Type; +import tech.ydb.topic.TopicClient; +import tech.ydb.topic.description.Consumer; +import tech.ydb.topic.description.TopicDescription; +import tech.ydb.topic.settings.AlterTopicSettings; +import tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec; import tech.ydb.yoj.databind.schema.Schema; import tech.ydb.yoj.repository.db.EntitySchema; import tech.ydb.yoj.repository.db.exception.CreateTableException; @@ -38,11 +44,14 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.Set; +import java.util.function.Function; import java.util.stream.Stream; import static com.google.common.base.Strings.isNullOrEmpty; import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toMap; import static java.util.stream.Collectors.toSet; import static lombok.AccessLevel.PRIVATE; import static tech.ydb.core.StatusCode.SCHEME_ERROR; @@ -53,12 +62,14 @@ public class YdbSchemaOperations { private final SessionManager sessionManager; private final SchemeClient schemeClient; + private final TopicClient topicClient; private String tablespace; public YdbSchemaOperations(String tablespace, @NonNull SessionManager sessionManager, GrpcTransport transport) { this.tablespace = YdbPaths.canonicalTablespace(tablespace); this.sessionManager = sessionManager; this.schemeClient = SchemeClient.newClient(transport).build(); + this.topicClient = TopicClient.newClient(transport).build(); } public void setTablespace(String tablespace) { @@ -81,7 +92,7 @@ public void createTable(String name, List columns, List< columns.forEach(c -> { ValueProtos.Type.PrimitiveTypeId yqlType = YqlPrimitiveType.of(c).getYqlType(); int yqlTypeNumber = yqlType.getNumber(); - ValueProtos.Type.PrimitiveTypeId primitiveTypeId = Stream.of(ValueProtos.Type.PrimitiveTypeId.values()) + Stream.of(ValueProtos.Type.PrimitiveTypeId.values()) .filter(id -> id.getNumber() == yqlTypeNumber) .findFirst() .orElseThrow(() -> new CreateTableException(String.format("Can't create table '%s'%n" @@ -149,6 +160,46 @@ public void createTable(String name, List columns, List< if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) { throw new CreateTableException(String.format("Can't alter table %s: %s", name, status)); } + + if (changefeed.getConsumers().isEmpty()) { + continue; + } + + String changeFeedTopicPath = YdbPaths.join(tablespace + name, changefeed.getName()); + Result result = topicClient.describeTopic(changeFeedTopicPath).join(); + if (result.getStatus().getCode() != tech.ydb.core.StatusCode.SUCCESS) { + throw new CreateTableException(String.format("Can't describe CDC topic %s: %s", changeFeedTopicPath, result.getStatus())); + } + + Set existingConsumerNames = result.getValue().getConsumers().stream() + .map(Consumer::getName) + .collect(toSet()); + + Map specifiedConsumers = changefeed.getConsumers().stream() + .collect(toMap(Schema.Changefeed.Consumer::getName, Function.identity())); + + Set addedConsumers = Sets.difference(specifiedConsumers.keySet(), existingConsumerNames); + + AlterTopicSettings.Builder addConsumersRequest = AlterTopicSettings.newBuilder(); + for (String addedConsumer : addedConsumers) { + Schema.Changefeed.Consumer consumer = specifiedConsumers.get(addedConsumer); + Consumer.Builder consumerConfiguration = Consumer.newBuilder() + .setName(consumer.getName()) + .setImportant(consumer.isImportant()) + .setReadFrom(consumer.getReadFrom()); + + for (Codec consumerCodec : consumer.getCodecs()) { + consumerConfiguration.addSupportedCodec( + tech.ydb.topic.description.Codec.valueOf(consumerCodec.name()) + ); + } + + addConsumersRequest.addAddConsumer(consumerConfiguration.build()); + } + status = topicClient.alterTopic(changeFeedTopicPath, addConsumersRequest.build()).join(); + if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) { + throw new CreateTableException(String.format("Can't alter CDC topic %s: %s", changeFeedTopicPath, status)); + } } } } finally { diff --git a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java index 6eb977a6..353f89be 100644 --- a/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java +++ b/repository-ydb-v2/src/test/java/tech/ydb/yoj/repository/ydb/YdbRepositoryIntegrationTest.java @@ -28,6 +28,7 @@ import tech.ydb.proto.discovery.v1.DiscoveryServiceGrpc; import tech.ydb.proto.scheme.v1.SchemeServiceGrpc; import tech.ydb.proto.table.v1.TableServiceGrpc; +import tech.ydb.proto.topic.v1.TopicServiceGrpc; import tech.ydb.table.Session; import tech.ydb.table.SessionPoolStats; import tech.ydb.table.TableClient; @@ -151,6 +152,7 @@ private YdbConfig getProxyServerConfig() { .addService(new ProxyYdbTableService(channel)) .addService(proxyDiscoveryService) .addService(new DelegateSchemeServiceImplBase(SchemeServiceGrpc.newStub(channel))) + .addService(new DelegateTopicServiceImplBase(TopicServiceGrpc.newStub(channel))) .build(); proxyServer.start(); Runtime.getRuntime().addShutdownHook(new Thread(proxyServer::shutdown)); @@ -1023,6 +1025,12 @@ private static class DelegateSchemeServiceImplBase extends SchemeServiceGrpc.Sch final SchemeServiceGrpc.SchemeServiceStub schemeServiceStub; } + @AllArgsConstructor + private static class DelegateTopicServiceImplBase extends TopicServiceGrpc.TopicServiceImplBase { + @Delegate + final TopicServiceGrpc.TopicServiceStub topicServiceStub; + } + private static class ProxyDiscoveryService extends DiscoveryServiceGrpc.DiscoveryServiceImplBase { @Delegate(excludes = ProxyDiscoveryService.OverriddenMethod.class) DiscoveryServiceGrpc.DiscoveryServiceStub stub;