From a3df3e31c124931c1ec9f2776ec0c0c9fa941752 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:31:50 +0800 Subject: [PATCH] fix ci: change exception check since now the exception was not thrown in main thread --- .../SchemaRegistryRequestHandler.java | 3 +- .../operators/schema/SchemaEvolveTest.java | 14 ++++-- .../operators/EventOperatorTestHarness.java | 28 +++++++++--- .../MockedOperatorCoordinatorContext.java | 44 +++++++++++++++++++ 4 files changed, 77 insertions(+), 12 deletions(-) create mode 100644 flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java index d1a8da8e48..847e343f2c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java @@ -468,7 +468,8 @@ private boolean shouldIgnoreException(Throwable throwable) { private List clearCurrentSchemaChangeRequest() { if (currentChangeException != null) { - throw new RuntimeException("Failed to apply schema change.", currentChangeException); + context.failJob( + new RuntimeException("Failed to apply schema change.", currentChangeException)); } List finishedSchemaChanges = new ArrayList<>(currentFinishedSchemaChanges); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java index 51acbd536e..e1f6a94c96 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java @@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.SchemaChangeEventType; import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily; import org.apache.flink.cdc.common.event.TableId; +import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException; import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior; import org.apache.flink.cdc.common.schema.Column; import org.apache.flink.cdc.common.schema.Schema; @@ -1039,11 +1040,16 @@ tableId, buildRecord(INT, 2, STRING, "Bob", SMALLINT, (short) 18)), new AddColumnEvent.ColumnWithPosition( Column.physicalColumn( "height", DOUBLE, "Height data"))))); - Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, addColumnEvents)) + processEvent(schemaOperator, addColumnEvents); + Assertions.assertThat(harness.isJobFailed()).isEqualTo(true); + Assertions.assertThat(harness.getJobFailureCause()) .cause() - .cause() - .isExactlyInstanceOf(RuntimeException.class) - .hasMessageContaining("Failed to apply schema change"); + .isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class) + .matches( + e -> + ((UnsupportedSchemaChangeEventException) e) + .getExceptionMessage() + .equals("Sink doesn't support such schema change event.")); harness.close(); } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java index d358054b79..d8b9770295 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java @@ -38,7 +38,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway; -import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.Output; @@ -82,6 +81,7 @@ public class EventOperatorTestHarness, E ex private final SchemaRegistry schemaRegistry; private final TestingSchemaRegistryGateway schemaRegistryGateway; private final LinkedList> outputRecords = new LinkedList<>(); + private final MockedOperatorCoordinatorContext mockedContext; public EventOperatorTestHarness(OP operator, int numOutputs) { this(operator, numOutputs, null, SchemaChangeBehavior.EVOLVE); @@ -95,11 +95,13 @@ public EventOperatorTestHarness( OP operator, int numOutputs, Duration duration, SchemaChangeBehavior behavior) { this.operator = operator; this.numOutputs = numOutputs; + this.mockedContext = + new MockedOperatorCoordinatorContext( + SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()); schemaRegistry = new SchemaRegistry( "SchemaOperator", - new MockOperatorCoordinatorContext( - SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), + mockedContext, Executors.newFixedThreadPool(1), new CollectingMetadataApplier(duration), new ArrayList<>(), @@ -115,11 +117,13 @@ public EventOperatorTestHarness( Set enabledEventTypes) { this.operator = operator; this.numOutputs = numOutputs; + this.mockedContext = + new MockedOperatorCoordinatorContext( + SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()); schemaRegistry = new SchemaRegistry( "SchemaOperator", - new MockOperatorCoordinatorContext( - SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), + mockedContext, Executors.newFixedThreadPool(1), new CollectingMetadataApplier(duration, enabledEventTypes), new ArrayList<>(), @@ -136,11 +140,13 @@ public EventOperatorTestHarness( Set errorsOnEventTypes) { this.operator = operator; this.numOutputs = numOutputs; + this.mockedContext = + new MockedOperatorCoordinatorContext( + SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()); schemaRegistry = new SchemaRegistry( "SchemaOperator", - new MockOperatorCoordinatorContext( - SCHEMA_OPERATOR_ID, Thread.currentThread().getContextClassLoader()), + mockedContext, Executors.newFixedThreadPool(1), new CollectingMetadataApplier( duration, enabledEventTypes, errorsOnEventTypes), @@ -200,6 +206,14 @@ public Schema getLatestEvolvedSchema(TableId tableId) throws Exception { .orElse(null); } + public boolean isJobFailed() { + return mockedContext.isJobFailed(); + } + + public Throwable getJobFailureCause() { + return mockedContext.getFailureCause(); + } + @Override public void close() throws Exception { operator.close(); diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java new file mode 100644 index 0000000000..19ab961eea --- /dev/null +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/MockedOperatorCoordinatorContext.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.runtime.testutils.operators; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.operators.coordination.MockOperatorCoordinatorContext; + +/** + * This is a mocked version of Operator coordinator context that stores failure cause for testing + * purposes only. + */ +public class MockedOperatorCoordinatorContext extends MockOperatorCoordinatorContext { + public MockedOperatorCoordinatorContext( + OperatorID operatorID, ClassLoader userCodeClassLoader) { + super(operatorID, userCodeClassLoader); + } + + private Throwable failureCause; + + @Override + public void failJob(Throwable cause) { + super.failJob(cause); + failureCause = cause; + } + + public Throwable getFailureCause() { + return failureCause; + } +}