Skip to content

Commit

Permalink
[minor][cdc-common] Improve the java doc of translators
Browse files Browse the repository at this point in the history
This closes apache#2937.
  • Loading branch information
Leonard Xu committed Apr 2, 2024
1 parent 38f172d commit 094500e
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,52 +96,49 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

// Source
// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(pipelineDef.getSource(), env, pipelineDef.getConfig());

// Transform Schema
// Build TransformSchemaOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
stream = transformTranslator.translateSchema(stream, pipelineDef.getTransforms());

// Schema operator
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
pipelineDef
.getConfig()
.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID));

OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

// Transform Data
// Build TransformDataOperator for processing Data Event
stream =
transformTranslator.translateData(
stream,
pipelineDef.getTransforms(),
schemaOperatorIDGenerator.generate(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));

// Route
// Build Router used to route Event
RouteTranslator routeTranslator = new RouteTranslator();
stream = routeTranslator.translate(stream, pipelineDef.getRoute());

// Create sink in advance as schema operator requires MetadataApplier
// Build DataSink in advance as schema operator requires MetadataApplier
DataSink dataSink = createDataSink(pipelineDef.getSink(), pipelineDef.getConfig());

stream =
schemaOperatorTranslator.translate(
stream, parallelism, dataSink.getMetadataApplier());

// Add partitioner
// Build Partitioner used to shuffle Event
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
stream =
partitioningTranslator.translate(
stream, parallelism, parallelism, schemaOperatorIDGenerator.generate());

// Sink
// Build Sink Operator
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
sinkTranslator.translate(
pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory;

/** Translator for building sink into the DataStream. */
/** Translator used to build {@link DataSink} for given {@link DataStream}. */
@Internal
public class DataSinkTranslator {

private static final String SINK_WRITER_PREFIX = "Sink Writer: ";
private static final String SINK_COMMITTER_PREFIX = "Sink Committer: ";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@
import org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils;
import org.apache.flink.cdc.composer.utils.FactoryDiscoveryUtils;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
* Translator for building source and generate a {@link
* org.apache.flink.streaming.api.datastream.DataStream}.
*/
/** Translator used to build {@link DataSource} which will generate a {@link DataStream}. */
@Internal
public class DataSourceTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.streaming.api.datastream.DataStream;

/** Translator for building partitioning related transformations. */
/**
* Translator used to build {@link PrePartitionOperator}, {@link EventPartitioner} and {@link
* PostPartitionProcessor} which are responsible for events partition.
*/
@Internal
public class PartitioningTranslator {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@

import java.util.List;

/** Translator for router. */
/** Translator used to build {@link RouteFunction}. */
public class RouteTranslator {

public DataStream<Event> translate(DataStream<Event> input, List<RouteDef> routes) {
if (routes.isEmpty()) {
return input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

/** Translator for building {@link SchemaOperator} into DataStream. */
/** Translator used to build {@link SchemaOperator} for schema event process. */
@Internal
public class SchemaOperatorTranslator {
private final SchemaChangeBehavior schemaChangeBehavior;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@

import java.util.List;

/** Translator for transform schema. */
/**
* Translator used to build {@link TransformSchemaOperator} and {@link TransformDataOperator} for
* event transform.
*/
public class TransformTranslator {

public DataStream<Event> translateSchema(
Expand Down

0 comments on commit 094500e

Please sign in to comment.