Skip to content

Commit

Permalink
add: schemaChangeRequest timeout & fix: clarify SchemaChangeResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
yuxiqian committed Aug 22, 2024
1 parent c1c3997 commit c6e53c9
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ public class SchemaOperator extends AbstractStreamOperator<Event>
private final SchemaChangeBehavior schemaChangeBehavior;

private transient SchemaOperatorMetrics schemaOperatorMetrics;
private transient int subTaskId;

@VisibleForTesting
public SchemaOperator(List<RouteRule> routingRules) {
Expand Down Expand Up @@ -150,6 +151,7 @@ public void open() throws Exception {
schemaOperatorMetrics =
new SchemaOperatorMetrics(
getRuntimeContext().getMetricGroup(), schemaChangeBehavior);
subTaskId = getRuntimeContext().getIndexOfThisSubtask();
}

@Override
Expand Down Expand Up @@ -234,7 +236,11 @@ public void processElement(StreamRecord<Event> streamRecord)
private void processSchemaChangeEvents(SchemaChangeEvent event)
throws InterruptedException, TimeoutException, ExecutionException {
TableId tableId = event.tableId();
LOG.info("Table {} received SchemaChangeEvent {} and start to be blocked.", tableId, event);
LOG.info(
">{} Table {} received SchemaChangeEvent {} and start to be blocked.",
subTaskId,
tableId,
event);
handleSchemaChangeEvent(tableId, event);
// Update caches
originalSchema.put(tableId, getLatestOriginalSchema(tableId));
Expand Down Expand Up @@ -400,11 +406,8 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh

// The request will block if another schema change event is being handled
SchemaChangeResponse response = requestSchemaChange(tableId, schemaChangeEvent);
if (!response.getSchemaChangeEvents().isEmpty()) {
LOG.info(
"Sending the FlushEvent for table {} in subtask {}.",
tableId,
getRuntimeContext().getIndexOfThisSubtask());
if (response.isAccepted()) {
LOG.info(">{} Sending the FlushEvent for table {}.", subTaskId, tableId);
output.collect(new StreamRecord<>(new FlushEvent(tableId)));
List<SchemaChangeEvent> expectedSchemaChangeEvents = response.getSchemaChangeEvents();
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
Expand All @@ -416,20 +419,36 @@ private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent schemaCh

// Update evolved schema changes based on apply results
finishedSchemaChangeEvents.forEach(e -> output.collect(new StreamRecord<>(e)));
} else if (response.isDuplicate()) {
LOG.info(
">{} Schema change event {} has been handled in another subTask already.",
subTaskId,
schemaChangeEvent);
} else if (response.isIgnored()) {
LOG.info(
">{} Schema change event {} has been ignored. No schema evolution needed.",
subTaskId,
schemaChangeEvent);
} else {
throw new IllegalStateException("Unexpected response status " + response);
}
}

private SchemaChangeResponse requestSchemaChange(
TableId tableId, SchemaChangeEvent schemaChangeEvent) {
TableId tableId, SchemaChangeEvent schemaChangeEvent)
throws InterruptedException, TimeoutException {
long schemaEvolveTimeOutMillis = System.currentTimeMillis() + rpcTimeOutInMillis;
while (true) {
SchemaChangeResponse response =
sendRequestToCoordinator(new SchemaChangeRequest(tableId, schemaChangeEvent));
if (response.isRejected()) {
try {
LOG.info("Schema Registry is busy now, waiting for next request...");
if (response.isRegistryBusy()) {
if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
LOG.info(
">{} Schema Registry is busy now, waiting for next request...",
subTaskId);
Thread.sleep(1000);
} catch (InterruptedException ignored) {
// Do nothing, enter the next loop
} else {
throw new TimeoutException("TimeOut when requesting schema change");
}
} else {
return response;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,14 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
if (schemaManager.isOriginalSchemaChangeEventRedundant(event)) {
LOG.info("Event {} has been addressed before, ignoring it.", event);
finishCurrentSchemaChangeRequest();
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(Collections.emptyList())));
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.DUPLICATE()));
}
schemaManager.applyOriginalSchemaChange(event);
List<SchemaChangeEvent> derivedSchemaChangeEvents =
calculateDerivedSchemaChangeEvents(request.getSchemaChangeEvent());
if (derivedSchemaChangeEvents.isEmpty()) {
finishCurrentSchemaChangeRequest();
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(Collections.emptyList())));
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.IGNORED()));
} else {
derivedSchemaChangeEvents.forEach(
e -> {
Expand All @@ -158,11 +156,13 @@ public CompletableFuture<CoordinationResponse> handleSchemaChangeRequest(
});
currentDerivedSchemaChangeEvents = new ArrayList<>(derivedSchemaChangeEvents);
return CompletableFuture.completedFuture(
wrap(new SchemaChangeResponse(derivedSchemaChangeEvents)));
wrap(SchemaChangeResponse.ACCEPTED(derivedSchemaChangeEvents)));
}
} else {
LOG.info("There is already a schema change request in progress. Rejected {}.", request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.REJECTED));
LOG.info(
"Schema Registry is busy processing a schema change request, could not handle request {} for now.",
request);
return CompletableFuture.completedFuture(wrap(SchemaChangeResponse.BUSY()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;

import java.util.Collections;
import java.util.List;
import java.util.Objects;

Expand All @@ -38,22 +39,44 @@ public class SchemaChangeResponse implements CoordinationResponse {
*/
private final List<SchemaChangeEvent> schemaChangeEvents;

private final boolean isRejected;
private final ResponseCode responseCode;

public static final SchemaChangeResponse REJECTED = new SchemaChangeResponse();
public static SchemaChangeResponse ACCEPTED(List<SchemaChangeEvent> schemaChangeEvents) {
return new SchemaChangeResponse(schemaChangeEvents, ResponseCode.ACCEPTED);
}

public static SchemaChangeResponse BUSY() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.BUSY);
}

public static SchemaChangeResponse DUPLICATE() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.DUPLICATE);
}

private SchemaChangeResponse() {
this.schemaChangeEvents = null;
this.isRejected = true;
public static SchemaChangeResponse IGNORED() {
return new SchemaChangeResponse(Collections.emptyList(), ResponseCode.IGNORED);
}

public SchemaChangeResponse(List<SchemaChangeEvent> schemaChangeEvents) {
private SchemaChangeResponse(
List<SchemaChangeEvent> schemaChangeEvents, ResponseCode responseCode) {
this.schemaChangeEvents = schemaChangeEvents;
this.isRejected = false;
this.responseCode = responseCode;
}

public boolean isAccepted() {
return ResponseCode.ACCEPTED.equals(responseCode);
}

public boolean isRegistryBusy() {
return ResponseCode.BUSY.equals(responseCode);
}

public boolean isDuplicate() {
return ResponseCode.DUPLICATE.equals(responseCode);
}

public boolean isRejected() {
return isRejected;
public boolean isIgnored() {
return ResponseCode.IGNORED.equals(responseCode);
}

public List<SchemaChangeEvent> getSchemaChangeEvents() {
Expand All @@ -70,11 +93,28 @@ public boolean equals(Object o) {
}
SchemaChangeResponse response = (SchemaChangeResponse) o;
return Objects.equals(schemaChangeEvents, response.schemaChangeEvents)
&& isRejected == response.isRejected;
&& responseCode == response.responseCode;
}

@Override
public int hashCode() {
return Objects.hash(schemaChangeEvents);
return Objects.hash(schemaChangeEvents, responseCode);
}

@Override
public String toString() {
return "SchemaChangeResponse{"
+ "schemaChangeEvents="
+ schemaChangeEvents
+ ", responseCode="
+ responseCode
+ '}';
}

public enum ResponseCode {
ACCEPTED,
BUSY,
DUPLICATE,
IGNORED
}
}

0 comments on commit c6e53c9

Please sign in to comment.