Skip to content

Commit

Permalink
feat(notification): support deleteRange notification (#187)
Browse files Browse the repository at this point in the history
* format license

* feat(notification): support disable notification
  • Loading branch information
mattisonchao authored Sep 26, 2024
1 parent cce083a commit a86d398
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@

/** A notification from an Oxia server indicating a change to a record associated with a key. */
public sealed interface Notification
permits Notification.KeyCreated, Notification.KeyModified, Notification.KeyDeleted {
permits Notification.KeyCreated,
Notification.KeyDeleted,
Notification.KeyModified,
Notification.KeyRangeDelete {

/**
* @return The key of the record.
Expand Down Expand Up @@ -56,4 +59,18 @@ record KeyModified(@NonNull String key, long version) implements Notification {
* @param key The key of the deleted record.
*/
record KeyDeleted(@NonNull String key) implements Notification {}

/**
* The record associated with the key range has been deleted.
*
* @param startKeyInclusive The range deletion start key. (inclusive)
* @param endKeyExclusive The range deletion end key. (exclusive)
*/
record KeyRangeDelete(@NonNull String startKeyInclusive, @NonNull String endKeyExclusive)
implements Notification {
@Override
public String key() {
return startKeyInclusive;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
/*
* Copyright © 2022-2024 StreamNative Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.streamnative.oxia.client.it;

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.metrics.SdkMeterProvider;
import io.opentelemetry.sdk.resources.Resource;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import io.opentelemetry.semconv.resource.attributes.ResourceAttributes;
import io.streamnative.oxia.client.api.AsyncOxiaClient;
import io.streamnative.oxia.client.api.Notification;
import io.streamnative.oxia.client.api.OxiaClientBuilder;
import io.streamnative.oxia.testcontainers.OxiaContainer;
import java.nio.charset.StandardCharsets;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.PullPolicy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
@Slf4j
public class NotificationIt {
@Container
private static final OxiaContainer oxia =
new OxiaContainer(OxiaContainer.DEFAULT_IMAGE_NAME)
.withImagePullPolicy(PullPolicy.alwaysPull())
.withShards(10)
.withLogConsumer(new Slf4jLogConsumer(log));

private static AsyncOxiaClient client;

private static Queue<Notification> notifications = new LinkedBlockingQueue<>();

private static InMemoryMetricReader metricReader;

@BeforeAll
static void beforeAll() {
Resource resource =
Resource.getDefault()
.merge(
Resource.create(
Attributes.of(ResourceAttributes.SERVICE_NAME, "logical-service-name")));

metricReader = InMemoryMetricReader.create();
SdkMeterProvider sdkMeterProvider =
SdkMeterProvider.builder().registerMetricReader(metricReader).setResource(resource).build();

OpenTelemetry openTelemetry =
OpenTelemetrySdk.builder().setMeterProvider(sdkMeterProvider).build();

client =
OxiaClientBuilder.create(oxia.getServiceAddress())
.openTelemetry(openTelemetry)
.asyncClient()
.join();
client.notifications(notifications::add);
}

@AfterAll
static void afterAll() throws Exception {
if (client != null) {
client.close();
}
}

@Test
public void testDeleteRange() {

for (int i = 0; i < 10; i++) {
client.put(i + "", (i + "").getBytes(StandardCharsets.UTF_8)).join();
}

Awaitility.await().untilAsserted(() -> Assertions.assertEquals(notifications.size(), 10));

notifications.clear();

client.deleteRange("0", "100").join();

Awaitility.await()
.untilAsserted(
() -> {
Assertions.assertEquals(notifications.size(), 10); // 10 shards
for (Notification notification : notifications) {
Assertions.assertInstanceOf(Notification.KeyRangeDelete.class, notification);
final Notification.KeyRangeDelete krd = (Notification.KeyRangeDelete) notification;
Assertions.assertEquals(krd.startKeyInclusive(), "0");
Assertions.assertEquals(krd.endKeyExclusive(), "100");
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public void onNext(NotificationBatch batch) {
case KEY_CREATED -> new KeyCreated(key, notification.getVersionId());
case KEY_MODIFIED -> new KeyModified(key, notification.getVersionId());
case KEY_DELETED -> new KeyDeleted(key);
case KEY_RANGE_DELETED -> new Notification.KeyRangeDelete(
key, notification.getKeyRangeLast());
case UNRECOGNIZED -> null;
};

Expand Down
3 changes: 3 additions & 0 deletions client/src/main/proto/io/streamnative/oxia/client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ enum NotificationType {
KEY_CREATED = 0;
KEY_MODIFIED = 1;
KEY_DELETED = 2;
KEY_RANGE_DELETED = 3;
}

message NotificationsRequest {
Expand All @@ -477,4 +478,6 @@ message NotificationBatch {
message Notification {
NotificationType type = 1;
optional int64 version_id = 2;

optional string key_range_last = 3;
}

0 comments on commit a86d398

Please sign in to comment.