Skip to content

Commit

Permalink
fix ci: change exception check since now the exception was not thrown…
Browse files Browse the repository at this point in the history
… in main thread
  • Loading branch information
yuxiqian committed Aug 27, 2024
1 parent 7a7477f commit a3df3e3
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,8 @@ private boolean shouldIgnoreException(Throwable throwable) {

private List<SchemaChangeEvent> clearCurrentSchemaChangeRequest() {
if (currentChangeException != null) {
throw new RuntimeException("Failed to apply schema change.", currentChangeException);
context.failJob(
new RuntimeException("Failed to apply schema change.", currentChangeException));
}
List<SchemaChangeEvent> finishedSchemaChanges =
new ArrayList<>(currentFinishedSchemaChanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
Expand Down Expand Up @@ -1039,11 +1040,16 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)),
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn(
"height", DOUBLE, "Height data")))));
Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents))
processEvent(schemaOperator, addColumnEvents);
Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
Assertions.assertThat(harness.getJobFailureCause())
.cause()
.cause()
.isExactlyInstanceOf(RuntimeException.class)
.hasMessageContaining("Failed to apply schema change");
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
.matches(
e ->
((UnsupportedSchemaChangeEventException) e)
.getExceptionMessage()
.equals("Sink doesn't support such schema change event."));
harness.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
Expand Down Expand Up @@ -82,6 +81,7 @@ public class EventOperatorTestHarness<OP extends AbstractStreamOperator<E>, E ex
private final SchemaRegistry schemaRegistry;
private final TestingSchemaRegistryGateway schemaRegistryGateway;
private final LinkedList<StreamRecord<E>> outputRecords = new LinkedList<>();
private final MockedOperatorCoordinatorContext mockedContext;

public EventOperatorTestHarness(OP operator, int numOutputs) {
this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE);
Expand All @@ -95,11 +95,13 @@ public EventOperatorTestHarness(
OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration),
new ArrayList<>(),
Expand All @@ -115,11 +117,13 @@ public EventOperatorTestHarness(
Set<SchemaChangeEventType> enabledEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(duration, enabledEventTypes),
new ArrayList<>(),
Expand All @@ -136,11 +140,13 @@ public EventOperatorTestHarness(
Set<SchemaChangeEventType> errorsOnEventTypes) {
this.operator = operator;
this.numOutputs = numOutputs;
this.mockedContext =
new MockedOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader());
schemaRegistry =
new SchemaRegistry(
"SchemaOperator",
new MockOperatorCoordinatorContext(
SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()),
mockedContext,
Executors.newFixedThreadPool(1),
new CollectingMetadataApplier(
duration, enabledEventTypes, errorsOnEventTypes),
Expand Down Expand Up @@ -200,6 +206,14 @@ public Schema getLatestEvolvedSchema(TableId tableId) throws Exception {
.orElse(null);
}

public boolean isJobFailed() {
return mockedContext.isJobFailed();
}

public Throwable getJobFailureCause() {
return mockedContext.getFailureCause();
}

@Override
public void close() throws Exception {
operator.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.cdc.runtime.testutils.operators;

import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext;

/**
* This is a mocked version of Operator coordinator context that stores failure cause for testing
* purposes only.
*/
public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext {
public MockedOperatorCoordinatorContext(
OperatorID operatorID, ClassLoader userCodeClassLoader) {
super(operatorID, userCodeClassLoader);
}

private Throwable failureCause;

@Override
public void failJob(Throwable cause) {
super.failJob(cause);
failureCause = cause;
}

public Throwable getFailureCause() {
return failureCause;
}
}

0 comments on commit a3df3e3

Please sign in to comment.