Skip to content

Commit

Permalink
[AMORO-1744] Rename name with arctic identifier in amoro-mixed-format…
Browse files Browse the repository at this point in the history
…-flink module (#2848)

* Rename arctic identifier in module amoro-mixed-format-flink

* Format codes

* fix unit tests
  • Loading branch information
zhoujinsong authored May 21, 2024
1 parent e5e377d commit cb2b996
Show file tree
Hide file tree
Showing 133 changed files with 1,685 additions and 1,590 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@
import java.util.Deque;
import java.util.List;

/** Copy from iceberg {@link ParquetWithFlinkSchemaVisitor}. see annotation "Change For Arctic" */
/**
* Copy from iceberg {@link ParquetWithFlinkSchemaVisitor}. see annotation "Change For mixed-format"
*/
public class AdaptHiveParquetWithFlinkSchemaVisitor<T> {
private final Deque<String> fieldNames = Lists.newLinkedList();

Expand Down Expand Up @@ -181,15 +183,15 @@ private static <T> List<T> visitFields(
Type field = group.getFields().get(i);
RowField sField = sFields.get(i);

// Change For Arctic
// Change for mixed-format table
// Preconditions.checkArgument(field.getName().equals(AvroSchemaUtil.makeCompatibleName(sField.getName())),
// "Structs do not match: field %s != %s", field.getName(), sField.getName());
Preconditions.checkArgument(
field.getName().equals(sField.getName()),
"Structs do not match: field %s != %s",
field.getName(),
sField.getName());
// Change For Arctic
// Change for mixed-format table

results.add(visitField(sField, field, visitor));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private CloseableIterable<RowData> newParquetIterable(
Parquet.read(inputFilesDecryptor.getInputFile(task))
.split(task.start(), task.length())
.project(schema)
// Change for Arctic
// Change for mixed-format table
.createReaderFunc(
fileSchema ->
AdaptHiveFlinkParquetReaders.buildReader(schema, fileSchema, idToConstant))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import static org.apache.flink.table.descriptors.DescriptorProperties.WATERMARK_STRATEGY_EXPR;
import static org.apache.flink.table.descriptors.Schema.SCHEMA_PROCTIME;

import org.apache.amoro.flink.table.ArcticDynamicSource;
import org.apache.amoro.flink.table.FlinkSource;
import org.apache.amoro.flink.table.MixedFormatDynamicSource;
import org.apache.amoro.table.MixedTable;
import org.apache.amoro.table.PrimaryKeySpec;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -116,16 +116,19 @@ public static TableSchema toSchema(
}

/**
* Add watermark info to help {@link FlinkSource} and {@link ArcticDynamicSource} distinguish the
* watermark field. For now, it only be used in the case of Arctic as dim-table.
* Add watermark info to help {@link FlinkSource} and {@link MixedFormatDynamicSource} distinguish
* the watermark field. For now, it only be used in the case of mixed-format table as dim-table.
*/
public static TableSchema getPhysicalSchemaForDimTable(TableSchema tableSchema) {
TableSchema.Builder builder = filter(tableSchema, TableColumn::isPhysical);
tableSchema.getWatermarkSpecs().forEach(builder::watermark);
return builder.build();
}

/** filter watermark due to watermark is a virtual field for now, not in arctic physical table. */
/**
* filter watermark due to watermark is a virtual field for now, not in mixed-format physical
* table.
*/
public static TableSchema filterWatermark(TableSchema tableSchema) {
List<WatermarkSpec> watermarkSpecs = tableSchema.getWatermarkSpecs();
if (watermarkSpecs.isEmpty()) {
Expand Down Expand Up @@ -241,7 +244,7 @@ public static void addPrimaryKey(
.orElseThrow(
() ->
new ValidationException(
"Arctic primary key should be declared in table")));
"Mixed-format table primary key should be declared in table")));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class InternalCatalogBuilder implements Serializable {
private String metastoreUrl;
private Map<String, String> properties = new HashMap<>(0);

private MixedFormatCatalog createBaseArcticCatalog() {
private MixedFormatCatalog createMixedFormatCatalog() {
Preconditions.checkArgument(
StringUtils.isNotBlank(metastoreUrl),
"metastoreUrl can not be empty. e.g: thrift://127.0.0.1:port/catalogName");
Expand All @@ -62,7 +62,7 @@ public static InternalCatalogBuilder builder() {
}

public MixedFormatCatalog build() {
return createBaseArcticCatalog();
return createMixedFormatCatalog();
}

public InternalCatalogBuilder metastoreUrl(String metastoreUrl) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.amoro.flink.catalog;

import static org.apache.amoro.Constants.THRIFT_TABLE_SERVICE_NAME;
import static org.apache.amoro.flink.table.descriptors.ArcticValidator.TABLE_FORMAT;
import static org.apache.amoro.flink.table.descriptors.MixedFormatValidator.TABLE_FORMAT;

import org.apache.amoro.AlreadyExistsException;
import org.apache.amoro.AmoroTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.amoro.flink.InternalCatalogBuilder;
import org.apache.amoro.flink.catalog.factories.CatalogFactoryOptions;
import org.apache.amoro.flink.table.DynamicTableFactory;
import org.apache.amoro.flink.table.descriptors.ArcticValidator;
import org.apache.amoro.flink.util.ArcticUtils;
import org.apache.amoro.flink.table.descriptors.MixedFormatValidator;
import org.apache.amoro.flink.util.MixedFormatUtils;
import org.apache.amoro.mixed.MixedFormatCatalog;
import org.apache.amoro.scan.CombinedScanTask;
import org.apache.amoro.scan.KeyedTableScanTask;
Expand Down Expand Up @@ -199,24 +199,25 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
throw new TableNotExistException(this.getName(), tablePath);
}
MixedTable table = internalCatalog.loadTable(tableIdentifier);
Schema arcticSchema = table.schema();
Schema mixedTableSchema = table.schema();

Map<String, String> arcticProperties = Maps.newHashMap(table.properties());
fillTableProperties(arcticProperties);
fillTableMetaPropertiesIfLookupLike(arcticProperties, tableIdentifier);
Map<String, String> mixedTableProperties = Maps.newHashMap(table.properties());
fillTableProperties(mixedTableProperties);
fillTableMetaPropertiesIfLookupLike(mixedTableProperties, tableIdentifier);

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
return CatalogTable.of(
toSchema(arcticSchema, ArcticUtils.getPrimaryKeys(table), arcticProperties).toSchema(),
toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table), mixedTableProperties)
.toSchema(),
null,
partitionKeys,
arcticProperties);
mixedTableProperties);
}

/**
* For now, 'CREATE TABLE LIKE' would be treated as the case which users want to add watermark in
* temporal join, as an alternative of lookup join, and use Arctic table as build table, i.e.
* right table. So the properties those required in temporal join will be put automatically.
* temporal join, as an alternative of lookup join, and use mixed-format table as build table,
* i.e. right table. So the properties those required in temporal join will be put automatically.
*
* <p>If you don't want the properties, 'EXCLUDING ALL' is what you need. More details @see <a
* href="https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#like">LIKE</a>
Expand All @@ -237,9 +238,9 @@ private void fillTableMetaPropertiesIfLookupLike(
}

properties.put(CONNECTOR.key(), DynamicTableFactory.IDENTIFIER);
properties.put(ArcticValidator.ARCTIC_CATALOG.key(), tableIdentifier.getCatalog());
properties.put(ArcticValidator.ARCTIC_TABLE.key(), tableIdentifier.getTableName());
properties.put(ArcticValidator.ARCTIC_DATABASE.key(), tableIdentifier.getDatabase());
properties.put(MixedFormatValidator.MIXED_FORMAT_CATALOG.key(), tableIdentifier.getCatalog());
properties.put(MixedFormatValidator.MIXED_FORMAT_TABLE.key(), tableIdentifier.getTableName());
properties.put(MixedFormatValidator.MIXED_FORMAT_DATABASE.key(), tableIdentifier.getDatabase());
properties.put(CatalogFactoryOptions.METASTORE_URL.key(), catalogBuilder.getMetastoreUrl());
}

Expand Down Expand Up @@ -440,9 +441,9 @@ public List<CatalogPartitionSpec> listPartitionsByFilter(
keyedTableScanTask.mixedEquityDeletes())
.flatMap(List::stream))
.forEach(
arcticFileScanTask -> {
mixedFileScanTask -> {
Map<String, String> map = Maps.newHashMap();
StructLike structLike = arcticFileScanTask.partition();
StructLike structLike = mixedFileScanTask.partition();
PartitionSpec spec = table.spec();
for (int i = 0; i < structLike.size(); i++) {
map.put(
Expand Down Expand Up @@ -625,16 +626,18 @@ public String amsCatalogName() {
* Check whether a list of partition values are valid based on the given list of partition keys.
*
* @param partitionSpec a partition spec.
* @param arcticPartitionSpec arcticPartitionSpec
* @param mixedTablePartitionSpec mixedTablePartitionSpec
* @param tablePath tablePath
* @throws PartitionSpecInvalidException thrown if any key in partitionSpec doesn't exist in
* partitionKeys.
*/
private void checkValidPartitionSpec(
CatalogPartitionSpec partitionSpec, PartitionSpec arcticPartitionSpec, ObjectPath tablePath)
CatalogPartitionSpec partitionSpec,
PartitionSpec mixedTablePartitionSpec,
ObjectPath tablePath)
throws PartitionSpecInvalidException {
List<String> partitionKeys =
arcticPartitionSpec.fields().stream()
mixedTablePartitionSpec.fields().stream()
.map(PartitionField::name)
.collect(Collectors.toList());
for (String key : partitionSpec.getPartitionSpec().keySet()) {
Expand Down Expand Up @@ -732,17 +735,17 @@ private void alterUnKeyedTable(UnkeyedTable table, CatalogBaseTable newTable) {
}

private CatalogTable toCatalogTable(MixedTable table, TableIdentifier tableIdentifier) {
Schema arcticSchema = table.schema();
Schema mixedTableSchema = table.schema();

Map<String, String> arcticProperties = Maps.newHashMap(table.properties());
fillTableProperties(arcticProperties);
fillTableMetaPropertiesIfLookupLike(arcticProperties, tableIdentifier);
Map<String, String> mixedTableProperties = Maps.newHashMap(table.properties());
fillTableProperties(mixedTableProperties);
fillTableMetaPropertiesIfLookupLike(mixedTableProperties, tableIdentifier);

List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
return new CatalogTableImpl(
toSchema(arcticSchema, ArcticUtils.getPrimaryKeys(table), arcticProperties),
toSchema(mixedTableSchema, MixedFormatUtils.getPrimaryKeys(table), mixedTableProperties),
partitionKeys,
arcticProperties,
mixedTableProperties,
null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;
import java.util.HashSet;
Expand All @@ -41,8 +39,6 @@
/** Factory for {@link MixedCatalog} */
public class MixedCatalogFactory implements CatalogFactory {

private static final Logger LOG = LoggerFactory.getLogger(MixedCatalogFactory.class);

@Override
public String factoryIdentifier() {
return CatalogFactoryOptions.LEGACY_MIXED_IDENTIFIER;
Expand All @@ -57,8 +53,8 @@ public Catalog createCatalog(Context context) {

final String defaultDatabase = helper.getOptions().get(CatalogFactoryOptions.DEFAULT_DATABASE);
String metastoreUrl = helper.getOptions().get(CatalogFactoryOptions.METASTORE_URL);
final Map<String, String> arcticCatalogProperties = getKafkaParams(context.getOptions());
final Map<String, String> catalogProperties = Maps.newHashMap(arcticCatalogProperties);
final Map<String, String> mixedCatalogProperties = getKafkaParams(context.getOptions());
final Map<String, String> catalogProperties = Maps.newHashMap(mixedCatalogProperties);

Optional<String> tableFormatsOptional =
helper.getOptions().getOptional(CatalogFactoryOptions.FLINK_TABLE_FORMATS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return proxy;
}
Object result = method.invoke(mixedTable, args);
// rewrite the properties as of the arctic table properties may be updated.
// rewrite the properties as of the mixed-format table properties may be updated.
if ("refresh".equals(method.getName())) {
rewriteProperties();
}
Expand All @@ -70,7 +70,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl

private void rewriteProperties() {
Map<String, String> refreshedProperties = mixedTable.properties();
// iterate through the properties of the arctic table and update the properties of the
// iterate through the properties of the mixed-format table and update the properties of the
// tablePropertiesCombined.
for (Map.Entry<String, String> entry : refreshedProperties.entrySet()) {
if (flinkTableProperties.containsKey(entry.getKey())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@

package org.apache.amoro.flink.lookup;

import static org.apache.amoro.flink.table.descriptors.ArcticValidator.LOOKUP_RELOADING_INTERVAL;
import static org.apache.amoro.flink.util.ArcticUtils.loadArcticTable;
import static org.apache.amoro.flink.table.descriptors.MixedFormatValidator.LOOKUP_RELOADING_INTERVAL;
import static org.apache.amoro.flink.util.MixedFormatUtils.loadMixedTable;
import static org.apache.flink.util.Preconditions.checkArgument;

import org.apache.amoro.flink.read.MixedIncrementalLoader;
import org.apache.amoro.flink.read.hybrid.enumerator.MergeOnReadIncrementalPlanner;
import org.apache.amoro.flink.read.hybrid.reader.DataIteratorReaderFunction;
import org.apache.amoro.flink.table.ArcticTableLoader;
import org.apache.amoro.flink.table.MixedFormatTableLoader;
import org.apache.amoro.hive.io.reader.AbstractAdaptHiveKeyedDataReader;
import org.apache.amoro.table.MixedTable;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -56,7 +56,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

/** This is a basic lookup function for an arctic table. */
/** This is a basic lookup function for an mixed-format table. */
public class BasicLookupFunction<T> implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(BasicLookupFunction.class);
private static final long serialVersionUID = 1671720424494168710L;
Expand All @@ -65,15 +65,15 @@ public class BasicLookupFunction<T> implements Serializable {
private final List<String> joinKeys;
private final Schema projectSchema;
private final List<Expression> filters;
private final ArcticTableLoader loader;
private final MixedFormatTableLoader loader;
private long nextLoadTime = Long.MIN_VALUE;
private final long reloadIntervalSeconds;
private MixedIncrementalLoader<T> incrementalLoader;
private final Configuration config;
private transient AtomicLong lookupLoadingTimeMs;
private final Predicate<T> predicate;
private final TableFactory<T> kvTableFactory;
private final AbstractAdaptHiveKeyedDataReader<T> flinkArcticMORDataReader;
private final AbstractAdaptHiveKeyedDataReader<T> flinkMORDataReader;
private final DataIteratorReaderFunction<T> readerFunction;

private transient ScheduledExecutorService executor;
Expand All @@ -85,15 +85,15 @@ public BasicLookupFunction(
List<String> joinKeys,
Schema projectSchema,
List<Expression> filters,
ArcticTableLoader tableLoader,
MixedFormatTableLoader tableLoader,
Configuration config,
Predicate<T> predicate,
AbstractAdaptHiveKeyedDataReader<T> flinkArcticMORDataReader,
AbstractAdaptHiveKeyedDataReader<T> adaptHiveKeyedDataReader,
DataIteratorReaderFunction<T> readerFunction) {
checkArgument(
mixedTable.isKeyedTable(),
String.format(
"Only keyed arctic table support lookup join, this table [%s] is an unkeyed table.",
"Only keyed mixed-format table support lookup join, this table [%s] is an unkeyed table.",
mixedTable.name()));
Preconditions.checkNotNull(tableFactory, "kvTableFactory cannot be null");
this.kvTableFactory = tableFactory;
Expand All @@ -104,7 +104,7 @@ public BasicLookupFunction(
this.config = config;
this.reloadIntervalSeconds = config.get(LOOKUP_RELOADING_INTERVAL).getSeconds();
this.predicate = predicate;
this.flinkArcticMORDataReader = flinkArcticMORDataReader;
this.flinkMORDataReader = adaptHiveKeyedDataReader;
this.readerFunction = readerFunction;
}

Expand All @@ -119,15 +119,15 @@ public void open(FunctionContext context) throws IOException {
}

/**
* Initialize the arcticTable, kvTable and incrementalLoader.
* Initialize the mixed-format table, kvTable and incrementalLoader.
*
* @param context
*/
public void init(FunctionContext context) {
LOG.info("lookup function row data predicate: {}.", predicate);
MetricGroup metricGroup = context.getMetricGroup().addGroup(LookupMetrics.GROUP_NAME_LOOKUP);
if (mixedTable == null) {
mixedTable = loadArcticTable(loader).asKeyedTable();
mixedTable = loadMixedTable(loader).asKeyedTable();
}
mixedTable.refresh();

Expand All @@ -147,10 +147,7 @@ public void init(FunctionContext context) {

this.incrementalLoader =
new MixedIncrementalLoader<>(
new MergeOnReadIncrementalPlanner(loader),
flinkArcticMORDataReader,
readerFunction,
filters);
new MergeOnReadIncrementalPlanner(loader), flinkMORDataReader, readerFunction, filters);
}

public void start() {
Expand All @@ -160,7 +157,7 @@ public void start() {

this.executor =
Executors.newScheduledThreadPool(
1, new ExecutorThreadFactory("Arctic-lookup-scheduled-loader"));
1, new ExecutorThreadFactory("Mixed-format-lookup-scheduled-loader"));
this.executor.scheduleWithFixedDelay(
() -> {
try {
Expand Down Expand Up @@ -242,7 +239,7 @@ public void close() throws Exception {
private void checkErrorAndRethrow() {
Throwable cause = failureThrowable.get();
if (cause != null) {
throw new RuntimeException("An error occurred in ArcticLookupFunction.", cause);
throw new RuntimeException("An error occurred in MixedFormatLookupFunction.", cause);
}
}

Expand Down
Loading

0 comments on commit cb2b996

Please sign in to comment.