Skip to content

Commit

Permalink
[hotfix][cdc-runtime] Fix schema registry hanging in multiple paralle…
Browse files Browse the repository at this point in the history
…lism

This closes  #3567.
  • Loading branch information
yuxiqian authored Aug 23, 2024
1 parent 52f2019 commit 060d203
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH

private SchemaChangeBehavior schemaChangeBehavior;

/**
* Current parallelism. Use this to verify if Schema Registry has collected enough flush success
* events from sink operators.
*/
private int currentParallelism;

public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
Expand Down Expand Up @@ -135,7 +141,9 @@ public SchemaRegistry(
public void start() throws Exception {
LOG.info("Starting SchemaRegistry for {}.", operatorName);
this.failedReasons.clear();
LOG.info("Started SchemaRegistry for {}.", operatorName);
this.currentParallelism = context.currentParallelism();
LOG.info(
"Started SchemaRegistry for {}. Parallelism: {}", operatorName, currentParallelism);
}

@Override
Expand All @@ -155,7 +163,9 @@ public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEven
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(), flushSuccessEvent.getSubtask());
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -103,8 +103,8 @@ public SchemaRegistryRequestHandler(
this.schemaDerivation = schemaDerivation;
this.schemaChangeBehavior = schemaChangeBehavior;

this.activeSinkWriters = new HashSet<>();
this.flushedSinkWriters = new HashSet<>();
this.activeSinkWriters = ConcurrentHashMap.newKeySet();
this.flushedSinkWriters = ConcurrentHashMap.newKeySet();
this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();

this.currentDerivedSchemaChangeEvents = new ArrayList<>();
Expand All @@ -122,7 +122,7 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
SchemaChangeRequest request) {
if (schemaChangeStatus.compareAndSet(RequestStatus.IDLE, RequestStatus.WAITING_FOR_FLUSH)) {
LOG.info(
"Received schema change event request {} from table {}. Start to buffer requests for others.",
"Received schema change event request {} from table {}. SchemaChangeStatus switched from IDLE to WAITING_FOR_FLUSH, other requests will be blocked.",
request.getSchemaChangeEvent(),
request.getTableId().toString());
SchemaChangeEvent event = request.getSchemaChangeEvent();
Expand All @@ -134,7 +134,11 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was duplicated, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to duplicated request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.duplicate()));
}
schemaManager.applyOriginalSchemaChange(event);
Expand All @@ -149,7 +153,11 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.IDLE),
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored.");
"Illegal schemaChangeStatus state: should still in WAITING_FOR_FLUSH state if event was ignored, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from WAITING_FOR_FLUSH to IDLE for request {} due to ignored request.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.ignored()));
}

Expand Down Expand Up @@ -220,7 +228,11 @@ private void applySchemaChange(
}
Preconditions.checkState(
schemaChangeStatus.compareAndSet(RequestStatus.APPLYING, RequestStatus.FINISHED),
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes");
"Illegal schemaChangeStatus state: should be APPLYING before applySchemaChange finishes, not "
+ schemaChangeStatus.get());
LOG.info(
"SchemaChangeStatus switched from APPLYING to FINISHED for request {}.",
currentDerivedSchemaChangeEvents);
}

/**
Expand All @@ -239,13 +251,21 @@ public void registerSinkWriter(int sinkSubtask) {
* @param tableId the subtask in SchemaOperator and table that the FlushEvent is about
* @param sinkSubtask the sink subtask succeed flushing
*/
public void flushSuccess(TableId tableId, int sinkSubtask) {
public void flushSuccess(TableId tableId, int sinkSubtask, int parallelism) {
flushedSinkWriters.add(sinkSubtask);
if (activeSinkWriters.size() < parallelism) {
LOG.info(
"Not all active sink writers have been registered. Current {}, expected {}.",
activeSinkWriters.size(),
parallelism);
return;
}
if (flushedSinkWriters.equals(activeSinkWriters)) {
Preconditions.checkState(
schemaChangeStatus.compareAndSet(
RequestStatus.WAITING_FOR_FLUSH, RequestStatus.APPLYING),
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents");
"Illegal schemaChangeStatus state: should be WAITING_FOR_FLUSH before collecting enough FlushEvents, not "
+ schemaChangeStatus);
LOG.info(
"All sink subtask have flushed for table {}. Start to apply schema change.",
tableId.toString());
Expand All @@ -259,6 +279,10 @@ public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
!schemaChangeStatus.get().equals(RequestStatus.IDLE),
"Illegal schemaChangeStatus: should not be IDLE before getting schema change request results.");
if (schemaChangeStatus.compareAndSet(RequestStatus.FINISHED, RequestStatus.IDLE)) {
LOG.info(
"SchemaChangeStatus switched from FINISHED to IDLE for request {}",
currentDerivedSchemaChangeEvents);

// This request has been finished, return it and prepare for the next request
List<SchemaChangeEvent> finishedEvents = clearCurrentSchemaChangeRequest();
return CompletableFuture.supplyAsync(
Expand Down

0 comments on commit 060d203

Please sign in to comment.