From b06b51153b02ba5194e3a8afedcd4077ef80ce8e Mon Sep 17 00:00:00 2001 From: Qi Yu Date: Wed, 16 Oct 2024 18:07:21 +0800 Subject: [PATCH] [#5019] feat: (hadoop-catalog): Add a framework to support multi-storage in a pluggable manner for fileset catalog (#5020) ### What changes were proposed in this pull request? Add a framework to support multiple storage system within Hadoop catalog ### Why are the changes needed? Some users want Gravitino to manage file system like S3 or GCS. Fix: #5019 ### Does this PR introduce _any_ user-facing change? N/A. ### How was this patch tested? Existing test. --- .gitignore | 4 + catalogs/catalog-hadoop/build.gradle.kts | 2 + .../hadoop/HadoopCatalogOperations.java | 83 +++++++++----- .../HadoopCatalogPropertiesMetadata.java | 41 ++++++- .../catalog/hadoop/fs/FileSystemProvider.java | 69 ++++++++++++ .../catalog/hadoop/fs/FileSystemUtils.java | 104 ++++++++++++++++++ .../hadoop/fs/HDFSFileSystemProvider.java | 54 +++++++++ .../hadoop/fs/LocalFileSystemProvider.java | 53 +++++++++ ...itino.catalog.hadoop.fs.FileSystemProvider | 21 ++++ .../hadoop/TestHadoopCatalogOperations.java | 87 ++++++++++----- clients/filesystem-hadoop3/build.gradle.kts | 9 ++ .../hadoop/GravitinoVirtualFileSystem.java | 39 ++++++- ...avitinoVirtualFileSystemConfiguration.java | 10 ++ docs/hadoop-catalog.md | 23 ++-- 14 files changed, 531 insertions(+), 68 deletions(-) create mode 100644 catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java create mode 100644 catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java create mode 100644 catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java create mode 100644 catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java create mode 100644 catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider diff --git a/.gitignore b/.gitignore index 7889cf7a923..eae3d3c952c 100644 --- a/.gitignore +++ b/.gitignore @@ -53,3 +53,7 @@ include clients/client-python/.gitignore **/metastore_db **/spark-warehouse derby.log + +web/node_modules +web/dist +web/.next diff --git a/catalogs/catalog-hadoop/build.gradle.kts b/catalogs/catalog-hadoop/build.gradle.kts index ba60a161d8f..94028934721 100644 --- a/catalogs/catalog-hadoop/build.gradle.kts +++ b/catalogs/catalog-hadoop/build.gradle.kts @@ -36,6 +36,8 @@ dependencies { exclude(group = "*") } + compileOnly(libs.guava) + implementation(libs.hadoop3.common) { exclude("com.sun.jersey") exclude("javax.servlet", "servlet-api") diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java index da4d0e1a18e..8515ea7d20f 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.Entity; @@ -44,6 +43,8 @@ import org.apache.gravitino.audit.CallerContext; import org.apache.gravitino.audit.FilesetAuditConstants; import org.apache.gravitino.audit.FilesetDataOperation; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.CatalogOperations; import org.apache.gravitino.connector.HasPropertyMetadata; @@ -71,11 +72,9 @@ import org.slf4j.LoggerFactory; public class HadoopCatalogOperations implements CatalogOperations, SupportsSchemas, FilesetCatalog { - private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist"; private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist"; private static final String SLASH = "/"; - private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogOperations.class); private final EntityStore store; @@ -90,6 +89,10 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem private CatalogInfo catalogInfo; + private final Map fileSystemProvidersMap = Maps.newHashMap(); + + private FileSystemProvider defaultFileSystemProvider; + HadoopCatalogOperations(EntityStore store) { this.store = store; } @@ -107,7 +110,9 @@ public CatalogInfo getCatalogInfo() { } public Configuration getHadoopConf() { - return hadoopConf; + Configuration configuration = new Configuration(); + conf.forEach((k, v) -> configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v)); + return configuration; } public Map getConf() { @@ -119,26 +124,31 @@ public void initialize( Map config, CatalogInfo info, HasPropertyMetadata propertiesMetadata) throws RuntimeException { this.propertiesMetadata = propertiesMetadata; - // Initialize Hadoop Configuration. - this.conf = config; - this.hadoopConf = new Configuration(); this.catalogInfo = info; - Map bypassConfigs = - config.entrySet().stream() - .filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX)) - .collect( - Collectors.toMap( - e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()), - Map.Entry::getValue)); - bypassConfigs.forEach(hadoopConf::set); + + this.conf = config; + + String fileSystemProviders = + (String) + propertiesMetadata + .catalogPropertiesMetadata() + .getOrDefault(config, HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS); + this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders)); + + String defaultFileSystemProviderName = + (String) + propertiesMetadata + .catalogPropertiesMetadata() + .getOrDefault(config, HadoopCatalogPropertiesMetadata.DEFAULT_FS_PROVIDER); + this.defaultFileSystemProvider = + FileSystemUtils.getFileSystemProviderByName( + fileSystemProvidersMap, defaultFileSystemProviderName); String catalogLocation = (String) propertiesMetadata .catalogPropertiesMetadata() .getOrDefault(config, HadoopCatalogPropertiesMetadata.LOCATION); - conf.forEach(hadoopConf::set); - this.catalogStorageLocation = StringUtils.isNotBlank(catalogLocation) ? Optional.of(catalogLocation).map(Path::new) @@ -235,8 +245,9 @@ public Fileset createFileset( try { // formalize the path to avoid path without scheme, uri, authority, etc. - filesetPath = formalizePath(filesetPath, hadoopConf); - FileSystem fs = filesetPath.getFileSystem(hadoopConf); + filesetPath = formalizePath(filesetPath, conf); + + FileSystem fs = getFileSystem(filesetPath, conf); if (!fs.exists(filesetPath)) { if (!fs.mkdirs(filesetPath)) { throw new RuntimeException( @@ -339,7 +350,7 @@ public boolean dropFileset(NameIdentifier ident) { // For managed fileset, we should delete the related files. if (filesetEntity.filesetType() == Fileset.Type.MANAGED) { - FileSystem fs = filesetPath.getFileSystem(hadoopConf); + FileSystem fs = getFileSystem(filesetPath, conf); if (fs.exists(filesetPath)) { if (!fs.delete(filesetPath, true)) { LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath); @@ -459,7 +470,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map properties) { } @VisibleForTesting - static Path formalizePath(Path path, Configuration configuration) throws IOException { - FileSystem defaultFs = FileSystem.get(configuration); + Path formalizePath(Path path, Map configuration) throws IOException { + FileSystem defaultFs = getFileSystem(path, configuration); return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory()); } @@ -731,7 +741,7 @@ private boolean hasCallerContext() { private boolean checkSingleFile(Fileset fileset) { try { Path locationPath = new Path(fileset.storageLocation()); - return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile(); + return getFileSystem(locationPath, conf).getFileStatus(locationPath).isFile(); } catch (FileNotFoundException e) { // We should always return false here, same with the logic in `FileSystem.isFile(Path f)`. return false; @@ -742,4 +752,25 @@ private boolean checkSingleFile(Fileset fileset) { fileset.name()); } } + + FileSystem getFileSystem(Path path, Map config) throws IOException { + if (path == null) { + throw new IllegalArgumentException("Path should not be null"); + } + + String scheme = + path.toUri().getScheme() != null + ? path.toUri().getScheme() + : defaultFileSystemProvider.scheme(); + + FileSystemProvider provider = fileSystemProvidersMap.get(scheme); + if (provider == null) { + throw new IllegalArgumentException( + String.format( + "Unsupported scheme: %s, path: %s, all supported schemes: %s and providers: %s", + scheme, path, fileSystemProvidersMap.keySet(), fileSystemProvidersMap.values())); + } + + return provider.getFileSystem(path, config); + } } diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java index 9a68e2d5522..397e13aa4af 100644 --- a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogPropertiesMetadata.java @@ -18,10 +18,13 @@ */ package org.apache.gravitino.catalog.hadoop; +import static org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KERBEROS_PROPERTY_ENTRIES; + import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig; -import org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider; import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata; import org.apache.gravitino.connector.PropertyEntry; @@ -34,6 +37,24 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada // If not, users have to specify the storage location in the Schema or Fileset level. public static final String LOCATION = "location"; + /** + * The name of {@link FileSystemProvider} to be added to the catalog. Except built-in + * FileSystemProvider like LocalFileSystemProvider and HDFSFileSystemProvider, users can add their + * own FileSystemProvider by specifying the provider name here. The value can be find {@link + * FileSystemProvider#name()}. + */ + public static final String FILESYSTEM_PROVIDERS = "filesystem-providers"; + + /** + * The default file system provider class name, used to create the default file system. If not + * specified, the default file system provider will be {@link LocalFileSystemProvider#name()}: + * 'builtin-local'. + */ + public static final String DEFAULT_FS_PROVIDER = "default-filesystem-provider"; + + public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local"; + public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs"; + private static final Map> HADOOP_CATALOG_PROPERTY_ENTRIES = ImmutableMap.>builder() .put( @@ -44,8 +65,24 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada false /* immutable */, null, false /* hidden */)) + .put( + FILESYSTEM_PROVIDERS, + PropertyEntry.stringOptionalPropertyEntry( + FILESYSTEM_PROVIDERS, + "The file system provider names, separated by comma", + false /* immutable */, + null, + false /* hidden */)) + .put( + DEFAULT_FS_PROVIDER, + PropertyEntry.stringOptionalPropertyEntry( + DEFAULT_FS_PROVIDER, + "Default file system provider name", + false /* immutable */, + BUILTIN_LOCAL_FS_PROVIDER, // please see LocalFileSystemProvider#name() + false /* hidden */)) // The following two are about authentication. - .putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES) + .putAll(KERBEROS_PROPERTY_ENTRIES) .putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES) .build(); diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java new file mode 100644 index 00000000000..5bee821e505 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemProvider.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.catalog.hadoop.fs; + +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nonnull; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +/** + * FileSystemProvider is an interface for providing FileSystem instances. It is used by the + * HadoopCatalog to create FileSystem instances for accessing Hadoop compatible file systems. + */ +public interface FileSystemProvider { + + /** + * Get the FileSystem instance according to the configuration map and file path. + * + *

Compared to the {@link FileSystem#get(Configuration)} method, this method allows the + * provider to create a FileSystem instance with a specific configuration and do further + * initialization if needed. + * + *

For example: 1. We can check the endpoint value validity for S3AFileSystem then do further + * actions. 2. We can also change some default behavior of the FileSystem initialization process + * 3. More... + * + * @param config The configuration for the FileSystem instance. + * @param path The path to the file system. + * @return The FileSystem instance. + * @throws IOException If the FileSystem instance cannot be created. + */ + FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config) + throws IOException; + + /** + * Scheme of this FileSystem provider. The value is 'file' for LocalFileSystem, 'hdfs' for HDFS, + * etc. + * + * @return The scheme of this FileSystem provider used. + */ + String scheme(); + + /** + * Name of this FileSystem provider. The value is 'builtin-local' for LocalFileSystem, + * 'builtin-hdfs' for HDFS, etc. + * + * @return The name of this FileSystem provider. + */ + String name(); +} diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java new file mode 100644 index 00000000000..3a959ff3738 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hadoop.fs; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER; +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.google.common.collect.Streams; +import java.util.Arrays; +import java.util.Locale; +import java.util.Map; +import java.util.ServiceLoader; +import java.util.Set; +import java.util.stream.Collectors; + +public class FileSystemUtils { + + private FileSystemUtils() {} + + public static Map getFileSystemProviders(String fileSystemProviders) { + Map resultMap = Maps.newHashMap(); + ServiceLoader allFileSystemProviders = + ServiceLoader.load(FileSystemProvider.class); + + Set providersInUses = + fileSystemProviders != null + ? Arrays.stream(fileSystemProviders.split(",")) + .map(f -> f.trim().toLowerCase(Locale.ROOT)) + .collect(java.util.stream.Collectors.toSet()) + : Sets.newHashSet(); + + // Add built-in file system providers to the use list automatically. + providersInUses.add(BUILTIN_LOCAL_FS_PROVIDER.toLowerCase(Locale.ROOT)); + providersInUses.add(BUILTIN_HDFS_FS_PROVIDER.toLowerCase(Locale.ROOT)); + + // Only get the file system providers that are in the user list and check if the scheme is + // unique. + Streams.stream(allFileSystemProviders.iterator()) + .filter( + fileSystemProvider -> + providersInUses.contains(fileSystemProvider.name().toLowerCase(Locale.ROOT))) + .forEach( + fileSystemProvider -> { + if (resultMap.containsKey(fileSystemProvider.scheme())) { + throw new UnsupportedOperationException( + String.format( + "File system provider: '%s' with scheme '%s' already exists in the use provider list " + + "Please make sure the file system provider scheme is unique.", + fileSystemProvider.getClass().getName(), fileSystemProvider.scheme())); + } + resultMap.put(fileSystemProvider.scheme(), fileSystemProvider); + }); + + // If not all file system providers in providersInUses was found, throw an exception. + Set notFoundProviders = + Sets.difference( + providersInUses, + resultMap.values().stream() + .map(p -> p.name().toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet())) + .immutableCopy(); + if (!notFoundProviders.isEmpty()) { + throw new UnsupportedOperationException( + String.format( + "File system providers %s not found in the classpath. Please make sure the file system " + + "provider is in the classpath.", + notFoundProviders)); + } + + return resultMap; + } + + public static FileSystemProvider getFileSystemProviderByName( + Map fileSystemProviders, String fileSystemProviderName) { + return fileSystemProviders.entrySet().stream() + .filter(entry -> entry.getValue().name().equals(fileSystemProviderName)) + .map(Map.Entry::getValue) + .findFirst() + .orElseThrow( + () -> + new UnsupportedOperationException( + String.format( + "File system provider with name '%s' not found in the file system provider list.", + fileSystemProviderName))); + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java new file mode 100644 index 00000000000..7c9ceebdd36 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hadoop.fs; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_HDFS_FS_PROVIDER; +import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; + +import java.io.IOException; +import java.util.Map; +import javax.annotation.Nonnull; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; + +public class HDFSFileSystemProvider implements FileSystemProvider { + + @Override + public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map config) + throws IOException { + Configuration configuration = new Configuration(); + config.forEach( + (k, v) -> { + configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v); + }); + return DistributedFileSystem.newInstance(path.toUri(), configuration); + } + + @Override + public String scheme() { + return "hdfs"; + } + + @Override + public String name() { + return BUILTIN_HDFS_FS_PROVIDER; + } +} diff --git a/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java new file mode 100644 index 00000000000..70e44c76f6b --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/fs/LocalFileSystemProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.gravitino.catalog.hadoop.fs; + +import static org.apache.gravitino.catalog.hadoop.HadoopCatalogPropertiesMetadata.BUILTIN_LOCAL_FS_PROVIDER; +import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; + +import java.io.IOException; +import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; + +public class LocalFileSystemProvider implements FileSystemProvider { + + @Override + public FileSystem getFileSystem(Path path, Map config) throws IOException { + Configuration configuration = new Configuration(); + config.forEach( + (k, v) -> { + configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v); + }); + + return LocalFileSystem.newInstance(path.toUri(), configuration); + } + + @Override + public String scheme() { + return "file"; + } + + @Override + public String name() { + return BUILTIN_LOCAL_FS_PROVIDER; + } +} diff --git a/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider new file mode 100644 index 00000000000..93a84744aa5 --- /dev/null +++ b/catalogs/catalog-hadoop/src/main/resources/META-INF/services/org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider +org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider \ No newline at end of file diff --git a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java index d3206972680..2b89180a8d1 100644 --- a/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java +++ b/catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java @@ -34,7 +34,6 @@ import static org.apache.gravitino.catalog.hadoop.HadoopCatalog.CATALOG_PROPERTIES_META; import static org.apache.gravitino.catalog.hadoop.HadoopCatalog.FILESET_PROPERTIES_META; import static org.apache.gravitino.catalog.hadoop.HadoopCatalog.SCHEMA_PROPERTIES_META; -import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @@ -50,10 +49,12 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.reflect.FieldUtils; import org.apache.gravitino.Catalog; import org.apache.gravitino.Config; import org.apache.gravitino.EntityStore; import org.apache.gravitino.EntityStoreFactory; +import org.apache.gravitino.GravitinoEnv; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.Namespace; import org.apache.gravitino.Schema; @@ -65,6 +66,7 @@ import org.apache.gravitino.connector.CatalogInfo; import org.apache.gravitino.connector.HasPropertyMetadata; import org.apache.gravitino.connector.PropertiesMetadata; +import org.apache.gravitino.connector.PropertyEntry; import org.apache.gravitino.exceptions.GravitinoRuntimeException; import org.apache.gravitino.exceptions.NoSuchFilesetException; import org.apache.gravitino.exceptions.NoSuchSchemaException; @@ -74,6 +76,7 @@ import org.apache.gravitino.file.FilesetChange; import org.apache.gravitino.storage.IdGenerator; import org.apache.gravitino.storage.RandomIdGenerator; +import org.apache.gravitino.storage.relational.RelationalEntityStore; import org.apache.gravitino.storage.relational.service.CatalogMetaService; import org.apache.gravitino.storage.relational.service.MetalakeMetaService; import org.apache.gravitino.utils.NameIdentifierUtil; @@ -230,18 +233,10 @@ public void testHadoopCatalogConfiguration() { CatalogInfo catalogInfo = randomCatalogInfo(); ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); - Configuration conf = ops.hadoopConf; + Configuration conf = ops.getHadoopConf(); String value = conf.get("fs.defaultFS"); Assertions.assertEquals("file:///", value); - emptyProps.put(CATALOG_BYPASS_PREFIX + "fs.defaultFS", "hdfs://localhost:9000"); - ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); - Configuration conf1 = ops.hadoopConf; - String value1 = conf1.get("fs.defaultFS"); - Assertions.assertEquals("hdfs://localhost:9000", value1); - - Assertions.assertFalse(ops.catalogStorageLocation.isPresent()); - emptyProps.put(HadoopCatalogPropertiesMetadata.LOCATION, "file:///tmp/catalog"); ops.initialize(emptyProps, catalogInfo, HADOOP_PROPERTIES_METADATA); Assertions.assertTrue(ops.catalogStorageLocation.isPresent()); @@ -677,33 +672,68 @@ public void testAlterFilesetProperties() throws IOException { } @Test - public void testFormalizePath() throws IOException { + public void testFormalizePath() throws IOException, IllegalAccessException { String[] paths = - new String[] { - "tmp/catalog", - "/tmp/catalog", - "file:/tmp/catalog", - "file:///tmp/catalog", - "hdfs://localhost:9000/tmp/catalog", - "s3://bucket/tmp/catalog", - "gs://bucket/tmp/catalog" - }; + new String[] {"tmp/catalog", "/tmp/catalog", "file:/tmp/catalog", "file:///tmp/catalog"}; String[] expected = new String[] { "file:" + Paths.get("").toAbsolutePath() + "/tmp/catalog", "file:/tmp/catalog", "file:/tmp/catalog", - "file:/tmp/catalog", - "hdfs://localhost:9000/tmp/catalog", - "s3://bucket/tmp/catalog", - "gs://bucket/tmp/catalog" + "file:/tmp/catalog" + }; + + HasPropertyMetadata hasPropertyMetadata = + new HasPropertyMetadata() { + @Override + public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata catalogPropertiesMetadata() + throws UnsupportedOperationException { + return new PropertiesMetadata() { + @Override + public Map> propertyEntries() { + return new HadoopCatalogPropertiesMetadata().propertyEntries(); + } + }; + } + + @Override + public PropertiesMetadata schemaPropertiesMetadata() + throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata filesetPropertiesMetadata() + throws UnsupportedOperationException { + return null; + } + + @Override + public PropertiesMetadata topicPropertiesMetadata() throws UnsupportedOperationException { + return null; + } }; - for (int i = 0; i < paths.length; i++) { - Path actual = HadoopCatalogOperations.formalizePath(new Path(paths[i]), new Configuration()); - Assertions.assertEquals(expected[i], actual.toString()); + try { + FieldUtils.writeField( + GravitinoEnv.getInstance(), "entityStore", new RelationalEntityStore(), true); + try (HadoopCatalogOperations hadoopCatalogOperations = new HadoopCatalogOperations()) { + Map map = ImmutableMap.of("default-filesystem", "file:///"); + hadoopCatalogOperations.initialize(map, null, hasPropertyMetadata); + for (int i = 0; i < paths.length; i++) { + Path actual = hadoopCatalogOperations.formalizePath(new Path(paths[i]), map); + Assertions.assertEquals(expected[i], actual.toString()); + } + } + } finally { + FieldUtils.writeField(GravitinoEnv.getInstance(), "entityStore", null, true); } } @@ -877,8 +907,11 @@ public void testGetFileLocation() throws IOException { try (HadoopCatalogOperations mockOps = Mockito.mock(HadoopCatalogOperations.class)) { mockOps.hadoopConf = new Configuration(); when(mockOps.loadFileset(filesetIdent)).thenReturn(mockFileset); + when(mockOps.getConf()).thenReturn(Maps.newHashMap()); String subPath = "/test/test.parquet"; when(mockOps.getFileLocation(filesetIdent, subPath)).thenCallRealMethod(); + when(mockOps.getFileSystem(Mockito.any(), Mockito.any())) + .thenReturn(FileSystem.getLocal(new Configuration())); String fileLocation = mockOps.getFileLocation(filesetIdent, subPath); Assertions.assertEquals( String.format("%s%s", mockFileset.storageLocation(), subPath.substring(1)), fileLocation); diff --git a/clients/filesystem-hadoop3/build.gradle.kts b/clients/filesystem-hadoop3/build.gradle.kts index d7905cd3b35..aefac5f28b9 100644 --- a/clients/filesystem-hadoop3/build.gradle.kts +++ b/clients/filesystem-hadoop3/build.gradle.kts @@ -26,6 +26,10 @@ plugins { dependencies { compileOnly(project(":clients:client-java-runtime", configuration = "shadow")) compileOnly(libs.hadoop3.common) + implementation(project(":catalogs:catalog-hadoop")) { + exclude(group = "*") + } + implementation(libs.caffeine) testImplementation(project(":api")) @@ -71,6 +75,11 @@ tasks.build { dependsOn("javadoc") } +tasks.compileJava { + dependsOn(":catalogs:catalog-hadoop:jar") + dependsOn(":catalogs:catalog-hadoop:runtimeJars") +} + tasks.test { val skipITs = project.hasProperty("skipITs") if (skipITs) { diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java index de0eb758edc..05e769667da 100644 --- a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystem.java @@ -18,6 +18,9 @@ */ package org.apache.gravitino.filesystem.hadoop; +import static org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_FILESYSTEM_PROVIDERS; +import static org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.GVFS_CONFIG_PREFIX; + import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; import com.github.benmanes.caffeine.cache.Scheduler; @@ -41,6 +44,8 @@ import org.apache.gravitino.audit.FilesetAuditConstants; import org.apache.gravitino.audit.FilesetDataOperation; import org.apache.gravitino.audit.InternalClientType; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils; import org.apache.gravitino.client.DefaultOAuth2TokenProvider; import org.apache.gravitino.client.GravitinoClient; import org.apache.gravitino.client.KerberosTokenProvider; @@ -81,6 +86,8 @@ public class GravitinoVirtualFileSystem extends FileSystem { private static final Pattern IDENTIFIER_PATTERN = Pattern.compile("^(?:gvfs://fileset)?/([^/]+)/([^/]+)/([^/]+)(?>/[^/]+)*/?$"); private static final String SLASH = "/"; + private final Map fileSystemProvidersMap = Maps.newHashMap(); + private static final String GRAVITINO_BYPASS_PREFIX = "gravitino.bypass."; @Override public void initialize(URI name, Configuration configuration) throws IOException { @@ -125,6 +132,10 @@ public void initialize(URI name, Configuration configuration) throws IOException initializeClient(configuration); + // Register the default local and HDFS FileSystemProvider + String fileSystemProviders = configuration.get(FS_FILESYSTEM_PROVIDERS); + fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders)); + this.workingDirectory = new Path(name); this.uri = URI.create(name.getScheme() + "://" + name.getAuthority()); @@ -351,7 +362,6 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat Preconditions.checkArgument( filesetCatalog != null, String.format("Loaded fileset catalog: %s is null.", catalogIdent)); - // set the thread local audit info Map contextMap = Maps.newHashMap(); contextMap.put( FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE, @@ -364,7 +374,8 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat filesetCatalog.getFileLocation( NameIdentifier.of(identifier.namespace().level(2), identifier.name()), subPath); - URI uri = new Path(actualFileLocation).toUri(); + Path filePath = new Path(actualFileLocation); + URI uri = filePath.toUri(); // we cache the fs for the same scheme, so we can reuse it String scheme = uri.getScheme(); Preconditions.checkArgument( @@ -374,7 +385,14 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat scheme, str -> { try { - return FileSystem.newInstance(uri, getConf()); + Map maps = getConfigMap(getConf()); + FileSystemProvider provider = fileSystemProvidersMap.get(scheme); + if (provider == null) { + throw new GravitinoRuntimeException( + "Unsupported file system scheme: %s for %s.", + scheme, GravitinoVirtualFileSystemConfiguration.GVFS_SCHEME); + } + return provider.getFileSystem(filePath, maps); } catch (IOException ioe) { throw new GravitinoRuntimeException( "Exception occurs when create new FileSystem for actual uri: %s, msg: %s", @@ -385,6 +403,21 @@ private FilesetContextPair getFilesetContext(Path virtualPath, FilesetDataOperat return new FilesetContextPair(new Path(actualFileLocation), fs); } + private Map getConfigMap(Configuration configuration) { + Map maps = Maps.newHashMap(); + configuration.forEach( + entry -> { + String key = entry.getKey(); + if (key.startsWith(GRAVITINO_BYPASS_PREFIX)) { + maps.put(key.substring(GRAVITINO_BYPASS_PREFIX.length()), entry.getValue()); + } else if (!key.startsWith(GVFS_CONFIG_PREFIX)) { + maps.put(key, entry.getValue()); + } + }); + + return maps; + } + private String getSubPathFromVirtualPath(NameIdentifier identifier, String virtualPathString) { return virtualPathString.startsWith(GravitinoVirtualFileSystemConfiguration.GVFS_FILESET_PREFIX) ? virtualPathString.substring( diff --git a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java index 8076c02c36a..cd1ecb92fa8 100644 --- a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java +++ b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java @@ -18,10 +18,13 @@ */ package org.apache.gravitino.filesystem.hadoop; +import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider; + /** Configuration class for Gravitino Virtual File System. */ class GravitinoVirtualFileSystemConfiguration { public static final String GVFS_FILESET_PREFIX = "gvfs://fileset"; public static final String GVFS_SCHEME = "gvfs"; + public static final String GVFS_CONFIG_PREFIX = "fs.gvfs."; /** The configuration key for the Gravitino server URI. */ public static final String FS_GRAVITINO_SERVER_URI_KEY = "fs.gravitino.server.uri"; @@ -32,6 +35,13 @@ class GravitinoVirtualFileSystemConfiguration { /** The configuration key for the Gravitino client auth type. */ public static final String FS_GRAVITINO_CLIENT_AUTH_TYPE_KEY = "fs.gravitino.client.authType"; + /** + * File system provider names configuration key. The value is a comma separated list of file + * system provider name which is defined in the service loader. Users can custom their own file + * system by implementing the {@link FileSystemProvider} interface. + */ + public static final String FS_FILESYSTEM_PROVIDERS = "fs.gvfs.filesystem.providers"; + public static final String SIMPLE_AUTH_TYPE = "simple"; public static final String OAUTH2_AUTH_TYPE = "oauth2"; public static final String KERBEROS_AUTH_TYPE = "kerberos"; diff --git a/docs/hadoop-catalog.md b/docs/hadoop-catalog.md index d6706ff3e1b..d28e6d93b04 100644 --- a/docs/hadoop-catalog.md +++ b/docs/hadoop-catalog.md @@ -25,16 +25,19 @@ Hadoop 3. If there's any compatibility issue, please create an [issue](https://g Besides the [common catalog properties](./gravitino-server-config.md#gravitino-catalog-properties-configuration), the Hadoop catalog has the following properties: -| Property Name | Description | Default Value | Required | Since Version | -|----------------------------------------------------|------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------| -| `location` | The storage location managed by Hadoop catalog. | (none) | No | 0.5.0 | -| `authentication.impersonation-enable` | Whether to enable impersonation for the Hadoop catalog. | `false` | No | 0.5.1 | -| `authentication.type` | The type of authentication for Hadoop catalog, currently we only support `kerberos`, `simple`. | `simple` | No | 0.5.1 | -| `authentication.kerberos.principal` | The principal of the Kerberos authentication | (none) | required if the value of `authentication.type` is Kerberos. | 0.5.1 | -| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.5.1 | -| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Hadoop catalog. | 60 | No | 0.5.1 | -| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.5.1 | - +| Property Name | Description | Default Value | Required | Since Version | +|----------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|-------------------------------------------------------------|------------------| +| `location` | The storage location managed by Hadoop catalog. | (none) | No | 0.5.0 | +| `filesystem-providers` | The names (split by comma) of filesystem providers for the Hadoop catalog. Gravitino already support built-in `builtin-local`(`local file`) and `builtin-hdfs`(`hdfs`). If users want to support more file system and add it to Gravitino, they custom more file system by implementing `FileSystemProvider`. | (none) | No | 0.7.0-incubating | +| `default-filesystem-provider` | The name default filesystem providers of this Hadoop catalog if users do not specify the scheme in the URI. Default value is `builtin-local` | `builtin-local` | No | 0.7.0-incubating | +| `authentication.impersonation-enable` | Whether to enable impersonation for the Hadoop catalog. | `false` | No | 0.5.1 | +| `authentication.type` | The type of authentication for Hadoop catalog, currently we only support `kerberos`, `simple`. | `simple` | No | 0.5.1 | +| `authentication.kerberos.principal` | The principal of the Kerberos authentication | (none) | required if the value of `authentication.type` is Kerberos. | 0.5.1 | +| `authentication.kerberos.keytab-uri` | The URI of The keytab for the Kerberos authentication. | (none) | required if the value of `authentication.type` is Kerberos. | 0.5.1 | +| `authentication.kerberos.check-interval-sec` | The check interval of Kerberos credential for Hadoop catalog. | 60 | No | 0.5.1 | +| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`. | 60 | No | 0.5.1 | + +For more about `filesystem-providers`, please refer to `HadoopFileSystemProvider` or `LocalFileSystemProvider` in the source code. Furthermore, you also need to place the jar of the file system provider into the `$GRAVITINO_HOME/catalogs/hadoop/libs` directory if it's not in the classpath. ### Authentication for Hadoop Catalog