Skip to content

Commit

Permalink
Allow extended attributes for the Changefeed consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
Daniil Zulin committed Jan 21, 2025
1 parent 16a8a4c commit 171d793
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
/**
* Initial consumers of the changefeed
*/
String[] consumers() default {};
Consumer[] consumers() default {};

enum Mode {
/**
Expand Down Expand Up @@ -76,4 +76,22 @@ enum Mode {
enum Format {
JSON
}

@interface Consumer {
String name();

Codec[] codecs() default {};

String readFrom() default "1970-01-01T00:00:00Z";

boolean important() default false;

enum Codec {
RAW,
GZIP,
LZOP,
ZSTD,
CUSTOM
}
}
}
29 changes: 27 additions & 2 deletions databind/src/main/java/tech/ydb/yoj/databind/schema/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.Type;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
Expand Down Expand Up @@ -245,14 +247,23 @@ private Changefeed changefeedFromAnnotation(@NonNull tech.ydb.yoj.databind.schem
var retentionPeriod = Duration.parse(changefeed.retentionPeriod());
Preconditions.checkArgument(!(retentionPeriod.isNegative() || retentionPeriod.isZero()),
"RetentionPeriod value defined for %s must be positive", getType());
List<Changefeed.Consumer> consumers = Arrays.stream(changefeed.consumers())
.map(consumer -> new Changefeed.Consumer(
consumer.name(),
List.of(consumer.codecs()),
Instant.parse(consumer.readFrom()),
consumer.important()
))
.toList();

return new Changefeed(
changefeed.name(),
changefeed.mode(),
changefeed.format(),
changefeed.virtualTimestamps(),
retentionPeriod,
changefeed.initialScan(),
Set.of(changefeed.consumers())
consumers
);
}

Expand Down Expand Up @@ -816,6 +827,20 @@ public static class Changefeed {
boolean initialScan;

@NonNull
Set<String> consumers;
List<Consumer> consumers;

@Value
public static class Consumer {
@NonNull
String name;

@NonNull
List<tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec> codecs;

@NonNull
Instant readFrom;

boolean important;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import org.junit.Test;

import java.time.Duration;
import java.time.Instant;
import java.util.List;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -42,13 +44,32 @@ public void testConflictingChangefeedNameEntity() {
public void testPredefinedConsumersChangefeedEntity() {
var entitySchema = schemaOf(PredefinedConsumersChangefeedEntity.class);

assertThat(entitySchema.getChangefeeds()).hasSize(1);
assertThat(entitySchema.getChangefeeds().get(0).getMode()).isEqualTo(Changefeed.Mode.NEW_IMAGE);
assertThat(entitySchema.getChangefeeds().get(0).getFormat()).isEqualTo(Changefeed.Format.JSON);
assertThat(entitySchema.getChangefeeds().get(0).getRetentionPeriod()).isEqualTo(Duration.ofHours(24));
assertThat(entitySchema.getChangefeeds().get(0).isVirtualTimestamps()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).isInitialScan()).isFalse();
assertThat(entitySchema.getChangefeeds().get(0).getConsumers()).containsExactlyInAnyOrder("consumer1", "consumer2");
Schema.Changefeed expectedChangefeed = new Schema.Changefeed(
"feed1",
Changefeed.Mode.NEW_IMAGE,
Changefeed.Format.JSON,
false,
Duration.ofHours(24),
false,
List.of(
new Schema.Changefeed.Consumer(
"consumer1",
List.of(),
Instant.EPOCH,
false
),
new Schema.Changefeed.Consumer(
"consumer2",
List.of(Changefeed.Consumer.Codec.RAW),
Instant.parse("2020-01-01T00:00:00Z"),
true
)
)
);

assertThat(entitySchema.getChangefeeds())
.singleElement()
.isEqualTo(expectedChangefeed);
}

private static <T> Schema<T> schemaOf(Class<T> entityType) {
Expand Down Expand Up @@ -90,7 +111,15 @@ private static class ConflictingChangefeedNameEntity {
}

@Value
@Changefeed(name = "feed1", consumers = {"consumer1", "consumer2"})
@Changefeed(name = "feed1", consumers = {
@Changefeed.Consumer(name = "consumer1"),
@Changefeed.Consumer(
name = "consumer2",
readFrom = "2020-01-01T00:00:00Z",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
})
private static class PredefinedConsumersChangefeedEntity {
int field1;
int field2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@
virtualTimestamps = true,
retentionPeriod = "PT1H",
initialScan = false, /* otherwise YDB is "overloaded" during YdbRepositoryIntegrationTest */
consumers = {"test-consumer1", "test-consumer2"}
consumers = {
@Changefeed.Consumer(name = "test-consumer1"),
@Changefeed.Consumer(
name = "test-consumer2",
readFrom = "2025-01-21T08:01:25+00:00",
codecs = {Changefeed.Consumer.Codec.RAW},
important = true
)
}
)
@Changefeed(name = "test-changefeed2")
public class ChangefeedEntity implements Entity<ChangefeedEntity> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import tech.ydb.topic.description.Consumer;
import tech.ydb.topic.description.TopicDescription;
import tech.ydb.topic.settings.AlterTopicSettings;
import tech.ydb.yoj.databind.schema.Changefeed.Consumer.Codec;
import tech.ydb.yoj.databind.schema.Schema;
import tech.ydb.yoj.repository.db.EntitySchema;
import tech.ydb.yoj.repository.db.exception.CreateTableException;
Expand All @@ -43,12 +44,13 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.*;
import static lombok.AccessLevel.PRIVATE;
import static tech.ydb.core.StatusCode.SCHEME_ERROR;

Expand Down Expand Up @@ -167,16 +169,30 @@ public void createTable(String name, List<EntitySchema.JavaField> columns, List<
throw new CreateTableException(String.format("Can't describe CDC topic %s: %s", changeFeedTopicPath, result.getStatus()));
}

Set<String> existingConsumers = result.getValue().getConsumers().stream()
Set<String> existingConsumerNames = result.getValue().getConsumers().stream()
.map(Consumer::getName)
.collect(toSet());
Set<String> addedConsumers = Sets.difference(changefeed.getConsumers(), existingConsumers);

Map<String, Schema.Changefeed.Consumer> specifiedConsumers = changefeed.getConsumers().stream()
.collect(toMap(Schema.Changefeed.Consumer::getName, Function.identity()));

Set<String> addedConsumers = Sets.difference(specifiedConsumers.keySet(), existingConsumerNames);

AlterTopicSettings.Builder addConsumersRequest = AlterTopicSettings.newBuilder();
for (String consumer : addedConsumers) {
addConsumersRequest.addAddConsumer(Consumer.newBuilder()
.setName(consumer)
.build());
for (String addedConsumer : addedConsumers) {
Schema.Changefeed.Consumer consumer = specifiedConsumers.get(addedConsumer);
Consumer.Builder consumerConfiguration = Consumer.newBuilder()
.setName(consumer.getName())
.setImportant(consumer.isImportant())
.setReadFrom(consumer.getReadFrom());

for (Codec consumerCodec : consumer.getCodecs()) {
consumerConfiguration.addSupportedCodec(
tech.ydb.topic.description.Codec.valueOf(consumerCodec.name())
);
}

addConsumersRequest.addAddConsumer(consumerConfiguration.build());
}
status = topicClient.alterTopic(changeFeedTopicPath, addConsumersRequest.build()).join();
if (status.getCode() != tech.ydb.core.StatusCode.SUCCESS) {
Expand Down

0 comments on commit 171d793

Please sign in to comment.