diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 6c89a38f6d..f9edff3a15 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -133,14 +133,14 @@ public CompletableFuture handleSchemaChangeRequest( if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { LOG.info("Event {} has been addressed before, ignoring it.", event); finishCurrentSchemaChangeRequest(); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.DUPLICATE())); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate())); } schemaManager.applyOriginalSchemaChange(event); List derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); if (derivedSchemaChangeEvents.isEmpty()) { finishCurrentSchemaChangeRequest(); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.IGNORED())); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored())); } else { derivedSchemaChangeEvents.forEach( e -> { @@ -156,13 +156,13 @@ public CompletableFuture handleSchemaChangeRequest( }); currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); return CompletableFuture.completedFuture( - wrap(SchemaChangeResponse.ACCEPTED(derivedSchemaChangeEvents))); + wrap(SchemaChangeResponse.accepted(derivedSchemaChangeEvents))); } } else { LOG.info( "Schema Registry is busy processing a schema change request, could not handle request {} for now.", request); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.BUSY())); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.busy())); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 97b2f75f92..63d57139b1 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -41,19 +41,19 @@ public class SchemaChangeResponse implements CoordinationResponse { private final ResponseCode responseCode; - public static SchemaChangeResponse ACCEPTED(List schemaChangeEvents) { + public static SchemaChangeResponse accepted(List schemaChangeEvents) { return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED); } - public static SchemaChangeResponse BUSY() { + public static SchemaChangeResponse busy() { return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY); } - public static SchemaChangeResponse DUPLICATE() { + public static SchemaChangeResponse duplicate() { return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE); } - public static SchemaChangeResponse IGNORED() { + public static SchemaChangeResponse ignored() { return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED); } @@ -111,6 +111,20 @@ public String toString() { + '}'; } + /** + * Schema Change Response status code. + * + *

- Accepted: Requested schema change request has been accepted exclusively. Any other + * schema change requests will be blocked. + * + *

- Busy: Schema registry is currently busy processing another schema change request. + * + *

- Duplicate: This schema change request has been submitted before, possibly by another + * paralleled subTask. + * + *

- Ignored: This schema change request has been assessed, but no actual evolution is + * required. Possibly caused by LENIENT mode or merging table strategies. + */ public enum ResponseCode { ACCEPTED, BUSY,