diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java index 9878dec364dd..b2688f8e6436 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/IOUtils.java @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import java.io.BufferedReader; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; @@ -93,6 +94,13 @@ public static void copyBytes(final InputStream in, final OutputStream out) throw // Stream input skipping // ------------------------------------------------------------------------ + /** Reads all into a bytes. */ + public static byte[] readFully(InputStream in, boolean close) throws IOException { + ByteArrayOutputStream output = new ByteArrayOutputStream(); + copyBytes(in, output, BLOCKSIZE, close); + return output.toByteArray(); + } + /** * Reads len bytes in a loop. * diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java new file mode 100644 index 000000000000..15a8df420b73 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta08Serializer.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.stats.SimpleStatsConverter; +import org.apache.paimon.types.ArrayType; +import org.apache.paimon.types.BigIntType; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.ObjectSerializer; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; +import static org.apache.paimon.utils.InternalRowUtils.toStringArrayData; +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.newBytesType; +import static org.apache.paimon.utils.SerializationUtils.newStringType; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link DataFileMeta}. */ +public class DataFileMeta08Serializer extends ObjectSerializer { + + private static final long serialVersionUID = 1L; + + public DataFileMeta08Serializer() { + super(schemaFor08()); + } + + private static RowType schemaFor08() { + List fields = new ArrayList<>(); + fields.add(new DataField(0, "_FILE_NAME", newStringType(false))); + fields.add(new DataField(1, "_FILE_SIZE", new BigIntType(false))); + fields.add(new DataField(2, "_ROW_COUNT", new BigIntType(false))); + fields.add(new DataField(3, "_MIN_KEY", newBytesType(false))); + fields.add(new DataField(4, "_MAX_KEY", newBytesType(false))); + fields.add(new DataField(5, "_KEY_STATS", SimpleStatsConverter.schema())); + fields.add(new DataField(6, "_VALUE_STATS", SimpleStatsConverter.schema())); + fields.add(new DataField(7, "_MIN_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new DataField(8, "_MAX_SEQUENCE_NUMBER", new BigIntType(false))); + fields.add(new DataField(9, "_SCHEMA_ID", new BigIntType(false))); + fields.add(new DataField(10, "_LEVEL", new IntType(false))); + fields.add(new DataField(11, "_EXTRA_FILES", new ArrayType(false, newStringType(false)))); + fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS())); + fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true))); + fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true))); + return new RowType(fields); + } + + @Override + public InternalRow toRow(DataFileMeta meta) { + return GenericRow.of( + BinaryString.fromString(meta.fileName()), + meta.fileSize(), + meta.rowCount(), + serializeBinaryRow(meta.minKey()), + serializeBinaryRow(meta.maxKey()), + meta.keyStats().toRow(), + meta.valueStats().toRow(), + meta.minSequenceNumber(), + meta.maxSequenceNumber(), + meta.schemaId(), + meta.level(), + toStringArrayData(meta.extraFiles()), + meta.creationTime(), + meta.deleteRowCount().orElse(null), + meta.embeddedIndex()); + } + + @Override + public DataFileMeta fromRow(InternalRow row) { + return new DataFileMeta( + row.getString(0).toString(), + row.getLong(1), + row.getLong(2), + deserializeBinaryRow(row.getBinary(3)), + deserializeBinaryRow(row.getBinary(4)), + SimpleStats.fromRow(row.getRow(5, 3)), + SimpleStats.fromRow(row.getRow(6, 3)), + row.getLong(7), + row.getLong(8), + row.getLong(9), + row.getInt(10), + fromStringArrayData(row.getArray(11)), + row.getTimestamp(12, 3), + row.isNullAt(13) ? null : row.getLong(13), + row.isNullAt(14) ? null : row.getBinary(14), + null); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java index 3ae51b7aa379..c73c12ffa206 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCommittableSerializer.java @@ -33,7 +33,7 @@ /** {@link VersionedSerializer} for {@link ManifestCommittable}. */ public class ManifestCommittableSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 2; + private static final int CURRENT_VERSION = 3; private final CommitMessageSerializer commitMessageSerializer; @@ -75,14 +75,13 @@ private void serializeOffsets(DataOutputViewStreamWrapper view, Map CURRENT_VERSION) { throw new UnsupportedOperationException( - "Expecting ManifestCommittable version to be " + "Expecting ManifestCommittableSerializer version to be smaller or equal than " + CURRENT_VERSION + ", but found " + version - + ".\nManifestCommittable is not a compatible data structure. " - + "Please restart the job afresh (do not recover from savepoint)."); + + "."); } DataInputDeserializer view = new DataInputDeserializer(serialized); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java index a7b566b32994..53a1f9455c8b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/CommitMessageSerializer.java @@ -21,6 +21,8 @@ import org.apache.paimon.data.serializer.VersionedSerializer; import org.apache.paimon.index.IndexFileMetaSerializer; import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFileMeta08Serializer; import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.DataInputDeserializer; @@ -28,10 +30,12 @@ import org.apache.paimon.io.DataOutputView; import org.apache.paimon.io.DataOutputViewStreamWrapper; import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.utils.ObjectSerializer; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; @@ -40,11 +44,13 @@ /** {@link VersionedSerializer} for {@link CommitMessage}. */ public class CommitMessageSerializer implements VersionedSerializer { - private static final int CURRENT_VERSION = 2; + private static final int CURRENT_VERSION = 3; private final DataFileMetaSerializer dataFileSerializer; private final IndexFileMetaSerializer indexEntrySerializer; + private DataFileMeta08Serializer dataFile08Serializer; + public CommitMessageSerializer() { this.dataFileSerializer = new DataFileMetaSerializer(); this.indexEntrySerializer = new IndexFileMetaSerializer(); @@ -86,34 +92,36 @@ private void serialize(CommitMessage obj, DataOutputView view) throws IOExceptio @Override public CommitMessage deserialize(int version, byte[] serialized) throws IOException { - checkVersion(version); DataInputDeserializer view = new DataInputDeserializer(serialized); - return deserialize(view); + return deserialize(version, view); } public List deserializeList(int version, DataInputView view) throws IOException { - checkVersion(version); int length = view.readInt(); List list = new ArrayList<>(length); for (int i = 0; i < length; i++) { - list.add(deserialize(view)); + list.add(deserialize(version, view)); } return list; } - private void checkVersion(int version) { - if (version != CURRENT_VERSION) { + private CommitMessage deserialize(int version, DataInputView view) throws IOException { + ObjectSerializer dataFileSerializer; + if (version == CURRENT_VERSION) { + dataFileSerializer = this.dataFileSerializer; + } else if (version <= 2) { + if (dataFile08Serializer == null) { + dataFile08Serializer = new DataFileMeta08Serializer(); + } + dataFileSerializer = dataFile08Serializer; + } else { throw new UnsupportedOperationException( - "Expecting FileCommittable version to be " + "Expecting CommitMessageSerializer version to be smaller or equal than " + CURRENT_VERSION + ", but found " + version - + ".\nFileCommittable is not a compatible data structure. " - + "Please restart the job afresh (do not recover from savepoint)."); + + "."); } - } - - private CommitMessage deserialize(DataInputView view) throws IOException { return new CommitMessageImpl( deserializeBinaryRow(view), view.readInt(), @@ -127,6 +135,8 @@ private CommitMessage deserialize(DataInputView view) throws IOException { dataFileSerializer.deserializeList(view)), new IndexIncrement( indexEntrySerializer.deserializeList(view), - indexEntrySerializer.deserializeList(view))); + version <= 2 + ? Collections.emptyList() + : indexEntrySerializer.deserializeList(view))); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java new file mode 100644 index 000000000000..13b9d1184483 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerCompatibilityTest.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.stats.SimpleStats; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.Pair; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.LocalDateTime; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; + +import static org.apache.paimon.data.BinaryArray.fromLongArray; +import static org.apache.paimon.data.BinaryRow.singleColumn; +import static org.assertj.core.api.Assertions.assertThat; + +/** Compatibility Test for {@link ManifestCommittableSerializer}. */ +public class ManifestCommittableSerializerCompatibilityTest { + + @Test + public void testProduction() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + FileSource.COMPACT); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap> dvRanges = new LinkedHashMap<>(); + dvRanges.put("dv_key1", Pair.of(1, 2)); + dvRanges.put("dv_key2", Pair.of(3, 4)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } + + @Test + public void testCompatibilityToVersion2() throws IOException { + SimpleStats keyStats = + new SimpleStats( + singleColumn("min_key"), + singleColumn("max_key"), + fromLongArray(new Long[] {0L})); + SimpleStats valueStats = + new SimpleStats( + singleColumn("min_value"), + singleColumn("max_value"), + fromLongArray(new Long[] {0L})); + DataFileMeta dataFile = + new DataFileMeta( + "my_file", + 1024 * 1024, + 1024, + singleColumn("min_key"), + singleColumn("max_key"), + keyStats, + valueStats, + 15, + 200, + 5, + 3, + Arrays.asList("extra1", "extra2"), + Timestamp.fromLocalDateTime(LocalDateTime.parse("2022-03-02T20:20:12")), + 11L, + new byte[] {1, 2, 4}, + null); + List dataFiles = Collections.singletonList(dataFile); + + LinkedHashMap> dvRanges = new LinkedHashMap<>(); + dvRanges.put("dv_key1", Pair.of(1, 2)); + dvRanges.put("dv_key2", Pair.of(3, 4)); + IndexFileMeta indexFile = + new IndexFileMeta("my_index_type", "my_index_file", 1024 * 100, 1002, dvRanges); + List indexFiles = Collections.singletonList(indexFile); + + CommitMessageImpl commitMessage = + new CommitMessageImpl( + singleColumn("my_partition"), + 11, + new DataIncrement(dataFiles, dataFiles, dataFiles), + new CompactIncrement(dataFiles, dataFiles, dataFiles), + new IndexIncrement(indexFiles)); + + ManifestCommittable manifestCommittable = + new ManifestCommittable( + 5, + 202020L, + Collections.singletonMap(5, 555L), + Collections.singletonList(commitMessage)); + + ManifestCommittableSerializer serializer = new ManifestCommittableSerializer(); + byte[] bytes = serializer.serialize(manifestCommittable); + ManifestCommittable deserialized = serializer.deserialize(3, bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + + byte[] v2Bytes = + IOUtils.readFully( + ManifestCommittableSerializerCompatibilityTest.class + .getClassLoader() + .getResourceAsStream("compatibility/manifest-committable-v2"), + true); + deserialized = serializer.deserialize(2, v2Bytes); + assertThat(deserialized).isEqualTo(manifestCommittable); + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java index 47a2107426d1..eb9105189b71 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CommitMessageSerializerTest.java @@ -48,7 +48,7 @@ public void test() throws IOException { CommitMessageImpl committable = new CommitMessageImpl(row(0), 1, dataIncrement, compactIncrement, indexIncrement); CommitMessageImpl newCommittable = - (CommitMessageImpl) serializer.deserialize(2, serializer.serialize(committable)); + (CommitMessageImpl) serializer.deserialize(3, serializer.serialize(committable)); assertThat(newCommittable.compactIncrement()).isEqualTo(committable.compactIncrement()); assertThat(newCommittable.newFilesIncrement()).isEqualTo(committable.newFilesIncrement()); assertThat(newCommittable.indexIncrement()).isEqualTo(committable.indexIncrement()); diff --git a/paimon-core/src/test/resources/compatibility/manifest-committable-v2 b/paimon-core/src/test/resources/compatibility/manifest-committable-v2 new file mode 100644 index 000000000000..6522f5aed3e4 Binary files /dev/null and b/paimon-core/src/test/resources/compatibility/manifest-committable-v2 differ diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index d5d99d4c234b..aa833ef008bd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.FlinkConnectorOptions; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.Committer; import org.apache.paimon.flink.sink.CommitterOperator; @@ -160,6 +159,6 @@ protected OneInputStreamOperator crea protected CommittableStateManager createCommittableStateManager() { return new RestoreAndFailCommittableStateManager<>( - () -> new VersionedSerializerWrapper<>(new WrappedManifestCommittableSerializer())); + WrappedManifestCommittableSerializer::new); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index ba7c1bb4413e..02351a0371cd 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -21,7 +21,6 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.Options; @@ -181,6 +180,6 @@ protected DataStreamSink doCommit( protected CommittableStateManager createCommittableStateManager() { return new RestoreAndFailCommittableStateManager<>( - () -> new VersionedSerializerWrapper<>(new WrappedManifestCommittableSerializer())); + WrappedManifestCommittableSerializer::new); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index 0d6f245bad8c..f36dae4a83ce 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -18,7 +18,6 @@ package org.apache.paimon.flink.sink; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; import org.apache.paimon.table.FileStoreTable; @@ -56,7 +55,6 @@ protected Committer.Factory createCommitterFac @Override protected CommittableStateManager createCommittableStateManager() { - return new RestoreAndFailCommittableStateManager<>( - () -> new VersionedSerializerWrapper<>(new ManifestCommittableSerializer())); + return new RestoreAndFailCommittableStateManager<>(ManifestCommittableSerializer::new); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java index 32a9cd7be2f5..be795fbab6af 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RestoreAndFailCommittableStateManager.java @@ -18,13 +18,14 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.data.serializer.VersionedSerializer; +import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.utils.SerializableSupplier; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer; -import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState; @@ -48,14 +49,13 @@ public class RestoreAndFailCommittableStateManager private static final long serialVersionUID = 1L; /** The committable's serializer. */ - private final SerializableSupplier> - committableSerializer; + private final SerializableSupplier> committableSerializer; /** GlobalCommitT state of this job. Used to filter out previous successful commits. */ private ListState streamingCommitterState; public RestoreAndFailCommittableStateManager( - SerializableSupplier> committableSerializer) { + SerializableSupplier> committableSerializer) { this.committableSerializer = committableSerializer; } @@ -70,7 +70,7 @@ public void initializeState( new ListStateDescriptor<>( "streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE)), - committableSerializer.get()); + new VersionedSerializerWrapper<>(committableSerializer.get())); List restored = new ArrayList<>(); streamingCommitterState.get().forEach(restored::add); streamingCommitterState.clear(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index d6d7f434bac5..024a99c3e5ef 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -19,7 +19,6 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestCommittableSerializer; import org.apache.paimon.table.FileStoreTable; @@ -60,9 +59,7 @@ public void testBatchWriteGeneratorTag() throws Exception { table, initialCommitUser, new RestoreAndFailCommittableStateManager<>( - () -> - new VersionedSerializerWrapper<>( - new ManifestCommittableSerializer()))); + ManifestCommittableSerializer::new)); committerOperator.open(); TableCommitImpl tableCommit = table.newCommit(initialCommitUser); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index 672ccfd39288..58a7d0dce36e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -22,7 +22,6 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.CompactIncrement; @@ -580,9 +579,7 @@ public void testCommitMetrics() throws Exception { table, null, new RestoreAndFailCommittableStateManager<>( - () -> - new VersionedSerializerWrapper<>( - new ManifestCommittableSerializer()))); + ManifestCommittableSerializer::new)); OneInputStreamOperatorTestHarness testHarness = createTestHarness(operator); testHarness.open(); @@ -678,9 +675,7 @@ public void testParallelism() throws Exception { table, null, new RestoreAndFailCommittableStateManager<>( - () -> - new VersionedSerializerWrapper<>( - new ManifestCommittableSerializer()))); + ManifestCommittableSerializer::new)); return createTestHarness(operator); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 94cfb3670ff3..693e66e93b7f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -27,7 +27,6 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.flink.utils.TestingMetricUtils; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; @@ -644,9 +643,7 @@ public void testCommitMetrics() throws Exception { initialCommitUser, context -> new StoreMultiCommitter(catalogLoader, context), new RestoreAndFailCommittableStateManager<>( - () -> - new VersionedSerializerWrapper<>( - new WrappedManifestCommittableSerializer()))); + WrappedManifestCommittableSerializer::new)); return createTestHarness(operator); }