Skip to content

Commit

Permalink
support iceberg pre event listener
Browse files Browse the repository at this point in the history
  • Loading branch information
FANNG1 committed Oct 18, 2024
1 parent 6272303 commit f5beda9
Show file tree
Hide file tree
Showing 29 changed files with 676 additions and 39 deletions.
2 changes: 1 addition & 1 deletion bundles/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,4 @@

tasks.all {
enabled = false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,5 +77,6 @@ public class IcebergConstants {

public static final String GRAVITINO_METALAKE = "gravitino-metalake";

public static final String GRAVITINO_DEFAULT_CATALOG = "__gravitino_default_catalog";
public static final String ICEBERG_REST_DEFAULT_METALAKE = "gravitino";
public static final String ICEBERG_REST_DEFAULT_CATALOG = "default_catalog";
}
35 changes: 24 additions & 11 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public class GravitinoEnv {
private static final Logger LOG = LoggerFactory.getLogger(GravitinoEnv.class);

private Config config;
// Iceberg REST server use base components while Gravitino Server use full components.
private boolean manageFullComponents = true;

private EntityStore entityStore;

Expand Down Expand Up @@ -130,21 +132,30 @@ public static GravitinoEnv getInstance() {
}

/**
* Initialize the Gravitino environment.
* Initialize base components, used for Iceberg REST server.
*
* @param config The configuration object to initialize the environment.
* @param isGravitinoServer A boolean flag indicating whether the initialization is for the
* Gravitino server. If true, server-specific components will be initialized in addition to
* the base components.
*/
public void initialize(Config config, boolean isGravitinoServer) {
LOG.info("Initializing Gravitino Environment...");
public void initializeBaseComponents(Config config) {
LOG.info("Initializing Gravitino base environment...");
this.config = config;
this.manageFullComponents = false;
initBaseComponents();
if (isGravitinoServer) {
initGravitinoServerComponents();
}
LOG.info("Gravitino Environment is initialized.");
LOG.info("Gravitino base environment is initialized.");
}

/**
* Initialize all components, used for Gravitino server.
*
* @param config The configuration object to initialize the environment.
*/
public void initializeFullComponents(Config config) {
LOG.info("Initializing Gravitino full Environment...");
this.config = config;
this.manageFullComponents = true;
initBaseComponents();
initGravitinoServerComponents();
LOG.info("Gravitino full environment is initialized.");
}

/**
Expand Down Expand Up @@ -308,9 +319,11 @@ public FutureGrantManager futureGrantManager() {
}

public void start() {
auxServiceManager.serviceStart();
metricsSystem.start();
eventListenerManager.start();
if (manageFullComponents) {
auxServiceManager.serviceStart();
}
}

/** Shutdown the Gravitino environment. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.apache.gravitino.iceberg.service.IcebergCatalogWrapperManager;
import org.apache.gravitino.iceberg.service.IcebergExceptionMapper;
import org.apache.gravitino.iceberg.service.IcebergObjectMapperProvider;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableEventDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationDispatcher;
import org.apache.gravitino.iceberg.service.dispatcher.IcebergTableOperationExecutor;
import org.apache.gravitino.iceberg.service.metrics.IcebergMetricsManager;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.metrics.MetricsSystem;
import org.apache.gravitino.metrics.source.MetricsSource;
import org.apache.gravitino.server.web.HttpServerMetricsSource;
Expand Down Expand Up @@ -70,14 +74,20 @@ private void initServer(IcebergConfig icebergConfig) {
new HttpServerMetricsSource(MetricsSource.ICEBERG_REST_SERVER_METRIC_NAME, config, server);
metricsSystem.register(httpServerMetricsSource);

EventBus eventBus = GravitinoEnv.getInstance().eventBus();
icebergCatalogWrapperManager = new IcebergCatalogWrapperManager(icebergConfig.getAllConfig());
icebergMetricsManager = new IcebergMetricsManager(icebergConfig);
IcebergTableOperationExecutor icebergTableOperationExecutor =
new IcebergTableOperationExecutor(icebergCatalogWrapperManager);
IcebergTableEventDispatcher icebergTableEventDispatcher =
new IcebergTableEventDispatcher(icebergTableOperationExecutor, eventBus);
config.register(
new AbstractBinder() {
@Override
protected void configure() {
bind(icebergCatalogWrapperManager).to(IcebergCatalogWrapperManager.class).ranked(1);
bind(icebergMetricsManager).to(IcebergMetricsManager.class).ranked(1);
bind(icebergTableEventDispatcher).to(IcebergTableOperationDispatcher.class).ranked(1);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ public Optional<IcebergConfig> getIcebergCatalogConfig(String catalogName) {
Preconditions.checkArgument(
StringUtils.isNotBlank(catalogName), "blank catalogName is illegal");
Preconditions.checkArgument(
!IcebergConstants.GRAVITINO_DEFAULT_CATALOG.equals(catalogName),
IcebergConstants.GRAVITINO_DEFAULT_CATALOG + " is illegal in gravitino-based-provider");
!IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(catalogName),
IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG + " is illegal in gravitino-based-provider");

Catalog catalog;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void initialize(Map<String, String> properties) {
MapUtils.getPrefixMap(
properties, String.format("catalog.%s.", catalogName)))));
this.catalogConfigs.put(
IcebergConstants.GRAVITINO_DEFAULT_CATALOG, new IcebergConfig(properties));
IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG, new IcebergConfig(properties));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ public GravitinoIcebergRESTServer(Config config) {
}

private void initialize() {
gravitinoEnv.initialize(serverConfig, false);
gravitinoEnv.initializeBaseComponents(serverConfig);
icebergRESTService.serviceInit(
serverConfig.getConfigsWithPrefix(IcebergConfig.ICEBERG_CONFIG_PREFIX));
ServerAuthenticator.getInstance().initialize(serverConfig);
}

private void start() {
gravitinoEnv.start();
icebergRESTService.serviceStart();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,6 @@ private IcebergCatalogConfigProvider createIcebergCatalogConfigProvider(
}
}

private String shelling(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else {
// rawPrefix is a string matching ([^/]*/) which end with /
Preconditions.checkArgument(
rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}

private void closeIcebergCatalogWrapper(IcebergCatalogWrapper catalogWrapper) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.ext.ExceptionMapper;
import javax.ws.rs.ext.Provider;
import org.apache.gravitino.exceptions.IllegalNameIdentifierException;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
Expand Down Expand Up @@ -51,8 +52,10 @@ public class IcebergExceptionMapper implements ExceptionMapper<Exception> {
ImmutableMap.<Class<? extends Exception>, Integer>builder()
.put(IllegalArgumentException.class, 400)
.put(ValidationException.class, 400)
.put(IllegalNameIdentifierException.class, 400)
.put(NamespaceNotEmptyException.class, 400)
.put(NotAuthorizedException.class, 401)
.put(org.apache.gravitino.exceptions.ForbiddenException.class, 403)
.put(ForbiddenException.class, 403)
.put(NoSuchNamespaceException.class, 404)
.put(NoSuchTableException.class, 404)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,21 @@
*/
package org.apache.gravitino.iceberg.service;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.stream.Stream;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.responses.ErrorResponse;

public class IcebergRestUtils {
Expand Down Expand Up @@ -71,4 +80,50 @@ public static Instant calculateNewTimestamp(Instant currentTimestamp, int hours)
}
return nextHourDateTime.atZone(ZoneId.systemDefault()).toInstant();
}

public static String getCatalogName(String rawPrefix) {
String prefix = normalizePrefix(rawPrefix);
Preconditions.checkArgument(
!IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG.equals(prefix),
String.format("%s is conflict with reserved key, please replace it", prefix));
if (StringUtils.isBlank(prefix)) {
return IcebergConstants.ICEBERG_REST_DEFAULT_CATALOG;
}
return prefix;
}

public static NameIdentifier getGravitinoNameIdentifier(
String catalogName, TableIdentifier icebergIdentifier) {
// todo(fanng): use a more general way to get metalake
Stream<String> catalogNS =
Stream.concat(
Stream.of(IcebergConstants.ICEBERG_REST_DEFAULT_METALAKE, catalogName),
Arrays.stream(icebergIdentifier.namespace().levels()));
String[] catalogNSTable =
Stream.concat(catalogNS, Stream.of(icebergIdentifier.name())).toArray(String[]::new);
return NameIdentifier.of(catalogNSTable);
}

// remove the last '/' from the prefix, for example transform 'iceberg_catalog/' to
// 'iceberg_catalog'
private static String normalizePrefix(String rawPrefix) {
if (StringUtils.isBlank(rawPrefix)) {
return rawPrefix;
} else {
// rawPrefix is a string matching ([^/]*/) which end with /
Preconditions.checkArgument(
rawPrefix.endsWith("/"), String.format("rawPrefix %s format is illegal", rawPrefix));
return rawPrefix.substring(0, rawPrefix.length() - 1);
}
}

public static <T> T cloneIcebergRESTObject(Object message, Class<T> className) {
ObjectMapper icebergObjectMapper = IcebergObjectMapper.getInstance();
try {
byte[] values = icebergObjectMapper.writeValueAsBytes(message);
return icebergObjectMapper.readValue(values, className);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.iceberg.service.dispatcher;

import org.apache.gravitino.NameIdentifier;
import org.apache.gravitino.iceberg.service.IcebergRestUtils;
import org.apache.gravitino.listener.EventBus;
import org.apache.gravitino.listener.api.event.IcebergCreateTableEvent;
import org.apache.gravitino.listener.api.event.IcebergCreateTableFailureEvent;
import org.apache.gravitino.listener.api.event.IcebergCreateTablePreEvent;
import org.apache.gravitino.utils.PrincipalUtils;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;

/**
* {@code IcebergTableEventDispatcher} is a decorator for {@link IcebergTableOperationExecutor} that
* not only delegates table operations to the underlying dispatcher but also dispatches
* corresponding events to an {@link org.apache.gravitino.listener.EventBus}.
*/
public class IcebergTableEventDispatcher implements IcebergTableOperationDispatcher {

private IcebergTableOperationDispatcher icebergTableOperationDispatcher;
private EventBus eventBus;

public IcebergTableEventDispatcher(
IcebergTableOperationDispatcher icebergTableOperationDispatcher, EventBus eventBus) {
this.icebergTableOperationDispatcher = icebergTableOperationDispatcher;
this.eventBus = eventBus;
}

@Override
public LoadTableResponse createTable(
String catalogName, Namespace namespace, CreateTableRequest createTableRequest) {
TableIdentifier tableIdentifier = TableIdentifier.of(namespace, createTableRequest.name());
NameIdentifier nameIdentifier =
IcebergRestUtils.getGravitinoNameIdentifier(catalogName, tableIdentifier);
eventBus.dispatchEvent(
new IcebergCreateTablePreEvent(
PrincipalUtils.getCurrentUserName(), nameIdentifier, createTableRequest));
LoadTableResponse loadTableResponse;
try {
loadTableResponse =
icebergTableOperationDispatcher.createTable(catalogName, namespace, createTableRequest);
} catch (Exception e) {
eventBus.dispatchEvent(
new IcebergCreateTableFailureEvent(
PrincipalUtils.getCurrentUserName(), nameIdentifier, e));
throw e;
}
eventBus.dispatchEvent(
new IcebergCreateTableEvent(
PrincipalUtils.getCurrentUserName(),
nameIdentifier,
createTableRequest,
loadTableResponse));
return loadTableResponse;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.iceberg.service.dispatcher;

import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.rest.requests.CreateTableRequest;
import org.apache.iceberg.rest.responses.LoadTableResponse;

/**
* The {@code IcebergTableOperationDispatcher} interface defines the public API for managing Iceberg
* tables.
*/
public interface IcebergTableOperationDispatcher {
/**
* Creates a new Iceberg table.
*
* @param catalogName The catalog name when creating the table.
* @param namespace The namespace within which the table should be created.
* @param createTableRequest The request object containing the details for creating the table.
* @return A {@link LoadTableResponse} object containing the result of the operation.
*/
LoadTableResponse createTable(
String catalogName, Namespace namespace, CreateTableRequest createTableRequest);
}
Loading

0 comments on commit f5beda9

Please sign in to comment.