Skip to content

Commit

Permalink
[Refactor][core] Unify transformFactory creation logic (apache#8574)
Browse files Browse the repository at this point in the history
  • Loading branch information
liugddx authored Feb 6, 2025
1 parent 6468a1b commit 99fa19d
Show file tree
Hide file tree
Showing 26 changed files with 225 additions and 215 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.api.table.factory;

import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.common.PluginIdentifier;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
Expand All @@ -37,6 +38,9 @@
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;

import org.slf4j.Logger;
Expand Down Expand Up @@ -106,7 +110,10 @@ Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>> restoreAndPrepare
if (fallback) {
source =
fallbackCreateSource.apply(
PluginIdentifier.of("seatunnel", "source", factoryId));
PluginIdentifier.of(
EngineType.SEATUNNEL.getEngine(),
PluginType.SOURCE.getType(),
factoryId));
source.prepare(options.toConfig());

} else {
Expand Down Expand Up @@ -205,7 +212,10 @@ SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> createAndPrepareSi
if (fallback) {
SeaTunnelSink sink =
fallbackCreateSink.apply(
PluginIdentifier.of("seatunnel", "sink", factoryId));
PluginIdentifier.of(
EngineType.SEATUNNEL.getEngine(),
PluginType.SINK.getType(),
factoryId));
sink.prepare(config.toConfig());
sink.setTypeInfo(catalogTable.getSeaTunnelRowType());

Expand Down Expand Up @@ -273,6 +283,23 @@ public static <T extends Factory> URL getFactoryUrl(T factory) {
return factory.getClass().getProtectionDomain().getCodeSource().getLocation();
}

public static <T extends Factory> Optional<T> discoverOptionalFactory(
ClassLoader classLoader,
Class<T> factoryClass,
String factoryIdentifier,
Function<String, T> discoverOptionalFactoryFunction) {

if (discoverOptionalFactoryFunction != null) {
T apply = discoverOptionalFactoryFunction.apply(factoryIdentifier);
if (apply != null) {
return Optional.of(apply);
} else {
return Optional.empty();
}
}
return discoverOptionalFactory(classLoader, factoryClass, factoryIdentifier);
}

public static <T extends Factory> Optional<T> discoverOptionalFactory(
ClassLoader classLoader, Class<T> factoryClass, String factoryIdentifier) {
final List<T> foundFactories = discoverFactories(classLoader, factoryClass);
Expand Down Expand Up @@ -436,4 +463,14 @@ private static <T extends Factory> boolean isFallback(
}
return false;
}

public static void ensureJobModeMatch(JobContext jobContext, SeaTunnelSource source) {
if (jobContext.getJobMode() == JobMode.BATCH
&& source.getBoundedness()
== org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
throw new UnsupportedOperationException(
String.format(
"'%s' source don't support off-line job.", source.getPluginName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.seatunnel.core.starter.enums;
package org.apache.seatunnel.common.constants;

/** Engine type enum */
public enum EngineType {
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.seatunnel.core.starter.flink.execution;

import org.apache.seatunnel.shade.com.google.common.collect.Lists;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.common.CommonOptions;
Expand All @@ -35,9 +36,10 @@
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.PluginUtil;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
Expand All @@ -56,6 +58,7 @@

import static org.apache.seatunnel.api.common.CommonOptions.PLUGIN_NAME;
import static org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode.HANDLE_SAVE_MODE_FAILED;
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverOptionalFactory;

@SuppressWarnings({"unchecked", "rawtypes"})
@Slf4j
Expand All @@ -77,14 +80,34 @@ protected List<Optional<? extends Factory>> initializePlugins(
new SeaTunnelFactoryDiscovery(TableSinkFactory.class, ADD_URL_TO_CLASSLOADER);
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
Function<String, TableSinkFactory> discoverOptionalFactoryFunction =
pluginName ->
(TableSinkFactory)
factoryDiscovery
.createOptionalPluginInstance(
PluginIdentifier.of(
EngineType.SEATUNNEL.getEngine(),
PluginType.SINK.getType(),
pluginName))
.orElse(null);

return pluginConfigs.stream()
.map(
sinkConfig ->
PluginUtil.createSinkFactory(
factoryDiscovery,
sinkPluginDiscovery,
sinkConfig,
jarPaths))
sinkConfig -> {
jarPaths.addAll(
sinkPluginDiscovery.getPluginJarPaths(
Lists.newArrayList(
PluginIdentifier.of(
EngineType.SEATUNNEL.getEngine(),
PluginType.SINK.getType(),
sinkConfig.getString(
PLUGIN_NAME.key())))));
return discoverOptionalFactory(
classLoader,
TableSinkFactory.class,
sinkConfig.getString(PLUGIN_NAME.key()),
discoverOptionalFactoryFunction);
})
.distinct()
.collect(Collectors.toList());
}
Expand All @@ -95,7 +118,6 @@ public List<DataStreamTableInfo> execute(List<DataStreamTableInfo> upstreamDataS
SeaTunnelSinkPluginDiscovery sinkPluginDiscovery =
new SeaTunnelSinkPluginDiscovery(ADD_URL_TO_CLASSLOADER);
DataStreamTableInfo input = upstreamDataStreams.get(upstreamDataStreams.size() - 1);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Function<PluginIdentifier, SeaTunnelSink> fallbackCreateSink =
sinkPluginDiscovery::createPluginInstance;
for (int i = 0; i < plugins.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.seatunnel.core.starter.flink;

import org.apache.seatunnel.common.constants.EngineType;
import org.apache.seatunnel.core.starter.SeaTunnel;
import org.apache.seatunnel.core.starter.enums.EngineType;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.utils.CommandLineUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
public abstract class FlinkAbstractPluginExecuteProcessor<T>
implements PluginExecuteProcessor<DataStreamTableInfo, FlinkRuntimeEnvironment> {

protected static final String ENGINE_TYPE = "seatunnel";

protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
(classLoader, url) -> {
if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
Expand All @@ -57,6 +55,7 @@ public abstract class FlinkAbstractPluginExecuteProcessor<T>
protected JobContext jobContext;
protected final List<T> plugins;
protected final Config envConfig;
protected final ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

protected FlinkAbstractPluginExecuteProcessor(
List<URL> jarPaths,
Expand Down
Loading

0 comments on commit 99fa19d

Please sign in to comment.