diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml index 67fc37b37f29..91278d04c235 100644 --- a/.github/workflows/bot.yml +++ b/.github/workflows/bot.yml @@ -137,4 +137,3 @@ jobs: run: | HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout) ./packaging/bundle-validation/ci_run.sh $HUDI_VERSION - diff --git a/.gitignore b/.gitignore index ff099d52067e..6c77bdab59de 100644 --- a/.gitignore +++ b/.gitignore @@ -3,7 +3,7 @@ target/ metastore_db/ .mvn/ - + # OS Files # .DS_Store diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java index 629b8115fcd6..ddc5d4144839 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/HoodieTimelineArchiver.java @@ -481,7 +481,7 @@ private Stream getCommitInstantsToArchive() throws IOException { private Stream getInstantsToArchive() throws IOException { Stream instants = Stream.concat(getCleanInstantsToArchive(), getCommitInstantsToArchive()); - if (config.isMetastoreEnabled()) { + if (config.isMetaserverEnabled()) { return Stream.empty(); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 13921fc70f40..b70b13c0833f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -28,7 +28,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.HoodieMetastoreConfig; +import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.engine.EngineType; @@ -558,7 +558,7 @@ public class HoodieWriteConfig extends HoodieConfig { private FileSystemViewStorageConfig viewStorageConfig; private HoodiePayloadConfig hoodiePayloadConfig; private HoodieMetadataConfig metadataConfig; - private HoodieMetastoreConfig metastoreConfig; + private HoodieMetaserverConfig metaserverConfig; private HoodieCommonConfig commonConfig; private HoodieStorageConfig storageConfig; private EngineType engineType; @@ -951,7 +951,7 @@ protected HoodieWriteConfig(EngineType engineType, Properties props) { this.viewStorageConfig = clientSpecifiedViewStorageConfig; this.hoodiePayloadConfig = HoodiePayloadConfig.newBuilder().fromProperties(newProps).build(); this.metadataConfig = HoodieMetadataConfig.newBuilder().fromProperties(props).build(); - this.metastoreConfig = HoodieMetastoreConfig.newBuilder().fromProperties(props).build(); + this.metaserverConfig = HoodieMetaserverConfig.newBuilder().fromProperties(props).build(); this.commonConfig = HoodieCommonConfig.newBuilder().fromProperties(props).build(); this.storageConfig = HoodieStorageConfig.newBuilder().fromProperties(props).build(); } @@ -2274,8 +2274,8 @@ public HoodieStorageLayout.LayoutType getLayoutType() { /** * Metastore configs. */ - public boolean isMetastoreEnabled() { - return metastoreConfig.enableMetastore(); + public boolean isMetaserverEnabled() { + return metaserverConfig.isMetaserverEnabled(); } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetaserverConfig.java similarity index 56% rename from hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java rename to hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetaserverConfig.java index d17c7e3486c0..08e48da98206 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetastoreConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetaserverConfig.java @@ -29,55 +29,55 @@ @ConfigClassProperty(name = "Metastore Configs", groupName = ConfigGroups.Names.WRITE_CLIENT, description = "Configurations used by the Hudi Metastore.") -public class HoodieMetastoreConfig extends HoodieConfig { +public class HoodieMetaserverConfig extends HoodieConfig { - public static final String METASTORE_PREFIX = "hoodie.metastore"; + public static final String METASERVER_PREFIX = "hoodie.metaserver"; - public static final ConfigProperty METASTORE_ENABLE = ConfigProperty - .key(METASTORE_PREFIX + ".enable") + public static final ConfigProperty METASERVER_ENABLE = ConfigProperty + .key(METASERVER_PREFIX + ".enabled") .defaultValue(false) - .withDocumentation("Use metastore server to store hoodie table metadata"); + .withDocumentation("Enable Hudi metaserver for storing Hudi tables' metadata."); - public static final ConfigProperty METASTORE_URLS = ConfigProperty - .key(METASTORE_PREFIX + ".uris") + public static final ConfigProperty METASERVER_URLS = ConfigProperty + .key(METASERVER_PREFIX + ".uris") .defaultValue("thrift://localhost:9090") .withDocumentation("Metastore server uris"); - public static final ConfigProperty METASTORE_CONNECTION_RETRIES = ConfigProperty - .key(METASTORE_PREFIX + ".connect.retries") + public static final ConfigProperty METASERVER_CONNECTION_RETRIES = ConfigProperty + .key(METASERVER_PREFIX + ".connect.retries") .defaultValue(3) .withDocumentation("Number of retries while opening a connection to metastore"); - public static final ConfigProperty METASTORE_CONNECTION_RETRY_DELAY = ConfigProperty - .key(METASTORE_PREFIX + ".connect.retry.delay") + public static final ConfigProperty METASERVER_CONNECTION_RETRY_DELAY = ConfigProperty + .key(METASERVER_PREFIX + ".connect.retry.delay") .defaultValue(1) .withDocumentation("Number of seconds for the client to wait between consecutive connection attempts"); - public static HoodieMetastoreConfig.Builder newBuilder() { - return new HoodieMetastoreConfig.Builder(); + public static HoodieMetaserverConfig.Builder newBuilder() { + return new HoodieMetaserverConfig.Builder(); } - public boolean enableMetastore() { - return getBoolean(METASTORE_ENABLE); + public boolean isMetaserverEnabled() { + return getBoolean(METASERVER_ENABLE); } - public String getMetastoreUris() { - return getStringOrDefault(METASTORE_URLS); + public String getMetaserverUris() { + return getStringOrDefault(METASERVER_URLS); } public int getConnectionRetryLimit() { - return getIntOrDefault(METASTORE_CONNECTION_RETRIES); + return getIntOrDefault(METASERVER_CONNECTION_RETRIES); } public int getConnectionRetryDelay() { - return getIntOrDefault(METASTORE_CONNECTION_RETRY_DELAY); + return getIntOrDefault(METASERVER_CONNECTION_RETRY_DELAY); } /** - * Builder for {@link HoodieMetastoreConfig}. + * Builder for {@link HoodieMetaserverConfig}. */ public static class Builder { - private final HoodieMetastoreConfig config = new HoodieMetastoreConfig(); + private final HoodieMetaserverConfig config = new HoodieMetaserverConfig(); public Builder fromProperties(Properties props) { this.config.getProps().putAll(props); @@ -85,12 +85,12 @@ public Builder fromProperties(Properties props) { } public Builder setUris(String uris) { - config.setValue(METASTORE_URLS, uris); + config.setValue(METASERVER_URLS, uris); return this; } - public HoodieMetastoreConfig build() { - config.setDefaults(HoodieMetastoreConfig.class.getName()); + public HoodieMetaserverConfig build() { + config.setDefaults(HoodieMetaserverConfig.class.getName()); return config; } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 70e9473db32f..1ff0a589af9e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -19,7 +19,7 @@ package org.apache.hudi.common.table; import org.apache.hudi.common.config.HoodieConfig; -import org.apache.hudi.common.config.HoodieMetastoreConfig; +import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; @@ -116,7 +116,7 @@ public class HoodieTableMetaClient implements Serializable { protected HoodieActiveTimeline activeTimeline; private ConsistencyGuardConfig consistencyGuardConfig = ConsistencyGuardConfig.newBuilder().build(); private FileSystemRetryConfig fileSystemRetryConfig = FileSystemRetryConfig.newBuilder().build(); - protected HoodieMetastoreConfig metastoreConfig; + protected HoodieMetaserverConfig metaserverConfig; /** * @@ -378,11 +378,11 @@ public synchronized HoodieArchivedTimeline getArchivedTimeline() { return getArchivedTimeline(StringUtils.EMPTY_STRING); } - public HoodieMetastoreConfig getMetastoreConfig() { - if (metastoreConfig == null) { - metastoreConfig = new HoodieMetastoreConfig(); + public HoodieMetaserverConfig getMetaserverConfig() { + if (metaserverConfig == null) { + metaserverConfig = new HoodieMetaserverConfig(); } - return metastoreConfig; + return metaserverConfig; } /** @@ -681,14 +681,14 @@ public void initializeBootstrapDirsIfNotExists() throws IOException { private static HoodieTableMetaClient newMetaClient(Configuration conf, String basePath, boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig consistencyGuardConfig, Option layoutVersion, String payloadClassName, String mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig, Properties props) { - HoodieMetastoreConfig metastoreConfig = null == props - ? new HoodieMetastoreConfig.Builder().build() - : new HoodieMetastoreConfig.Builder().fromProperties(props).build(); - return metastoreConfig.enableMetastore() - ? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetastoreClient", - new Class[]{Configuration.class, ConsistencyGuardConfig.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetastoreConfig.class}, - conf, consistencyGuardConfig, fileSystemRetryConfig, - props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metastoreConfig) + HoodieMetaserverConfig metaserverConfig = null == props + ? new HoodieMetaserverConfig.Builder().build() + : new HoodieMetaserverConfig.Builder().fromProperties(props).build(); + return metaserverConfig.isMetaserverEnabled() + ? (HoodieTableMetaClient) ReflectionUtils.loadClass("org.apache.hudi.common.table.HoodieTableMetaserverClient", + new Class[] {Configuration.class, ConsistencyGuardConfig.class, String.class, FileSystemRetryConfig.class, String.class, String.class, HoodieMetaserverConfig.class}, + conf, consistencyGuardConfig, mergerStrategy, fileSystemRetryConfig, + props.getProperty(HoodieTableConfig.DATABASE_NAME.key()), props.getProperty(HoodieTableConfig.NAME.key()), metaserverConfig) : new HoodieTableMetaClient(conf, basePath, loadActiveTimelineOnLoad, consistencyGuardConfig, layoutVersion, payloadClassName, mergerStrategy, fileSystemRetryConfig); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java index 48023d50463d..99fdfcea0d7a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/FileSystemViewManager.java @@ -20,7 +20,7 @@ import org.apache.hudi.common.config.HoodieCommonConfig; import org.apache.hudi.common.config.HoodieMetadataConfig; -import org.apache.hudi.common.config.HoodieMetastoreConfig; +import org.apache.hudi.common.config.HoodieMetaserverConfig; import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.function.SerializableSupplier; @@ -61,7 +61,7 @@ public class FileSystemViewManager { private static final Logger LOG = LogManager.getLogger(FileSystemViewManager.class); - private static final String HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetastoreFileSystemView"; + private static final String HOODIE_METASERVER_FILE_SYSTEM_VIEW_CLASS = "org.apache.hudi.common.table.view.HoodieMetaserverFileSystemView"; private final SerializableConfiguration conf; // The View Storage config used to store file-system views @@ -169,10 +169,10 @@ private static HoodieTableFileSystemView createInMemoryFileSystemView(HoodieMeta return new HoodieMetadataFileSystemView(metaClient, metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), metadataSupplier.get()); } - if (metaClient.getMetastoreConfig().enableMetastore()) { - return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS, - new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetastoreConfig.class}, - metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); + if (metaClient.getMetaserverConfig().isMetaserverEnabled()) { + return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASERVER_FILE_SYSTEM_VIEW_CLASS, + new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetaserverConfig.class}, + metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetaserverConfig()); } return new HoodieTableFileSystemView(metaClient, timeline, viewConf.isIncrementalTimelineSyncEnabled()); } @@ -193,10 +193,10 @@ public static HoodieTableFileSystemView createInMemoryFileSystemViewWithTimeline if (metadataConfig.enabled()) { return new HoodieMetadataFileSystemView(engineContext, metaClient, timeline, metadataConfig); } - if (metaClient.getMetastoreConfig().enableMetastore()) { - return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASTORE_FILE_SYSTEM_VIEW_CLASS, + if (metaClient.getMetaserverConfig().isMetaserverEnabled()) { + return (HoodieTableFileSystemView) ReflectionUtils.loadClass(HOODIE_METASERVER_FILE_SYSTEM_VIEW_CLASS, new Class[] {HoodieTableMetaClient.class, HoodieTimeline.class, HoodieMetadataConfig.class}, - metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetastoreConfig()); + metaClient, metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants(), metaClient.getMetaserverConfig()); } return new HoodieTableFileSystemView(metaClient, timeline); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java index b3e178320b82..f831af182018 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/ReflectionUtils.java @@ -26,6 +26,7 @@ import java.io.File; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.net.URISyntaxException; import java.net.URL; import java.util.ArrayList; @@ -167,4 +168,25 @@ private static List findClasses(File directory, String packageName) { public static boolean isSameClass(Comparable v, Comparable o) { return v.getClass() == o.getClass(); } + + /** + * Invoke a static method of a class. + * @param clazz + * @param methodName + * @param args + * @param parametersType + * @return the return value of the method + */ + public static Object invokeStaticMethod(String clazz, String methodName, Object[] args, Class... parametersType) { + try { + Method method = Class.forName(clazz).getMethod(methodName, parametersType); + return method.invoke(null, args); + } catch (ClassNotFoundException e) { + throw new HoodieException("Unable to find the class " + clazz, e); + } catch (NoSuchMethodException e) { + throw new HoodieException(String.format("Unable to find the method %s of the class %s ", methodName, clazz), e); + } catch (InvocationTargetException | IllegalAccessException e) { + throw new HoodieException(String.format("Unable to invoke the methond %s of the class %s ", methodName, clazz), e); + } + } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java index 5cd89c9f5f8f..ccd42fcd94ff 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/RetryHelper.java @@ -74,7 +74,7 @@ public RetryHelper tryWith(CheckedFunction func) { return this; } - public T start(CheckedFunction func) throws R { + public T start(CheckedFunction func) throws R { int retries = 0; T functionResult = null; diff --git a/hudi-platform-service/hudi-metaserver/README.md b/hudi-platform-service/hudi-metaserver/README.md new file mode 100644 index 000000000000..a7802e558912 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/README.md @@ -0,0 +1,91 @@ + + +## How to compile + +There are two ways, if there is a thrift binary in the local, please see the first one. +If docker is more convenient for you, please see the second way. + +### Compile with local Thrift binary + +Firstly, make sure `/usr/local/bin/thrift` exists, and its version is the same with the one declared in the pom.xml. + +Then compile the module with maven options `-Pthrift-gen-source`. For example, +`mvn install -pl :hudi-metaserver -DskipTests=true -Phudi-platform-service` + +### Compile with Docker + +Firstly, make sure there is a docker in the local. + +Then just compile the module as a normal one. For example, +`mvn install -pl :hudi-metaserver -DskipTests=true -Phudi-platform-service` + +Attention: Apple m1 cannot install thrift by docker successfully. The script will use homebrew to do it. So make sure homebrew exits. + + +### Source code generated by Thrift + +After packaging, the generated source code are placed in `target/generated-sources/thrift/gen-java`. +It looks like, + +```shell +├── gen-java +│ └── org +│ └── apache +│ └── hudi +│ └── metaserver +│ └── thrift +│ ├── AlreadyExistException.java +│ ├── FieldSchema.java +│ ├── ... +``` + +## How to run a job with Hudi Metaserver + +### Start Hudi Metaserver + +1. modify the `hikariPool.properties` and config the mysql address. For example, +```text +jdbcUrl=jdbc:mysql://localhost:3306 +dataSource.user=root +dataSource.password=password +``` +2. start the server + + make sure `hudi-metaserver-${project.version}.jar` is under the directory, +```shell +sh start_hudi_metaserver.sh +``` + +### Write client configurations + +```shell +hoodie.database.name=default +hoodie.table.name=test +hoodie.base.path=${path} +hoodie.metaserver.enabled=true +hoodie.metadata.enabled=false +hoodie.metaserver.uris=thrift://${serverIP}:9090 +``` + +## How to test + +### Run a unit test with Hudi Metaserver + +Add the configurations mentioned in the `Write client configurations` part, and then set `hoodie.metaserver.uris=""`. + +The metaserver runs as an embedded one with h2 database that performs like mysql running locally. diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/pom.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/pom.xml new file mode 100644 index 000000000000..c4e273614867 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/pom.xml @@ -0,0 +1,95 @@ + + + + + hudi-metaserver + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-metaserver-client + + + ${project.parent.basedir} + + + + + org.apache.hudi + hudi-common + ${project.version} + + + org.apache.hudi + hudi-client-common + ${project.version} + + + + + org.apache.hudi + hudi-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-client-common + ${project.version} + tests + test-jar + test + + + org.apache.hudi + hudi-metaserver-server + ${project.version} + test + + + + + + + org.jacoco + jacoco-maven-plugin + + + org.codehaus.mojo + exec-maven-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.rat + apache-rat-plugin + + + + \ No newline at end of file diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/HoodieTableMetaserverClient.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/HoodieTableMetaserverClient.java new file mode 100644 index 000000000000..3facdaa45fb5 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/HoodieTableMetaserverClient.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.common.fs.ConsistencyGuardConfig; +import org.apache.hudi.common.fs.FileSystemRetryConfig; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieMetaserverBasedTimeline; +import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metaserver.client.HoodieMetaserverClient; +import org.apache.hudi.metaserver.client.HoodieMetaserverClientProxy; +import org.apache.hudi.metaserver.thrift.NoSuchObjectException; +import org.apache.hudi.metaserver.thrift.Table; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.List; +import java.util.Set; + +import static org.apache.hudi.common.util.StringUtils.nonEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + +/** + * HoodieTableMetaClient implementation for hoodie table whose metadata is stored in the hoodie metaserver. + */ +public class HoodieTableMetaserverClient extends HoodieTableMetaClient { + private static final Logger LOG = LogManager.getLogger(HoodieTableMetaserverClient.class); + + private final String databaseName; + private final String tableName; + private final Table table; + private final HoodieMetaserverClient metaserverClient; + + public HoodieTableMetaserverClient(Configuration conf, ConsistencyGuardConfig consistencyGuardConfig, + String mergerStrategy, FileSystemRetryConfig fileSystemRetryConfig, + String databaseName, String tableName, HoodieMetaserverConfig config) { + super(conf, config.getString(HoodieWriteConfig.BASE_PATH), false, consistencyGuardConfig, Option.of(TimelineLayoutVersion.CURR_LAYOUT_VERSION), + config.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME), mergerStrategy, fileSystemRetryConfig); + checkArgument(nonEmpty(databaseName), "database name is required."); + checkArgument(nonEmpty(tableName), "table name is required."); + this.databaseName = databaseName; + this.tableName = tableName; + this.metaserverConfig = config; + this.metaserverClient = HoodieMetaserverClientProxy.getProxy(config); + this.table = initOrGetTable(databaseName, tableName, config); + // TODO: transfer table parameters to table config + this.tableConfig = new HoodieTableConfig(); + tableConfig.setTableVersion(HoodieTableVersion.current()); + tableConfig.setAll(config.getProps()); + } + + private Table initOrGetTable(String db, String tb, HoodieMetaserverConfig config) { + Table table; + try { + table = metaserverClient.getTable(databaseName, tableName); + } catch (HoodieException e) { + if (e.getCause() instanceof NoSuchObjectException) { + String user = ""; + try { + user = UserGroupInformation.getCurrentUser().getShortUserName(); + } catch (IOException ioException) { + LOG.info("Failed to get the user", ioException); + } + LOG.info(String.format("Table %s.%s doesn't exist, will create it.", db, tb)); + table = new Table(); + table.setDbName(db); + table.setTableName(tb); + table.setLocation(config.getString(HoodieWriteConfig.BASE_PATH)); + table.setOwner(user); + table.setTableType(config.getString(HoodieTableConfig.TYPE.key())); + metaserverClient.createTable(table); + table = metaserverClient.getTable(databaseName, tableName); + } else { + throw e; + } + } + return table; + } + + /** + * @return Hoodie Table Type + */ + public HoodieTableType getTableType() { + return HoodieTableType.valueOf(table.getTableType()); + } + + /** + * Get the active instants as a timeline. + * + * @return Active instants timeline + */ + public synchronized HoodieActiveTimeline getActiveTimeline() { + if (activeTimeline == null) { + activeTimeline = new HoodieMetaserverBasedTimeline(this, metaserverConfig); + } + return activeTimeline; + } + + /** + * Reload ActiveTimeline. + * + * @return Active instants timeline + */ + public synchronized HoodieActiveTimeline reloadActiveTimeline() { + activeTimeline = new HoodieMetaserverBasedTimeline(this, metaserverConfig); + return activeTimeline; + } + + public List scanHoodieInstantsFromFileSystem(Set includedExtensions, + boolean applyLayoutVersionFilters) { + throw new HoodieException("Unsupport operation"); + } + + public List scanHoodieInstantsFromFileSystem(Path timelinePath, Set includedExtensions, + boolean applyLayoutVersionFilters) { + throw new HoodieException("Unsupport operation"); + } + + public void setBasePath(String basePath) { + throw new HoodieException("Unsupport operation"); + } + + public void setMetaPath(String metaPath) { + throw new HoodieException("Unsupport operation"); + } + + public void setActiveTimeline(HoodieActiveTimeline activeTimeline) { + throw new HoodieException("Unsupport operation"); + } + + public HoodieMetaserverClient getMetaserverClient() { + return metaserverClient; + } + +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/timeline/HoodieMetaserverBasedTimeline.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/timeline/HoodieMetaserverBasedTimeline.java new file mode 100644 index 000000000000..6ea327297a44 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/timeline/HoodieMetaserverBasedTimeline.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.timeline; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metaserver.client.HoodieMetaserverClient; +import org.apache.hudi.metaserver.client.HoodieMetaserverClientProxy; + +/** + * Active timeline for hoodie table whose metadata is stored in the hoodie meta server instead of file system. + */ +public class HoodieMetaserverBasedTimeline extends HoodieActiveTimeline { + private final String databaseName; + private final String tableName; + private final HoodieMetaserverClient metaserverClient; + + public HoodieMetaserverBasedTimeline(HoodieTableMetaClient metaClient, HoodieMetaserverConfig config) { + this.metaClient = metaClient; + this.metaserverClient = HoodieMetaserverClientProxy.getProxy(config); + this.databaseName = config.getString(HoodieTableConfig.DATABASE_NAME.key()); + this.tableName = config.getString(HoodieTableConfig.NAME.key()); + this.setInstants(metaserverClient.listInstants(databaseName, tableName, 24)); + } + + @Override + protected void deleteInstantFile(HoodieInstant instant) { + metaserverClient.deleteInstant(databaseName, tableName, instant); + } + + @Override + public void transitionState(HoodieInstant fromInstant, HoodieInstant toInstant, Option data, boolean allowRedundantTransitions) { + ValidationUtils.checkArgument(fromInstant.getTimestamp().equals(toInstant.getTimestamp())); + metaserverClient.transitionInstantState(databaseName, tableName, fromInstant, toInstant, data); + } + + @Override + public void createFileInMetaPath(String filename, Option content, boolean allowOverwrite) { + FileStatus status = new FileStatus(); + status.setPath(new Path(filename)); + HoodieInstant instant = new HoodieInstant(status); + ValidationUtils.checkArgument(instant.getState().equals(HoodieInstant.State.REQUESTED)); + metaserverClient.createNewInstant(databaseName, tableName, instant, Option.empty()); + } + + @Override + protected void revertCompleteToInflight(HoodieInstant completed, HoodieInstant inflight) { + throw new HoodieException("Unsupported now"); + } + + public Option readDataFromPath(Path detailPath) { + FileStatus status = new FileStatus(); + status.setPath(detailPath); + HoodieInstant instant = new HoodieInstant(status); + return metaserverClient.getInstantMetadata(databaseName, tableName, instant); + } + + @Override + public HoodieMetaserverBasedTimeline reload() { + return new HoodieMetaserverBasedTimeline(metaClient, metaClient.getMetaserverConfig()); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/view/HoodieMetaserverFileSystemView.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/view/HoodieMetaserverFileSystemView.java new file mode 100644 index 000000000000..2ec9dbb9a912 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/common/table/view/HoodieMetaserverFileSystemView.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.table.view; + +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.metaserver.client.HoodieMetaserverClient; +import org.apache.hudi.metaserver.client.HoodieMetaserverClientProxy; + +/** + * TableFileSystemView Implementations based on in-memory storage and + * is specifically for hoodie table whose metadata is stored in the hoodie metaserver. + */ +public class HoodieMetaserverFileSystemView extends HoodieTableFileSystemView { + private String databaseName; + private String tableName; + + private HoodieMetaserverClient metaserverClient; + + public HoodieMetaserverFileSystemView(HoodieTableMetaClient metaClient, + HoodieTimeline visibleActiveTimeline, HoodieMetaserverConfig config) { + super(metaClient, visibleActiveTimeline); + this.metaserverClient = HoodieMetaserverClientProxy.getProxy(config); + this.databaseName = metaClient.getTableConfig().getDatabaseName(); + this.tableName = metaClient.getTableConfig().getTableName(); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClient.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClient.java new file mode 100644 index 000000000000..0eddc78cf40e --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClient.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.client; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.metaserver.thrift.Table; + +import java.util.List; + +/** + * Hoodie meta server client, is to get/put instants, instant meta, snapshot from/to hoodie meta server. + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public interface HoodieMetaserverClient { + + Table getTable(String db, String tb); + + void createTable(Table table); + + List listInstants(String db, String tb, int commitNum); + + Option getInstantMetadata(String db, String tb, HoodieInstant instant); + + String createNewTimestamp(String db, String tb); + + void createNewInstant(String db, String tb, HoodieInstant instant, Option content); + + void transitionInstantState(String db, String tb, HoodieInstant fromInstant, HoodieInstant toInstant, Option content); + + void deleteInstant(String db, String tb, HoodieInstant instant); + + boolean isLocal(); + + boolean isConnected(); +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientImp.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientImp.java new file mode 100644 index 000000000000..522303717201 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientImp.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.client; + +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.RetryHelper; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metaserver.thrift.Table; +import org.apache.hudi.metaserver.thrift.ThriftHoodieMetaserver; +import org.apache.hudi.metaserver.util.EntityConversions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TBinaryProtocol; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransport; +import org.apache.thrift.transport.TTransportException; + +import java.io.Serializable; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * HoodieMetaserverClientImp based on thrift. + */ +public class HoodieMetaserverClientImp implements HoodieMetaserverClient, AutoCloseable, Serializable { + + private static final Logger LOG = LogManager.getLogger(HoodieMetaserverClientImp.class); + private final HoodieMetaserverConfig config; + private final int retryLimit; + private final long retryDelayMs; + private boolean isConnected; + private boolean isLocal; + private ThriftHoodieMetaserver.Iface client; + private TTransport transport; + + public HoodieMetaserverClientImp(HoodieMetaserverConfig config) { + this.config = config; + this.retryLimit = config.getConnectionRetryLimit(); + this.retryDelayMs = config.getConnectionRetryDelay() * 1000L; + String uri = config.getMetaserverUris(); + if (isLocalEmbeddedMetaserver(uri)) { + try { + this.client = (ThriftHoodieMetaserver.Iface) ReflectionUtils.invokeStaticMethod("org.apache.hudi.metaserver.HoodieMetaserver", + "getEmbeddedMetaserver", new Object[]{}, new Class[]{}); + } catch (HoodieException e) { + throw new HoodieException("Please check the server uri has ever been set. Empty uri is used for local unit test", e); + } + this.isConnected = true; + this.isLocal = true; + } else { + URI msUri = URI.create(uri); + this.transport = new TSocket(msUri.getHost(), msUri.getPort()); + this.client = new ThriftHoodieMetaserver.Client(new TBinaryProtocol(transport)); + try { + new RetryHelper(retryDelayMs, retryLimit, retryDelayMs, TTransportException.class.getName()) + .tryWith(() -> { + transport.open(); + this.isConnected = true; + LOG.info("Connected to meta server: " + msUri); + return null; + }).start(); + } catch (TTransportException e) { + throw new HoodieException("Fail to connect to the metaserver.", e); + } + } + } + + private boolean isLocalEmbeddedMetaserver(String uri) { + return uri == null || uri.trim().isEmpty(); + } + + @Override + public Table getTable(String db, String tb) { + return exceptionWrapper(() -> this.client.getTable(db, tb)).get(); + } + + @Override + public void createTable(Table table) { + try { + this.client.createTable(table); + } catch (TException e) { + throw new HoodieException(e); + } + } + + @Override + public List listInstants(String db, String tb, int commitNum) { + return exceptionWrapper(() -> this.client.listInstants(db, tb, commitNum).stream() + .map(EntityConversions::fromTHoodieInstant) + .collect(Collectors.toList())).get(); + } + + @Override + public Option getInstantMetadata(String db, String tb, HoodieInstant instant) { + ByteBuffer byteBuffer = exceptionWrapper(() -> this.client.getInstantMetadata(db, tb, EntityConversions.toTHoodieInstant(instant))).get(); + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes.length > 0 ? Option.of(bytes) : Option.empty(); + } + + @Override + public String createNewTimestamp(String db, String tb) { + return exceptionWrapper(() -> this.client.createNewInstantTime(db, tb)).get(); + } + + @Override + public void createNewInstant(String db, String tb, HoodieInstant instant, Option content) { + exceptionWrapper(() -> this.client.createNewInstantWithTime(db, tb, EntityConversions.toTHoodieInstant(instant), getByteBuffer(content))).get(); + } + + @Override + public void transitionInstantState(String db, String tb, HoodieInstant fromInstant, HoodieInstant toInstant, Option content) { + exceptionWrapper(() -> this.client.transitionInstantState(db, tb, + EntityConversions.toTHoodieInstant(fromInstant), + EntityConversions.toTHoodieInstant(toInstant), + getByteBuffer(content))).get(); + } + + @Override + public void deleteInstant(String db, String tb, HoodieInstant instant) { + exceptionWrapper(() -> this.client.deleteInstant(db, tb, EntityConversions.toTHoodieInstant(instant))).get(); + } + + private ByteBuffer getByteBuffer(Option content) { + ByteBuffer byteBuffer; + if (content.isPresent()) { + byteBuffer = ByteBuffer.wrap(content.get()); + } else { + byteBuffer = ByteBuffer.allocate(0); + } + return byteBuffer; + } + + // used for test + @Override + public boolean isLocal() { + return isLocal; + } + + @Override + public boolean isConnected() { + return isConnected; + } + + @Override + public void close() { + isConnected = false; + if (transport != null && transport.isOpen()) { + transport.close(); + } + } + + interface FunctionWithTException { + R get() throws E; + } + + private Supplier exceptionWrapper(FunctionWithTException f) { + return () -> { + try { + return f.get(); + } catch (TException e) { + throw new HoodieException(e); + } + }; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientProxy.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientProxy.java new file mode 100644 index 000000000000..e3e113ead9ce --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientProxy.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.client; + +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.common.util.RetryHelper; + +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; +import java.lang.reflect.UndeclaredThrowableException; + +/** + * AOP for meta server client. + */ +public class HoodieMetaserverClientProxy implements InvocationHandler, Serializable { + + private final HoodieMetaserverClient client; + private final int retryLimit; + private final long retryDelayMs; + + private HoodieMetaserverClientProxy(HoodieMetaserverConfig config) { + this.retryLimit = config.getConnectionRetryLimit(); + this.retryDelayMs = config.getConnectionRetryDelay() * 1000L; + this.client = new HoodieMetaserverClientImp(config); + } + + public static HoodieMetaserverClient getProxy(HoodieMetaserverConfig config) { + HoodieMetaserverClientProxy handler = new HoodieMetaserverClientProxy(config); + return (HoodieMetaserverClient) Proxy.newProxyInstance(HoodieMetaserverClientProxy.class.getClassLoader(), + new Class[]{HoodieMetaserverClient.class}, handler); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + try { + return new RetryHelper(retryDelayMs, retryLimit, retryDelayMs, Exception.class.getName()) + .tryWith(() -> method.invoke(client, args)).start(); + } catch (IllegalAccessException | InvocationTargetException | UndeclaredThrowableException e) { + throw e.getCause(); + } + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/util/EntityConversions.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/util/EntityConversions.java new file mode 100644 index 000000000000..07214dd14030 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/main/java/org/apache/hudi/metaserver/util/EntityConversions.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.util; + +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.metaserver.thrift.TAction; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; + +import java.util.Locale; + +/** + * Conversion helpers to convert between hoodie entity and thrift entity. + */ +public class EntityConversions { + + public static THoodieInstant toTHoodieInstant(HoodieInstant instant) { + return new THoodieInstant(instant.getTimestamp(), toTAction(instant.getAction()), toTState(instant.getState())); + } + + public static HoodieInstant fromTHoodieInstant(THoodieInstant instant) { + return new HoodieInstant(fromTState(instant.getState()), fromTAction(instant.getAction()), instant.getTimestamp()); + } + + public static TAction toTAction(String action) { + switch (action) { + case HoodieTimeline.COMMIT_ACTION: + return TAction.COMMIT; + case HoodieTimeline.DELTA_COMMIT_ACTION: + return TAction.DELTACOMMIT; + case HoodieTimeline.CLEAN_ACTION: + return TAction.CLEAN; + case HoodieTimeline.ROLLBACK_ACTION: + return TAction.ROLLBACK; + case HoodieTimeline.SAVEPOINT_ACTION: + return TAction.SAVEPOINT; + case HoodieTimeline.REPLACE_COMMIT_ACTION: + return TAction.REPLACECOMMIT; + case HoodieTimeline.COMPACTION_ACTION: + return TAction.COMPACTION; + case HoodieTimeline.RESTORE_ACTION: + return TAction.RESTORE; + default: + throw new IllegalArgumentException("Unknown action: " + action); + } + } + + public static TState toTState(HoodieInstant.State state) { + switch (state) { + case COMPLETED: + return TState.COMPLETED; + case INFLIGHT: + return TState.INFLIGHT; + case REQUESTED: + return TState.REQUESTED; + case NIL: + return TState.NIL; + default: + throw new IllegalArgumentException("Unknown state: " + state.name()); + } + } + + public static String fromTAction(TAction action) { + switch (action) { + case COMMIT: + case DELTACOMMIT: + case CLEAN: + case ROLLBACK: + case SAVEPOINT: + case REPLACECOMMIT: + case COMPACTION: + case RESTORE: + return action.name().toLowerCase(Locale.ROOT); + default: + throw new IllegalArgumentException("Unknown action: " + action); + } + } + + public static HoodieInstant.State fromTState(TState state) { + switch (state) { + case COMPLETED: + return HoodieInstant.State.COMPLETED; + case INFLIGHT: + return HoodieInstant.State.INFLIGHT; + case REQUESTED: + return HoodieInstant.State.REQUESTED; + case NIL: + return HoodieInstant.State.NIL; + default: + throw new IllegalArgumentException("Unknown state: " + state.name()); + } + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/test/java/org/apache/hudi/metaserver/client/TestHoodieMetaserverClient.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/test/java/org/apache/hudi/metaserver/client/TestHoodieMetaserverClient.java new file mode 100644 index 000000000000..7db77abeba39 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-client/src/test/java/org/apache/hudi/metaserver/client/TestHoodieMetaserverClient.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.client; + +import org.apache.hudi.common.config.HoodieMetaserverConfig; +import org.apache.hudi.metaserver.HoodieMetaserver; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests on hoodie meta server client. + */ +public class TestHoodieMetaserverClient { + + @Test + public void testLocalClient() { + HoodieMetaserverConfig config = HoodieMetaserverConfig.newBuilder().setUris("").build(); + HoodieMetaserverClient client = new HoodieMetaserverClientImp(config); + assertTrue(client.isLocal()); + assertTrue(client.isConnected()); + } + + @Test + public void testRemoteClient() { + HoodieMetaserver.startServer(); + assertNotNull(HoodieMetaserver.getMetaserverStorage()); + HoodieMetaserverConfig config = HoodieMetaserverConfig.newBuilder().build(); + HoodieMetaserverClient client = new HoodieMetaserverClientImp(config); + assertFalse(client.isLocal()); + assertTrue(client.isConnected()); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/bin/start_hudi_metaserver.sh b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/bin/start_hudi_metaserver.sh new file mode 100755 index 000000000000..f45fd5b4ef12 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/bin/start_hudi_metaserver.sh @@ -0,0 +1,26 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Usage: ./scripts/checkout_pr.sh +# +# Checkout a PR given the PR number into a local branch. PR branches are named +# using the convention "pull/", to enable pr_push_command.sh to work +# in tandem. +# +java -jar hudi-metaserver-*.jar diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/pom.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/pom.xml new file mode 100644 index 000000000000..16ea7c134101 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/pom.xml @@ -0,0 +1,83 @@ + + + + + hudi-metaserver + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-metaserver-server + jar + + + ${project.parent.basedir} + 3.4.6 + 4.0.3 + 8.0.22 + + + + + + org.mybatis + mybatis + ${mybatis.version} + + + + com.zaxxer + HikariCP + ${HikariCP.version} + + + + mysql + mysql-connector-java + ${mysql-connector-java.version} + + + + + + + org.jacoco + jacoco-maven-plugin + + + org.codehaus.mojo + exec-maven-plugin + + + org.codehaus.mojo + build-helper-maven-plugin + + + org.apache.maven.plugins + maven-jar-plugin + + + org.apache.rat + apache-rat-plugin + + + + \ No newline at end of file diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/HoodieMetaserver.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/HoodieMetaserver.java new file mode 100644 index 000000000000..cc3ce1ace996 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/HoodieMetaserver.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver; + +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.metaserver.service.HoodieMetaserverService; +import org.apache.hudi.metaserver.service.HoodieMetaserverProxyHandler; +import org.apache.hudi.metaserver.service.TableService; +import org.apache.hudi.metaserver.service.TimelineService; +import org.apache.hudi.metaserver.store.RelationalDBBasedStorage; +import org.apache.hudi.metaserver.store.MetaserverStorage; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.ThriftHoodieMetaserver; +import org.apache.hudi.metaserver.util.TServerSocketWrapper; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.server.TServer; +import org.apache.thrift.server.TThreadPoolServer; +import org.apache.thrift.transport.TServerTransport; + +import java.lang.reflect.Proxy; + +/** + * Main class of hoodie meta server. + * + * @since 0.13.0 + * @Experimental + */ +public class HoodieMetaserver { + + private static final Logger LOG = LogManager.getLogger(HoodieMetaserver.class); + + private static TServer server; + private static Thread serverThread; + private static volatile MetaserverStorage metaserverStorage; + private static HoodieMetaserverService metaserverService; + + public static void main(String[] args) { + startServer(); + } + + public static void startServer() { + try { + if (server != null) { + return; + } + metaserverStorage = new RelationalDBBasedStorage(); + // service + TableService tableService = new TableService(metaserverStorage); + TimelineService timelineService = new TimelineService(metaserverStorage); + HoodieMetaserverService hoodieMetaserverService = new HoodieMetaserverService(tableService, timelineService); + HoodieMetaserverProxyHandler proxyHandler = new HoodieMetaserverProxyHandler(hoodieMetaserverService); + + // start a thrift server + ThriftHoodieMetaserver.Iface proxy = (ThriftHoodieMetaserver.Iface) Proxy + .newProxyInstance(HoodieMetaserverProxyHandler.class.getClassLoader(), + new Class[]{ThriftHoodieMetaserver.Iface.class}, proxyHandler); + ThriftHoodieMetaserver.Processor processor = new ThriftHoodieMetaserver.Processor(proxy); + TServerTransport serverTransport = new TServerSocketWrapper(9090); + server = new TThreadPoolServer(new TThreadPoolServer.Args(serverTransport).processor(processor)); + LOG.info("Starting the server"); + serverThread = new Thread(() -> server.serve()); + serverThread.start(); + } catch (Exception e) { + LOG.error("Failed to start Metaserver.", e); + System.exit(1); + } + } + + public static ThriftHoodieMetaserver.Iface getEmbeddedMetaserver() { + if (metaserverStorage == null) { + synchronized (HoodieMetaserver.class) { + if (metaserverStorage == null) { + // TODO: add metastore factory. + metaserverStorage = new RelationalDBBasedStorage(); + try { + metaserverStorage.initStorage(); + } catch (MetaserverStorageException e) { + throw new HoodieException("Fail to init the Metaserver's storage." + e); + } + TableService tableService = new TableService(metaserverStorage); + TimelineService timelineService = new TimelineService(metaserverStorage); + metaserverService = new HoodieMetaserverService(tableService, timelineService); + } + } + } + return metaserverService; + } + + // only for test + public static MetaserverStorage getMetaserverStorage() { + return metaserverStorage; + } + + public static void stopServer() { + if (server != null) { + LOG.info("Stop the server..."); + server.stop(); + serverThread.interrupt(); + server = null; + } + if (metaserverStorage != null) { + metaserverStorage.close(); + } + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverProxyHandler.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverProxyHandler.java new file mode 100644 index 000000000000..5fbf2c9eb16d --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverProxyHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.service; + +import org.apache.hudi.metaserver.thrift.MetaserverException; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.TException; + +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Arrays; + +/** + * AOP for meta server. + */ +public class HoodieMetaserverProxyHandler implements InvocationHandler { + private static final Logger LOG = LogManager.getLogger(HoodieMetaserverProxyHandler.class); + + private final HoodieMetaserverService metaserverService; + + public HoodieMetaserverProxyHandler(HoodieMetaserverService metaserverService) { + this.metaserverService = metaserverService; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + Throwable err; + try { + return method.invoke(metaserverService, args); + } catch (IllegalAccessException | InvocationTargetException e) { + err = e.getCause(); + } catch (Throwable e) { + err = e; + } + if (err != null) { + LOG.error("Error in calling metaserver method=" + method.getName() + " args=" + Arrays.toString(args) + " error", err); + if (err instanceof TException) { + throw err; + } else { + throw new MetaserverException(err.getMessage()); + } + } + return null; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverService.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverService.java new file mode 100644 index 000000000000..06712bc1839f --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/HoodieMetaserverService.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.service; + +import org.apache.hudi.metaserver.thrift.HoodieInstantChangeResult; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.Table; +import org.apache.hudi.metaserver.thrift.ThriftHoodieMetaserver; +import org.apache.thrift.TException; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * A proxy for meta server, accepts all thrift calls and routes them to the corresponding service. + */ +public class HoodieMetaserverService implements ThriftHoodieMetaserver.Iface, Serializable { + private final TableService tableService; + private final TimelineService timelineService; + + public HoodieMetaserverService(TableService tableService, TimelineService timelineService) { + this.tableService = tableService; + this.timelineService = timelineService; + } + + @Override + public void createDatabase(String db) throws TException { + tableService.createDatabase(db); + } + + @Override + public void createTable(Table table) throws TException { + tableService.createTable(table); + } + + @Override + public Table getTable(String db, String tb) throws TException { + return tableService.getTable(db, tb); + } + + @Override + public List listInstants(String db, String tb, int num) throws TException { + return timelineService.listInstants(db, tb, num); + } + + @Override + public ByteBuffer getInstantMetadata(String db, String tb, THoodieInstant instant) throws TException { + return timelineService.getInstantMetadata(db, tb, instant); + } + + @Override + public String createNewInstantTime(String db, String tb) throws TException { + return timelineService.createNewInstantTime(db, tb); + } + + @Override + public HoodieInstantChangeResult createNewInstantWithTime(String db, String tb, THoodieInstant instant, ByteBuffer content) throws TException { + return timelineService.createNewInstantWithTime(db, tb, instant, content); + } + + @Override + public HoodieInstantChangeResult transitionInstantState(String db, String tb, THoodieInstant fromInstant, THoodieInstant toInstant, ByteBuffer metadata) throws TException { + return timelineService.transitionInstantState(db, tb, fromInstant, toInstant, metadata); + } + + @Override + public HoodieInstantChangeResult deleteInstant(String db, String tb, THoodieInstant instant) throws TException { + return timelineService.deleteInstant(db, tb, instant); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TableService.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TableService.java new file mode 100644 index 000000000000..78961c6b883e --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TableService.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.service; + +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.metaserver.store.MetaserverStorage; +import org.apache.hudi.metaserver.thrift.AlreadyExistException; +import org.apache.hudi.metaserver.thrift.MetaserverException; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.NoSuchObjectException; +import org.apache.hudi.metaserver.thrift.Table; + +import java.io.Serializable; + +/** + * Handle all database / table related requests. + */ +public class TableService implements Serializable { + private MetaserverStorage store; + + public TableService(MetaserverStorage metaserverStorage) { + this.store = metaserverStorage; + } + + public void createDatabase(String db) throws AlreadyExistException, MetaserverStorageException, MetaserverException { + // todo: define the database entry in the thrift + if (databaseExists(db)) { + throw new AlreadyExistException("Database " + db + " already exists"); + } + if (!store.createDatabase(db)) { + throw new MetaserverException("Fail to create the database: " + db); + } + } + + public Table getTable(String db, String tb) throws MetaserverStorageException, NoSuchObjectException { + Table table = store.getTable(db, tb); + if (table == null) { + throw new NoSuchObjectException(db + "." + tb + " does not exist"); + } + // todo: add params + table.setTableType(HoodieTableType.COPY_ON_WRITE.toString()); + return table; + } + + public void createTable(Table table) throws MetaserverStorageException, NoSuchObjectException, AlreadyExistException, MetaserverException { + Long dbId = store.getDatabaseId(table.getDbName()); + if (dbId == null) { + createDatabase(table.getDbName()); + dbId = store.getDatabaseId(table.getDbName()); + } + if (tableExists(table.getDbName(), table.getTableName())) { + throw new AlreadyExistException(table.getDbName() + "." + table.getTableName() + " already exists"); + } + if (!store.createTable(dbId, table)) { + throw new MetaserverException("Fail to create the table: " + table); + } + // todo: add params + } + + private boolean databaseExists(String db) throws MetaserverStorageException { + return store.getDatabaseId(db) != null; + } + + private boolean tableExists(String db, String tb) throws MetaserverStorageException { + return store.getTableId(db, tb) != null; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TimelineService.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TimelineService.java new file mode 100644 index 000000000000..4e13db91b682 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/service/TimelineService.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.service; + +import org.apache.hudi.metaserver.store.MetaserverStorage; +import org.apache.hudi.metaserver.thrift.HoodieInstantChangeResult; +import org.apache.hudi.metaserver.thrift.MetaserverException; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.NoSuchObjectException; +import org.apache.hudi.metaserver.thrift.TAction; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; +import org.apache.hudi.metaserver.util.MetaserverTableUtils; + +import org.apache.log4j.Logger; +import org.apache.thrift.TException; + +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; + +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; + +/** + * Handle all timeline / instant / instant meta related requests. + */ +public class TimelineService implements Serializable { + + private static final Logger LOG = Logger.getLogger(TimelineService.class); + private static final List ALL_ACTIONS = Arrays.asList(TAction.COMMIT, TAction.DELTACOMMIT, + TAction.CLEAN, TAction.ROLLBACK, TAction.SAVEPOINT, TAction.REPLACECOMMIT, TAction.COMPACTION, TAction.RESTORE); + private static final List PENDING_STATES = Arrays.asList(TState.REQUESTED, TState.INFLIGHT); + + private final MetaserverStorage store; + + public TimelineService(MetaserverStorage metaserverStorage) { + this.store = metaserverStorage; + } + + public List listInstants(String db, String tb, int num) throws TException { + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + List completeds = store.scanInstants(tableId, TState.COMPLETED, num); + List pendings = store.scanInstants(tableId, PENDING_STATES, -1); + completeds.addAll(pendings); + return completeds; + } + + public ByteBuffer getInstantMetadata(String db, String tb, THoodieInstant instant) throws TException { + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + return ByteBuffer.wrap(store.getInstantMetadata(tableId, instant)); + } + + public String createNewInstantTime(String db, String tb) throws MetaserverStorageException, NoSuchObjectException { + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + return store.createNewTimestamp(tableId); + } + + public HoodieInstantChangeResult createNewInstantWithTime(String db, String tb, THoodieInstant instant, ByteBuffer content) throws TException { + checkArgument(instant.getState().equals(TState.REQUESTED)); + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + HoodieInstantChangeResult result = new HoodieInstantChangeResult(); + result.setInstant(instant); + if (store.instantExists(tableId, instant)) { + result.setSuccess(true); + return result; + } + store.saveInstantMetadata(tableId, instant, content.array()); + result.setSuccess(store.createInstant(tableId, instant)); + return result; + } + + public HoodieInstantChangeResult transitionInstantState(String db, String tb, THoodieInstant fromInstant, THoodieInstant toInstant, ByteBuffer metadata) throws TException { + switch (fromInstant.getState()) { + case REQUESTED: + return transitionRequestedToInflight(db, tb, fromInstant, toInstant, metadata); + case INFLIGHT: + return transitionInflightToCompleted(db, tb, fromInstant, toInstant, metadata); + default: + throw new MetaserverException("Unsupported state " + fromInstant.getState() + " when do the state transition."); + } + } + + private HoodieInstantChangeResult transitionRequestedToInflight(String db, String tb, THoodieInstant fromInstant, THoodieInstant toInstant, ByteBuffer metadata) throws TException { + checkArgument(fromInstant.getState().equals(TState.REQUESTED)); + checkArgument(toInstant.getState().equals(TState.INFLIGHT)); + HoodieInstantChangeResult result = new HoodieInstantChangeResult(); + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + if (store.instantExists(tableId, toInstant)) { + LOG.info("Instant " + toInstant + " has been already changed to"); + result.setSuccess(true); + return result; + } + // todo: add conflict check for inflight + store.saveInstantMetadata(tableId, toInstant, metadata.array()); + result.setSuccess(store.updateInstant(tableId, fromInstant, toInstant)); + return result; + } + + private HoodieInstantChangeResult transitionInflightToCompleted(String db, String tb, THoodieInstant fromInstant, THoodieInstant toInstant, ByteBuffer metadata) throws TException { + checkArgument(fromInstant.getState().equals(TState.INFLIGHT)); + checkArgument(toInstant.getState().equals(TState.COMPLETED)); + HoodieInstantChangeResult result = new HoodieInstantChangeResult(); + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + if (store.instantExists(tableId, toInstant)) { + LOG.info("Instant " + toInstant + " has been already changed to"); + result.setSuccess(true); + return result; + } + // todo: add conflict check for completed + store.saveInstantMetadata(tableId, toInstant, metadata.array()); + // todo: sync snapshot + result.setSuccess(store.updateInstant(tableId, fromInstant, toInstant)); + return result; + } + + public HoodieInstantChangeResult deleteInstant(String db, String tb, THoodieInstant instant) throws TException { + Long tableId = MetaserverTableUtils.getTableId(store, db, tb); + HoodieInstantChangeResult result = new HoodieInstantChangeResult(); + if (store.instantExists(tableId, instant)) { + switch (instant.getState()) { + case COMPLETED: + store.deleteInstantAllMeta(tableId, instant.getTimestamp()); + break; + default: + store.deleteInstant(tableId, instant); + } + store.deleteInstant(tableId, instant); + } else { + LOG.info("Instant " + instant + " has been already deleted"); + } + result.setSuccess(true); + return result; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/MetaserverStorage.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/MetaserverStorage.java new file mode 100644 index 000000000000..4741f8169049 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/MetaserverStorage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store; + +import org.apache.hudi.ApiMaturityLevel; +import org.apache.hudi.PublicAPIClass; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; +import org.apache.hudi.metaserver.thrift.Table; + +import java.util.List; + +/** + * Metadata store for meta server, stores all entities like database, table, instant and so on. + */ +@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING) +public interface MetaserverStorage extends AutoCloseable { + + void initStorage() throws MetaserverStorageException; + + // table related + boolean createDatabase(String db) throws MetaserverStorageException; + + Long getDatabaseId(String db) throws MetaserverStorageException; + + boolean createTable(Long dbId, Table table) throws MetaserverStorageException; + + Table getTable(String db, String tb) throws MetaserverStorageException; + + Long getTableId(String db, String tb) throws MetaserverStorageException; + + // timeline related + String createNewTimestamp(long tableId) throws MetaserverStorageException; + + boolean createInstant(long tableId, THoodieInstant instant) throws MetaserverStorageException; + + boolean updateInstant(long tableId, THoodieInstant fromInstant, THoodieInstant toInstant) throws MetaserverStorageException; + + boolean deleteInstant(long tableId, THoodieInstant instant) throws MetaserverStorageException; + + List scanInstants(long tableId, List states, int limit) throws MetaserverStorageException; + + List scanInstants(long tableId, TState state, int limit) throws MetaserverStorageException; + + boolean instantExists(long tableId, THoodieInstant instant) throws MetaserverStorageException; + + void saveInstantMetadata(long tableId, THoodieInstant instant, byte[] metadata) throws MetaserverStorageException; + + boolean deleteInstantMetadata(long tableId, THoodieInstant instant) throws MetaserverStorageException; + + boolean deleteInstantAllMeta(long tableId, String timestamp) throws MetaserverStorageException; + + byte[] getInstantMetadata(long tableId, THoodieInstant instant) throws MetaserverStorageException; + + void close(); + +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/RelationalDBBasedStorage.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/RelationalDBBasedStorage.java new file mode 100644 index 000000000000..0d22032430f5 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/RelationalDBBasedStorage.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store; + +import org.apache.hudi.metaserver.store.bean.InstantBean; +import org.apache.hudi.metaserver.store.bean.TableBean; +import org.apache.hudi.metaserver.store.jdbc.WrapperDao; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; +import org.apache.hudi.metaserver.thrift.Table; + +import java.io.Serializable; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty; +import static org.apache.hudi.common.util.ValidationUtils.checkState; + +/** + * Metadata store based on relation database. + */ +public class RelationalDBBasedStorage implements MetaserverStorage, Serializable { + + private final WrapperDao tableDao = new WrapperDao.TableDao(); + private final WrapperDao timelineDao = new WrapperDao.TimelineDao(); + + @Override + public void initStorage() throws MetaserverStorageException { + WrapperDao dao = new WrapperDao("DDLMapper"); + dao.updateBySql("createDBs", null); + dao.updateBySql("createTables", null); + dao.updateBySql("createTableParams", null); + dao.updateBySql("createPartitions", null); + dao.updateBySql("createTableTimestamp", null); + dao.updateBySql("createInstant", null); + dao.updateBySql("createInstantMetadata", null); + dao.updateBySql("createFiles", null); + } + + @Override + public boolean createDatabase(String db) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("databaseName", db); + return tableDao.insertBySql("insertDB", params) == 1; + } + + @Override + public Long getDatabaseId(String db) throws MetaserverStorageException { + List ids = tableDao.queryForListBySql("selectDBId", db); + validate(ids, "db " + db); + return ids.isEmpty() ? null : ids.get(0); + } + + @Override + public boolean createTable(Long dbId, Table table) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("dbId", dbId); + TableBean tableBean = new TableBean(table); + params.put("tableBean", tableBean); + return tableDao.insertBySql("insertTable", params) == 1; + } + + @Override + public Table getTable(String db, String tb) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("databaseName", db); + params.put("tableName", tb); + List table = tableDao.queryForListBySql("selectTable", params); + validate(table, "table " + db + "." + tb); + return table.isEmpty() ? null : table.get(0).toTable(); + } + + @Override + public Long getTableId(String db, String tb) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("databaseName", db); + params.put("tableName", tb); + List ids = tableDao.queryForListBySql("selectTableId", params); + validate(ids, "table " + db + "." + tb); + return ids.isEmpty() ? null : ids.get(0); + } + + @Override + public String createNewTimestamp(long tableId) throws MetaserverStorageException { + // todo: support SSS + SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); + String oldTimestamp; + String newTimestamp; + boolean success; + try { + do { + oldTimestamp = getLatestTimestamp(tableId); + newTimestamp = oldTimestamp == null + ? sdf.format(new Date()) + : sdf.format(Math.max(new Date().getTime(), sdf.parse(oldTimestamp).getTime() + 1000)); + Map params = new HashMap<>(); + params.put("tableId", tableId); + params.put("oldTimestamp", oldTimestamp); + params.put("newTimestamp", newTimestamp); + if (oldTimestamp == null) { + success = timelineDao.insertBySql("insertTimestamp", params) == 1; + } else { + success = timelineDao.updateBySql("updateTimestamp", params) == 1; + } + } while (!success); + } catch (ParseException e) { + throw new MetaserverStorageException("Fail to parse the timestamp, " + e.getMessage()); + } + return newTimestamp; + } + + private String getLatestTimestamp(long tableId) throws MetaserverStorageException { + List timestamps = timelineDao.queryForListBySql("selectTimestampByTableId", tableId); + validate(timestamps, "timestamp"); + return timestamps.isEmpty() ? null : timestamps.get(0); + } + + @Override + public boolean createInstant(long tableId, THoodieInstant instant) throws MetaserverStorageException { + InstantBean instantBean = new InstantBean(tableId, instant); + Map params = new HashMap<>(); + params.put("instant", instantBean); + // todo: support heartbeat + params.put("duration", 120); + params.put("startTs", (int) (System.currentTimeMillis() / 1000L)); + return timelineDao.insertBySql("insertInstant", params) == 1; + } + + @Override + public boolean updateInstant(long tableId, THoodieInstant fromInstant, THoodieInstant toInstant) throws MetaserverStorageException { + InstantBean oldInstant = new InstantBean(tableId, fromInstant); + InstantBean newInstant = new InstantBean(tableId, toInstant); + Map params = new HashMap<>(); + params.put("oldInstant", oldInstant); + params.put("newInstant", newInstant); + return timelineDao.updateBySql("updateInstant", params) == 1; + } + + @Override + public boolean deleteInstant(long tableId, THoodieInstant instant) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("tableId", tableId); + params.put("ts", instant.getTimestamp()); + return timelineDao.deleteBySql("deleteInstant", params) == 1; + } + + @Override + public List scanInstants(long tableId, List states, int limit) throws MetaserverStorageException { + if (isNullOrEmpty(states)) { + throw new MetaserverStorageException("State has to be specified when scan instants"); + } + Map params = new HashMap<>(); + params.put("tableId", tableId); + params.put("states", states.stream().mapToInt(TState::getValue).boxed().collect(Collectors.toList())); + params.put("limit", limit); + List instantBeans = timelineDao.queryForListBySql("selectInstantsByStates", params); + return instantBeans.stream().map(InstantBean::toTHoodieInstant).collect(Collectors.toList()); + } + + @Override + public List scanInstants(long tableId, TState state, int limit) throws MetaserverStorageException { + return scanInstants(tableId, Collections.singletonList(state), limit); + } + + @Override + public boolean instantExists(long tableId, THoodieInstant instant) throws MetaserverStorageException { + InstantBean instantBean = new InstantBean(tableId, instant); + List ids = timelineDao.queryForListBySql("selectInstantId", instantBean); + validate(ids, instantBean.toString()); + return !ids.isEmpty(); + } + + // todo: check correctness + @Override + public void saveInstantMetadata(long tableId, THoodieInstant instant, byte[] metadata) throws MetaserverStorageException { + InstantBean instantBean = new InstantBean(tableId, instant); + Map params = new HashMap<>(); + params.put("instant", instantBean); + params.put("metadata", metadata); + // todo: array bytes to longblob + timelineDao.insertBySql("insertInstantMetadata", params); + } + + @Override + public boolean deleteInstantMetadata(long tableId, THoodieInstant instant) throws MetaserverStorageException { + InstantBean instantBean = new InstantBean(tableId, instant); + return timelineDao.deleteBySql("deleteInstantMetadata", instantBean) == 1; + } + + @Override + public boolean deleteInstantAllMeta(long tableId, String timestamp) throws MetaserverStorageException { + Map params = new HashMap<>(); + params.put("tableId", tableId); + params.put("ts", timestamp); + return timelineDao.deleteBySql("deleteInstantAllMetadata", params) >= 1; + } + + @Override + public byte[] getInstantMetadata(long tableId, THoodieInstant instant) throws MetaserverStorageException { + InstantBean instantBean = new InstantBean(tableId, instant); + Map result = timelineDao.queryForObjectBySql("selectInstantMetadata", instantBean); + return result == null ? null : (byte[]) result.get("data"); + } + + @Override + public void close() { + + } + + public static void validate(List entities, String entityName) throws MetaserverStorageException { + try { + checkState(isNullOrEmpty(entities) || entities.size() == 1, "Found multiple records of " + entityName + " , expected one"); + } catch (IllegalStateException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/InstantBean.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/InstantBean.java new file mode 100644 index 000000000000..c404ffa284b2 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/InstantBean.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.bean; + +import org.apache.hudi.metaserver.thrift.TAction; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; + +/** + * Instant entity for store. + */ +public class InstantBean { + + private Long tableId; + private String ts; + private Integer action; + private Integer state; + + public InstantBean(Long tableId, THoodieInstant instant) { + this.tableId = tableId; + this.ts = instant.getTimestamp(); + this.action = instant.getAction().getValue(); + this.state = instant.getState().getValue(); + } + + public InstantBean(String ts, Byte action, Byte state) { + this.ts = ts; + this.action = action & 0xFF; + this.state = state & 0xFF; + } + + public THoodieInstant toTHoodieInstant() { + THoodieInstant instant = new THoodieInstant(); + instant.setTimestamp(ts); + instant.setAction(TAction.findByValue(action)); + instant.setState(TState.findByValue(state)); + return instant; + } + + public Long getTableId() { + return tableId; + } + + public void setTableId(Long tableId) { + this.tableId = tableId; + } + + public String getTs() { + return ts; + } + + public void setTs(String ts) { + this.ts = ts; + } + + public Integer getAction() { + return action; + } + + public void setAction(Integer action) { + this.action = action; + } + + public Integer getState() { + return state; + } + + public void setState(Integer state) { + this.state = state; + } + + @Override + public String toString() { + return "InstantBean{" + + "tableId=" + tableId + + ", ts='" + ts + '\'' + + ", action=" + action + + ", state=" + state + + '}'; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/TableBean.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/TableBean.java new file mode 100644 index 000000000000..f18d127c1bc9 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/bean/TableBean.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.bean; + +import org.apache.hudi.metaserver.thrift.Table; + +import java.sql.Timestamp; + +/** + * Table entity for store. + */ +public class TableBean { + private String dbName; + private Long tblId; + private String tableName; + private Long createTime; // ms + private String owner; + private String location; + + public TableBean(Table table) { + this.tableName = table.tableName; + this.owner = table.owner; + this.location = table.location; + } + + public TableBean(String dbName, Long tblId, String tableName, Timestamp createTime, String owner, String location) { + this.dbName = dbName; + this.tblId = tblId; + this.tableName = tableName; + this.createTime = createTime.getTime(); + this.owner = owner; + this.location = location; + } + + public Table toTable() { + Table table = new Table(); + table.setDbName(dbName); + table.setTableName(tableName); + table.setOwner(owner); + table.setLocation(location); + table.setCreateTime(createTime.longValue()); + return table; + } + + public String getDbName() { + return dbName; + } + + public void setDatabaseName(String dbName) { + this.dbName = dbName; + } + + public Long getTblId() { + return tblId; + } + + public void setTblId(Long tblId) { + this.tblId = tblId; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public Long getCreateTime() { + return createTime; + } + + public void setCreateTime(Long createTime) { + this.createTime = createTime; + } + + public void setCreateTime(Timestamp createTime) { + this.createTime = createTime.getTime(); + } + + public String getOwner() { + return owner; + } + + public void setOwner(String owner) { + this.owner = owner; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + @Override + public String toString() { + return "TableBean{" + + "dbName=" + dbName + + ", tblId=" + tblId + + ", tableName='" + tableName + '\'' + + ", createTime='" + createTime + '\'' + + ", owner='" + owner + '\'' + + ", location='" + location + '\'' + + '}'; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BasicDao.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BasicDao.java new file mode 100644 index 000000000000..265a14e86b34 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BasicDao.java @@ -0,0 +1,91 @@ +package org.apache.hudi.metaserver.store.jdbc; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.ibatis.session.SqlSession; + +import java.io.Serializable; +import java.util.List; + +/** + * A basic class provides the public method for DAO. + */ +public class BasicDao implements Serializable { + public List queryForListBySql(String namespace, String sqlID, Object parameter) { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + return session.selectList(statement(namespace, sqlID), parameter); + } + } + + public T queryForObjectBySql(String namespace, String sqlID, Object parameter) { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + return session.selectOne(statement(namespace, sqlID), parameter); + } + } + + public int insertBySql(String namespace, String sqlID, Object parameter) { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + int res = session.insert(statement(namespace, sqlID), parameter); + session.commit(); + return res; + } + } + + public int deleteBySql(String namespace, String sqlID, Object parameter) { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + int res = session.delete(statement(namespace, sqlID), parameter); + session.commit(); + return res; + } + } + + public int updateBySql(String namespace, String sqlID, Object parameter) { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + int res = session.update(statement(namespace, sqlID), parameter); + session.commit(); + return res; + } + } + + public void batchOperateBySql(List batchDaoOperations) throws MetaserverStorageException { + try (SqlSession session = SqlSessionFactoryUtils.openSqlSession()) { + for (BatchDaoOperation batchDaoOperation: batchDaoOperations) { + switch (batchDaoOperation.getOperationType()) { + case BatchDaoOperation.OPERATION_TYPE_INSERT: + session.insert(statement(batchDaoOperation.getNamespace(), batchDaoOperation.getSqlID()), batchDaoOperation.getParameter()); + break; + case BatchDaoOperation.OPERATION_TYPE_UPDATE: + session.update(statement(batchDaoOperation.getNamespace(), batchDaoOperation.getSqlID()), batchDaoOperation.getParameter()); + break; + case BatchDaoOperation.OPERATION_TYPE_DELETE: + session.delete(statement(batchDaoOperation.getNamespace(), batchDaoOperation.getSqlID()), batchDaoOperation.getParameter()); + break; + default: + throw new MetaserverStorageException("Unsupported type: " + batchDaoOperation.getOperationType()); + } + } + session.commit(); + } + } + + private String statement(String namespace, String sqlID) { + return namespace + "." + sqlID; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BatchDaoOperation.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BatchDaoOperation.java new file mode 100644 index 000000000000..e912bfd26f97 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/BatchDaoOperation.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.jdbc; + +public class BatchDaoOperation { + + public static final String OPERATION_TYPE_INSERT = "INSERT"; + public static final String OPERATION_TYPE_UPDATE = "UPDATE"; + public static final String OPERATION_TYPE_DELETE = "DELETE"; + + private String namespace; + private String sqlID; + private Object parameter; + private String operationType; + + public BatchDaoOperation(String namespace, String sqlID, Object parameter, String operationType) { + this.namespace = namespace; + this.sqlID = sqlID; + this.parameter = parameter; + this.operationType = operationType; + } + + public BatchDaoOperation(String sqlID, Object parameter, String operationType) { + this(null, sqlID, parameter, operationType); + } + + public String getNamespace() { + return namespace; + } + + public void setNamespace(String namespace) { + this.namespace = namespace; + } + + public String getSqlID() { + return sqlID; + } + + public void setSqlID(String sqlID) { + this.sqlID = sqlID; + } + + public Object getParameter() { + return parameter; + } + + public void setParameter(Object parameter) { + this.parameter = parameter; + } + + public String getOperationType() { + return operationType; + } + + public void setOperationType(String operationType) { + this.operationType = operationType; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/HikariDataSourceFactory.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/HikariDataSourceFactory.java new file mode 100644 index 000000000000..dd26486ae366 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/HikariDataSourceFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.jdbc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import org.apache.ibatis.datasource.unpooled.UnpooledDataSourceFactory; +import org.apache.ibatis.io.Resources; + +import java.io.IOException; +import java.util.Properties; + +/** + * JDBC datasource factory. + */ +public class HikariDataSourceFactory extends UnpooledDataSourceFactory { + private static final String PROPERTIES_PATH = "hikariPool.properties"; + + public HikariDataSourceFactory() throws IOException { + Properties properties = new Properties(); + properties.load(Resources.getResourceAsStream(PROPERTIES_PATH)); + HikariConfig config = new HikariConfig(properties); + this.dataSource = new HikariDataSource(config); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/SqlSessionFactoryUtils.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/SqlSessionFactoryUtils.java new file mode 100644 index 000000000000..ab1a0fede0d8 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/SqlSessionFactoryUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.jdbc; + +import org.apache.ibatis.io.Resources; +import org.apache.ibatis.session.SqlSession; +import org.apache.ibatis.session.SqlSessionFactory; +import org.apache.ibatis.session.SqlSessionFactoryBuilder; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.io.InputStream; + +/** + * Utils for sql session's life cycle. + */ +public class SqlSessionFactoryUtils { + private static final Logger LOG = LogManager.getLogger(SqlSessionFactoryUtils.class); + private static final String CONFIG_PATH = "mybatis-config.xml"; + private static volatile SqlSessionFactory sqlSessionFactory; + + private SqlSessionFactoryUtils() { + + } + + private static void initSqlSessionFactory() { + if (sqlSessionFactory == null) { + synchronized (SqlSessionFactoryUtils.class) { + if (sqlSessionFactory == null) { + try (InputStream inputStream = Resources.getResourceAsStream(CONFIG_PATH)) { + sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream); + } catch (IOException e) { + LOG.error("Failed to init SQL session.", e); + } + } + } + } + } + + public static SqlSession openSqlSession() { + initSqlSessionFactory(); + return sqlSessionFactory.openSession(); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/WrapperDao.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/WrapperDao.java new file mode 100644 index 000000000000..7ea90549d53a --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/store/jdbc/WrapperDao.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store.jdbc; + +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.ibatis.exceptions.PersistenceException; + +import java.util.List; + +/** + * A wrapper class to handle exception thrown when do operations to database store. + */ +public class WrapperDao extends BasicDao { + + private final String namespace; + + public WrapperDao(String namespace) { + this.namespace = namespace; + } + + public List queryForListBySql(String sqlID, Object parameter) throws MetaserverStorageException { + try { + return queryForListBySql(namespace, sqlID, parameter); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public T queryForObjectBySql(String sqlID, Object parameter) throws MetaserverStorageException { + try { + return queryForObjectBySql(namespace, sqlID, parameter); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public int insertBySql(String sqlID, Object parameter) throws MetaserverStorageException { + try { + return insertBySql(namespace, sqlID, parameter); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public int deleteBySql(String sqlID, Object parameter) throws MetaserverStorageException { + try { + return deleteBySql(namespace, sqlID, parameter); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public int updateBySql(String sqlID, Object parameter) throws MetaserverStorageException { + try { + return updateBySql(namespace, sqlID, parameter); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public void batchOperateBySql(List batchDaoOperations) throws MetaserverStorageException { + try { + batchDaoOperations.forEach(x -> { + if (x.getNamespace() == null) { + x.setNamespace(namespace); + } + }); + super.batchOperateBySql(batchDaoOperations); + } catch (PersistenceException e) { + throw new MetaserverStorageException(e.getMessage()); + } + } + + public static class TableDao extends WrapperDao { + public TableDao() { + super("TableMapper"); + } + } + + public static class PartitionDao extends WrapperDao { + public PartitionDao() { + super("PartitionMapper"); + } + } + + public static class TimelineDao extends WrapperDao { + public TimelineDao() { + super("TimelineMapper"); + } + } + + public static class FileDao extends WrapperDao { + public FileDao() { + super("FileMapper"); + } + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/MetaserverTableUtils.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/MetaserverTableUtils.java new file mode 100644 index 000000000000..24b006720d2c --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/MetaserverTableUtils.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.util; + +import org.apache.hudi.metaserver.store.MetaserverStorage; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.NoSuchObjectException; + +public class MetaserverTableUtils { + + public static Long getTableId(MetaserverStorage store, String databaseName, String tableName) throws NoSuchObjectException, MetaserverStorageException { + Long tableId = store.getTableId(databaseName, tableName); + if (tableId == null) { + throw new NoSuchObjectException(databaseName + "." + tableName + " does not exist"); + } + return tableId; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/TServerSocketWrapper.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/TServerSocketWrapper.java new file mode 100644 index 000000000000..97b34b696396 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/java/org/apache/hudi/metaserver/util/TServerSocketWrapper.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.util; + +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.thrift.transport.TServerSocket; +import org.apache.thrift.transport.TSocket; +import org.apache.thrift.transport.TTransportException; + +import java.net.ServerSocket; + +/** + * A wrapper class of thrift server socket. + */ +public class TServerSocketWrapper extends TServerSocket { + + private static final Logger LOG = LogManager.getLogger(TServerSocketWrapper.class); + + public TServerSocketWrapper(ServerSocket serverSocket) throws TTransportException { + super(serverSocket); + } + + public TServerSocketWrapper(int port) throws TTransportException { + super(port); + } + + @Override + protected TSocket acceptImpl() throws TTransportException { + TSocket socket = super.acceptImpl(); + LOG.info("received connection from " + socket.getSocket().getInetAddress().getHostAddress() + + ":" + socket.getSocket().getPort()); + return socket; + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/hikariPool.properties b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/hikariPool.properties new file mode 100644 index 000000000000..aed55423e6b0 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/hikariPool.properties @@ -0,0 +1,20 @@ +### +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +### +jdbcUrl=jdbc:h2:mem:bms;MODE=MYSQL +#dataSource.user=root +#dataSource.password=password diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis-config.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis-config.xml new file mode 100644 index 000000000000..8610f8983dda --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis-config.xml @@ -0,0 +1,45 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/DDLMapper.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/DDLMapper.xml new file mode 100644 index 000000000000..1626724fd7fc --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/DDLMapper.xml @@ -0,0 +1,126 @@ + + + + + + + CREATE TABLE dbs + ( + db_id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + desc VARCHAR(512) COMMENT 'database description', + location_uri VARCHAR(512) COMMENT 'database storage path', + name VARCHAR(512) UNIQUE COMMENT 'database name', + owner_name VARCHAR(512) COMMENT 'database owner', + owner_type VARCHAR(512) COMMENT 'database type', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'db created time', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time' + ) COMMENT 'databases'; + + + + CREATE TABLE tbls + ( + tbl_id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + db_id BIGINT COMMENT 'database id', + name VARCHAR(512) COMMENT 'table name', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'table created time', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + owner_name VARCHAR(512) COMMENT 'table owner', + location VARCHAR(512) COMMENT 'table location', + UNIQUE KEY uniq_tb (db_id, name) + ) COMMENT 'tables'; + + + + CREATE TABLE tbl_params + ( + tbl_id BIGINT UNSIGNED COMMENT 'tbl id', + param_key VARCHAR(256) COMMENT 'param_key', + param_value VARCHAR(2048) COMMENT 'param_value', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'parameter created time', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + PRIMARY KEY (tbl_id, param_key) + ) COMMENT 'tbl params'; + + + + CREATE TABLE partitions + ( + part_id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + tbl_id BIGINT COMMENT 'table id', + part_name VARCHAR(256) COMMENT 'partition path', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + is_deleted BOOL DEFAULT FALSE COMMENT 'whether the partition is deleted', + UNIQUE uniq_partition_version (tbl_id, part_name) + ) COMMENT 'partitions'; + + + + CREATE TABLE tbl_timestamp + ( + tbl_id BIGINT UNSIGNED PRIMARY KEY COMMENT 'uuid', + ts VARCHAR(17) COMMENT 'instant timestamp' + ) COMMENT 'generate the unique timestamp for a table'; + + + + CREATE TABLE instant + ( + instant_id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + tbl_id BIGINT COMMENT 'table id', + ts VARCHAR(17) COMMENT 'instant timestamp', + action TINYINT COMMENT 'commit, deltacommit, compaction, replace etc', + state TINYINT COMMENT 'completed, requested, inflight, invalid etc', + duration INT DEFAULT 0 COMMENT 'for heartbeat (s)', + start_ts INT DEFAULT 0 COMMENT 'for heartbeat (s)', + UNIQUE KEY uniq_inst1 (tbl_id, state, ts, action), + UNIQUE KEY uniq_inst2 (tbl_id, ts) + ) COMMENT 'timeline'; + + + + CREATE TABLE instant_meta_data + ( + commit_id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + tbl_id BIGINT COMMENT 'table id', + ts VARCHAR(17) COMMENT 'instant timestamp', + action TINYINT COMMENT 'commit, deltacommit, compaction, replace etc', + state TINYINT COMMENT 'completed, requested, inflight, invalid etc', + data LONGBLOB COMMENT 'instant metadata', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'instant meta created time', + UNIQUE KEY uniq_inst3 (tbl_id, state, ts, action) + ) COMMENT 'instant meta data'; + + + + CREATE TABLE files + ( + id BIGINT UNSIGNED PRIMARY KEY AUTO_INCREMENT COMMENT 'uuid', + tbl_id BIGINT COMMENT 'table id', + part_id BIGINT COMMENT 'partition id', + name VARCHAR(256) COMMENT 'file name', + size BIGINT COMMENT 'file size', + is_deleted BOOL COMMENT 'whether the file has been deleted', + create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP COMMENT 'create time', + update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'update time', + UNIQUE KEY uniq_name (part_id, name) + ) COMMENT 'snapshot files'; + + diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TableMapper.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TableMapper.xml new file mode 100644 index 000000000000..a3452cd50b04 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TableMapper.xml @@ -0,0 +1,60 @@ + + + + + + + + INSERT INTO dbs (name, desc, location_uri, owner_name, owner_type) + VALUES(#{databaseName}, null, null, null, null) + + + + + + + INSERT INTO tbls (db_id, name, owner_name, location) + VALUES(#{dbId}, #{tableBean.tableName}, #{tableBean.owner}, #{tableBean.location}) + + + + + + + diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TimelineMapper.xml b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TimelineMapper.xml new file mode 100644 index 000000000000..db36a47d6285 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/main/resources/mybatis/TimelineMapper.xml @@ -0,0 +1,146 @@ + + + + + + + + + INSERT IGNORE INTO tbl_timestamp + VALUES (#{tableId}, #{newTimestamp}) + + + + UPDATE tbl_timestamp + SET ts = #{newTimestamp} + WHERE + tbl_id = #{tableId} + AND ts = #{oldTimestamp} + + + + + + + INSERT INTO instant (tbl_id, action, ts, state, duration, start_ts) + VALUES (#{instant.tableId}, #{instant.action}, #{instant.ts}, #{instant.state}, #{duration}, ${startTs}) + + + + UPDATE instant + SET state = #{newInstant.state}, action = #{newInstant.action} + + , duration = #{duration} + + + , start_ts = #{startTs} + + WHERE + tbl_id = #{oldInstant.tableId} + AND action = #{oldInstant.action} + AND ts = #{oldInstant.ts} + AND state = #{oldInstant.state} + + + + DELETE + FROM + instant + WHERE + tbl_id = #{tableId} + AND ts = #{ts} + + + + + + + + + + + + INSERT IGNORE INTO instant_meta_data (tbl_id, ts, action, state, data) + VALUES (#{instant.tableId}, #{instant.ts}, #{instant.action}, #{instant.state}, #{metadata, typeHandler=org.apache.ibatis.type.BlobTypeHandler}) + + + + DELETE + FROM + instant_meta_data + WHERE + tbl_id = #{tableId} + AND ts = #{ts} + + + + DELETE + FROM + instant_meta_data + WHERE + tbl_id = #{tableId} + AND ts = #{ts} + AND state = #{state} + + + + + diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/TestHoodieMetaserver.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/TestHoodieMetaserver.java new file mode 100644 index 000000000000..3be36b605cdc --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/TestHoodieMetaserver.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class TestHoodieMetaserver { + @Test + public void testEmbeddedServer() { + assertNotNull(HoodieMetaserver.getEmbeddedMetaserver()); + assertNotNull(HoodieMetaserver.getMetaserverStorage()); + } +} diff --git a/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/store/TestRelationalDBBasedStore.java b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/store/TestRelationalDBBasedStore.java new file mode 100644 index 000000000000..bd8d960f43a9 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/hudi-metaserver-server/src/test/java/org/apache/hudi/metaserver/store/TestRelationalDBBasedStore.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metaserver.store; + +import org.apache.hudi.metaserver.HoodieMetaserver; +import org.apache.hudi.metaserver.thrift.MetaserverStorageException; +import org.apache.hudi.metaserver.thrift.TAction; +import org.apache.hudi.metaserver.thrift.THoodieInstant; +import org.apache.hudi.metaserver.thrift.TState; +import org.apache.hudi.metaserver.thrift.Table; +import org.apache.thrift.TException; +import org.junit.jupiter.api.Test; + +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** + * Unit tests on metadata store base on relation database of hoodie meta server. + */ +public class TestRelationalDBBasedStore { + + private MetaserverStorage store; + private final String db = "test_db"; + private final String tb = "test_tb"; + + @Test + public void testAPIs() throws TException { + HoodieMetaserver.getEmbeddedMetaserver(); + store = HoodieMetaserver.getMetaserverStorage(); + testTableRelatedAPIs(); + testTimelineRelatedAPIs(); + } + + private void testTableRelatedAPIs() throws MetaserverStorageException { + assertTrue(store.createDatabase(db)); + Long dbId = store.getDatabaseId(db); + assertNotNull(dbId); + + Table table = new Table(); + table.setDbName(db); + table.setTableName(tb); + table.setOwner("owner"); + table.setLocation("test_db.db/test_tb"); + // check table related API + assertTrue(store.createTable(dbId, table)); + Long tableId = store.getTableId(db, tb); + assertNotNull(tableId); + Table storedTable = store.getTable(db, tb); + assertTrue(System.currentTimeMillis() - storedTable.getCreateTime() <= 1000); + table.setCreateTime(storedTable.getCreateTime()); + assertEquals(store.getTable(db, tb).toString(), table.toString()); + } + + private void testTimelineRelatedAPIs() throws MetaserverStorageException { + Long tableId = store.getTableId(db, tb); + String ts = store.createNewTimestamp(tableId); + assertTrue(Long.valueOf(store.createNewTimestamp(tableId)) > Long.valueOf(ts)); + THoodieInstant requested = new THoodieInstant(ts, TAction.COMMIT, TState.REQUESTED); + assertTrue(store.createInstant(tableId, requested)); + assertTrue(store.instantExists(tableId, requested)); + assertThrows(MetaserverStorageException.class, + () -> store.createInstant(tableId, new THoodieInstant(ts, TAction.REPLACECOMMIT, TState.REQUESTED))); + // update instant and check it + THoodieInstant inflight = new THoodieInstant(ts, TAction.COMMIT, TState.INFLIGHT); + assertTrue(store.updateInstant(tableId, requested, inflight)); + List instants = store.scanInstants(tableId, Arrays.asList(TState.REQUESTED, TState.INFLIGHT), -1); + assertEquals(1, instants.size()); + assertEquals(inflight, instants.get(0)); + // delete + assertTrue(store.deleteInstant(tableId, inflight)); + assertTrue(store.scanInstants(tableId, Arrays.asList(TState.REQUESTED, TState.INFLIGHT), -1).isEmpty()); + + // instant meta CRUD + byte[] requestedMeta = "requested".getBytes(StandardCharsets.UTF_8); + byte[] inflightMeta = "inflight".getBytes(StandardCharsets.UTF_8); + store.saveInstantMetadata(tableId, requested, requestedMeta); + store.saveInstantMetadata(tableId, inflight, inflightMeta); + assertTrue(store.deleteInstantMetadata(tableId, requested)); + assertNull(store.getInstantMetadata(tableId, requested)); + assertEquals("inflight", new String(store.getInstantMetadata(tableId, inflight))); + // delete all metadata of a timestamp + store.saveInstantMetadata(tableId, requested, requestedMeta); + assertEquals("requested", new String(store.getInstantMetadata(tableId, requested))); + assertTrue(store.deleteInstantAllMeta(tableId, ts)); + assertNull(store.getInstantMetadata(tableId, requested)); + assertNull(store.getInstantMetadata(tableId, inflight)); + } + +} diff --git a/hudi-platform-service/hudi-metaserver/pom.xml b/hudi-platform-service/hudi-metaserver/pom.xml new file mode 100644 index 000000000000..28ecdab5b3b3 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/pom.xml @@ -0,0 +1,208 @@ + + + + + + hudi-platform-service + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-metaserver + 0.13.0-SNAPSHOT + + hudi-metaserver + pom + + + ${project.parent.basedir} + 1.4.200 + + /usr/local + docker + 0.1.11 + + + + hudi-metaserver-server + hudi-metaserver-client + + + + + org.apache.hudi + hudi-common + ${project.version} + + + + org.apache.thrift + libthrift + ${thrift.version} + + + + org.apache.logging.log4j + log4j-1.2-api + ${log4j2.version} + + + + + + com.h2database + h2 + ${h2.version} + test + + + org.junit.jupiter + junit-jupiter-api + test + + + + + + thrift-gen-source + + + + org.apache.thrift.tools + maven-thrift-plugin + ${maven-thrift-plugin.version} + + ${thrift.home}/bin/thrift + ${project.parent.basedir}/src/main/thrift + target/generated-sources/gen-java + java + + + + thrift-sources + generate-sources + + compile + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.6.0 + + true + + + + + + + m1-mac + + m1_mac + + + + mac + aarch64 + + + + + + + + + + org.codehaus.mojo + exec-maven-plugin + 1.6.0 + + + thrift-install-and-generate-source + generate-sources + + exec + + + + + ${project.parent.basedir}/src/main/thrift/bin/thrift_binary.sh + + ${thrift.install.env} + + false + + + + org.codehaus.mojo + build-helper-maven-plugin + ${build-helper-maven-plugin.version} + + + add-source + generate-sources + + add-source + + + + src/main/java + target/generated-sources/gen-java + + + + + add-test-source + generate-sources + + add-test-source + + + + src/test/java + src/main/java + target/generated-sources/gen-java + + + + + + + org.apache.maven.plugins + maven-jar-plugin + ${maven-jar-plugin.version} + + + + test-jar + + test-compile + + + + false + + + + + + diff --git a/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_binary.sh b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_binary.sh new file mode 100755 index 000000000000..24f2282a2a8a --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_binary.sh @@ -0,0 +1,39 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Usage: ./scripts/checkout_pr.sh +# +# Checkout a PR given the PR number into a local branch. PR branches are named +# using the convention "pull/", to enable pr_push_command.sh to work +# in tandem. +# + +SUBDIR=../src/main/thrift/bin +OS_VERSION=$1 +M1_MAC='m1_mac' +DOCKER='docker' +if [ "$OS_VERSION" = "$M1_MAC" ] +then + sh $SUBDIR/thrift_in_mac_m1.sh +elif [ "$OS_VERSION" = "$DOCKER" ] +then + sh $SUBDIR/thrift_in_docker.sh +else + sh $SUBDIR/thrift_in_docker.sh +fi \ No newline at end of file diff --git a/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_docker.sh b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_docker.sh new file mode 100755 index 000000000000..5e930ff6e37b --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_docker.sh @@ -0,0 +1,29 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +printf "====== INSTALL THRIFT START ======\n" +DIR_PATH=$(dirname "$PWD") +THRIFT_FILE_PATH=${DIR_PATH}/src/main/thrift +THRIFT_OUT_PATH=$PWD/target/generated-sources +THRIFT_VERSION=0.12.0 +THRIFT_IMAGE=thrift:$THRIFT_VERSION +docker pull $THRIFT_IMAGE +printf "====== INSTALL THRIFT END ======\n" +printf "====== COMPILE THRIFT SOURCE FILE START ======\n" +docker run -v "$THRIFT_FILE_PATH:/thrift" -v "$THRIFT_OUT_PATH:/output" $THRIFT_IMAGE thrift -o /output/ --gen java /thrift/hudi-metaserver.thrift +printf "====== COMPILE THRIFT SOURCE FILE END ======\n" diff --git a/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_mac_m1.sh b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_mac_m1.sh new file mode 100644 index 000000000000..5005195ea0f5 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/src/main/thrift/bin/thrift_in_mac_m1.sh @@ -0,0 +1,33 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# Usage: ./scripts/checkout_pr.sh +# +# Checkout a PR given the PR number into a local branch. PR branches are named +# using the convention "pull/", to enable pr_push_command.sh to work +# in tandem. +# + +printf "====== INSTALL THRIFT START ======\n" +brew install thrift@0.12.0 +printf "====== INSTALL THRIFT END ======\n" +printf "====== COMPILE THRIFT SOURCE FILE START ======\n" +mkdir -p target/generated-sources +/usr/local/bin/thrift -o target/generated-sources --gen java ../src/main/thrift/hudi-metaserver.thrift +printf "====== COMPILE THRIFT SOURCE FILE END ======\n" \ No newline at end of file diff --git a/hudi-platform-service/hudi-metaserver/src/main/thrift/hudi-metaserver.thrift b/hudi-platform-service/hudi-metaserver/src/main/thrift/hudi-metaserver.thrift new file mode 100644 index 000000000000..a8e019e33160 --- /dev/null +++ b/hudi-platform-service/hudi-metaserver/src/main/thrift/hudi-metaserver.thrift @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + namespace java org.apache.hudi.metaserver.thrift + + // table related + struct Table { + 1: string tableName, + 2: string dbName, + 3: string owner, + 4: i64 createTime, + 5: string location, + 6: string tableType, + 7: list partitionKeys, + 8: map parameters + } + + struct FieldSchema { + 1: string name, + 2: string type, + 3: string comments + } + +// timeline related +// align with actions defined in HoodieTimeline +enum TAction { + COMMIT = 1, + DELTACOMMIT = 2, + CLEAN = 3, + ROLLBACK = 4, + SAVEPOINT = 5, + REPLACECOMMIT = 6, + COMPACTION = 7, + RESTORE = 8 +} + +// align with states defined in HoodieInstant +enum TState { + REQUESTED = 1, + INFLIGHT = 2, + COMPLETED = 3, + NIL = 4 +} + +struct THoodieInstant { + 1: string timestamp, + 2: TAction action, + 3: TState state +} + +struct HoodieInstantChangeResult { + 1: bool success, + 2: optional THoodieInstant instant, + 4: optional string msg +} + +exception MetaserverStorageException { + 1: string message +} + +exception MetaserverException { + 1: string message +} + +exception NoSuchObjectException { + 1: string message +} + +exception AlreadyExistException { + 1: string message +} + +service ThriftHoodieMetaserver { + // table related + void createDatabase(1:string db) + void createTable(1:Table table) + Table getTable(1:string db, 2:string tb) + + // timeline related + list listInstants(1:string db, 2:string tb, 3:i32 num) + binary getInstantMetadata(1:string db, 2:string tb, 3:THoodieInstant instant) + string createNewInstantTime(1:string db, 2:string tb) + HoodieInstantChangeResult createNewInstantWithTime(1:string db, 2:string tb, 3:THoodieInstant instant, 4:optional binary content) + HoodieInstantChangeResult transitionInstantState(1:string db, 2:string tb, 3: THoodieInstant fromInstant, 4: THoodieInstant toInstant, 5:optional binary metadata) + HoodieInstantChangeResult deleteInstant(1:string db, 2:string tb, 3:THoodieInstant instant) +} diff --git a/hudi-platform-service/pom.xml b/hudi-platform-service/pom.xml new file mode 100644 index 000000000000..7c0234a7c5f6 --- /dev/null +++ b/hudi-platform-service/pom.xml @@ -0,0 +1,36 @@ + + + + + hudi + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-platform-service + pom + + + ${project.parent.basedir} + + + + hudi-metaserver + + diff --git a/packaging/hudi-flink-bundle/pom.xml b/packaging/hudi-flink-bundle/pom.xml index d66b81aea0ad..a24751c3c1d0 100644 --- a/packaging/hudi-flink-bundle/pom.xml +++ b/packaging/hudi-flink-bundle/pom.xml @@ -705,5 +705,35 @@ + + hudi-platform-service + + + org.apache.hudi + hudi-metaserver-client + ${project.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + + + + org.apache.hudi:hudi-metaserver-client + + + + + + + + + diff --git a/packaging/hudi-metaserver-server-bundle/pom.xml b/packaging/hudi-metaserver-server-bundle/pom.xml new file mode 100644 index 000000000000..37d83fa879e5 --- /dev/null +++ b/packaging/hudi-metaserver-server-bundle/pom.xml @@ -0,0 +1,85 @@ + + + + hudi + org.apache.hudi + 0.13.0-SNAPSHOT + + 4.0.0 + + hudi-metaserver-server-bundle + + + ${project.parent.basedir} + + + + + org.apache.hudi + hudi-metaserver-server + ${project.version} + + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + package + + shade + + + + + org.apache.hudi:hudi-common + org.apache.thrift:libthrift + org.apache.logging.log4j:log4j-1.2-api + org.slf4j:slf4j-api + org.slf4j:jul-to-slf4j + org.mybatis:mybatis + com.zaxxer:HikariCP + mysql:mysql-connector-java + + + ${shadeSources} + ${project.build.directory}/dependency-reduced-pom.xml + + + + org.apache.hudi.metaserver.HoodieMetaserver + + + + true + + + META-INF/LICENSE + target/classes/META-INF/LICENSE + + + + + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + META-INF/services/javax.* + + + + ${project.artifactId}-${project.version} + + + + + + + + \ No newline at end of file diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml index 55f40442ae64..eaebe0b9b27e 100644 --- a/packaging/hudi-spark-bundle/pom.xml +++ b/packaging/hudi-spark-bundle/pom.xml @@ -441,5 +441,35 @@ org.apache.hudi. + + hudi-platform-service + + + org.apache.hudi + hudi-metaserver-client + ${project.version} + + + + + + org.apache.maven.plugins + maven-shade-plugin + ${maven-shade-plugin.version} + + + + + + org.apache.hudi:hudi-metaserver-client + + + + + + + + + diff --git a/pom.xml b/pom.xml index a34734a74253..0dff6d4d863d 100644 --- a/pom.xml +++ b/pom.xml @@ -293,7 +293,7 @@ basedir=${maven.multiModuleProjectDirectory} - **\/generated-sources\/ + **\/generated-sources\/,org/apache/hudi/metaserver/thrift/* @@ -2324,6 +2324,14 @@ + + + hudi-platform-service + + hudi-platform-service + packaging/hudi-metaserver-server-bundle + +