Skip to content

Commit

Permalink
[hotfix] Run schema coordinator logic asynchronously to avoid blockin…
Browse files Browse the repository at this point in the history
…g the main thread
  • Loading branch information
yuxiqian committed Aug 27, 2024
1 parent 0df63e2 commit 7a7477f
Show file tree
Hide file tree
Showing 5 changed files with 237 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

/** Dummy classes for migration test. Called via reflection. */
Expand Down Expand Up @@ -69,6 +70,7 @@ public SchemaRegistry generateSchemaRegistry() {
return new SchemaRegistry(
"Dummy Name",
null,
Executors.newFixedThreadPool(1),
new MetadataApplier() {
@Override
public boolean acceptsSchemaEvolutionType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.ThrowingRunnable;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,6 +57,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

import static org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;

Expand Down Expand Up @@ -83,6 +86,9 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
/** The name of the operator this SchemaOperatorCoordinator is associated with. */
private final String operatorName;

/** A single-thread executor to handle async execution of the coordinator. */
private final ExecutorService coordinatorExecutor;

/**
* Tracks the subtask failed reason to throw a more meaningful exception in {@link
* #subtaskReset}.
Expand Down Expand Up @@ -113,18 +119,27 @@ public class SchemaRegistry implements OperatorCoordinator, CoordinationRequestH
public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
ExecutorService executorService,
MetadataApplier metadataApplier,
List<RouteRule> routes) {
this(operatorName, context, metadataApplier, routes, SchemaChangeBehavior.EVOLVE);
this(
operatorName,
context,
executorService,
metadataApplier,
routes,
SchemaChangeBehavior.EVOLVE);
}

public SchemaRegistry(
String operatorName,
OperatorCoordinator.Context context,
ExecutorService coordinatorExecutor,
MetadataApplier metadataApplier,
List<RouteRule> routes,
SchemaChangeBehavior schemaChangeBehavior) {
this.context = context;
this.coordinatorExecutor = coordinatorExecutor;
this.operatorName = operatorName;
this.failedReasons = new HashMap<>();
this.metadataApplier = metadataApplier;
Expand All @@ -133,7 +148,11 @@ public SchemaRegistry(
this.schemaDerivation = new SchemaDerivation(schemaManager, routes, new HashMap<>());
this.requestHandler =
new SchemaRegistryRequestHandler(
metadataApplier, schemaManager, schemaDerivation, schemaChangeBehavior);
metadataApplier,
schemaManager,
schemaDerivation,
schemaChangeBehavior,
context);
this.schemaChangeBehavior = schemaChangeBehavior;
}

Expand All @@ -153,48 +172,87 @@ public void close() throws Exception {
}

@Override
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event)
throws Exception {
try {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
runInEventLoop(
() -> {
try {
if (event instanceof FlushSuccessEvent) {
FlushSuccessEvent flushSuccessEvent = (FlushSuccessEvent) event;
LOG.info(
"Sink subtask {} succeed flushing for table {}.",
flushSuccessEvent.getSubtask(),
flushSuccessEvent.getTableId().toString());
requestHandler.flushSuccess(
flushSuccessEvent.getTableId(),
flushSuccessEvent.getSubtask(),
currentParallelism);
} else if (event instanceof SinkWriterRegisterEvent) {
requestHandler.registerSinkWriter(
((SinkWriterRegisterEvent) event).getSubtask());
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"handling event %s from subTask %d",
event,
subtask);
}

@Override
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture)
throws Exception {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
// Serialize SchemaManager
int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
out.writeInt(schemaManagerSerializerVersion);
byte[] serializedSchemaManager = SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
} catch (Throwable t) {
context.failJob(t);
throw t;
}
public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
// we generate checkpoint in an async thread to not block the JobManager's main thread, the
// coordinator state might be large if there are many schema changes and monitor many
// tables.
runInEventLoop(
() -> {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
// Serialize SchemaManager
int schemaManagerSerializerVersion = SchemaManager.SERIALIZER.getVersion();
out.writeInt(schemaManagerSerializerVersion);
byte[] serializedSchemaManager =
SchemaManager.SERIALIZER.serialize(schemaManager);
out.writeInt(serializedSchemaManager.length);
out.write(serializedSchemaManager);
// Serialize SchemaDerivation mapping
SchemaDerivation.serializeDerivationMapping(schemaDerivation, out);
resultFuture.complete(baos.toByteArray());
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"taking checkpoint %d",
checkpointId);
}

private void runInEventLoop(
final ThrowingRunnable<Throwable> action,
final String actionName,
final Object... actionNameFormatParameters) {
coordinatorExecutor.execute(
() -> {
try {
action.run();
} catch (Throwable t) {
// if we have a JVM critical error, promote it immediately, there is a good
// chance the logging or job failing will not succeed anymore
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);

final String actionString =
String.format(actionName, actionNameFormatParameters);
LOG.error(
"Uncaught exception in the SchemaEvolutionCoordinator for {} while {}. Triggering job failover.",
operatorName,
actionString,
t);
context.failJob(t);
}
});
}

@Override
Expand All @@ -205,26 +263,34 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public CompletableFuture<CoordinationResponse> handleCoordinationRequest(
CoordinationRequest request) {
try {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
return requestHandler.handleSchemaChangeRequest(schemaChangeRequest);
} else if (request instanceof SchemaChangeResultRequest) {
return requestHandler.getSchemaChangeResult();
} else if (request instanceof GetEvolvedSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetEvolvedSchemaRequest(((GetEvolvedSchemaRequest) request))));
} else if (request instanceof GetOriginalSchemaRequest) {
return CompletableFuture.completedFuture(
wrap(handleGetOriginalSchemaRequest((GetOriginalSchemaRequest) request)));
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
CompletableFuture<CoordinationResponse> responseFuture = new CompletableFuture<>();
runInEventLoop(
() -> {
try {
if (request instanceof SchemaChangeRequest) {
SchemaChangeRequest schemaChangeRequest = (SchemaChangeRequest) request;
requestHandler.handleSchemaChangeRequest(
schemaChangeRequest, responseFuture);
} else if (request instanceof SchemaChangeResultRequest) {
requestHandler.getSchemaChangeResult(responseFuture);
} else if (request instanceof GetEvolvedSchemaRequest) {
handleGetEvolvedSchemaRequest(
((GetEvolvedSchemaRequest) request), responseFuture);
} else if (request instanceof GetOriginalSchemaRequest) {
handleGetOriginalSchemaRequest(
(GetOriginalSchemaRequest) request, responseFuture);
} else {
throw new IllegalArgumentException(
"Unrecognized CoordinationRequest type: " + request);
}
} catch (Throwable t) {
context.failJob(t);
throw t;
}
},
"handling coordination request %s",
request);
return responseFuture;
}

@Override
Expand Down Expand Up @@ -253,7 +319,8 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
metadataApplier,
schemaManager,
schemaDerivation,
schemaManager.getBehavior());
schemaManager.getBehavior(),
context);
break;
}
case 1:
Expand All @@ -274,7 +341,8 @@ public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData
metadataApplier,
schemaManager,
schemaDerivation,
schemaChangeBehavior);
schemaChangeBehavior,
context);
break;
}
default:
Expand Down Expand Up @@ -307,46 +375,56 @@ public void executionAttemptReady(
// do nothing
}

private GetEvolvedSchemaResponse handleGetEvolvedSchemaRequest(
GetEvolvedSchemaRequest getEvolvedSchemaRequest) {
private void handleGetEvolvedSchemaRequest(
GetEvolvedSchemaRequest getEvolvedSchemaRequest,
CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling evolved schema request: {}", getEvolvedSchemaRequest);
int schemaVersion = getEvolvedSchemaRequest.getSchemaVersion();
TableId tableId = getEvolvedSchemaRequest.getTableId();
if (schemaVersion == GetEvolvedSchemaRequest.LATEST_SCHEMA_VERSION) {
return new GetEvolvedSchemaResponse(
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
response.complete(
wrap(
new GetEvolvedSchemaResponse(
schemaManager.getLatestEvolvedSchema(tableId).orElse(null))));
} else {
try {
return new GetEvolvedSchemaResponse(
schemaManager.getEvolvedSchema(tableId, schemaVersion));
response.complete(
wrap(
new GetEvolvedSchemaResponse(
schemaManager.getEvolvedSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed evolved schema for table {} with version {}",
tableId,
schemaVersion);
return new GetEvolvedSchemaResponse(null);
response.complete(wrap(new GetEvolvedSchemaResponse(null)));
}
}
}

private GetOriginalSchemaResponse handleGetOriginalSchemaRequest(
GetOriginalSchemaRequest getOriginalSchemaRequest) {
private void handleGetOriginalSchemaRequest(
GetOriginalSchemaRequest getOriginalSchemaRequest,
CompletableFuture<CoordinationResponse> response) {
LOG.info("Handling original schema request: {}", getOriginalSchemaRequest);
int schemaVersion = getOriginalSchemaRequest.getSchemaVersion();
TableId tableId = getOriginalSchemaRequest.getTableId();
if (schemaVersion == GetOriginalSchemaRequest.LATEST_SCHEMA_VERSION) {
return new GetOriginalSchemaResponse(
schemaManager.getLatestOriginalSchema(tableId).orElse(null));
response.complete(
wrap(
new GetOriginalSchemaResponse(
schemaManager.getLatestOriginalSchema(tableId).orElse(null))));
} else {
try {
return new GetOriginalSchemaResponse(
schemaManager.getOriginalSchema(tableId, schemaVersion));
response.complete(
wrap(
new GetOriginalSchemaResponse(
schemaManager.getOriginalSchema(tableId, schemaVersion))));
} catch (IllegalArgumentException iae) {
LOG.warn(
"Some client is requesting an non-existed original schema for table {} with version {}",
tableId,
schemaVersion);
return new GetOriginalSchemaResponse(null);
response.complete(wrap(new GetOriginalSchemaResponse(null)));
}
}
}
Expand Down
Loading

0 comments on commit 7a7477f

Please sign in to comment.