Skip to content

Commit

Permalink
[core] cut StoreSinkWriteState when write unaware bucket table
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored and hongli.wwj committed Aug 28, 2024
1 parent 98d2617 commit b87198c
Show file tree
Hide file tree
Showing 11 changed files with 558 additions and 271 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.sink.cdc;

import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;

/**
* A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema may change. If current
* known schema does not fit record schema, this operator will wait for schema changes.
*/
public abstract class AbstractCdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {

private static final long serialVersionUID = 1L;

public static final ConfigOption<Duration> RETRY_SLEEP_TIME =
ConfigOptions.key("cdc.retry-sleep-time")
.durationType()
.defaultValue(Duration.ofMillis(500));

private final long retrySleepMillis;

public AbstractCdcRecordStoreWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
table = table.copyWithLatestSchema();
super.initializeState(context);
initStateAndWriter(
context,
stateFilter,
getContainingTask().getEnvironment().getIOManager(),
commitUser);
}

@Override
protected boolean containLogSystem() {
return false;
}

@Override
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
break;
}
Thread.sleep(retrySleepMillis);
}
write.replace(table);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
Expand All @@ -40,7 +43,7 @@
public class CdcDynamicBucketWriteOperator extends TableWriteOperator<Tuple2<CdcRecord, Integer>> {

private static final long serialVersionUID = 1L;

private transient StoreSinkWriteState state;
private final long retrySleepMillis;

public CdcDynamicBucketWriteOperator(
Expand All @@ -56,6 +59,35 @@ public CdcDynamicBucketWriteOperator(
public void initializeState(StateInitializationContext context) throws Exception {
table = table.copyWithLatestSchema();
super.initializeState(context);
initStateAndWriter(
context,
stateFilter,
getContainingTask().getEnvironment().getIOManager(),
commitUser);
}

@Override
protected void initStateAndWriter(
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter,
IOManager ioManager,
String commitUser)
throws Exception {
state = new StoreSinkWriteState(context, stateFilter);
write =
storeSinkWriteProvider.provide(
table,
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
getMetricGroup());
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
state.snapshotState();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,53 @@

package org.apache.paimon.flink.sink.cdc;

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import java.io.IOException;
import java.time.Duration;
import java.util.Optional;

import static org.apache.paimon.flink.sink.cdc.CdcRecordUtils.toGenericRow;
import org.apache.flink.runtime.state.StateSnapshotContext;

/**
* A {@link PrepareCommitOperator} to write {@link CdcRecord}. Record schema may change. If current
* known schema does not fit record schema, this operator will wait for schema changes.
* A {@link PrepareCommitOperator} to write {@link CdcRecord} with StoreSinkWriteState. Record
* schema may change. If current known schema does not fit record schema, this operator will wait
* for schema changes.
*/
public class CdcRecordStoreWriteOperator extends TableWriteOperator<CdcRecord> {
public class CdcRecordStoreWriteOperator extends AbstractCdcRecordStoreWriteOperator {

private static final long serialVersionUID = 1L;

public static final ConfigOption<Duration> RETRY_SLEEP_TIME =
ConfigOptions.key("cdc.retry-sleep-time")
.durationType()
.defaultValue(Duration.ofMillis(500));

private final long retrySleepMillis;
private transient StoreSinkWriteState state;

public CdcRecordStoreWriteOperator(
FileStoreTable table,
StoreSinkWrite.Provider storeSinkWriteProvider,
String initialCommitUser) {
super(table, storeSinkWriteProvider, initialCommitUser);
this.retrySleepMillis =
table.coreOptions().toConfiguration().get(RETRY_SLEEP_TIME).toMillis();
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
table = table.copyWithLatestSchema();
super.initializeState(context);
protected void initStateAndWriter(
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter,
IOManager ioManager,
String commitUser)
throws Exception {
state = new StoreSinkWriteState(context, stateFilter);
write =
storeSinkWriteProvider.provide(
table,
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
getMetricGroup());
}

@Override
protected boolean containLogSystem() {
return false;
}

@Override
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
CdcRecord record = element.getValue();
Optional<GenericRow> optionalConverted = toGenericRow(record, table.schema().fields());
if (!optionalConverted.isPresent()) {
while (true) {
table = table.copyWithLatestSchema();
optionalConverted = toGenericRow(record, table.schema().fields());
if (optionalConverted.isPresent()) {
break;
}
Thread.sleep(retrySleepMillis);
}
write.replace(table);
}

try {
write.write(optionalConverted.get());
} catch (Exception e) {
throw new IOException(e);
}
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
state.snapshotState();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@

import org.apache.paimon.flink.sink.PrepareCommitOperator;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.RowKind;

import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/** A {@link PrepareCommitOperator} to write {@link CdcRecord} to unaware-bucket mode table. */
public class CdcUnawareBucketWriteOperator extends CdcRecordStoreWriteOperator {
public class CdcUnawareBucketWriteOperator extends AbstractCdcRecordStoreWriteOperator {

public CdcUnawareBucketWriteOperator(
FileStoreTable table,
Expand All @@ -35,6 +38,24 @@ public CdcUnawareBucketWriteOperator(
super(table, storeSinkWriteProvider, initialCommitUser);
}

@Override
protected void initStateAndWriter(
StateInitializationContext context,
StoreSinkWriteState.StateValueFilter stateFilter,
IOManager ioManager,
String commitUser)
throws Exception {
StoreSinkWriteState state = new StoreSinkWriteState(context, stateFilter);
write =
storeSinkWriteProvider.provide(
table,
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
memoryPool,
getMetricGroup());
}

@Override
public void processElement(StreamRecord<CdcRecord> element) throws Exception {
// only accepts INSERT record
Expand Down
Loading

0 comments on commit b87198c

Please sign in to comment.