Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#4993] feat(iceberg): integrate credential framework to iceberg REST server #5134

Merged
merged 18 commits into from
Oct 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ public class IcebergConstants {
public static final String ICEBERG_REST_CATALOG_CACHE_EVICTION_INTERVAL =
"catalog-cache-eviction-interval-ms";

public static final String ICEBERG_REST_CATALOG_PROVIDER = "catalog-provider";
public static final String ICEBERG_REST_CATALOG_CONFIG_PROVIDER = "catalog-config-provider";
public static final String STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME = "static-config-provider";
public static final String DYNAMIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME =
"dynamic-config-provider";

public static final String GRAVITINO_URI = "gravitino-uri";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

public class CredentialConstants {
public static final String CREDENTIAL_PROVIDER_TYPE = "credential-provider-type";

private CredentialConstants() {}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remembered that we already defined this in Credential, we don't have to define this again here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not the same, one is the credential type in Credential , this is the credential type in catalog properties. They are happen to the same name.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you use a different name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use credential-provider-type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it is a catalog property, it is better to define in catalog-common, so other engines can also use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the property already defined in catalog-common

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
*/
package org.apache.gravitino.catalog.lakehouse.iceberg.ops;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper.IcebergTableChange;
import org.apache.gravitino.rel.TableChange;
Expand Down Expand Up @@ -79,7 +81,7 @@ public class TestIcebergTableUpdate {

@BeforeEach
public void init() {
icebergCatalogWrapper = new IcebergCatalogWrapper();
icebergCatalogWrapper = new IcebergCatalogWrapper(new IcebergConfig(Collections.emptyMap()));
icebergCatalogWrapperHelper =
new IcebergCatalogWrapperHelper(icebergCatalogWrapper.getCatalog());
createNamespace(TEST_NAMESPACE_NAME);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import java.util.Map;

/**
* Helper class to generate specific credential properties for different table format and engine.
*/
public class CredentialPropertyUtils {
/**
* Transforms a specific credential into a map of Iceberg properties.
*
* @param credential the credential to be transformed into Iceberg properties
* @return a map of Iceberg properties derived from the credential
*/
public static Map<String, String> toIcebergProperties(Credential credential) {
// todo: transform specific credential to iceberg properties
return credential.toProperties();
Comment on lines +34 to +36
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This two lines code is not worthy to create a new class here, why can't you put this in iceberg package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's like a placeholder function to support more credential properties transform logic, this function will be used in client side and Iceberg REST server side, so I place it in common module.

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CredentialProviderManager {

private static final Logger LOG = LoggerFactory.getLogger(CredentialProviderManager.class);
private Map<String, CredentialProvider> credentialProviders;

public CredentialProviderManager() {
this.credentialProviders = new ConcurrentHashMap<>();
}

public void registerCredentialProvider(
String catalogName, CredentialProvider credentialProvider) {
CredentialProvider current = credentialProviders.putIfAbsent(catalogName, credentialProvider);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support one catalog with multiple credential vendors? If so, seems this map cannot support it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in current PR, I prefer to support it later in integrate Credential vending to Gravitino server to have a better overall design.

Preconditions.checkState(
!credentialProvider.equals(current),
String.format(
"Should not register multiple times to CredentialProviderManager, catalog: %s, "
+ "credential provider: %s",
catalogName, credentialProvider.credentialType()));
LOG.info(
"Register catalog:%s credential provider:%s to CredentialProviderManager",
catalogName, credentialProvider.credentialType());
}

public void unregisterCredentialProvider(String catalogName) {
CredentialProvider credentialProvider = credentialProviders.remove(catalogName);
// Not all catalog has credential provider
if (credentialProvider != null) {
LOG.info(
"Unregister catalog:{} credential provider:{} to CredentialProviderManager",
catalogName,
credentialProvider.credentialType());
try {
credentialProvider.close();
} catch (IOException e) {
LOG.warn("Close credential provider failed", e);
}
}
}

@Nullable
public CredentialProvider getCredentialProvider(String catalogName) {
return credentialProviders.get(catalogName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.credential;

import com.google.common.collect.ImmutableSet;
import org.apache.gravitino.utils.PrincipalUtils;

public class CredentialUtils {
public static Credential vendCredential(CredentialProvider credentialProvider, String path) {
PathBasedCredentialContext pathBasedCredentialContext =
new PathBasedCredentialContext(
PrincipalUtils.getCurrentUserName(), ImmutableSet.of(path), ImmutableSet.of());
return credentialProvider.getCredential(pathBasedCredentialContext);
}
}
1 change: 1 addition & 0 deletions iceberg/iceberg-common/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ plugins {
}

dependencies {
implementation(project(":api"))
implementation(project(":catalogs:catalog-common"))
implementation(project(":core")) {
exclude("*")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.gravitino.config.ConfigBuilder;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.config.ConfigEntry;
import org.apache.gravitino.credential.CredentialConstants;
import org.apache.gravitino.storage.OSSProperties;
import org.apache.gravitino.storage.S3Properties;

Expand Down Expand Up @@ -201,13 +202,13 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.longConf()
.createWithDefault(3600000L);

public static final ConfigEntry<String> ICEBERG_REST_CATALOG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_PROVIDER)
public static final ConfigEntry<String> ICEBERG_REST_CATALOG_CONFIG_PROVIDER =
new ConfigBuilder(IcebergConstants.ICEBERG_REST_CATALOG_CONFIG_PROVIDER)
.doc(
"Catalog provider class name, you can develop a class that implements `IcebergCatalogWrapperProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
"Catalog provider class name, you can develop a class that implements `IcebergCatalogConfigProvider` and add the corresponding jar file to the Iceberg REST service classpath directory.")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.createWithDefault("config-based-provider");
.createWithDefault(IcebergConstants.STATIC_ICEBERG_CATALOG_CONFIG_PROVIDER_NAME);

public static final ConfigEntry<String> GRAVITINO_URI =
new ConfigBuilder(IcebergConstants.GRAVITINO_URI)
Expand All @@ -233,6 +234,13 @@ public class IcebergConfig extends Config implements OverwriteDefaultConfig {
.toSequence()
.createWithDefault(Collections.emptyList());

public static final ConfigEntry<String> CREDENTIAL_PROVIDER_TYPE =
new ConfigBuilder(CredentialConstants.CREDENTIAL_PROVIDER_TYPE)
.doc("The credential provider type for Iceberg")
.version(ConfigConstants.VERSION_0_7_0)
.stringConf()
.create();

public String getJdbcDriver() {
return get(JDBC_DRIVER);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
package org.apache.gravitino.iceberg.common.ops;

import java.util.Map;
import java.util.Optional;
import org.apache.gravitino.iceberg.common.IcebergConfig;

/**
* IcebergCatalogWrapperProvider is an interface defining how Iceberg REST catalog server gets
* Iceberg catalogs.
* {@code IcebergCatalogConfigProvider} is an interface defining how Iceberg REST catalog server
* gets Iceberg catalog configurations.
*/
public interface IcebergCatalogWrapperProvider {
public interface IcebergCatalogConfigProvider {

/**
* @param properties The parameters for creating Provider which from configurations whose prefix
Expand All @@ -33,8 +35,8 @@ public interface IcebergCatalogWrapperProvider {
void initialize(Map<String, String> properties);

/**
* @param catalogName a param send by clients.
* @return the instance of IcebergCatalogWrapper.
* @param catalogName Iceberg catalog name.
* @return the configuration of Iceberg catalog.
*/
IcebergCatalogWrapper getIcebergTableOps(String catalogName);
Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.ImmutableSet;
import java.sql.Driver;
import java.sql.DriverManager;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -104,10 +103,6 @@ public IcebergCatalogWrapper(IcebergConfig icebergConfig) {
this.catalogPropertiesMap = icebergConfig.getIcebergCatalogProperties();
}

public IcebergCatalogWrapper() {
this(new IcebergConfig(Collections.emptyMap()));
Comment on lines -107 to -108
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can still maintain this default constructor like:

public IcebergCatalogWrapper() {
  this(xxxx, false)
}

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's used to test only, seems useless to maintain the default constructor

}

private void validateNamespace(Optional<Namespace> namespace) {
namespace.ifPresent(
n ->
Expand Down Expand Up @@ -160,7 +155,7 @@ public LoadTableResponse registerTable(Namespace namespace, RegisterTableRequest
/**
* Reload hadoop configuration, this is useful when the hadoop configuration UserGroupInformation
* is shared by multiple threads. UserGroupInformation#authenticationMethod was first initialized
* in KerberosClient, however, when switching to iceberg-rest thead,
* in KerberosClient, however, when switching to iceberg-rest thread,
* UserGroupInformation#authenticationMethod will be reset to the default value; we need to
* reinitialize it again.
*/
Expand Down Expand Up @@ -271,7 +266,7 @@ public void close() throws Exception {
private void closeMySQLCatalogResource() {
try {
// Close thread AbandonedConnectionCleanupThread if we are using `com.mysql.cj.jdbc.Driver`,
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thead maybe not this one.
// for driver `com.mysql.jdbc.Driver` (deprecated), the daemon thread maybe not this one.
Class.forName("com.mysql.cj.jdbc.AbandonedConnectionCleanupThread")
.getMethod("uncheckedShutdown")
.invoke(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.exceptions.NoSuchCatalogException;
import org.apache.gravitino.iceberg.common.IcebergConfig;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapper;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogWrapperProvider;
import org.apache.gravitino.iceberg.common.ops.IcebergCatalogConfigProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,13 +40,10 @@
*
* <p>The catalogName is iceberg_catalog
*/
public class GravitinoBasedIcebergCatalogWrapperProvider
implements IcebergCatalogWrapperProvider, AutoCloseable {
public class DynamicIcebergCatalogConfigProvider
implements IcebergCatalogConfigProvider, AutoCloseable {
public static final Logger LOG =
LoggerFactory.getLogger(GravitinoBasedIcebergCatalogWrapperProvider.class);

public static final String GRAVITINO_BASE_ICEBERG_TABLE_OPS_PROVIDER_NAME =
"gravitino-based-provider";
LoggerFactory.getLogger(DynamicIcebergCatalogConfigProvider.class);

private String gravitinoMetalake;

Expand All @@ -66,22 +64,27 @@ public void initialize(Map<String, String> properties) {
}

@Override
public IcebergCatalogWrapper getIcebergTableOps(String catalogName) {
public Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName),
IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider");

Catalog catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName);
Catalog catalog;
try {
catalog = client.loadMetalake(gravitinoMetalake).loadCatalog(catalogName);
} catch (NoSuchCatalogException e) {
return Optional.empty();
}

Preconditions.checkArgument(
"lakehouse-iceberg".equals(catalog.provider()),
String.format("%s.%s is not iceberg catalog", gravitinoMetalake, catalogName));

Map<String, String> properties =
IcebergPropertiesUtils.toIcebergCatalogProperties(catalog.properties());
return new IcebergCatalogWrapper(new IcebergConfig(properties));
return Optional.of(new IcebergConfig(properties));
}

@VisibleForTesting
Expand Down
Loading
Loading