Skip to content

Commit

Permalink
[#5019] feat: (hadoop-catalog): Add a framework to support multi-stor…
Browse files Browse the repository at this point in the history
…age in a pluggable manner for fileset catalog (#5020)

### What changes were proposed in this pull request?

Add a framework to support multiple storage system within Hadoop catalog

### Why are the changes needed?

Some users want Gravitino to manage file system like S3 or GCS. 

Fix: #5019 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Existing test.
  • Loading branch information
yuqi1129 authored Oct 16, 2024
1 parent f0e7f36 commit b06b511
Show file tree
Hide file tree
Showing 14 changed files with 531 additions and 68 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ include clients/client-python/.gitignore
**/metastore_db
**/spark-warehouse
derby.log

web/node_modules
web/dist
web/.next
2 changes: 2 additions & 0 deletions catalogs/catalog-hadoop/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ dependencies {
exclude(group = "*")
}

compileOnly(libs.guava)

implementation(libs.hadoop3.common) {
exclude("com.sun.jersey")
exclude("javax.servlet", "servlet-api")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.Entity;
Expand All @@ -44,6 +43,8 @@
import org.apache.gravitino.audit.CallerContext;
import org.apache.gravitino.audit.FilesetAuditConstants;
import org.apache.gravitino.audit.FilesetDataOperation;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
import org.apache.gravitino.connector.CatalogInfo;
import org.apache.gravitino.connector.CatalogOperations;
import org.apache.gravitino.connector.HasPropertyMetadata;
Expand Down Expand Up @@ -71,11 +72,9 @@
import org.slf4j.LoggerFactory;

public class HadoopCatalogOperations implements CatalogOperations, SupportsSchemas, FilesetCatalog {

private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
private static final String SLASH = "/";

private static final Logger LOG = LoggerFactory.getLogger(HadoopCatalogOperations.class);

private final EntityStore store;
Expand All @@ -90,6 +89,10 @@ public class HadoopCatalogOperations implements CatalogOperations, SupportsSchem

private CatalogInfo catalogInfo;

private final Map<String, FileSystemProvider> fileSystemProvidersMap = Maps.newHashMap();

private FileSystemProvider defaultFileSystemProvider;

HadoopCatalogOperations(EntityStore store) {
this.store = store;
}
Expand All @@ -107,7 +110,9 @@ public CatalogInfo getCatalogInfo() {
}

public Configuration getHadoopConf() {
return hadoopConf;
Configuration configuration = new Configuration();
conf.forEach((k, v) -> configuration.set(k.replace(CATALOG_BYPASS_PREFIX, ""), v));
return configuration;
}

public Map<String, String> getConf() {
Expand All @@ -119,26 +124,31 @@ public void initialize(
Map<String, String> config, CatalogInfo info, HasPropertyMetadata propertiesMetadata)
throws RuntimeException {
this.propertiesMetadata = propertiesMetadata;
// Initialize Hadoop Configuration.
this.conf = config;
this.hadoopConf = new Configuration();
this.catalogInfo = info;
Map<String, String> bypassConfigs =
config.entrySet().stream()
.filter(e -> e.getKey().startsWith(CATALOG_BYPASS_PREFIX))
.collect(
Collectors.toMap(
e -> e.getKey().substring(CATALOG_BYPASS_PREFIX.length()),
Map.Entry::getValue));
bypassConfigs.forEach(hadoopConf::set);

this.conf = config;

String fileSystemProviders =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config, HadoopCatalogPropertiesMetadata.FILESYSTEM_PROVIDERS);
this.fileSystemProvidersMap.putAll(FileSystemUtils.getFileSystemProviders(fileSystemProviders));

String defaultFileSystemProviderName =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config, HadoopCatalogPropertiesMetadata.DEFAULT_FS_PROVIDER);
this.defaultFileSystemProvider =
FileSystemUtils.getFileSystemProviderByName(
fileSystemProvidersMap, defaultFileSystemProviderName);

String catalogLocation =
(String)
propertiesMetadata
.catalogPropertiesMetadata()
.getOrDefault(config, HadoopCatalogPropertiesMetadata.LOCATION);
conf.forEach(hadoopConf::set);

this.catalogStorageLocation =
StringUtils.isNotBlank(catalogLocation)
? Optional.of(catalogLocation).map(Path::new)
Expand Down Expand Up @@ -235,8 +245,9 @@ public Fileset createFileset(

try {
// formalize the path to avoid path without scheme, uri, authority, etc.
filesetPath = formalizePath(filesetPath, hadoopConf);
FileSystem fs = filesetPath.getFileSystem(hadoopConf);
filesetPath = formalizePath(filesetPath, conf);

FileSystem fs = getFileSystem(filesetPath, conf);
if (!fs.exists(filesetPath)) {
if (!fs.mkdirs(filesetPath)) {
throw new RuntimeException(
Expand Down Expand Up @@ -339,7 +350,7 @@ public boolean dropFileset(NameIdentifier ident) {

// For managed fileset, we should delete the related files.
if (filesetEntity.filesetType() == Fileset.Type.MANAGED) {
FileSystem fs = filesetPath.getFileSystem(hadoopConf);
FileSystem fs = getFileSystem(filesetPath, conf);
if (fs.exists(filesetPath)) {
if (!fs.delete(filesetPath, true)) {
LOG.warn("Failed to delete fileset {} location {}", ident, filesetPath);
Expand Down Expand Up @@ -459,7 +470,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
Path schemaPath = getSchemaPath(ident.name(), properties);
if (schemaPath != null) {
try {
FileSystem fs = schemaPath.getFileSystem(hadoopConf);
FileSystem fs = getFileSystem(schemaPath, conf);
if (!fs.exists(schemaPath)) {
if (!fs.mkdirs(schemaPath)) {
// Fail the operation when failed to create the schema path.
Expand Down Expand Up @@ -577,8 +588,7 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
if (schemaPath == null) {
return false;
}

FileSystem fs = schemaPath.getFileSystem(hadoopConf);
FileSystem fs = getFileSystem(schemaPath, conf);
// Nothing to delete if the schema path does not exist.
if (!fs.exists(schemaPath)) {
return false;
Expand Down Expand Up @@ -717,8 +727,8 @@ private Path getSchemaPath(String name, Map<String, String> properties) {
}

@VisibleForTesting
static Path formalizePath(Path path, Configuration configuration) throws IOException {
FileSystem defaultFs = FileSystem.get(configuration);
Path formalizePath(Path path, Map<String, String> configuration) throws IOException {
FileSystem defaultFs = getFileSystem(path, configuration);
return path.makeQualified(defaultFs.getUri(), defaultFs.getWorkingDirectory());
}

Expand All @@ -731,7 +741,7 @@ private boolean hasCallerContext() {
private boolean checkSingleFile(Fileset fileset) {
try {
Path locationPath = new Path(fileset.storageLocation());
return locationPath.getFileSystem(hadoopConf).getFileStatus(locationPath).isFile();
return getFileSystem(locationPath, conf).getFileStatus(locationPath).isFile();
} catch (FileNotFoundException e) {
// We should always return false here, same with the logic in `FileSystem.isFile(Path f)`.
return false;
Expand All @@ -742,4 +752,25 @@ private boolean checkSingleFile(Fileset fileset) {
fileset.name());
}
}

FileSystem getFileSystem(Path path, Map<String, String> config) throws IOException {
if (path == null) {
throw new IllegalArgumentException("Path should not be null");
}

String scheme =
path.toUri().getScheme() != null
? path.toUri().getScheme()
: defaultFileSystemProvider.scheme();

FileSystemProvider provider = fileSystemProvidersMap.get(scheme);
if (provider == null) {
throw new IllegalArgumentException(
String.format(
"Unsupported scheme: %s, path: %s, all supported schemes: %s and providers: %s",
scheme, path, fileSystemProvidersMap.keySet(), fileSystemProvidersMap.values()));
}

return provider.getFileSystem(path, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@
*/
package org.apache.gravitino.catalog.hadoop;

import static org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig.KERBEROS_PROPERTY_ENTRIES;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import org.apache.gravitino.catalog.hadoop.authentication.AuthenticationConfig;
import org.apache.gravitino.catalog.hadoop.authentication.kerberos.KerberosConfig;
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
import org.apache.gravitino.catalog.hadoop.fs.LocalFileSystemProvider;
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
import org.apache.gravitino.connector.PropertyEntry;

Expand All @@ -34,6 +37,24 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
// If not, users have to specify the storage location in the Schema or Fileset level.
public static final String LOCATION = "location";

/**
* The name of {@link FileSystemProvider} to be added to the catalog. Except built-in
* FileSystemProvider like LocalFileSystemProvider and HDFSFileSystemProvider, users can add their
* own FileSystemProvider by specifying the provider name here. The value can be find {@link
* FileSystemProvider#name()}.
*/
public static final String FILESYSTEM_PROVIDERS = "filesystem-providers";

/**
* The default file system provider class name, used to create the default file system. If not
* specified, the default file system provider will be {@link LocalFileSystemProvider#name()}:
* 'builtin-local'.
*/
public static final String DEFAULT_FS_PROVIDER = "default-filesystem-provider";

public static final String BUILTIN_LOCAL_FS_PROVIDER = "builtin-local";
public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";

private static final Map<String, PropertyEntry<?>> HADOOP_CATALOG_PROPERTY_ENTRIES =
ImmutableMap.<String, PropertyEntry<?>>builder()
.put(
Expand All @@ -44,8 +65,24 @@ public class HadoopCatalogPropertiesMetadata extends BaseCatalogPropertiesMetada
false /* immutable */,
null,
false /* hidden */))
.put(
FILESYSTEM_PROVIDERS,
PropertyEntry.stringOptionalPropertyEntry(
FILESYSTEM_PROVIDERS,
"The file system provider names, separated by comma",
false /* immutable */,
null,
false /* hidden */))
.put(
DEFAULT_FS_PROVIDER,
PropertyEntry.stringOptionalPropertyEntry(
DEFAULT_FS_PROVIDER,
"Default file system provider name",
false /* immutable */,
BUILTIN_LOCAL_FS_PROVIDER, // please see LocalFileSystemProvider#name()
false /* hidden */))
// The following two are about authentication.
.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES)
.putAll(KERBEROS_PROPERTY_ENTRIES)
.putAll(AuthenticationConfig.AUTHENTICATION_PROPERTY_ENTRIES)
.build();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.catalog.hadoop.fs;

import java.io.IOException;
import java.util.Map;
import javax.annotation.Nonnull;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
* FileSystemProvider is an interface for providing FileSystem instances. It is used by the
* HadoopCatalog to create FileSystem instances for accessing Hadoop compatible file systems.
*/
public interface FileSystemProvider {

/**
* Get the FileSystem instance according to the configuration map and file path.
*
* <p>Compared to the {@link FileSystem#get(Configuration)} method, this method allows the
* provider to create a FileSystem instance with a specific configuration and do further
* initialization if needed.
*
* <p>For example: 1. We can check the endpoint value validity for S3AFileSystem then do further
* actions. 2. We can also change some default behavior of the FileSystem initialization process
* 3. More...
*
* @param config The configuration for the FileSystem instance.
* @param path The path to the file system.
* @return The FileSystem instance.
* @throws IOException If the FileSystem instance cannot be created.
*/
FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, String> config)
throws IOException;

/**
* Scheme of this FileSystem provider. The value is 'file' for LocalFileSystem, 'hdfs' for HDFS,
* etc.
*
* @return The scheme of this FileSystem provider used.
*/
String scheme();

/**
* Name of this FileSystem provider. The value is 'builtin-local' for LocalFileSystem,
* 'builtin-hdfs' for HDFS, etc.
*
* @return The name of this FileSystem provider.
*/
String name();
}
Loading

0 comments on commit b06b511

Please sign in to comment.