diff --git a/HowToKerberize.md b/HowToKerberize.md
index d1176c8cf..30908f83c 100644
--- a/HowToKerberize.md
+++ b/HowToKerberize.md
@@ -24,83 +24,64 @@ In addition, because Kerberos authentication requires a delegation-token to prox
* Zookeeper to store delegation-token (Recommended)
### Configuration
+Waggle Dance `waggle-dance-server.yml` example:
-Waggle Dance does not read Hadoop's `core-site.xml` so a general property providing Kerberos auth should be added to
-the Hive configuration file `hive-site.xml`:
-
-```
-
- hadoop.security.authentication
- KERBEROS
-
-```
-
-
-Waggle Dance also needs a keytab file to communicate with the Metastore so the following properties should be present:
```
-
- hive.metastore.sasl.enabled
- true
-
-
- hive.metastore.kerberos.principal
- hive/_HOST@YOUR_REALM.COM
-
-
- hive.metastore.kerberos.keytab.file
- /etc/hive.keytab
-
+port: 9083
+verbose: true
+#database-resolution: MANUAL
+database-resolution: PREFIXED
+yaml-storage:
+ overwrite-config-on-shutdown: false
+logging:
+ config: file:/path/to/log4j2.xml
+configuration-properties:
+ hadoop.security.authentication: KERBEROS
+ hive.metastore.sasl.enabled: true
+ hive.metastore.kerberos.principal: hive/_HOST@EXAMPLE.COM
+ hive.metastore.kerberos.keytab.file: /path/to/hive.keytab
+ hive.cluster.delegation.token.store.class: org.apache.hadoop.hive.thrift.ZooKeeperTokenStore
+ hive.cluster.delegation.token.store.zookeeper.connectString: zz1:2181,zz2:2181,zz3:2181
+ hive.cluster.delegation.token.store.zookeeper.znode: /hive/cluster/wd_delegation
+ hive.server2.authentication: KERBEROS
+ hive.server2.authentication.kerberos.principal: hive/_HOST@EXAMPLE.COM
+ hive.server2.authentication.kerberos.keytab: /path/to/hive.keytab
+ hive.server2.authentication.client.kerberos.principal: hive/_HOST@EXAMPLE.COM
+ hadoop.kerberos.keytab.login.autorenewal.enabled : true
+ hadoop.proxyuser.hive.users: '*'
+ hadoop.proxyuser.hive.hosts: '*'
```
-In addition, all metastores need to use the Zookeeper shared token:
+Waggle Dance `waggle-dance-federation.yml` example:
```
-
- hive.cluster.delegation.token.store.class
- org.apache.hadoop.hive.thrift.ZooKeeperTokenStore
-
-
- hive.cluster.delegation.token.store.zookeeper.connectString
- zk1:2181,zk2:2181,zk3:2181
-
-
- hive.cluster.delegation.token.store.zookeeper.znode
- /hive/token
-
+primary-meta-store:
+ database-prefix: ''
+ name: local
+ remote-meta-store-uris: thrift://ms1:9083
+ access-control-type: READ_AND_WRITE_AND_CREATE
+ impersonation-enabled: true
+federated-meta-stores:
+- remote-meta-store-uris: thrift://ms2:9083
+ database-prefix: dw_
+ name: remote
+ impersonation-enabled: true
+ access-control-type: READ_AND_WRITE_ON_DATABASE_WHITELIST
+ writable-database-white-list:
+ - .*
```
-If you are intending to use a Beeline client, the following properties may be valuable:
+Connect to Waggle Dance via beeline, change ` hive.metastore.uris` in Hive configuration file `hive-site.xml`:
```
- hive.server2.transport.mode
- http
-
-
- hive.server2.authentication
- KERBEROS
-
-
- hive.server2.authentication.kerberos.principal
- hive/_HOST@YOUR_REALM.COM
-
-
- hive.server2.authentication.kerberos.keytab
- /etc/hive.keytab
-
-
- hive.server2.enable.doAs
- false
+ hive.metastore.uris
+ thrift://wd:9083
```
-
### Running
Waggle Dance should be started by a privileged user with a fresh keytab.
-If Waggle Dance throws a GSS exception, you have problem with the keytab file.
-Try to perform `kdestroy` and `kinit` operations and check the keytab file ownership flags.
-
-If the Metastore throws an exception with code -127, Waggle Dance is probably using the wrong authentication policy.
-Check the values in `hive-conf.xml` and make sure that HIVE_HOME and HIVE_CONF_DIR are defined.
-
-Don't forget to restart hive services!
+Just start the service directly, no kinit operation is required.
+Because the ticket information is saved in jvm instead of being saved in a local file.
+In this way, it can automatically renew without the need for additional operations to renew local tickets.
\ No newline at end of file
diff --git a/README.md b/README.md
index 926bcf775..6983b44bb 100644
--- a/README.md
+++ b/README.md
@@ -152,35 +152,38 @@ Example:
configuration-properties:
hive.metastore.kerberos.principal: hive/clustername@HADOOP.COM
- ...
+
The table below describes all the available configuration values for Waggle Dance federations:
-| Property | Required | Description |
-|:----------------------------------------------------------|:----:|:----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
-| `primary-meta-store` | No | Primary MetaStore config. Can be empty but it is advised to configure it. |
-| `primary-meta-store.remote-meta-store-uris` | Yes | Thrift URIs of the federated read-only metastore. |
-| `primary-meta-store.name` | Yes | Database name that uniquely identifies this metastore. Used internally. Cannot be empty. |
-| `primary-meta-store.database-prefix` | No | Prefix used to access the primary metastore and differentiate databases in it from databases in another metastore. The default prefix (i.e. if this value isn't explicitly set) is empty string. |
-| `primary-meta-store.access-control-type` | No | Sets how the client access controls should be handled. Default is `READ_ONLY` Other options `READ_AND_WRITE_AND_CREATE`, `READ_AND_WRITE_ON_DATABASE_WHITELIST` and `READ_AND_WRITE_AND_CREATE_ON_DATABASE_WHITELIST` see Access Control section below. |
-| `primary-meta-store.writable-database-white-list` | No | White-list of databases used to verify write access used in conjunction with `primary-meta-store.access-control-type`. The list of databases should be listed without any `primary-meta-store.database-prefix`. This property supports both full database names and (case-insensitive) [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html). |
-| `primary-meta-store.metastore-tunnel` | No | See metastore tunnel configuration values below. |
-| `primary-meta-store.latency` | No | Indicates the acceptable slowness of the metastore in **milliseconds** for increasing the default connection timeout. Default latency is `0` and should be changed if the metastore is particularly slow. If you get an error saying that results were omitted because the metastore was slow, consider changing the latency to a higher number. |
-| `primary-meta-store.mapped-databases` | No | List of databases to federate from the primary metastore; all other databases will be ignored. This property supports both full database names and [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html) (both being case-insensitive). By default, all databases from the metastore are federated. |
-| `primary-meta-store.mapped-tables` | No | List of mappings from databases to tables to federate from the primary metastore, similar to `mapped-databases`. By default, all tables are available. See `mapped-tables` configuration below. |
-| `primary-meta-stores.hive-metastore-filter-hook` | No | Name of the class which implements the `MetaStoreFilterHook` interface from Hive. This allows a metastore filter hook to be applied to the corresponding Hive metastore calls. Can be configured with the `configuration-properties` specified in the `waggle-dance-server.yml` configuration. They will be added in the HiveConf object that is given to the constructor of the `MetaStoreFilterHook` implementation you provide. |
-| `primary-meta-stores.database-name-mapping` | No | BiDirectional Map of database names and mapped name, where key=`` and value=``. See the [Database Name Mapping](#database-name-mapping) section. |
-| `primary-meta-stores.configuration-properties` | No | Map of the primary metastore personalized properties that will be added to the HiveConf used when creating the Thrift clients (they will be effect only on this client),the priority is higher than the properites of the same name in waggle-dance-server.yml. |
-| `federated-meta-stores` | No | Possible empty list of read only federated metastores. |
-| `federated-meta-stores[n].remote-meta-store-uris` | Yes | Thrift URIs of the federated read-only metastore. |
-| `federated-meta-stores[n].name` | Yes | Name that uniquely identifies this metastore. Used internally. Cannot be empty. |
-| `federated-meta-stores[n].database-prefix` | No | Prefix used to access this particular metastore and differentiate databases in it from databases in another metastore. Typically used if databases have the same name across metastores but federated access to them is still needed. The default prefix (i.e. if this value isn't explicitly set) is {federated-meta-stores[n].name} lowercased and postfixed with an underscore. For example if the metastore name was configured as "waggle" and no database prefix was provided but `PREFIXED` database resolution was used then the value of `database-prefix` would be "waggle_". |
-| `federated-meta-stores[n].metastore-tunnel` | No | See metastore tunnel configuration values below. |
-| `federated-meta-stores[n].latency` | No | Indicates the acceptable slowness of the metastore in **milliseconds** for increasing the default connection timeout. Default latency is `0` and should be changed if the metastore is particularly slow. If you get an error saying that results were omitted because the metastore was slow, consider changing the latency to a higher number. |
-| `federated-meta-stores[n].mapped-databases` | No | List of databases to federate from this federated metastore, all other databases will be ignored. This property supports both full database names and [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html) (both being case-insensitive). By default, all databases from the metastore are federated. |
-| `federated-meta-stores[n].mapped-tables` | No | List of mappings from databases to tables to federate from this federated metastore, similar to `mapped-databases`. By default, all tables are available. See `mapped-tables` configuration below. |
-| `federated-meta-stores[n].hive-metastore-filter-hook` | No | Name of the class which implements the `MetaStoreFilterHook` interface from Hive. This allows a metastore filter hook to be applied to the corresponding Hive metastore calls. Can be configured with the `configuration-properties` specified in the `waggle-dance-server.yml` configuration. They will be added in the HiveConf object that is given to the constructor of the `MetaStoreFilterHook` implementation you provide. |
-| `federated-meta-stores[n].database-name-mapping` | No | BiDirectional Map of database names and mapped names where key=`` and value=``. See the [Database Name Mapping](#database-name-mapping) section. |
-| `federated-meta-stores[n].writable-database-white-list` | No | White-list of databases used to verify write access used in conjunction with `federated-meta-stores[n].access-control-type`. The list of databases should be listed without a `federated-meta-stores[n].database-prefix`. This property supports both full database names and (case-insensitive) [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html). |
-| `federated-meta-stores[n].configuration-properties` | No | Map of the federate metastore personalized properties that will be added to the HiveConf used when creating the Thrift clients (they will be effect only on this client),the priority is higher than the properites of the same name in waggle-dance-server.yml. |
+| Property | Required | Description |
+|:---------------------------------------------------------|:----:|:----|
+| `primary-meta-store` | No | Primary MetaStore config. Can be empty but it is advised to configure it. |
+| `primary-meta-store.remote-meta-store-uris` | Yes | Thrift URIs of the federated read-only metastore. |
+| `primary-meta-store.name` | Yes | Database name that uniquely identifies this metastore. Used internally. Cannot be empty. |
+| `primary-meta-store.database-prefix` | No | Prefix used to access the primary metastore and differentiate databases in it from databases in another metastore. The default prefix (i.e. if this value isn't explicitly set) is empty string.|
+| `primary-meta-store.access-control-type` | No | Sets how the client access controls should be handled. Default is `READ_ONLY` Other options `READ_AND_WRITE_AND_CREATE`, `READ_AND_WRITE_ON_DATABASE_WHITELIST` and `READ_AND_WRITE_AND_CREATE_ON_DATABASE_WHITELIST` see Access Control section below. |
+| `primary-meta-store.impersonation-enabled` | No | Enable metastore end-user impersonation.|
+| `primary-meta-store.writable-database-white-list` | No | White-list of databases used to verify write access used in conjunction with `primary-meta-store.access-control-type`. The list of databases should be listed without any `primary-meta-store.database-prefix`. This property supports both full database names and (case-insensitive) [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html).|
+| `primary-meta-store.metastore-tunnel` | No | See metastore tunnel configuration values below. |
+| `primary-meta-store.latency` | No | Indicates the acceptable slowness of the metastore in **milliseconds** for increasing the default connection timeout. Default latency is `0` and should be changed if the metastore is particularly slow. If you get an error saying that results were omitted because the metastore was slow, consider changing the latency to a higher number.|
+| `primary-meta-store.mapped-databases` | No | List of databases to federate from the primary metastore; all other databases will be ignored. This property supports both full database names and [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html) (both being case-insensitive). By default, all databases from the metastore are federated. |
+| `primary-meta-store.mapped-tables` | No | List of mappings from databases to tables to federate from the primary metastore, similar to `mapped-databases`. By default, all tables are available. See `mapped-tables` configuration below. |
+| `primary-meta-stores.hive-metastore-filter-hook` | No | Name of the class which implements the `MetaStoreFilterHook` interface from Hive. This allows a metastore filter hook to be applied to the corresponding Hive metastore calls. Can be configured with the `configuration-properties` specified in the `waggle-dance-server.yml` configuration. They will be added in the HiveConf object that is given to the constructor of the `MetaStoreFilterHook` implementation you provide. |
+| `primary-meta-stores.database-name-mapping` | No | BiDirectional Map of database names and mapped name, where key=`` and value=``. See the [Database Name Mapping](#database-name-mapping) section.|
+| `primary-meta-stores.configuration-properties` | No | Map of the primary metastore personalized properties that will be added to the HiveConf used when creating the Thrift clients (they will be effect only on this client),the priority is higher than the properites of the same name in waggle-dance-server.yml. |
+| `federated-meta-stores` | No | Possible empty list of read only federated metastores. |
+| `federated-meta-stores[n].remote-meta-store-uris` | Yes | Thrift URIs of the federated read-only metastore. |
+| `federated-meta-stores[n].name` | Yes | Name that uniquely identifies this metastore. Used internally. Cannot be empty. |
+| `federated-meta-stores[n].impersonation-enabled` | No | Enable metastore end-user impersonation.|
+| `federated-meta-stores[n].database-prefix` | No | Prefix used to access this particular metastore and differentiate databases in it from databases in another metastore. Typically used if databases have the same name across metastores but federated access to them is still needed. The default prefix (i.e. if this value isn't explicitly set) is {federated-meta-stores[n].name} lowercased and postfixed with an underscore. For example if the metastore name was configured as "waggle" and no database prefix was provided but `PREFIXED` database resolution was used then the value of `database-prefix` would be "waggle_". |
+| `federated-meta-stores[n].metastore-tunnel` | No | See metastore tunnel configuration values below. |
+| `federated-meta-stores[n].latency` | No | Indicates the acceptable slowness of the metastore in **milliseconds** for increasing the default connection timeout. Default latency is `0` and should be changed if the metastore is particularly slow. If you get an error saying that results were omitted because the metastore was slow, consider changing the latency to a higher number.|
+| `federated-meta-stores[n].mapped-databases` | No | List of databases to federate from this federated metastore, all other databases will be ignored. This property supports both full database names and [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html) (both being case-insensitive). By default, all databases from the metastore are federated. |
+| `federated-meta-stores[n].mapped-tables` | No | List of mappings from databases to tables to federate from this federated metastore, similar to `mapped-databases`. By default, all tables are available. See `mapped-tables` configuration below. |
+| `federated-meta-stores[n].hive-metastore-filter-hook` | No | Name of the class which implements the `MetaStoreFilterHook` interface from Hive. This allows a metastore filter hook to be applied to the corresponding Hive metastore calls. Can be configured with the `configuration-properties` specified in the `waggle-dance-server.yml` configuration. They will be added in the HiveConf object that is given to the constructor of the `MetaStoreFilterHook` implementation you provide. |
+| `federated-meta-stores[n].database-name-mapping` | No | BiDirectional Map of database names and mapped names where key=`` and value=``. See the [Database Name Mapping](#database-name-mapping) section.|
+| `federated-meta-stores[n].writable-database-white-list` | No | White-list of databases used to verify write access used in conjunction with `federated-meta-stores[n].access-control-type`. The list of databases should be listed without a `federated-meta-stores[n].database-prefix`. This property supports both full database names and (case-insensitive) [Java RegEx patterns](https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html).|
+| `federated-meta-stores[n].configuration-properties` | No | Map of the federate metastore personalized properties that will be added to the HiveConf used when creating the Thrift clients (they will be effect only on this client),the priority is higher than the properites of the same name in waggle-dance-server.yml. |
#### Metastore tunnel
The table below describes the metastore tunnel configuration values:
diff --git a/kerberos-process.png b/kerberos-process.png
index 992c8ee10..d4974533a 100644
Binary files a/kerberos-process.png and b/kerberos-process.png differ
diff --git a/waggle-dance-api/src/main/java/com/hotels/bdp/waggledance/api/model/AbstractMetaStore.java b/waggle-dance-api/src/main/java/com/hotels/bdp/waggledance/api/model/AbstractMetaStore.java
index 24cb9fcc4..3eab7217d 100644
--- a/waggle-dance-api/src/main/java/com/hotels/bdp/waggledance/api/model/AbstractMetaStore.java
+++ b/waggle-dance-api/src/main/java/com/hotels/bdp/waggledance/api/model/AbstractMetaStore.java
@@ -60,6 +60,7 @@ public abstract class AbstractMetaStore {
private transient @JsonProperty @NotNull MetaStoreStatus status = MetaStoreStatus.UNKNOWN;
private long latency = 0;
private transient @JsonIgnore HashBiMap databaseNameBiMapping = HashBiMap.create();
+ private boolean impersonationEnabled;
private Map configurationProperties = new HashMap<>();
public AbstractMetaStore(String name, String remoteMetaStoreUris, AccessControlType accessControlType) {
@@ -222,6 +223,14 @@ public void setStatus(MetaStoreStatus status) {
this.status = status;
}
+ public boolean isImpersonationEnabled() {
+ return impersonationEnabled;
+ }
+
+ public void setImpersonationEnabled(boolean impersonationEnabled) {
+ this.impersonationEnabled = impersonationEnabled;
+ }
+
@Override
public int hashCode() {
return Objects.hashCode(name);
diff --git a/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/FederatedMetaStoreTest.java b/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/FederatedMetaStoreTest.java
index 8a6efcc9b..fc6228e75 100644
--- a/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/FederatedMetaStoreTest.java
+++ b/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/FederatedMetaStoreTest.java
@@ -72,7 +72,7 @@ public void nullDatabasePrefix() {
@Test
public void toJson() throws Exception {
- String expected = "{\"accessControlType\":\"READ_ONLY\",\"configurationProperties\":{},\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"name_\",\"federationType\":\"FEDERATED\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
+ String expected = "{\"accessControlType\":\"READ_ONLY\",\"configurationProperties\":{},\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"name_\",\"federationType\":\"FEDERATED\",\"hiveMetastoreFilterHook\":null,\"impersonationEnabled\":false,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
ObjectMapper mapper = new ObjectMapper();
// Sorting to get deterministic test behaviour
mapper.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY);
diff --git a/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/PrimaryMetaStoreTest.java b/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/PrimaryMetaStoreTest.java
index e76cfc60e..5da2d6316 100644
--- a/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/PrimaryMetaStoreTest.java
+++ b/waggle-dance-api/src/test/java/com/hotels/bdp/waggledance/api/model/PrimaryMetaStoreTest.java
@@ -89,7 +89,7 @@ public void nonEmptyDatabasePrefix() {
@Test
public void toJson() throws Exception {
- String expected = "{\"accessControlType\":\"READ_ONLY\",\"configurationProperties\":{},\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"\",\"federationType\":\"PRIMARY\",\"hiveMetastoreFilterHook\":null,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
+ String expected = "{\"accessControlType\":\"READ_ONLY\",\"configurationProperties\":{},\"connectionType\":\"DIRECT\",\"databaseNameMapping\":{},\"databasePrefix\":\"\",\"federationType\":\"PRIMARY\",\"hiveMetastoreFilterHook\":null,\"impersonationEnabled\":false,\"latency\":0,\"mappedDatabases\":null,\"mappedTables\":null,\"metastoreTunnel\":null,\"name\":\"name\",\"remoteMetaStoreUris\":\"uri\",\"status\":\"UNKNOWN\",\"writableDatabaseWhiteList\":[]}";
ObjectMapper mapper = new ObjectMapper();
// Sorting to get deterministic test behaviour
mapper.enable(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY);
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java
new file mode 100644
index 000000000..4f747aaad
--- /dev/null
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/AbstractThriftMetastoreClientManager.java
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2016-2024 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hotels.bdp.waggledance.client;
+
+import java.io.Closeable;
+import java.net.URI;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.conf.HiveConfUtil;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TTransport;
+
+import lombok.extern.log4j.Log4j2;
+
+import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory;
+
+@Log4j2
+public abstract class AbstractThriftMetastoreClientManager implements Closeable {
+
+ protected static final AtomicInteger CONN_COUNT = new AtomicInteger(0);
+ protected final HiveConf conf;
+ protected final HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory;
+ protected final URI[] metastoreUris;
+ protected ThriftHiveMetastore.Iface client = null;
+ protected TTransport transport = null;
+ protected boolean isConnected = false;
+ // for thrift connects
+ protected int retries = 5;
+ protected long retryDelaySeconds = 0;
+
+ protected final int connectionTimeout;
+ protected final String msUri;
+
+ AbstractThriftMetastoreClientManager(
+ HiveConf conf,
+ HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory,
+ int connectionTimeout) {
+ this.conf = conf;
+ this.hiveCompatibleThriftHiveMetastoreIfaceFactory = hiveCompatibleThriftHiveMetastoreIfaceFactory;
+ this.connectionTimeout = connectionTimeout;
+ msUri = conf.getVar(ConfVars.METASTOREURIS);
+
+ if (HiveConfUtil.isEmbeddedMetaStore(msUri)) {
+ throw new RuntimeException("You can't waggle an embedded metastore");
+ }
+
+ // get the number retries
+ retries = HiveConf.getIntVar(conf, ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
+ retryDelaySeconds = conf.getTimeVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
+
+ // user wants file store based configuration
+ if (msUri != null) {
+ String[] metastoreUrisString = msUri.split(",");
+ metastoreUris = new URI[metastoreUrisString.length];
+ try {
+ int i = 0;
+ for (String s : metastoreUrisString) {
+ URI tmpUri = new URI(s);
+ if (tmpUri.getScheme() == null) {
+ throw new IllegalArgumentException("URI: " + s + " does not have a scheme");
+ }
+ metastoreUris[i++] = tmpUri;
+ }
+ } catch (IllegalArgumentException e) {
+ throw (e);
+ } catch (Exception e) {
+ String exInfo = "Got exception: " + e.getClass().getName() + " " + e.getMessage();
+ log.error(exInfo, e);
+ throw new RuntimeException(exInfo, e);
+ }
+ } else {
+ log.error("NOT getting uris from conf");
+ throw new RuntimeException("MetaStoreURIs not found in conf file");
+ }
+ }
+
+ void open() {
+ open(null);
+ }
+
+ abstract void open(HiveUgiArgs ugiArgs);
+
+ void reconnect(HiveUgiArgs ugiArgs) {
+ close();
+ // Swap the first element of the metastoreUris[] with a random element from the rest
+ // of the array. Rationale being that this method will generally be called when the default
+ // connection has died and the default connection is likely to be the first array element.
+ promoteRandomMetaStoreURI();
+ open(ugiArgs);
+ }
+
+ public String getHiveConfValue(String key, String defaultValue) {
+ return conf.get(key, defaultValue);
+ }
+
+ public void setHiveConfValue(String key, String value) {
+ conf.set(key, value);
+ }
+
+ @Override
+ public void close() {
+ if (!isConnected) {
+ return;
+ }
+ isConnected = false;
+ try {
+ if (client != null) {
+ client.shutdown();
+ }
+ } catch (TException e) {
+ log.debug("Unable to shutdown metastore client. Will try closing transport directly.", e);
+ }
+ // Transport would have got closed via client.shutdown(), so we don't need this, but
+ // just in case, we make this call.
+ if ((transport != null) && transport.isOpen()) {
+ transport.close();
+ transport = null;
+ }
+ log.info("Closed a connection to metastore, current connections: {}", CONN_COUNT.decrementAndGet());
+ }
+
+ boolean isOpen() {
+ return (transport != null) && transport.isOpen();
+ }
+
+ protected ThriftHiveMetastore.Iface getClient() {
+ return client;
+ }
+
+ /**
+ * Swaps the first element of the metastoreUris array with a random element from the remainder of the array.
+ */
+ private void promoteRandomMetaStoreURI() {
+ if (metastoreUris.length <= 1) {
+ return;
+ }
+ Random rng = new Random();
+ int index = rng.nextInt(metastoreUris.length - 1) + 1;
+ URI tmp = metastoreUris[0];
+ metastoreUris[0] = metastoreUris[index];
+ metastoreUris[index] = tmp;
+ }
+}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java
index abb07b052..7d7bdf85c 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/CloseableThriftHiveMetastoreIfaceClientFactory.java
@@ -30,6 +30,7 @@
import com.hotels.bdp.waggledance.api.model.AbstractMetaStore;
import com.hotels.bdp.waggledance.client.tunnelling.TunnelingMetaStoreClientFactory;
import com.hotels.bdp.waggledance.conf.WaggleDanceConfiguration;
+import com.hotels.bdp.waggledance.context.CommonBeans;
import com.hotels.hcommon.hive.metastore.conf.HiveConfFactory;
import com.hotels.hcommon.hive.metastore.util.MetaStoreUriNormaliser;
@@ -70,6 +71,8 @@ private CloseableThriftHiveMetastoreIface newHiveInstance(
connectionTimeout, waggleDanceConfiguration.getConfigurationProperties());
}
properties.put(ConfVars.METASTOREURIS.varname, uris);
+ properties.put(CommonBeans.IMPERSONATION_ENABLED_KEY,
+ String.valueOf(metaStore.isImpersonationEnabled()));
HiveConfFactory confFactory = new HiveConfFactory(Collections.emptyList(), properties);
return defaultMetaStoreClientFactory
.newInstance(confFactory.newInstance(), "waggledance-" + name, DEFAULT_CLIENT_FACTORY_RECONNECTION_RETRY,
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java
index 4b6986048..fe9b5873c 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/DefaultMetaStoreClientFactory.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,17 +15,14 @@
*/
package com.hotels.bdp.waggledance.client;
-import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
-import java.lang.reflect.UndeclaredThrowableException;
import java.util.List;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.thrift.transport.TTransportException;
import lombok.extern.log4j.Log4j2;
@@ -34,7 +31,6 @@
import com.google.common.collect.Lists;
import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory;
-import com.hotels.bdp.waggledance.server.TokenWrappingHMSHandler;
import com.hotels.hcommon.hive.metastore.exception.MetastoreUnavailableException;
@@ -45,7 +41,7 @@ public class DefaultMetaStoreClientFactory implements MetaStoreClientFactory {
@Log4j2
private static class ReconnectingMetastoreClientInvocationHandler implements InvocationHandler {
- private final ThriftMetastoreClientManager base;
+ private final AbstractThriftMetastoreClientManager base;
private final String name;
private final int maxRetries;
@@ -54,7 +50,7 @@ private static class ReconnectingMetastoreClientInvocationHandler implements Inv
private ReconnectingMetastoreClientInvocationHandler(
String name,
int maxRetries,
- ThriftMetastoreClientManager base) {
+ AbstractThriftMetastoreClientManager base) {
this.name = name;
this.maxRetries = maxRetries;
this.base = base;
@@ -99,7 +95,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
}
}
- private Object doRealCall(Method method, Object[] args, int attempt) throws IllegalAccessException, Throwable {
+ private Object doRealCall(Method method, Object[] args, int attempt) throws Throwable {
do {
try {
return method.invoke(base.getClient(), args);
@@ -140,76 +136,6 @@ private void reconnectIfDisconnected() {
}
- @Log4j2
- private static class SaslMetastoreClientHander implements InvocationHandler {
-
- private final CloseableThriftHiveMetastoreIface baseHandler;
- private final ThriftMetastoreClientManager clientManager;
- private final String tokenSignature = "WAGGLEDANCETOKEN";
-
- private String delegationToken;
-
- public static CloseableThriftHiveMetastoreIface newProxyInstance(
- CloseableThriftHiveMetastoreIface baseHandler,
- ThriftMetastoreClientManager clientManager) {
- return (CloseableThriftHiveMetastoreIface) Proxy.newProxyInstance(SaslMetastoreClientHander.class.getClassLoader(),
- INTERFACES, new SaslMetastoreClientHander(baseHandler, clientManager));
- }
-
- private SaslMetastoreClientHander(
- CloseableThriftHiveMetastoreIface handler,
- ThriftMetastoreClientManager clientManager) {
- this.baseHandler = handler;
- this.clientManager = clientManager;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- switch (method.getName()) {
- case "get_delegation_token":
- try {
- clientManager.open();
- Object token = method.invoke(baseHandler, args);
- this.delegationToken = (String) token;
- clientManager.close();
- setTokenStr2Ugi(UserGroupInformation.getCurrentUser(), (String) token);
- clientManager.open();
- return token;
- } catch (IOException e) {
- throw new MetastoreUnavailableException("Couldn't setup delegation token in the ugi: ", e);
- }
- default:
- genToken();
- return method.invoke(baseHandler, args);
- }
- } catch (InvocationTargetException e) {
- throw e.getTargetException();
- } catch (UndeclaredThrowableException e) {
- throw e.getCause();
- }
- }
-
- private void genToken() throws Throwable {
- UserGroupInformation currUser = null;
- if (delegationToken == null && (currUser = UserGroupInformation.getCurrentUser())
- != UserGroupInformation.getLoginUser()) {
-
- log.info("set {} delegation token", currUser.getShortUserName());
- String token = TokenWrappingHMSHandler.getToken();
- setTokenStr2Ugi(currUser, token);
- delegationToken = token;
- clientManager.close();
- }
- }
-
- private void setTokenStr2Ugi(UserGroupInformation currUser, String token) throws IOException {
- String newTokenSignature = clientManager.generateNewTokenSignature(tokenSignature);
- SecurityUtils.setTokenStr(currUser, token, newTokenSignature);
- }
- }
-
/*
* (non-Javadoc)
* @see com.hotels.bdp.waggledance.client.MetaStoreClientFactoryI#newInstance(org.apache.hadoop.hive.conf.HiveConf,
@@ -221,27 +147,26 @@ public CloseableThriftHiveMetastoreIface newInstance(
String name,
int reconnectionRetries,
int connectionTimeout) {
- return newInstance(name, reconnectionRetries, new ThriftMetastoreClientManager(hiveConf,
- new HiveCompatibleThriftHiveMetastoreIfaceFactory(), connectionTimeout));
+ boolean useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
+ HiveCompatibleThriftHiveMetastoreIfaceFactory factory = new HiveCompatibleThriftHiveMetastoreIfaceFactory();
+ AbstractThriftMetastoreClientManager base = null;
+ if (useSasl) {
+ base = new SaslThriftMetastoreClientManager(hiveConf, factory, connectionTimeout);
+ } else {
+ base = new ThriftMetastoreClientManager(hiveConf, factory, connectionTimeout);
+ }
+ return newInstance(name, reconnectionRetries, base);
}
@VisibleForTesting
CloseableThriftHiveMetastoreIface newInstance(
String name,
int reconnectionRetries,
- ThriftMetastoreClientManager base) {
+ AbstractThriftMetastoreClientManager base) {
ReconnectingMetastoreClientInvocationHandler reconnectingHandler = new ReconnectingMetastoreClientInvocationHandler(
- name, reconnectionRetries, base);
- if (base.isSaslEnabled()) {
- CloseableThriftHiveMetastoreIface ifaceReconnectingHandler = (CloseableThriftHiveMetastoreIface) Proxy
- .newProxyInstance(getClass().getClassLoader(), INTERFACES, reconnectingHandler);
- // wrapping the SaslMetastoreClientHander to handle delegation token if using sasl
- return SaslMetastoreClientHander.newProxyInstance(ifaceReconnectingHandler, base);
- } else {
- return (CloseableThriftHiveMetastoreIface) Proxy
- .newProxyInstance(getClass().getClassLoader(), INTERFACES, reconnectingHandler);
- }
-
+ name, reconnectionRetries, base);
+ return (CloseableThriftHiveMetastoreIface) Proxy.newProxyInstance(getClass().getClassLoader(),
+ INTERFACES, reconnectingHandler);
}
}
\ No newline at end of file
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java
new file mode 100644
index 000000000..d535a2883
--- /dev/null
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/SaslThriftMetastoreClientManager.java
@@ -0,0 +1,231 @@
+/**
+ * Copyright (C) 2016-2024 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hotels.bdp.waggledance.client;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+import java.io.IOException;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+import java.time.Duration;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
+import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore.Iface;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.service.auth.KerberosSaslHelper;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+
+import lombok.extern.log4j.Log4j2;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+
+import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory;
+import com.hotels.bdp.waggledance.context.CommonBeans;
+
+@Log4j2
+public class SaslThriftMetastoreClientManager extends AbstractThriftMetastoreClientManager {
+
+ private final boolean impersonationEnabled;
+ private static final Duration delegationTokenCacheTtl = Duration.ofHours(
+ 1); // The default lifetime in Hive is 7 days (metastore.cluster.delegation.token.max-lifetime)
+ private static final long delegationTokenCacheMaximumSize = 1000;
+ private static final LoadingCache delegationTokenCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(delegationTokenCacheTtl.toMillis(), MILLISECONDS)
+ .maximumSize(delegationTokenCacheMaximumSize)
+ .build(CacheLoader.from(SaslThriftMetastoreClientManager::loadDelegationToken));
+
+ SaslThriftMetastoreClientManager(HiveConf conf,
+ HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory,
+ int connectionTimeout) {
+ super(conf, hiveCompatibleThriftHiveMetastoreIfaceFactory, connectionTimeout);
+ impersonationEnabled = conf.getBoolean(CommonBeans.IMPERSONATION_ENABLED_KEY, false);
+ }
+
+ @Override
+ void open(HiveUgiArgs ugiArgs) {
+ if (isConnected) {
+ return;
+ }
+ createMetastoreClientAndOpen(null, ugiArgs);
+ if (impersonationEnabled) {
+ try {
+ String userName = UserGroupInformation.getCurrentUser().getShortUserName();
+ DelegationTokenKey key = new DelegationTokenKey(msUri, userName, client);
+ String delegationToken = delegationTokenCache.get(key);
+ close();
+ createMetastoreClientAndOpen(delegationToken, ugiArgs);
+ } catch (IOException | ExecutionException e) {
+ log.error("Couldn't create delegation token client");
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void createMetastoreClientAndOpen(String delegationToken, HiveUgiArgs ugiArgs) {
+ TException te = null;
+ boolean useSsl = conf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
+ boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
+ int clientSocketTimeout = (int) conf.getTimeVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+
+ for (int attempt = 0; !isConnected && (attempt < retries); ++attempt) {
+ for (URI store : metastoreUris) {
+ log.info("Trying to connect to metastore with URI {}", store);
+ try {
+ transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout,
+ connectionTimeout);
+ // Wrap thrift connection with SASL for secure connection.
+ try {
+ UserGroupInformation.setConfiguration(conf);
+
+ // check if we should use delegation tokens to authenticate
+ // the call below gets hold of the tokens if they are set up by hadoop
+ // this should happen on the map/reduce tasks if the client added the
+ // tokens into hadoop's credential store in the front end during job
+ // submission.
+ if (impersonationEnabled && delegationToken != null) {
+ // authenticate using delegation tokens via the "DIGEST" mechanism
+ transport = KerberosSaslHelper
+ .getTokenTransport(delegationToken,
+ store.getHost(), transport,
+ MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl));
+ } else {
+ String principalConfig = conf.getVar(ConfVars.METASTORE_KERBEROS_PRINCIPAL);
+ transport = UserGroupInformation.getLoginUser().doAs(
+ (PrivilegedExceptionAction) () -> KerberosSaslHelper.getKerberosTransport(
+ principalConfig, store.getHost(), transport,
+ MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl), false));
+ }
+ } catch (IOException | InterruptedException exception) {
+ log.error("Couldn't create client transport, URI " + store, exception);
+ throw new MetaException(exception.toString());
+ }
+
+ TProtocol protocol;
+ if (useCompactProtocol) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance(
+ new ThriftHiveMetastore.Client(protocol));
+ try {
+ transport.open();
+ log
+ .info("Opened a connection to metastore '"
+ + store
+ + "', total current connections to all metastores: "
+ + CONN_COUNT.incrementAndGet());
+
+ isConnected = true;
+ if (ugiArgs != null) {
+ log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store);
+ client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups());
+ } else {
+ log.debug("Connection opened with out #set_ugi call', on URI {}", store);
+ }
+ } catch (TException e) {
+ te = e;
+ if (log.isDebugEnabled()) {
+ log.warn("Failed to connect to the MetaStore Server, URI " + store, e);
+ } else {
+ // Don't print full exception trace if DEBUG is not on.
+ log.warn("Failed to connect to the MetaStore Server, URI {}", store);
+ }
+ }
+ } catch (MetaException e) {
+ log.error("Unable to connect to metastore with URI " + store + " in attempt " + attempt,
+ e);
+ }
+ if (isConnected) {
+ break;
+ }
+ }
+ // Wait before launching the next round of connection retries.
+ if (!isConnected && (retryDelaySeconds > 0) && ((attempt + 1) < retries)) {
+ try {
+ log.info("Waiting {} seconds before next connection attempt.", retryDelaySeconds);
+ Thread.sleep(retryDelaySeconds * 1000);
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+
+ if (!isConnected) {
+ throw new RuntimeException("Could not connect to meta store using any of the URIs ["
+ + msUri
+ + "] provided. Most recent failure: "
+ + StringUtils.stringifyException(te));
+ }
+ log.debug("Connected to metastore.");
+ }
+
+ private static class DelegationTokenKey {
+
+ String msUri;
+ String username;
+ ThriftHiveMetastore.Iface client;
+
+ public DelegationTokenKey(String msUri, String username, Iface client) {
+ this.msUri = msUri;
+ this.username = username;
+ this.client = client;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DelegationTokenKey that = (DelegationTokenKey) o;
+ return Objects.equals(msUri, that.msUri) && Objects.equals(username,
+ that.username);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(msUri, username);
+ }
+
+ }
+
+ private static String loadDelegationToken(DelegationTokenKey key) {
+ try {
+ return key.client.get_delegation_token(key.username, key.username);
+ } catch (TException e) {
+ log.error("could not get delegation token,username:{},uri: {}", key.username, key.msUri);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java
index 6ff94e8d0..6a68c689f 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/client/ThriftMetastoreClientManager.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,98 +15,32 @@
*/
package com.hotels.bdp.waggledance.client;
-import java.io.Closeable;
-import java.io.IOException;
import java.net.URI;
-import java.util.Random;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.conf.HiveConfUtil;
-import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore;
-import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.service.auth.KerberosSaslHelper;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
-import org.apache.thrift.transport.TTransport;
import lombok.extern.log4j.Log4j2;
import com.hotels.bdp.waggledance.client.compatibility.HiveCompatibleThriftHiveMetastoreIfaceFactory;
@Log4j2
-class ThriftMetastoreClientManager implements Closeable {
-
-
- private static final AtomicInteger CONN_COUNT = new AtomicInteger(0);
- private final HiveConf conf;
- private final HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory;
- private final URI[] metastoreUris;
- private ThriftHiveMetastore.Iface client = null;
- private TTransport transport = null;
- private boolean isConnected = false;
- // for thrift connects
- private int retries = 5;
- private long retryDelaySeconds = 0;
-
- private final int connectionTimeout;
- private final String msUri;
+class ThriftMetastoreClientManager extends AbstractThriftMetastoreClientManager {
ThriftMetastoreClientManager(
HiveConf conf,
HiveCompatibleThriftHiveMetastoreIfaceFactory hiveCompatibleThriftHiveMetastoreIfaceFactory,
int connectionTimeout) {
- this.conf = conf;
- this.hiveCompatibleThriftHiveMetastoreIfaceFactory = hiveCompatibleThriftHiveMetastoreIfaceFactory;
- this.connectionTimeout = connectionTimeout;
- msUri = conf.getVar(ConfVars.METASTOREURIS);
-
- if (HiveConfUtil.isEmbeddedMetaStore(msUri)) {
- throw new RuntimeException("You can't waggle an embedded metastore");
- }
-
- // get the number retries
- retries = HiveConf.getIntVar(conf, ConfVars.METASTORETHRIFTCONNECTIONRETRIES);
- retryDelaySeconds = conf.getTimeVar(ConfVars.METASTORE_CLIENT_CONNECT_RETRY_DELAY, TimeUnit.SECONDS);
-
- // user wants file store based configuration
- if (msUri != null) {
- String[] metastoreUrisString = msUri.split(",");
- metastoreUris = new URI[metastoreUrisString.length];
- try {
- int i = 0;
- for (String s : metastoreUrisString) {
- URI tmpUri = new URI(s);
- if (tmpUri.getScheme() == null) {
- throw new IllegalArgumentException("URI: " + s + " does not have a scheme");
- }
- metastoreUris[i++] = tmpUri;
- }
- } catch (IllegalArgumentException e) {
- throw (e);
- } catch (Exception e) {
- String exInfo = "Got exception: " + e.getClass().getName() + " " + e.getMessage();
- log.error(exInfo, e);
- throw new RuntimeException(exInfo, e);
- }
- } else {
- log.error("NOT getting uris from conf");
- throw new RuntimeException("MetaStoreURIs not found in conf file");
- }
- }
-
- void open() {
- open(null);
+ super(conf, hiveCompatibleThriftHiveMetastoreIfaceFactory, connectionTimeout);
}
void open(HiveUgiArgs ugiArgs) {
@@ -114,8 +48,6 @@ void open(HiveUgiArgs ugiArgs) {
return;
}
TException te = null;
- boolean useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
- boolean useSsl = conf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
boolean useFramedTransport = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
boolean useCompactProtocol = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_COMPACT_PROTOCOL);
int clientSocketTimeout = (int) conf.getTimeVar(ConfVars.METASTORE_CLIENT_SOCKET_TIMEOUT, TimeUnit.MILLISECONDS);
@@ -123,73 +55,41 @@ void open(HiveUgiArgs ugiArgs) {
for (int attempt = 0; !isConnected && (attempt < retries); ++attempt) {
for (URI store : metastoreUris) {
log.info("Trying to connect to metastore with URI {}", store);
+ transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout);
+ if (useFramedTransport) {
+ transport = new TFramedTransport(transport);
+ }
+ TProtocol protocol;
+ if (useCompactProtocol) {
+ protocol = new TCompactProtocol(transport);
+ } else {
+ protocol = new TBinaryProtocol(transport);
+ }
+ client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance(new ThriftHiveMetastore.Client(protocol));
try {
- transport = new TSocket(store.getHost(), store.getPort(), clientSocketTimeout, connectionTimeout);
- if (useSasl) {
- // Wrap thrift connection with SASL for secure connection.
- try {
- UserGroupInformation.setConfiguration(conf);
-
- // check if we should use delegation tokens to authenticate
- // the call below gets hold of the tokens if they are set up by hadoop
- // this should happen on the map/reduce tasks if the client added the
- // tokens into hadoop's credential store in the front end during job
- // submission.
- String tokenSig = conf.getVar(ConfVars.METASTORE_TOKEN_SIGNATURE);
- // tokenSig could be null
- String tokenStrForm = SecurityUtils.getTokenStrForm(tokenSig);
- if (tokenStrForm != null) {
- // authenticate using delegation tokens via the "DIGEST" mechanism
- transport = KerberosSaslHelper
- .getTokenTransport(tokenStrForm, store.getHost(), transport,
- MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl));
- } else {
- String principalConfig = conf.getVar(ConfVars.METASTORE_KERBEROS_PRINCIPAL);
- transport = KerberosSaslHelper
- .getKerberosTransport(principalConfig, store.getHost(), transport,
- MetaStoreUtils.getMetaStoreSaslProperties(conf, useSsl), false);
- }
- } catch (IOException ioe) {
- log.error("Couldn't create client transport, URI " + store, ioe);
- throw new MetaException(ioe.toString());
- }
- } else if (useFramedTransport) {
- transport = new TFramedTransport(transport);
+ transport.open();
+ log
+ .info("Opened a connection to metastore '"
+ + store
+ + "', total current connections to all metastores: "
+ + CONN_COUNT.incrementAndGet());
+
+ isConnected = true;
+ if (ugiArgs != null) {
+ log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store);
+ client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups());
+ } else {
+ log.debug("Connection opened with out #set_ugi call', on URI {}", store);
}
- TProtocol protocol;
- if (useCompactProtocol) {
- protocol = new TCompactProtocol(transport);
+ } catch (TException e) {
+ te = e;
+ if (log.isDebugEnabled()) {
+ log.warn("Failed to connect to the MetaStore Server, URI " + store, e);
} else {
- protocol = new TBinaryProtocol(transport);
+ // Don't print full exception trace if DEBUG is not on.
+ log.warn("Failed to connect to the MetaStore Server, URI {}", store);
}
- client = hiveCompatibleThriftHiveMetastoreIfaceFactory.newInstance(new ThriftHiveMetastore.Client(protocol));
- try {
- transport.open();
- log
- .info("Opened a connection to metastore '"
- + store
- + "', total current connections to all metastores: "
- + CONN_COUNT.incrementAndGet());
-
- isConnected = true;
- if (ugiArgs != null) {
- log.info("calling #set_ugi for user '{}', on URI {}", ugiArgs.getUser(), store);
- client.set_ugi(ugiArgs.getUser(), ugiArgs.getGroups());
- } else {
- log.debug("Connection opened with out #set_ugi call', on URI {}", store);
- }
- } catch (TException e) {
- te = e;
- if (log.isDebugEnabled()) {
- log.warn("Failed to connect to the MetaStore Server, URI " + store, e);
- } else {
- // Don't print full exception trace if DEBUG is not on.
- log.warn("Failed to connect to the MetaStore Server, URI {}", store);
- }
}
- } catch (MetaException e) {
- log.error("Unable to connect to metastore with URI " + store + " in attempt " + attempt, e);
- }
if (isConnected) {
break;
}
@@ -212,76 +112,4 @@ void open(HiveUgiArgs ugiArgs) {
log.debug("Connected to metastore.");
}
- void reconnect(HiveUgiArgs ugiArgs) {
- close();
- // Swap the first element of the metastoreUris[] with a random element from the rest
- // of the array. Rationale being that this method will generally be called when the default
- // connection has died and the default connection is likely to be the first array element.
- promoteRandomMetaStoreURI();
- open(ugiArgs);
- }
-
- public String getHiveConfValue(String key, String defaultValue) {
- return conf.get(key, defaultValue);
- }
-
- public void setHiveConfValue(String key, String value) {
- conf.set(key, value);
- }
-
- public String generateNewTokenSignature(String defaultTokenSignature) {
- String tokenSignature = conf.get(ConfVars.METASTORE_TOKEN_SIGNATURE.varname,
- defaultTokenSignature);
- conf.set(ConfVars.METASTORE_TOKEN_SIGNATURE.varname,
- tokenSignature);
- return tokenSignature;
- }
-
- public Boolean isSaslEnabled() {
- return conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
- }
-
- @Override
- public void close() {
- if (!isConnected) {
- return;
- }
- isConnected = false;
- try {
- if (client != null) {
- client.shutdown();
- }
- } catch (TException e) {
- log.debug("Unable to shutdown metastore client. Will try closing transport directly.", e);
- }
- // Transport would have got closed via client.shutdown(), so we don't need this, but
- // just in case, we make this call.
- if ((transport != null) && transport.isOpen()) {
- transport.close();
- transport = null;
- }
- log.info("Closed a connection to metastore, current connections: {}", CONN_COUNT.decrementAndGet());
- }
-
- boolean isOpen() {
- return (transport != null) && transport.isOpen();
- }
-
- protected ThriftHiveMetastore.Iface getClient() {
- return client;
- }
-
- /**
- * Swaps the first element of the metastoreUris array with a random element from the remainder of the array.
- */
- private void promoteRandomMetaStoreURI() {
- if (metastoreUris.length <= 1) {
- return;
- }
- Random rng = new Random();
- int index = rng.nextInt(metastoreUris.length - 1) + 1;
- URI tmp = metastoreUris[0];
- metastoreUris[0] = metastoreUris[index];
- metastoreUris[index] = tmp;
- }
}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java
index 98be2ca65..83a4e58d0 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/context/CommonBeans.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,6 +33,8 @@
@org.springframework.context.annotation.Configuration
public class CommonBeans {
+ public final static String IMPERSONATION_ENABLED_KEY = "hive.metastore.thrift.impersonation.enabled";
+
@Bean
public HiveConf hiveConf(WaggleDanceConfiguration waggleDanceConfiguration) {
Map confProps = waggleDanceConfiguration.getConfigurationProperties();
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java
index a22b695bf..880f81ced 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandler.java
@@ -214,6 +214,7 @@
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanRequest;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -245,16 +246,19 @@ class FederatedHMSHandler extends FacebookBase implements CloseableIHMSHandler {
private final NotifyingFederationService notifyingFederationService;
private final WaggleDanceConfiguration waggleDanceConfiguration;
private Configuration conf;
+ private SaslServerWrapper saslServerWrapper;
FederatedHMSHandler(
MappingEventListener databaseMappingService,
NotifyingFederationService notifyingFederationService,
- WaggleDanceConfiguration waggleDanceConfiguration) {
+ WaggleDanceConfiguration waggleDanceConfiguration,
+ SaslServerWrapper saslServerWrapper) {
super("waggle-dance-handler");
this.databaseMappingService = databaseMappingService;
this.notifyingFederationService = notifyingFederationService;
this.waggleDanceConfiguration = waggleDanceConfiguration;
this.notifyingFederationService.subscribe(databaseMappingService);
+ this.saslServerWrapper= saslServerWrapper;
}
private ThriftHiveMetastore.Iface getPrimaryClient() throws TException {
@@ -1369,19 +1373,56 @@ public List set_ugi(String user_name, List group_names) throws M
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public String get_delegation_token(String token_owner, String renewer_kerberos_principal_name)
throws MetaException, TException {
- return getPrimaryClient().get_delegation_token(token_owner, renewer_kerberos_principal_name);
+ try {
+ return saslServerWrapper.getDelegationTokenManager()
+ .getDelegationToken(token_owner, renewer_kerberos_principal_name,
+ getIPAddressFromSaslServer());
+ } catch (IOException | InterruptedException e) {
+ throw new MetaException(e.getMessage());
+ }
+ }
+
+ private String getIPAddressFromSaslServer() {
+ HadoopThriftAuthBridge.Server saslServer = saslServerWrapper.getSaslServer();
+ if (saslServer != null && saslServer.getRemoteAddress() != null) {
+ return saslServer.getRemoteAddress().getHostAddress();
+ }
+ return null;
}
@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public long renew_delegation_token(String token_str_form) throws MetaException, TException {
- return getPrimaryClient().renew_delegation_token(token_str_form);
+ try {
+ return saslServerWrapper.getDelegationTokenManager()
+ .renewDelegationToken(token_str_form);
+ } catch (IOException e) {
+ throw new MetaException(e.getMessage());
+ } catch (Exception e) {
+ throw newMetaException(e);
+ }
}
@Override
@Loggable(value = Loggable.DEBUG, skipResult = true, name = INVOCATION_LOG_NAME)
public void cancel_delegation_token(String token_str_form) throws MetaException, TException {
- getPrimaryClient().cancel_delegation_token(token_str_form);
+ try {
+ saslServerWrapper.getDelegationTokenManager()
+ .cancelDelegationToken(token_str_form);
+ } catch (IOException e) {
+ throw new MetaException(e.getMessage());
+ } catch (Exception e) {
+ throw newMetaException(e);
+ }
+ }
+
+ private static MetaException newMetaException(Exception e) {
+ if (e instanceof MetaException) {
+ return (MetaException)e;
+ }
+ MetaException me = new MetaException(e.toString());
+ me.initCause(e);
+ return me;
}
@Override
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java
index d1d501ace..d46015eb7 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactory.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -38,6 +38,7 @@ public class FederatedHMSHandlerFactory {
private final MetaStoreMappingFactory metaStoreMappingFactory;
private final WaggleDanceConfiguration waggleDanceConfiguration;
private final QueryMapping queryMapping;
+ private SaslServerWrapper saslServerWrapper;
@Autowired
public FederatedHMSHandlerFactory(
@@ -45,20 +46,23 @@ public FederatedHMSHandlerFactory(
NotifyingFederationService notifyingFederationService,
MetaStoreMappingFactory metaStoreMappingFactory,
WaggleDanceConfiguration waggleDanceConfiguration,
- QueryMapping queryMapping) {
+ QueryMapping queryMapping,
+ SaslServerWrapper saslServerWrapper) {
this.hiveConf = hiveConf;
this.notifyingFederationService = notifyingFederationService;
this.metaStoreMappingFactory = metaStoreMappingFactory;
this.waggleDanceConfiguration = waggleDanceConfiguration;
this.queryMapping = queryMapping;
+ this.saslServerWrapper = saslServerWrapper;
}
public CloseableIHMSHandler create() {
MappingEventListener service = createDatabaseMappingService();
MonitoredDatabaseMappingService monitoredService = new MonitoredDatabaseMappingService(service);
- CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService, notifyingFederationService,
- waggleDanceConfiguration);
+ CloseableIHMSHandler baseHandler = new FederatedHMSHandler(monitoredService,
+ notifyingFederationService,
+ waggleDanceConfiguration, saslServerWrapper);
HiveConf conf = new HiveConf(hiveConf);
baseHandler.setConf(conf);
return baseHandler;
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java
index 05c7797a3..76b3d29c0 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/MetaStoreProxyServer.java
@@ -43,18 +43,14 @@
import org.apache.hadoop.hive.metastore.TServerSocketKeepAlive;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.StringUtils;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.server.TThreadPoolServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.apache.thrift.transport.TTransportFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -87,17 +83,20 @@ public class MetaStoreProxyServer implements ApplicationRunner {
private final Lock startLock;
private final Condition startCondition;
private TServer tServer;
+ private SaslServerWrapper saslServerWrapper;
@Autowired
public MetaStoreProxyServer(
HiveConf hiveConf,
WaggleDanceConfiguration waggleDanceConfiguration,
- TProcessorFactory tProcessorFactory) {
+ TProcessorFactory tProcessorFactory,
+ SaslServerWrapper saslServerWrapper) {
this.hiveConf = hiveConf;
this.waggleDanceConfiguration = waggleDanceConfiguration;
this.tProcessorFactory = tProcessorFactory;
startLock = new ReentrantLock();
startCondition = startLock.newCondition();
+ this.saslServerWrapper = saslServerWrapper;
}
private boolean isRunning() {
@@ -162,7 +161,10 @@ private void startWaggleDance(
boolean tcpKeepAlive = hiveConf.getBoolVar(ConfVars.METASTORE_TCP_KEEP_ALIVE);
boolean useFramedTransport = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_FRAMED_TRANSPORT);
boolean useSSL = hiveConf.getBoolVar(ConfVars.HIVE_METASTORE_USE_SSL);
- boolean useSASL = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
+ boolean useSasl = hiveConf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
+
+ //load 'hadoop.proxyuser' configs
+ ProxyUsers.refreshSuperUserGroupsConfiguration(hiveConf);
TServerSocket serverSocket = createServerSocket(useSSL, waggleDanceConfiguration.getPort());
@@ -170,15 +172,9 @@ private void startWaggleDance(
serverSocket = new TServerSocketKeepAlive(serverSocket);
}
- HadoopThriftAuthBridge.Server saslServer = null;
-
- if(useSASL) {
- UserGroupInformation.setConfiguration(hiveConf);
- saslServer = SaslHelper.createSaslServer(hiveConf);
- }
-
- TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSASL, saslServer);
- TProcessorFactory tProcessorFactory = getTProcessorFactory(useSASL, saslServer);
+ TTransportFactory transFactory = createTTransportFactory(useFramedTransport, useSasl,
+ saslServerWrapper.getSaslServer());
+ TProcessorFactory tProcessorFactory = getTProcessorFactory(useSasl, saslServerWrapper.getSaslServer());
log.info("Starting WaggleDance Server");
TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
@@ -192,28 +188,6 @@ private void startWaggleDance(
.requestTimeoutUnit(waggleDanceConfiguration.getThriftServerRequestTimeoutUnit());
tServer = new TThreadPoolServer(args);
- if (useSASL){
- TServerEventHandler tServerEventHandler = new TServerEventHandler() {
- @Override
- public void preServe() {
- }
-
- @Override
- public ServerContext createContext(TProtocol tProtocol, TProtocol tProtocol1) {
- return null;
- }
-
- @Override
- public void deleteContext(ServerContext serverContext, TProtocol tProtocol, TProtocol tProtocol1) {
- TokenWrappingHMSHandler.removeToken();
- }
-
- @Override
- public void processContext(ServerContext serverContext, TTransport tTransport, TTransport tTransport1) {
- }
- };
- tServer.setServerEventHandler(tServerEventHandler);
- }
log.info("Started the new WaggleDance on port [{}]...", waggleDanceConfiguration.getPort());
log.info("Options.minWorkerThreads = {}", minWorkerThreads);
log.info("Options.maxWorkerThreads = {}", maxWorkerThreads);
@@ -331,5 +305,4 @@ public void waitUntilStarted(int retries, long waitDelay, TimeUnit waitDelayTime
}
}
}
-
}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java
new file mode 100644
index 000000000..ae8326e7e
--- /dev/null
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/SaslServerWrapper.java
@@ -0,0 +1,99 @@
+/**
+ * Copyright (C) 2016-2024 Expedia, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.hotels.bdp.waggledance.server;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.security.DBTokenStore;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
+import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge.Server;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.thrift.transport.TTransportException;
+import org.springframework.stereotype.Component;
+
+import lombok.Getter;
+import lombok.extern.log4j.Log4j2;
+
+import com.hotels.bdp.waggledance.util.SaslHelper;
+
+@Component
+@Log4j2
+public class SaslServerWrapper {
+
+ private MetastoreDelegationTokenManager delegationTokenManager;
+ @Getter
+ private static boolean useSasl;
+
+ private HadoopThriftAuthBridge.Server saslServer = null;
+
+ protected SaslServerWrapper(HiveConf conf)
+ throws TTransportException {
+ useSasl = conf.getBoolVar(ConfVars.METASTORE_USE_THRIFT_SASL);
+ if (!useSasl) {
+ return;
+ }
+
+ UserGroupInformation.setConfiguration(conf);
+
+ if (SaslHelper.isSASLWithKerberizedHadoop(conf)) {
+ saslServer =
+ HadoopThriftAuthBridge.getBridge().createServer(
+ conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB),
+ conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
+ conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL));
+
+ // Start delegation token manager
+ delegationTokenManager = new MetastoreDelegationTokenManager();
+ try {
+ Object baseHandler = null;
+ String tokenStoreClass = conf.getVar(
+ HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS);
+
+ if (tokenStoreClass.equals(DBTokenStore.class.getName())) {
+ // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It
+ // will be got via Hive.get(conf).getMSC in a thread where the DelegationTokenStore
+ // is called. To avoid the cyclic reference, we pass the Hive class to DBTokenStore where
+ // it is used to get a threadLocal Hive object with a synchronized MetaStoreClient using
+ // Java reflection.
+ // Note: there will be two HS2 life-long opened MSCs, one is stored in HS2 thread local
+ // Hive object, the other is in a daemon thread spawned in DelegationTokenSecretManager
+ // to remove expired tokens.
+ baseHandler = Hive.class;
+ }
+
+ delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler,
+ HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
+ saslServer.setSecretManager(delegationTokenManager.getSecretManager());
+ } catch (IOException e) {
+ throw new TTransportException("Failed to start token manager", e);
+ }
+
+ }
+ }
+
+ public MetastoreDelegationTokenManager getDelegationTokenManager() {
+ return delegationTokenManager;
+ }
+
+ public Server getSaslServer() {
+ return saslServer;
+ }
+
+}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java
index 63b1fe2bc..2172795f0 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TSetIpAddressProcessorFactory.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -58,18 +58,10 @@ public TProcessor getProcessor(TTransport transport) {
}
CloseableIHMSHandler baseHandler = federatedHMSHandlerFactory.create();
- boolean useSASL = hiveConf.getBoolVar(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL);
- if (useSASL) {
- IHMSHandler tokenHandler = TokenWrappingHMSHandler.newProxyInstance(baseHandler, useSASL);
- IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(tokenHandler), hiveConf,
- false);
- return new TSetIpAddressProcessor<>(handler);
- } else {
- IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf,
- false);
- transportMonitor.monitor(transport, baseHandler);
- return new TSetIpAddressProcessor<>(handler);
- }
+ IHMSHandler handler = newRetryingHMSHandler(ExceptionWrappingHMSHandler.newProxyInstance(baseHandler), hiveConf,
+ false);
+ transportMonitor.monitor(transport, baseHandler);
+ return new TSetIpAddressProcessor<>(handler);
} catch (MetaException | ReflectiveOperationException | RuntimeException e) {
throw new RuntimeException("Error creating TProcessor", e);
}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java
deleted file mode 100644
index 4f0f7c06f..000000000
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/server/TokenWrappingHMSHandler.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Copyright (C) 2016-2023 Expedia, Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.hotels.bdp.waggledance.server;
-
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.lang.reflect.UndeclaredThrowableException;
-
-import org.apache.hadoop.hive.metastore.IHMSHandler;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import lombok.extern.log4j.Log4j2;
-
-@Log4j2
-public class TokenWrappingHMSHandler implements InvocationHandler {
-
- private final IHMSHandler baseHandler;
- private final Boolean useSasl;
-
- private static final ThreadLocal tokens = new ThreadLocal() {
- @Override
- protected String initialValue() {
- return "";
- }
- };
-
- public static String getToken() {
- return tokens.get();
- }
-
- public static void removeToken() {
- tokens.remove();
- }
-
- public static IHMSHandler newProxyInstance(IHMSHandler baseHandler, boolean useSasl) {
- return (IHMSHandler) Proxy.newProxyInstance(TokenWrappingHMSHandler.class.getClassLoader(),
- new Class[] { IHMSHandler.class }, new TokenWrappingHMSHandler(baseHandler, useSasl));
- }
-
- public TokenWrappingHMSHandler(IHMSHandler baseHandler, boolean useSasl) {
- this.baseHandler = baseHandler;
- this.useSasl = useSasl;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- try {
- // We will get the token when proxy user call in the first time.
- // Login user must open connect in `TProcessorFactorySaslDecorator#getProcessor`
- // so we can reuse this connect to get proxy user delegation token
- if (useSasl) {
- UserGroupInformation currUser = null;
- String token = null;
- // if call get_delegation_token , will call it directly and set token to threadlocal
-
- switch (method.getName()) {
- case "get_delegation_token":
- token = (String) method.invoke(baseHandler, args);
- tokens.set(token);
- return token;
- case "close":
- tokens.remove();
- return method.invoke(baseHandler, args);
- default:
- if (tokens.get().isEmpty() && (currUser = UserGroupInformation.getCurrentUser())
- != UserGroupInformation.getLoginUser()) {
-
- String shortName = currUser.getShortUserName();
- token = baseHandler.get_delegation_token(shortName, shortName);
- log.info("get delegation token by user {}", shortName);
- tokens.set(token);
- }
- return method.invoke(baseHandler, args);
- }
- }
- return method.invoke(baseHandler, args);
- } catch (InvocationTargetException e) {
- // Need to unwrap this, so callers get the correct exception thrown by the handler.
- throw e.getCause();
- } catch (UndeclaredThrowableException e) {
- // Need to unwrap this, so callers get the correct exception thrown by the handler.
- throw e.getCause();
- }
-
- }
-
-}
diff --git a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java
index 87b07cded..1063553c8 100644
--- a/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java
+++ b/waggle-dance-core/src/main/java/com/hotels/bdp/waggledance/util/SaslHelper.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2023 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -25,10 +24,7 @@
import javax.security.sasl.Sasl;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.security.DBTokenStore;
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
-import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
-import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hive.service.auth.HiveAuthConstants;
import org.apache.hive.service.auth.PlainSaslHelper;
import org.apache.hive.service.auth.SaslQOP;
@@ -42,43 +38,6 @@
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class SaslHelper {
- public static HadoopThriftAuthBridge.Server createSaslServer(HiveConf conf) throws TTransportException {
- HadoopThriftAuthBridge.Server saslServer = null;
- if (SaslHelper.isSASLWithKerberizedHadoop(conf)) {
- saslServer =
- HadoopThriftAuthBridge.getBridge().createServer(
- conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_KEYTAB),
- conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL),
- conf.getVar(HiveConf.ConfVars.HIVE_SERVER2_CLIENT_KERBEROS_PRINCIPAL));
-
- // Start delegation token manager
- MetastoreDelegationTokenManager delegationTokenManager = new MetastoreDelegationTokenManager();
- try {
- Object baseHandler = null;
- String tokenStoreClass = conf.getVar(HiveConf.ConfVars.METASTORE_CLUSTER_DELEGATION_TOKEN_STORE_CLS);
-
- if (tokenStoreClass.equals(DBTokenStore.class.getName())) {
- // IMetaStoreClient is needed to access token store if DBTokenStore is to be used. It
- // will be got via Hive.get(conf).getMSC in a thread where the DelegationTokenStore
- // is called. To avoid the cyclic reference, we pass the Hive class to DBTokenStore where
- // it is used to get a threadLocal Hive object with a synchronized MetaStoreClient using
- // Java reflection.
- // Note: there will be two HS2 life-long opened MSCs, one is stored in HS2 thread local
- // Hive object, the other is in a daemon thread spawned in DelegationTokenSecretManager
- // to remove expired tokens.
- baseHandler = Hive.class;
- }
-
- delegationTokenManager.startDelegationTokenSecretManager(conf, baseHandler, HadoopThriftAuthBridge.Server.ServerMode.METASTORE);
- saslServer.setSecretManager(delegationTokenManager.getSecretManager());
- }
- catch (IOException e) {
- throw new TTransportException("Failed to start token manager", e);
- }
- }
- return saslServer;
- }
-
public static boolean isSASLWithKerberizedHadoop(HiveConf hiveconf) {
return "kerberos".equalsIgnoreCase(hiveconf.get(HADOOP_SECURITY_AUTHENTICATION, "simple"))
&& !hiveconf.getVar(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION).equalsIgnoreCase(HiveAuthConstants.AuthTypes.NOSASL.getAuthName());
diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/service/impl/YamlFederatedMetaStoreStorageTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/service/impl/YamlFederatedMetaStoreStorageTest.java
index 80faa7bfc..62d950266 100644
--- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/service/impl/YamlFederatedMetaStoreStorageTest.java
+++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/mapping/service/impl/YamlFederatedMetaStoreStorageTest.java
@@ -216,32 +216,37 @@ public void saveFederationWriteFederations() throws Exception {
storage.insert(newFederatedInstance);
storage.saveFederation();
List lines = Files.readAllLines(file.toPath(), StandardCharsets.UTF_8);
- assertThat(lines.size(), is(25));
- assertThat(lines.get(0), is("primary-meta-store:"));
- assertThat(lines.get(1), is(" access-control-type: READ_ONLY"));
- assertThat(lines.get(2), is(" database-prefix: ''"));
- assertThat(lines.get(3), is(" latency: 0"));
- assertThat(lines.get(4), is(" name: hcom_1"));
- assertThat(lines.get(5), is(" remote-meta-store-uris: thrift://localhost:19083"));
- assertThat(lines.get(6), is("federated-meta-stores:"));
- assertThat(lines.get(7), is("- access-control-type: READ_ONLY"));
- assertThat(lines.get(8), is(" configuration-properties:"));
- assertThat(lines.get(9), is(" hive.metastore.kerberos.principal: hive/_HOST@REALM"));
- assertThat(lines.get(10), is(" database-prefix: hcom_2_"));
- assertThat(lines.get(11), is(" hive-metastore-filter-hook: filter.hook.class"));
- assertThat(lines.get(12), is(" latency: 0"));
- assertThat(lines.get(13), is(" mapped-databases:"));
- assertThat(lines.get(14), is(" - db1"));
- assertThat(lines.get(15), is(" - db2"));
- assertThat(lines.get(16), is(" mapped-tables:"));
- assertThat(lines.get(17), is(" - database: db1"));
- assertThat(lines.get(18), is(" mapped-tables:"));
- assertThat(lines.get(19), is(" - tbl1"));
- assertThat(lines.get(20), is(" - database: db2"));
- assertThat(lines.get(21), is(" mapped-tables:"));
- assertThat(lines.get(22), is(" - tbl2"));
- assertThat(lines.get(23), is(" name: hcom_2"));
- assertThat(lines.get(24), is(" remote-meta-store-uris: thrift://localhost:29083"));
+ assertThat(lines.size(), is(27));
+ int i = 0;
+ while (i < lines.size()) {
+ assertThat(lines.get(i++), is("primary-meta-store:"));
+ assertThat(lines.get(i++), is(" access-control-type: READ_ONLY"));
+ assertThat(lines.get(i++), is(" database-prefix: ''"));
+ assertThat(lines.get(i++), is(" impersonation-enabled: false"));
+ assertThat(lines.get(i++), is(" latency: 0"));
+ assertThat(lines.get(i++), is(" name: hcom_1"));
+ assertThat(lines.get(i++), is(" remote-meta-store-uris: thrift://localhost:19083"));
+ assertThat(lines.get(i++), is("federated-meta-stores:"));
+ assertThat(lines.get(i++), is("- access-control-type: READ_ONLY"));
+ assertThat(lines.get(i++), is(" configuration-properties:"));
+ assertThat(lines.get(i++), is(" hive.metastore.kerberos.principal: hive/_HOST@REALM"));
+ assertThat(lines.get(i++), is(" database-prefix: hcom_2_"));
+ assertThat(lines.get(i++), is(" hive-metastore-filter-hook: filter.hook.class"));
+ assertThat(lines.get(i++), is(" impersonation-enabled: false"));
+ assertThat(lines.get(i++), is(" latency: 0"));
+ assertThat(lines.get(i++), is(" mapped-databases:"));
+ assertThat(lines.get(i++), is(" - db1"));
+ assertThat(lines.get(i++), is(" - db2"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - database: db1"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - tbl1"));
+ assertThat(lines.get(i++), is(" - database: db2"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - tbl2"));
+ assertThat(lines.get(i++), is(" name: hcom_2"));
+ assertThat(lines.get(i++), is(" remote-meta-store-uris: thrift://localhost:29083"));
+ }
}
@Test
@@ -300,31 +305,36 @@ public void savePrimaryWriteFederations() throws Exception {
storage.insert(newFederatedInstance("hcom_2", "thrift://localhost:29083"));
storage.saveFederation();
List lines = Files.readAllLines(file.toPath(), StandardCharsets.UTF_8);
- assertThat(lines.size(), is(24));
- assertThat(lines.get(0), is("primary-meta-store:"));
- assertThat(lines.get(1), is(" access-control-type: READ_ONLY"));
- assertThat(lines.get(2), is(" configuration-properties:"));
- assertThat(lines.get(3), is(" hive.metastore.kerberos.principal: hive/_HOST@REALM"));
- assertThat(lines.get(4), is(" database-prefix: ''"));
- assertThat(lines.get(5), is(" latency: 0"));
- assertThat(lines.get(6), is(" mapped-databases:"));
- assertThat(lines.get(7), is(" - db1"));
- assertThat(lines.get(8), is(" - db2"));
- assertThat(lines.get(9), is(" mapped-tables:"));
- assertThat(lines.get(10), is(" - database: db1"));
- assertThat(lines.get(11), is(" mapped-tables:"));
- assertThat(lines.get(12), is(" - tbl1"));
- assertThat(lines.get(13), is(" - database: db2"));
- assertThat(lines.get(14), is(" mapped-tables:"));
- assertThat(lines.get(15), is(" - tbl2"));
- assertThat(lines.get(16), is(" name: hcom_1"));
- assertThat(lines.get(17), is(" remote-meta-store-uris: thrift://localhost:19083"));
- assertThat(lines.get(18), is("federated-meta-stores:"));
- assertThat(lines.get(19), is("- access-control-type: READ_ONLY"));
- assertThat(lines.get(20), is(" database-prefix: hcom_2_"));
- assertThat(lines.get(21), is(" latency: 0"));
- assertThat(lines.get(22), is(" name: hcom_2"));
- assertThat(lines.get(23), is(" remote-meta-store-uris: thrift://localhost:29083"));
+ assertThat(lines.size(), is(26));
+ int i = 0;
+ while (i < lines.size()) {
+ assertThat(lines.get(i++), is("primary-meta-store:"));
+ assertThat(lines.get(i++), is(" access-control-type: READ_ONLY"));
+ assertThat(lines.get(i++), is(" configuration-properties:"));
+ assertThat(lines.get(i++), is(" hive.metastore.kerberos.principal: hive/_HOST@REALM"));
+ assertThat(lines.get(i++), is(" database-prefix: ''"));
+ assertThat(lines.get(i++), is(" impersonation-enabled: false"));
+ assertThat(lines.get(i++), is(" latency: 0"));
+ assertThat(lines.get(i++), is(" mapped-databases:"));
+ assertThat(lines.get(i++), is(" - db1"));
+ assertThat(lines.get(i++), is(" - db2"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - database: db1"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - tbl1"));
+ assertThat(lines.get(i++), is(" - database: db2"));
+ assertThat(lines.get(i++), is(" mapped-tables:"));
+ assertThat(lines.get(i++), is(" - tbl2"));
+ assertThat(lines.get(i++), is(" name: hcom_1"));
+ assertThat(lines.get(i++), is(" remote-meta-store-uris: thrift://localhost:19083"));
+ assertThat(lines.get(i++), is("federated-meta-stores:"));
+ assertThat(lines.get(i++), is("- access-control-type: READ_ONLY"));
+ assertThat(lines.get(i++), is(" database-prefix: hcom_2_"));
+ assertThat(lines.get(i++), is(" impersonation-enabled: false"));
+ assertThat(lines.get(i++), is(" latency: 0"));
+ assertThat(lines.get(i++), is(" name: hcom_2"));
+ assertThat(lines.get(i++), is(" remote-meta-store-uris: thrift://localhost:29083"));
+ }
}
private PrimaryMetaStore newPrimaryInstance(String name, String remoteMetaStoreUris) {
diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java
index b1e1984ee..638779e9d 100644
--- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java
+++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerFactoryTest.java
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2016-2021 Expedia, Inc.
+ * Copyright (C) 2016-2024 Expedia, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -44,13 +44,14 @@ public class FederatedHMSHandlerFactoryTest {
private @Mock NotifyingFederationService notifyingFederationService;
private @Mock MetaStoreMappingFactory metaStoreMappingFactory;
private @Mock QueryMapping queryMapping;
+ private @Mock SaslServerWrapper saslServerWrapper;
private FederatedHMSHandlerFactory factory;
@Before
public void init() {
when(notifyingFederationService.getAll()).thenReturn(new ArrayList<>());
factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory,
- waggleDanceConfiguration, queryMapping);
+ waggleDanceConfiguration, queryMapping, saslServerWrapper);
}
@Test
@@ -64,7 +65,7 @@ public void typical() throws Exception {
public void prefixedDatabase() throws Exception {
when(waggleDanceConfiguration.getDatabaseResolution()).thenReturn(DatabaseResolution.PREFIXED);
factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory,
- waggleDanceConfiguration, queryMapping);
+ waggleDanceConfiguration, queryMapping, saslServerWrapper);
CloseableIHMSHandler handler = factory.create();
assertThat(handler, is(instanceOf(FederatedHMSHandler.class)));
}
@@ -72,7 +73,7 @@ public void prefixedDatabase() throws Exception {
@Test(expected = WaggleDanceException.class)
public void noMode() {
factory = new FederatedHMSHandlerFactory(hiveConf, notifyingFederationService, metaStoreMappingFactory,
- waggleDanceConfiguration, queryMapping);
+ waggleDanceConfiguration, queryMapping, saslServerWrapper);
factory.create();
}
diff --git a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java
index 0b4e9309e..f726039df 100644
--- a/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java
+++ b/waggle-dance-core/src/test/java/com/hotels/bdp/waggledance/server/FederatedHMSHandlerTest.java
@@ -28,6 +28,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
@@ -212,6 +213,7 @@
import org.apache.hadoop.hive.metastore.api.WMGetTriggersForResourePlanResponse;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanRequest;
import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
+import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
import org.apache.thrift.TException;
import org.junit.Before;
import org.junit.Test;
@@ -245,12 +247,15 @@ public class FederatedHMSHandlerTest {
private @Mock DatabaseMapping primaryMapping;
private @Mock Iface primaryClient;
private @Mock WaggleDanceConfiguration waggleDanceConfiguration;
+ private @Mock SaslServerWrapper saslServerWrapper;
+ private @Mock MetastoreDelegationTokenManager metastoreDelegationTokenManager;
private FederatedHMSHandler handler;
@Before
public void setUp() throws NoSuchObjectException {
- handler = new FederatedHMSHandler(databaseMappingService, notifyingFederationService, waggleDanceConfiguration);
+ handler = new FederatedHMSHandler(databaseMappingService, notifyingFederationService,
+ waggleDanceConfiguration, saslServerWrapper);
when(databaseMappingService.primaryDatabaseMapping()).thenReturn(primaryMapping);
when(databaseMappingService.getAvailableDatabaseMappings()).thenReturn(Collections.singletonList(primaryMapping));
when(primaryMapping.getClient()).thenReturn(primaryClient);
@@ -1519,25 +1524,29 @@ public void grant_revoke_privileges() throws TException {
}
@Test
- public void get_delegation_token() throws TException {
+ public void get_delegation_token() throws TException, IOException, InterruptedException {
String expected = "expected";
- when(primaryClient.get_delegation_token("owner", "kerberos_principal")).thenReturn(expected);
+ when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager);
+ when(metastoreDelegationTokenManager.getDelegationToken("owner", "kerberos_principal",
+ null)).thenReturn(expected);
String result = handler.get_delegation_token("owner", "kerberos_principal");
assertThat(result, is(expected));
}
@Test
- public void renew_delegation_token() throws TException {
+ public void renew_delegation_token() throws TException, IOException {
long expected = 10L;
- when(primaryClient.renew_delegation_token("token")).thenReturn(expected);
+ when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager);
+ when(metastoreDelegationTokenManager.renewDelegationToken("token")).thenReturn(expected);
long result = handler.renew_delegation_token("token");
assertThat(result, is(expected));
}
@Test
- public void cancel_delegation_token() throws TException {
+ public void cancel_delegation_token() throws TException, IOException {
+ when(saslServerWrapper.getDelegationTokenManager()).thenReturn(metastoreDelegationTokenManager);
handler.cancel_delegation_token("token");
- verify(primaryClient).cancel_delegation_token("token");
+ verify(metastoreDelegationTokenManager).cancelDelegationToken("token");
}
@Test