From e128ccc636f2d9cac3a35d5083b47fe8609dbfcb Mon Sep 17 00:00:00 2001 From: Carl-Zhou-CN <1058249259@qq.com> Date: Mon, 23 Dec 2024 10:10:40 +0800 Subject: [PATCH] [Feature][Transform-V2] Spark support transform with multi-table (#8340) --- .../execution/TransformExecuteProcessor.java | 48 ++++++++--------- .../seatunnel/e2e/transform/TestCopyIT.java | 6 --- .../e2e/transform/TestEmbeddingIT.java | 4 -- .../seatunnel/e2e/transform/TestFilterIT.java | 6 --- .../e2e/transform/TestFilterRowKindIT.java | 6 --- .../seatunnel/e2e/transform/TestLLMIT.java | 6 --- .../TestRowKindExtractorTransformIT.java | 6 --- .../seatunnel/e2e/transform/TestSplitIT.java | 6 --- .../e2e/transform/TestDynamicCompileIT.java | 6 --- .../e2e/transform/TestFieldMapperIT.java | 6 --- .../transform/TestJsonPathTransformIT.java | 6 --- .../e2e/transform/TestMetadataIT.java | 6 --- .../seatunnel/e2e/transform/TestRenameIT.java | 6 --- .../e2e/transform/TestReplaceIT.java | 6 --- .../seatunnel/e2e/transform/TestSQLIT.java | 4 -- .../spark/execution/MultiTableManager.java | 36 +++++++++++++ .../serialization/SeaTunnelRowConverter.java | 52 ++++++++++--------- 17 files changed, 84 insertions(+), 132 deletions(-) diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java index cce8b9b5584..492af1ad73d 100644 --- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java @@ -22,9 +22,9 @@ import org.apache.seatunnel.api.common.JobContext; import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.configuration.util.ConfigValidator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.transform.SeaTunnelFlatMapTransform; import org.apache.seatunnel.api.transform.SeaTunnelMapTransform; @@ -34,8 +34,7 @@ import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFactoryDiscovery; import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery; import org.apache.seatunnel.translation.spark.execution.DatasetTableInfo; -import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter; -import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; +import org.apache.seatunnel.translation.spark.execution.MultiTableManager; import org.apache.commons.collections.CollectionUtils; import org.apache.spark.api.java.function.FlatMapFunction; @@ -43,14 +42,12 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; import org.apache.spark.sql.catalyst.encoders.RowEncoder; -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.catalyst.expressions.GenericRow; import lombok.extern.slf4j.Slf4j; import java.net.URL; import java.util.ArrayList; -import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -141,7 +138,7 @@ public List execute(List upstreamDataStreams pluginOutputIdentifier, new DatasetTableInfo( inputDataset, - Collections.singletonList(transform.getProducedCatalogTable()), + transform.getProducedCatalogTables(), pluginOutputIdentifier)); } catch (Exception e) { throw new TaskExecuteException( @@ -155,55 +152,52 @@ public List execute(List upstreamDataStreams } private Dataset sparkTransform(SeaTunnelTransform transform, DatasetTableInfo tableInfo) { + MultiTableManager inputManager = + new MultiTableManager(tableInfo.getCatalogTables().toArray(new CatalogTable[0])); + MultiTableManager outputManager = + new MultiTableManager( + (CatalogTable[]) + transform.getProducedCatalogTables().toArray(new CatalogTable[0])); Dataset stream = tableInfo.getDataset(); - SeaTunnelDataType inputDataType = - tableInfo.getCatalogTables().get(0).getSeaTunnelRowType(); - SeaTunnelDataType outputDataTYpe = - transform.getProducedCatalogTable().getSeaTunnelRowType(); - StructType outputSchema = (StructType) TypeConverterUtils.parcel(outputDataTYpe); - SeaTunnelRowConverter inputRowConverter = new SeaTunnelRowConverter(inputDataType); - SeaTunnelRowConverter outputRowConverter = new SeaTunnelRowConverter(outputDataTYpe); - ExpressionEncoder encoder = RowEncoder.apply(outputSchema); - + ExpressionEncoder encoder = RowEncoder.apply(outputManager.getTableSchema()); return stream.flatMap( - new TransformMapPartitionsFunction( - transform, inputRowConverter, outputRowConverter), + new TransformMapPartitionsFunction(transform, inputManager, outputManager), encoder) .filter(Objects::nonNull); } private static class TransformMapPartitionsFunction implements FlatMapFunction { private SeaTunnelTransform transform; - private SeaTunnelRowConverter inputRowConverter; - private SeaTunnelRowConverter outputRowConverter; + private MultiTableManager inputManager; + private MultiTableManager outputManager; public TransformMapPartitionsFunction( SeaTunnelTransform transform, - SeaTunnelRowConverter inputRowConverter, - SeaTunnelRowConverter outputRowConverter) { + MultiTableManager inputManager, + MultiTableManager outputManager) { this.transform = transform; - this.inputRowConverter = inputRowConverter; - this.outputRowConverter = outputRowConverter; + this.inputManager = inputManager; + this.outputManager = outputManager; } @Override public Iterator call(Row row) throws Exception { List rows = new ArrayList<>(); - SeaTunnelRow seaTunnelRow = inputRowConverter.unpack((GenericRowWithSchema) row); + SeaTunnelRow seaTunnelRow = inputManager.reconvert((GenericRow) row); if (transform instanceof SeaTunnelFlatMapTransform) { List seaTunnelRows = ((SeaTunnelFlatMapTransform) transform).flatMap(seaTunnelRow); if (CollectionUtils.isNotEmpty(seaTunnelRows)) { for (SeaTunnelRow seaTunnelRowTransform : seaTunnelRows) { - rows.add(outputRowConverter.parcel(seaTunnelRowTransform)); + rows.add(outputManager.convert(seaTunnelRowTransform)); } } } else if (transform instanceof SeaTunnelMapTransform) { SeaTunnelRow seaTunnelRowTransform = ((SeaTunnelMapTransform) transform).map(seaTunnelRow); if (seaTunnelRowTransform != null) { - rows.add(outputRowConverter.parcel(seaTunnelRowTransform)); + rows.add(outputManager.convert(seaTunnelRowTransform)); } } return rows.iterator(); diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java index d47819c4523..9217df50fdb 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestCopyIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -35,10 +33,6 @@ public void testCopy(TestContainer container) throws IOException, InterruptedExc Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testCopyMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java index f71beac3d92..034479a3027 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestEmbeddingIT.java @@ -93,10 +93,6 @@ public void testEmbedding(TestContainer container) throws IOException, Interrupt Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testEmbeddingMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterIT.java index 8d3c458081d..8d0f8c7c274 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -35,10 +33,6 @@ public void testFilter(TestContainer container) throws IOException, InterruptedE Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testFilterMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java index b1eaede35d6..c4104f734c9 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestFilterRowKindIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -43,10 +41,6 @@ public void testFilterRowKind(TestContainer container) Assertions.assertEquals(0, execResult3.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testFilterRowKindMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestLLMIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestLLMIT.java index 79606fd1257..495e2bd714a 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestLLMIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestLLMIT.java @@ -18,9 +18,7 @@ package org.apache.seatunnel.e2e.transform; import org.apache.seatunnel.e2e.common.TestResource; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.Assertions; @@ -90,10 +88,6 @@ public void testLLMWithOpenAI(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testLLMWithOpenAIMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java index da334c73627..a82ff2fb37a 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestRowKindExtractorTransformIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -40,10 +38,6 @@ public void testRowKindExtractorTransform(TestContainer container) Assertions.assertEquals(0, execResult2.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testRowKindExtractorMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestSplitIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestSplitIT.java index 643cd20eebc..123b52fb6e0 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestSplitIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-1/src/test/java/org/apache/seatunnel/e2e/transform/TestSplitIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -35,10 +33,6 @@ public void testSplit(TestContainer container) throws IOException, InterruptedEx Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testSplitMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java index aa2effaa38e..757c38dfcfa 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestDynamicCompileIT.java @@ -19,9 +19,7 @@ import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.apache.seatunnel.e2e.common.junit.TestContainerExtension; import org.junit.jupiter.api.AfterAll; @@ -118,10 +116,6 @@ public void testDynamicSingleCompileJava(TestContainer container) Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testDynamicSingleCompileJavaMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java index dbb5b4d107e..7127a9f6f26 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestFieldMapperIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -39,10 +37,6 @@ public void testFieldMapper(TestContainer container) throws IOException, Interru Assertions.assertEquals(0, execResult1.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testFieldMapperMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java index 5a485f912e7..3bc4b0cc70a 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestJsonPathTransformIT.java @@ -16,9 +16,7 @@ */ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -33,10 +31,6 @@ public void testBasicType(TestContainer container) throws Exception { Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testBasicTypeMultiTable(TestContainer container) throws Exception { Container.ExecResult execResult = diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestMetadataIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestMetadataIT.java index e5ce7d27ff5..ccc4ef77202 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestMetadataIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestMetadataIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -29,10 +27,6 @@ public class TestMetadataIT extends TestSuiteBase { - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testMetadataMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java index 8b2e55551b1..7a0087747cd 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestRenameIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -29,10 +27,6 @@ public class TestRenameIT extends TestSuiteBase { - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testRenameMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestReplaceIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestReplaceIT.java index 7a73452207f..2c6b60f433d 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestReplaceIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestReplaceIT.java @@ -17,9 +17,7 @@ package org.apache.seatunnel.e2e.transform; -import org.apache.seatunnel.e2e.common.container.EngineType; import org.apache.seatunnel.e2e.common.container.TestContainer; -import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.TestTemplate; @@ -35,10 +33,6 @@ public void testReplace(TestContainer container) throws IOException, Interrupted Assertions.assertEquals(0, execResult.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testReplaceMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index 800cd34e313..042fcaf15fc 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -79,10 +79,6 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr Assertions.assertEquals(0, splitSql.getExitCode()); } - @DisabledOnContainer( - value = {}, - type = {EngineType.SPARK}, - disabledReason = "Currently SPARK do not multi table transform") @TestTemplate public void testSQLTransformMultiTable(TestContainer container) throws IOException, InterruptedException { diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/execution/MultiTableManager.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/execution/MultiTableManager.java index ab470ce3dd4..b33fdeb26c4 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/execution/MultiTableManager.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/execution/MultiTableManager.java @@ -25,9 +25,11 @@ import org.apache.seatunnel.translation.spark.serialization.InternalMultiRowCollector; import org.apache.seatunnel.translation.spark.serialization.InternalRowCollector; import org.apache.seatunnel.translation.spark.serialization.InternalRowConverter; +import org.apache.seatunnel.translation.spark.serialization.SeaTunnelRowConverter; import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.sql.types.StructType; import lombok.extern.slf4j.Slf4j; @@ -45,8 +47,11 @@ public class MultiTableManager implements Serializable { private Map rowSerializationMap; + private Map genericRowSerializationMap; private InternalRowConverter rowSerialization; + + private SeaTunnelRowConverter genericRowSerialization; private CatalogTable mergeCatalogTable; private boolean isMultiTable = false; @@ -67,8 +72,23 @@ public MultiTableManager(CatalogTable[] catalogTables) { new InternalRowConverter( mergeCatalogTable.getSeaTunnelRowType(), columnWithIndex.getIndex()))); + genericRowSerializationMap = + columnWithIndexes.stream() + .collect( + Collectors.toMap( + columnWithIndex -> + columnWithIndex + .getCatalogTable() + .getTablePath() + .toString(), + columnWithIndex -> + new SeaTunnelRowConverter( + mergeCatalogTable.getSeaTunnelRowType(), + columnWithIndex.getIndex()))); } else { rowSerialization = new InternalRowConverter(catalogTables[0].getSeaTunnelRowType()); + genericRowSerialization = + new SeaTunnelRowConverter(catalogTables[0].getSeaTunnelRowType()); } log.info("Multi-table enabled:{}", isMultiTable); log.info( @@ -88,6 +108,22 @@ public SeaTunnelRow reconvert(InternalRow record) throws IOException { return rowSerialization.reconvert(record); } + public SeaTunnelRow reconvert(GenericRow record) throws IOException { + if (isMultiTable) { + String tableId = record.getString(1); + return genericRowSerializationMap.get(tableId).reconvert(record); + } + return genericRowSerialization.reconvert(record); + } + + public GenericRow convert(SeaTunnelRow record) throws IOException { + if (isMultiTable) { + String tableId = record.getTableId(); + return genericRowSerializationMap.get(tableId).convert(record); + } + return genericRowSerialization.convert(record); + } + public StructType getTableSchema() { return (StructType) TypeConverterUtils.parcel(mergeCatalogTable.getSeaTunnelRowType()); } diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java index 33d1b49fb1c..d76aa61e465 100644 --- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java +++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/serialization/SeaTunnelRowConverter.java @@ -25,10 +25,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.translation.serialization.RowConverter; -import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils; -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; -import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.catalyst.expressions.GenericRow; import org.apache.spark.unsafe.types.UTF8String; import scala.Tuple2; @@ -45,35 +43,41 @@ import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; +import java.util.stream.IntStream; + +public class SeaTunnelRowConverter extends RowConverter { + + private final int[] indexes; -public class SeaTunnelRowConverter extends RowConverter { public SeaTunnelRowConverter(SeaTunnelDataType dataType) { super(dataType); + indexes = IntStream.range(0, ((SeaTunnelRowType) dataType).getTotalFields()).toArray(); + } + + public SeaTunnelRowConverter(SeaTunnelDataType dataType, int[] indexes) { + super(dataType); + this.indexes = indexes; } // SeaTunnelRow To GenericRow @Override - public SeaTunnelRow convert(SeaTunnelRow seaTunnelRow) throws IOException { - validate(seaTunnelRow); - GenericRowWithSchema rowWithSchema = (GenericRowWithSchema) convert(seaTunnelRow, dataType); - SeaTunnelRow newRow = new SeaTunnelRow(rowWithSchema.values()); - return newRow; + public GenericRow convert(SeaTunnelRow seaTunnelRow) throws IOException { + return parcel(seaTunnelRow); } - public GenericRowWithSchema parcel(SeaTunnelRow seaTunnelRow) { + public GenericRow parcel(SeaTunnelRow seaTunnelRow) { SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; int arity = rowType.getTotalFields(); Object[] fields = new Object[arity + 2]; fields[0] = seaTunnelRow.getRowKind().toByteValue(); fields[1] = seaTunnelRow.getTableId(); - StructType schema = (StructType) TypeConverterUtils.parcel(rowType); - for (int i = 0; i < arity; i++) { - Object fieldValue = convert(seaTunnelRow.getField(i), rowType.getFieldType(i)); + for (int i = 0; i < indexes.length; i++) { + Object fieldValue = convert(seaTunnelRow.getField(i), rowType.getFieldType(indexes[i])); if (fieldValue != null) { - fields[i + 2] = fieldValue; + fields[indexes[i] + 2] = fieldValue; } } - return new GenericRowWithSchema(fields, schema); + return new GenericRow(fields); } private Object convert(Object field, SeaTunnelDataType dataType) { @@ -121,17 +125,16 @@ private Object convert(Object field, SeaTunnelDataType dataType) { } } - private GenericRowWithSchema convertRow(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { + private GenericRow convertRow(SeaTunnelRow seaTunnelRow, SeaTunnelRowType rowType) { int arity = rowType.getTotalFields(); Object[] values = new Object[arity]; - StructType schema = (StructType) TypeConverterUtils.convert(rowType); for (int i = 0; i < arity; i++) { Object fieldValue = convert(seaTunnelRow.getField(i), rowType.getFieldType(i)); if (fieldValue != null) { values[i] = fieldValue; } } - return new GenericRowWithSchema(values, schema); + return new GenericRow(values); } private scala.collection.immutable.HashMap convertMap( @@ -167,11 +170,11 @@ private WrappedArray.ofRef convertArray(Object[] arrayData, ArrayType a // GenericRow To SeaTunnel @Override - public SeaTunnelRow reconvert(SeaTunnelRow engineRow) throws IOException { - return (SeaTunnelRow) reconvert(engineRow, dataType); + public SeaTunnelRow reconvert(GenericRow engineRow) throws IOException { + return unpack(engineRow); } - public SeaTunnelRow unpack(GenericRowWithSchema engineRow) throws IOException { + public SeaTunnelRow unpack(GenericRow engineRow) throws IOException { SeaTunnelRowType rowType = (SeaTunnelRowType) dataType; RowKind rowKind = RowKind.fromByteValue(engineRow.getByte(0)); String tableId = engineRow.getString(1); @@ -191,9 +194,8 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { } switch (dataType.getSqlType()) { case ROW: - if (field instanceof GenericRowWithSchema) { - return createFromGenericRow( - (GenericRowWithSchema) field, (SeaTunnelRowType) dataType); + if (field instanceof GenericRow) { + return createFromGenericRow((GenericRow) field, (SeaTunnelRowType) dataType); } return reconvert((SeaTunnelRow) field, (SeaTunnelRowType) dataType); case DATE: @@ -216,7 +218,7 @@ private Object reconvert(Object field, SeaTunnelDataType dataType) { } } - private SeaTunnelRow createFromGenericRow(GenericRowWithSchema row, SeaTunnelRowType type) { + private SeaTunnelRow createFromGenericRow(GenericRow row, SeaTunnelRowType type) { Object[] fields = row.values(); Object[] newFields = new Object[fields.length]; for (int idx = 0; idx < fields.length; idx++) {