From c6e53c9b64c11bd025333166e01f6a1c8b501200 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 22 Aug 2024 13:27:25 +0800 Subject: [PATCH] add: schemaChangeRequest timeout & fix: clarify SchemaChangeResponse --- .../operators/schema/SchemaOperator.java | 43 +++++++++---- .../SchemaRegistryRequestHandler.java | 14 ++--- .../schema/event/SchemaChangeResponse.java | 62 +++++++++++++++---- 3 files changed, 89 insertions(+), 30 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java index 272078a870..816a859ffe 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java @@ -117,6 +117,7 @@ public class SchemaOperator extends AbstractStreamOperator private final SchemaChangeBehavior schemaChangeBehavior; private transient SchemaOperatorMetrics schemaOperatorMetrics; + private transient int subTaskId; @VisibleForTesting public SchemaOperator(List routingRules) { @@ -150,6 +151,7 @@ public void open() throws Exception { schemaOperatorMetrics = new SchemaOperatorMetrics( getRuntimeContext().getMetricGroup(), schemaChangeBehavior); + subTaskId = getRuntimeContext().getIndexOfThisSubtask(); } @Override @@ -234,7 +236,11 @@ public void processElement(StreamRecord streamRecord) private void processSchemaChangeEvents(SchemaChangeEvent event) throws InterruptedException, TimeoutException, ExecutionException { TableId tableId = event.tableId(); - LOG.info("Table {} received SchemaChangeEvent {} and start to be blocked.", tableId, event); + LOG.info( + ">{} Table {} received SchemaChangeEvent {} and start to be blocked.", + subTaskId, + tableId, + event); handleSchemaChangeEvent(tableId, event); // Update caches originalSchema.put(tableId, getLatestOriginalSchema(tableId)); @@ -400,11 +406,8 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh // The request will block if another schema change event is being handled SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent); - if (!response.getSchemaChangeEvents().isEmpty()) { - LOG.info( - "Sending the FlushEvent for table {} in subtask {}.", - tableId, - getRuntimeContext().getIndexOfThisSubtask()); + if (response.isAccepted()) { + LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, tableId); output.collect(new StreamRecord<>(new FlushEvent(tableId))); List expectedSchemaChangeEvents = response.getSchemaChangeEvents(); schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size()); @@ -416,20 +419,36 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh // Update evolved schema changes based on apply results finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e))); + } else if (response.isDuplicate()) { + LOG.info( + ">{} Schema change event {} has been handled in another subTask already.", + subTaskId, + schemaChangeEvent); + } else if (response.isIgnored()) { + LOG.info( + ">{} Schema change event {} has been ignored. No schema evolution needed.", + subTaskId, + schemaChangeEvent); + } else { + throw new IllegalStateException("Unexpected response status " + response); } } private SchemaChangeResponse requestSchemaChange( - TableId tableId, SchemaChangeEvent schemaChangeEvent) { + TableId tableId, SchemaChangeEvent schemaChangeEvent) + throws InterruptedException, TimeoutException { + long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis; while (true) { SchemaChangeResponse response = sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent)); - if (response.isRejected()) { - try { - LOG.info("Schema Registry is busy now, waiting for next request..."); + if (response.isRegistryBusy()) { + if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) { + LOG.info( + ">{} Schema Registry is busy now, waiting for next request...", + subTaskId); Thread.sleep(1000); - } catch (InterruptedException ignored) { - // Do nothing, enter the next loop + } else { + throw new TimeoutException("TimeOut when requesting schema change"); } } else { return response; diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index 458d4d0324..6c89a38f6d 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -133,16 +133,14 @@ public CompletableFuture handleSchemaChangeRequest( if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) { LOG.info("Event {} has been addressed before, ignoring it.", event); finishCurrentSchemaChangeRequest(); - return CompletableFuture.completedFuture( - wrap(new SchemaChangeResponse(Collections.emptyList()))); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.DUPLICATE())); } schemaManager.applyOriginalSchemaChange(event); List derivedSchemaChangeEvents = calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent()); if (derivedSchemaChangeEvents.isEmpty()) { finishCurrentSchemaChangeRequest(); - return CompletableFuture.completedFuture( - wrap(new SchemaChangeResponse(Collections.emptyList()))); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.IGNORED())); } else { derivedSchemaChangeEvents.forEach( e -> { @@ -158,11 +156,13 @@ public CompletableFuture handleSchemaChangeRequest( }); currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents); return CompletableFuture.completedFuture( - wrap(new SchemaChangeResponse(derivedSchemaChangeEvents))); + wrap(SchemaChangeResponse.ACCEPTED(derivedSchemaChangeEvents))); } } else { - LOG.info("There is already a schema change request in progress. Rejected {}.", request); - return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.REJECTED)); + LOG.info( + "Schema Registry is busy processing a schema change request, could not handle request {} for now.", + request); + return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.BUSY())); } } diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java index 4e211b719b..97b2f75f92 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java @@ -22,6 +22,7 @@ import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry; import org.apache.flink.runtime.operators.coordination.CoordinationResponse; +import java.util.Collections; import java.util.List; import java.util.Objects; @@ -38,22 +39,44 @@ public class SchemaChangeResponse implements CoordinationResponse { */ private final List schemaChangeEvents; - private final boolean isRejected; + private final ResponseCode responseCode; - public static final SchemaChangeResponse REJECTED = new SchemaChangeResponse(); + public static SchemaChangeResponse ACCEPTED(List schemaChangeEvents) { + return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED); + } + + public static SchemaChangeResponse BUSY() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY); + } + + public static SchemaChangeResponse DUPLICATE() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE); + } - private SchemaChangeResponse() { - this.schemaChangeEvents = null; - this.isRejected = true; + public static SchemaChangeResponse IGNORED() { + return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED); } - public SchemaChangeResponse(List schemaChangeEvents) { + private SchemaChangeResponse( + List schemaChangeEvents, ResponseCode responseCode) { this.schemaChangeEvents = schemaChangeEvents; - this.isRejected = false; + this.responseCode = responseCode; + } + + public boolean isAccepted() { + return ResponseCode.ACCEPTED.equals(responseCode); + } + + public boolean isRegistryBusy() { + return ResponseCode.BUSY.equals(responseCode); + } + + public boolean isDuplicate() { + return ResponseCode.DUPLICATE.equals(responseCode); } - public boolean isRejected() { - return isRejected; + public boolean isIgnored() { + return ResponseCode.IGNORED.equals(responseCode); } public List getSchemaChangeEvents() { @@ -70,11 +93,28 @@ public boolean equals(Object o) { } SchemaChangeResponse response = (SchemaChangeResponse) o; return Objects.equals(schemaChangeEvents, response.schemaChangeEvents) - && isRejected == response.isRejected; + && responseCode == response.responseCode; } @Override public int hashCode() { - return Objects.hash(schemaChangeEvents); + return Objects.hash(schemaChangeEvents, responseCode); + } + + @Override + public String toString() { + return "SchemaChangeResponse{" + + "schemaChangeEvents=" + + schemaChangeEvents + + ", responseCode=" + + responseCode + + '}'; + } + + public enum ResponseCode { + ACCEPTED, + BUSY, + DUPLICATE, + IGNORED } }