Skip to content

Commit

Permalink
[rest] Supports global system tables (apache#4880)
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi authored Jan 10, 2025
1 parent 37e26f3 commit 6c54255
Show file tree
Hide file tree
Showing 9 changed files with 132 additions and 207 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
Expand All @@ -66,8 +64,6 @@
import static org.apache.paimon.catalog.CatalogUtils.validateAutoCreateClose;
import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
Expand Down Expand Up @@ -372,15 +368,7 @@ protected abstract void alterTableImpl(Identifier identifier, List<SchemaChange>
@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (isSystemDatabase(identifier.getDatabaseName())) {
String tableName = identifier.getTableName();
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
return new AllTableOptionsTable(fileIO, allTablePaths());
case CATALOG_OPTIONS:
return new CatalogOptionsTable(catalogOptions);
default:
throw new TableNotExistException(identifier);
}
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
Table originTable =
getDataOrFormatTable(
Expand Down Expand Up @@ -454,22 +442,6 @@ public Path newDatabasePath(String database) {
return newDatabasePath(warehouse(), database);
}

public Map<String, Map<String, Path>> allTablePaths() {
try {
Map<String, Map<String, Path>> allPaths = new HashMap<>();
for (String database : listDatabases()) {
Map<String, Path> tableMap =
allPaths.computeIfAbsent(database, d -> new HashMap<>());
for (String table : listTables(database)) {
tableMap.put(table, getTableLocation(Identifier.create(database, table)));
}
}
return allPaths;
} catch (DatabaseNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
}

protected TableMeta getDataTableMeta(Identifier identifier) throws TableNotExistException {
return new TableMeta(getDataTableSchema(identifier), null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.system.AllTableOptionsTable;
import org.apache.paimon.table.system.CatalogOptionsTable;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Preconditions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand All @@ -39,6 +42,8 @@
import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
import static org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
import static org.apache.paimon.table.system.AllTableOptionsTable.ALL_TABLE_OPTIONS;
import static org.apache.paimon.table.system.CatalogOptionsTable.CATALOG_OPTIONS;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/** Utils for {@link Catalog}. */
Expand Down Expand Up @@ -121,6 +126,31 @@ public static void validateAutoCreateClose(Map<String, String> options) {
CoreOptions.AUTO_CREATE.key(), Boolean.FALSE));
}

public static Table createGlobalSystemTable(String tableName, Catalog catalog)
throws Catalog.TableNotExistException {
switch (tableName.toLowerCase()) {
case ALL_TABLE_OPTIONS:
try {
Map<Identifier, Map<String, String>> allOptions = new HashMap<>();
for (String database : catalog.listDatabases()) {
for (String name : catalog.listTables(database)) {
Identifier identifier = Identifier.create(database, name);
Table table = catalog.getTable(identifier);
allOptions.put(identifier, table.options());
}
}
return new AllTableOptionsTable(allOptions);
} catch (Catalog.DatabaseNotExistException | Catalog.TableNotExistException e) {
throw new RuntimeException("Database is deleted while listing", e);
}
case CATALOG_OPTIONS:
return new CatalogOptionsTable(Options.fromMap(catalog.options()));
default:
throw new Catalog.TableNotExistException(
Identifier.create(SYSTEM_DATABASE_NAME, tableName));
}
}

public static Table createSystemTable(Identifier identifier, Table originTable)
throws Catalog.TableNotExistException {
if (!(originTable instanceof FileStoreTable)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ public List<String> listTables(String databaseName) throws DatabaseNotExistExcep
@Override
public Table getTable(Identifier identifier) throws TableNotExistException {
if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
throw new UnsupportedOperationException("TODO support global system tables.");
return CatalogUtils.createGlobalSystemTable(identifier.getTableName(), this);
} else if (identifier.isSystemTable()) {
return getSystemTable(identifier);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@

package org.apache.paimon.table.system;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.ReadonlyTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.InnerTableRead;
Expand All @@ -45,7 +43,6 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -68,13 +65,10 @@ public class AllTableOptionsTable implements ReadonlyTable {

public static final String ALL_TABLE_OPTIONS = "all_table_options";

private final FileIO fileIO;
private final Map<String, Map<String, Path>> allTablePaths;
private final Map<Identifier, Map<String, String>> allOptions;

public AllTableOptionsTable(FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
// allTablePath is the map of <database, <table_name, properties>>
this.fileIO = fileIO;
this.allTablePaths = allTablePaths;
public AllTableOptionsTable(Map<Identifier, Map<String, String>> allOptions) {
this.allOptions = allOptions;
}

@Override
Expand Down Expand Up @@ -104,12 +98,12 @@ public InnerTableScan newScan() {

@Override
public InnerTableRead newRead() {
return new AllTableOptionsRead(fileIO);
return new AllTableOptionsRead();
}

@Override
public Table copy(Map<String, String> dynamicOptions) {
return new AllTableOptionsTable(fileIO, allTablePaths);
return new AllTableOptionsTable(allOptions);
}

private class AllTableOptionsScan extends ReadOnceTableScan {
Expand All @@ -121,18 +115,18 @@ public InnerTableScan withFilter(Predicate predicate) {

@Override
public Plan innerPlan() {
return () -> Collections.singletonList(new AllTableSplit(allTablePaths));
return () -> Collections.singletonList(new AllTableSplit(allOptions));
}
}

private static class AllTableSplit extends SingletonSplit {

private static final long serialVersionUID = 1L;

private final Map<String, Map<String, Path>> allTablePaths;
private final Map<Identifier, Map<String, String>> allOptions;

private AllTableSplit(Map<String, Map<String, Path>> allTablePaths) {
this.allTablePaths = allTablePaths;
private AllTableSplit(Map<Identifier, Map<String, String>> allOptions) {
this.allOptions = allOptions;
}

@Override
Expand All @@ -144,24 +138,19 @@ public boolean equals(Object o) {
return false;
}
AllTableSplit that = (AllTableSplit) o;
return Objects.equals(allTablePaths, that.allTablePaths);
return Objects.equals(allOptions, that.allOptions);
}

@Override
public int hashCode() {
return Objects.hash(allTablePaths);
return Objects.hash(allOptions);
}
}

private static class AllTableOptionsRead implements InnerTableRead {

private final FileIO fileIO;
private RowType readType;

public AllTableOptionsRead(FileIO fileIO) {
this.fileIO = fileIO;
}

@Override
public InnerTableRead withFilter(Predicate predicate) {
return this;
Expand All @@ -183,29 +172,12 @@ public RecordReader<InternalRow> createReader(Split split) {
if (!(split instanceof AllTableSplit)) {
throw new IllegalArgumentException("Unsupported split: " + split.getClass());
}
Map<String, Map<String, Path>> location = ((AllTableSplit) split).allTablePaths;
Iterator<InternalRow> rows = toRow(options(fileIO, location));
if (readType != null) {
rows =
Iterators.transform(
rows,
row ->
ProjectedRow.from(
readType, AggregationFieldsTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(rows);
}
}

protected static Iterator<InternalRow> toRow(
Map<String, Map<String, Map<String, String>>> option) {
List<InternalRow> rows = new ArrayList<>();
for (Map.Entry<String, Map<String, Map<String, String>>> entry0 : option.entrySet()) {
String database = entry0.getKey();
for (Map.Entry<String, Map<String, String>> entry1 : entry0.getValue().entrySet()) {
String tableName = entry1.getKey();
for (Map.Entry<String, String> entry2 : entry1.getValue().entrySet()) {
List<InternalRow> rows = new ArrayList<>();
for (Map.Entry<Identifier, Map<String, String>> entry :
((AllTableSplit) split).allOptions.entrySet()) {
String database = entry.getKey().getDatabaseName();
String tableName = entry.getKey().getTableName();
for (Map.Entry<String, String> entry2 : entry.getValue().entrySet()) {
String key = entry2.getKey();
String value = entry2.getValue();
rows.add(
Expand All @@ -216,25 +188,17 @@ protected static Iterator<InternalRow> toRow(
BinaryString.fromString(value)));
}
}
}
return rows.iterator();
}

protected static Map<String, Map<String, Map<String, String>>> options(
FileIO fileIO, Map<String, Map<String, Path>> allTablePaths) {
Map<String, Map<String, Map<String, String>>> allOptions = new HashMap<>();
for (Map.Entry<String, Map<String, Path>> entry0 : allTablePaths.entrySet()) {
Map<String, Map<String, String>> m0 =
allOptions.computeIfAbsent(entry0.getKey(), k -> new HashMap<>());
for (Map.Entry<String, Path> entry1 : entry0.getValue().entrySet()) {
Map<String, String> options =
new SchemaManager(fileIO, entry1.getValue())
.latest()
.orElseThrow(() -> new RuntimeException("Table not exists."))
.options();
m0.put(entry1.getKey(), options);
Iterator<InternalRow> iterator = rows.iterator();
if (readType != null) {
iterator =
Iterators.transform(
iterator,
row ->
ProjectedRow.from(
readType, AggregationFieldsTable.TABLE_TYPE)
.replaceRow(row));
}
return new IteratorRecordReader<>(iterator);
}
return allOptions;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -426,10 +426,6 @@ public void testGetTable() throws Exception {
() -> catalog.getTable(Identifier.create("non_existing_db", "test_table")))
.withMessage("Table non_existing_db.test_table does not exist.");

// Get all table options from system database
if (!supportGetFromSystemDatabase()) {
return;
}
Table allTableOptionsTable =
catalog.getTable(Identifier.create(SYSTEM_DATABASE_NAME, ALL_TABLE_OPTIONS));
assertThat(allTableOptionsTable).isNotNull();
Expand Down Expand Up @@ -1029,10 +1025,6 @@ public void testTableUUID() throws Exception {
.isGreaterThan(0);
}

protected boolean supportGetFromSystemDatabase() {
return true;
}

protected boolean supportsAlterDatabase() {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ public void tearDown() throws Exception {
restCatalogServer.shutdown();
}

@Override
protected boolean supportGetFromSystemDatabase() {
return false;
}

@Test
void testInitFailWhenDefineWarehouse() {
Options options = new Options();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public void before() throws Exception {
}

@Test
public void testSchemasTable() throws Exception {
public void testAllTableOptionsTable() throws Exception {
List<String> result =
read(allTableOptionsTable).stream()
.map(Objects::toString)
.collect(Collectors.toList());
result = result.stream().filter(r -> !r.contains("path")).collect(Collectors.toList());
assertThat(result)
.containsExactlyInAnyOrder(
"+I(default,T,fields.sales.aggregate-function,sum)",
Expand Down
Loading

0 comments on commit 6c54255

Please sign in to comment.