Skip to content

Commit

Permalink
GEOMESA-3390 Accumulo - load accumulo-client.properties for map/reduce (
Browse files Browse the repository at this point in the history
  • Loading branch information
elahrvivaz authored Sep 11, 2024
1 parent 2e6cbda commit f38756d
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.locationtech.geomesa.accumulo.data


import org.apache.accumulo.core.client.security.tokens.{AuthenticationToken, KerberosToken, PasswordToken}
import org.apache.accumulo.core.client.security.tokens.{KerberosToken, PasswordToken}
import org.apache.accumulo.core.client.{Accumulo, AccumuloClient}
import org.apache.accumulo.core.conf.ClientProperty
import org.apache.hadoop.security.UserGroupInformation
Expand All @@ -34,7 +34,7 @@ import org.locationtech.geomesa.utils.geotools.GeoMesaParam
import java.awt.RenderingHints
import java.io.IOException
import java.nio.charset.StandardCharsets
import java.util.Locale
import java.util.{Locale, Properties}

class AccumuloDataStoreFactory extends DataStoreFactorySpi {

Expand All @@ -43,9 +43,9 @@ class AccumuloDataStoreFactory extends DataStoreFactorySpi {
createDataStore(params)

override def createDataStore(params: java.util.Map[String, _]): AccumuloDataStore = {
val connector = AccumuloDataStoreFactory.buildAccumuloConnector(params)
val config = AccumuloDataStoreFactory.buildConfig(connector, params)
val ds = new AccumuloDataStore(connector, config)
val client = AccumuloDataStoreFactory.buildAccumuloClient(params)
val config = AccumuloDataStoreFactory.buildConfig(client, params)
val ds = new AccumuloDataStore(client, config)
GeoMesaDataStore.initRemoteVersion(ds)
ds
}
Expand Down Expand Up @@ -122,14 +122,33 @@ object AccumuloDataStoreFactory extends GeoMesaDataStoreInfo {
override def canProcess(params: java.util.Map[String, _]): Boolean =
CatalogParam.exists(params)

def buildAccumuloConnector(params: java.util.Map[String, _]): AccumuloClient = {
@deprecated("Replaced with buildAccumuloClient")
def buildAccumuloConnector(params: java.util.Map[String, _]): AccumuloClient = buildAccumuloClient(params)

/**
* Build an accumulo client
*
* @param params data store params
* @return
*/
def buildAccumuloClient(params: java.util.Map[String, _]): AccumuloClient =
Accumulo.newClient().from(AccumuloDataStoreFactory.buildAccumuloClientConfig(params)).build()

/**
* Build an Accumulo client configuration, based on passed in parameters and any accumulo-client.properties
* file on the classpath
*
* @param params data store params
* @return
*/
def buildAccumuloClientConfig(params: java.util.Map[String, _]): Properties = {
val config = AccumuloClientConfig.load()

def setRequired(param: GeoMesaParam[String], key: ClientProperty): Unit = {
def setRequired(param: GeoMesaParam[String], key: ClientProperty): String = {
param.lookupOpt(params) match {
case Some(v) => config.put(key.getKey, v)
case Some(v) => config.put(key.getKey, v); v
case None =>
if (config.get(key.getKey) == null) {
Option(config.getProperty(key.getKey)).getOrElse {
throw new IOException(s"Parameter ${param.key} is required: ${param.description}")
}
}
Expand All @@ -148,7 +167,7 @@ object AccumuloDataStoreFactory extends GeoMesaDataStoreInfo {
setRequired(ZookeepersParam, ClientProperty.INSTANCE_ZOOKEEPERS)
setOptional(ZookeeperTimeoutParam, ClientProperty.INSTANCE_ZOOKEEPERS_TIMEOUT)

val user = getRequired(UserParam, ClientProperty.AUTH_PRINCIPAL)
val user = setRequired(UserParam, ClientProperty.AUTH_PRINCIPAL)

if (PasswordParam.exists(params) && KeytabPathParam.exists(params)) {
throw new IllegalArgumentException(
Expand All @@ -164,52 +183,62 @@ object AccumuloDataStoreFactory extends GeoMesaDataStoreInfo {
ClientProperty.AUTH_TYPE.getValue(config).toLowerCase(Locale.US)
}

// build authentication token according to how we are authenticating
val auth: AuthenticationToken = if (authType == AccumuloClientConfig.PasswordAuthType) {
new PasswordToken(getRequired(PasswordParam, ClientProperty.AUTH_TOKEN).getBytes(StandardCharsets.UTF_8))
} else if (authType == AccumuloClientConfig.KerberosAuthType) {
val file = new java.io.File(getRequired(KeytabPathParam, ClientProperty.AUTH_TOKEN))
// mimic behavior from accumulo 1.9 and earlier:
// `public KerberosToken(String principal, File keytab, boolean replaceCurrentUser)`
UserGroupInformation.loginUserFromKeytab(user, file.getAbsolutePath)
new KerberosToken(user, file)
} else {
throw new IllegalArgumentException(s"Unsupported auth type: $authType")
}
val token =
if (authType == AccumuloClientConfig.PasswordAuthType) {
new PasswordToken(getRequired(PasswordParam, ClientProperty.AUTH_TOKEN).getBytes(StandardCharsets.UTF_8))
} else if (authType == AccumuloClientConfig.KerberosAuthType) {
val file = new java.io.File(getRequired(KeytabPathParam, ClientProperty.AUTH_TOKEN))
// mimic behavior from accumulo 1.9 and earlier:
// `public KerberosToken(String principal, File keytab, boolean replaceCurrentUser)`
UserGroupInformation.loginUserFromKeytab(user, file.getAbsolutePath)
new KerberosToken(user, file)
} else {
throw new IllegalArgumentException(
s"Unsupported auth type: $authType - supported values are " +
s"${AccumuloClientConfig.PasswordAuthType}, ${AccumuloClientConfig.KerberosAuthType}")
}
ClientProperty.setAuthenticationToken(config, token)

if (WriteThreadsParam.exists(params)) {
config.put(ClientProperty.BATCH_WRITER_THREADS_MAX.getKey, WriteThreadsParam.lookup(params).toString)
} else if (!config.containsKey(ClientProperty.BATCH_WRITER_THREADS_MAX.getKey)) {
} else if (ClientProperty.BATCH_WRITER_THREADS_MAX.isEmpty(config)) {
BatchWriterProperties.WRITER_THREADS.option.foreach { threads =>
config.put(ClientProperty.BATCH_WRITER_THREADS_MAX.getKey, String.valueOf(threads))
}
}
if (!config.containsKey(ClientProperty.BATCH_WRITER_MEMORY_MAX.getKey)) {
if (ClientProperty.BATCH_WRITER_MEMORY_MAX.isEmpty(config)) {
BatchWriterProperties.WRITER_MEMORY_BYTES.toBytes.foreach { memory =>
config.put(ClientProperty.BATCH_WRITER_MEMORY_MAX.getKey, String.valueOf(memory))
}
}
if (!config.containsKey(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey)) {
if (ClientProperty.BATCH_WRITER_LATENCY_MAX.isEmpty(config)) {
BatchWriterProperties.WRITER_LATENCY.toDuration.foreach { duration =>
config.put(ClientProperty.BATCH_WRITER_LATENCY_MAX.getKey, s"${duration.toMillis}ms")
}
}
if (!config.containsKey(ClientProperty.BATCH_WRITER_TIMEOUT_MAX.getKey)) {
if (ClientProperty.BATCH_WRITER_TIMEOUT_MAX.isEmpty(config)) {
BatchWriterProperties.WRITE_TIMEOUT.toDuration.foreach { duration =>
config.put(ClientProperty.BATCH_WRITER_TIMEOUT_MAX.getKey, s"${duration.toMillis}ms")
}
}

Accumulo.newClient().from(config).as(user, auth).build()
config
}

def buildConfig(connector: AccumuloClient, params: java.util.Map[String, _]): AccumuloDataStoreConfig = {
/**
* Build the data store config
*
* @param client accumulo client
* @param params data store params
* @return
*/
def buildConfig(client: AccumuloClient, params: java.util.Map[String, _]): AccumuloDataStoreConfig = {
val catalog = CatalogParam.lookup(params)

val authProvider = buildAuthsProvider(connector, params)
val authProvider = buildAuthsProvider(client, params)
val auditProvider = buildAuditProvider(params)
val auditWriter =
new AccumuloAuditWriter(connector, s"${catalog}_queries", auditProvider, AuditQueriesParam.lookup(params).booleanValue())
new AccumuloAuditWriter(client, s"${catalog}_queries", auditProvider, AuditQueriesParam.lookup(params).booleanValue())

val queries = AccumuloQueryConfig(
threads = QueryThreadsParam.lookup(params),
Expand Down Expand Up @@ -246,10 +275,10 @@ object AccumuloDataStoreFactory extends GeoMesaDataStoreInfo {
}
}

def buildAuthsProvider(connector: AccumuloClient, params: java.util.Map[String, _]): AuthorizationsProvider = {
def buildAuthsProvider(client: AccumuloClient, params: java.util.Map[String, _]): AuthorizationsProvider = {
// convert the connector authorizations into a string array - this is the maximum auths this connector can support
val securityOps = connector.securityOperations
val masterAuths = securityOps.getUserAuthorizations(connector.whoami).asScala.toSeq.map(b => new String(b))
val securityOps = client.securityOperations
val masterAuths = securityOps.getUserAuthorizations(client.whoami).asScala.toSeq.map(b => new String(b))

// get the auth params passed in as a comma-delimited string
val configuredAuths = AuthsParam.lookupOpt(params).getOrElse("").split(",").filterNot(_.isEmpty).toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,16 @@ class AttributeIndexJob extends Tool {
AttributeIndexJob.setAttributes(job.getConfiguration, attributes.toSeq)
AttributeIndexJob.setTypeName(job.getConfiguration, sft.getTypeName)

val config = new Properties()
config.put(ClientProperty.INSTANCE_NAME.getKey, parsedArgs.inInstanceId)
config.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey, parsedArgs.inZookeepers)
config.put(ClientProperty.AUTH_PRINCIPAL.getKey, parsedArgs.inUser)
config.put(ClientProperty.AUTH_TOKEN.getKey, parsedArgs.inPassword)

AccumuloOutputFormat.configure().clientProperties(config).createTables(true).store(job)
val props = AccumuloDataStoreFactory.buildAccumuloClientConfig(
java.util.Map.of(
AccumuloDataStoreParams.InstanceNameParam.key, parsedArgs.inInstanceId,
AccumuloDataStoreParams.ZookeepersParam.key, parsedArgs.inZookeepers,
AccumuloDataStoreParams.UserParam.key, parsedArgs.inUser,
AccumuloDataStoreParams.PasswordParam.key, parsedArgs.inPassword
)
)

AccumuloOutputFormat.configure().clientProperties(props).createTables(true).store(job)

val result = job.waitForCompletion(true)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.hadoop.mapreduce._
import org.geotools.api.data.Query
import org.geotools.api.feature.simple.SimpleFeature
import org.locationtech.geomesa.accumulo.AccumuloProperties.AccumuloMapperProperties
import org.locationtech.geomesa.accumulo.data.{AccumuloClientConfig, AccumuloDataStore, AccumuloDataStoreParams, AccumuloQueryPlan}
import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreFactory, AccumuloDataStoreParams, AccumuloQueryPlan}
import org.locationtech.geomesa.accumulo.jobs.AccumuloJobUtils
import org.locationtech.geomesa.accumulo.jobs.mapreduce.GeoMesaAccumuloInputFormat.{GeoMesaRecordReader, GroupedSplit}
import org.locationtech.geomesa.index.api.QueryPlan.ResultsToFeatures
Expand All @@ -34,7 +34,7 @@ import java.io._
import java.net.{URL, URLClassLoader}
import java.util.AbstractMap.SimpleImmutableEntry
import java.util.Map.Entry
import java.util.{Collections, Properties}
import java.util.{Collections, Locale}
import scala.collection.mutable.ArrayBuffer

/**
Expand Down Expand Up @@ -150,37 +150,14 @@ object GeoMesaAccumuloInputFormat extends LazyLogging {
val job = new Job(conf)
job.setInputFormatClass(classOf[GeoMesaAccumuloInputFormat])

val props = new Properties()
// set zookeeper instance
props.put(ClientProperty.INSTANCE_NAME.getKey, AccumuloDataStoreParams.InstanceNameParam.lookup(params))
props.put(ClientProperty.INSTANCE_ZOOKEEPERS.getKey, AccumuloDataStoreParams.ZookeepersParam.lookup(params))
// set connector info
val password = AccumuloDataStoreParams.PasswordParam.lookupOpt(params)
password.orElse(AccumuloDataStoreParams.KeytabPathParam.lookupOpt(params)).foreach { token =>
props.put(ClientProperty.AUTH_PRINCIPAL.getKey, AccumuloDataStoreParams.UserParam.lookup(params))
props.put(ClientProperty.AUTH_TOKEN.getKey, token)
if (password.isDefined) {
props.put(ClientProperty.AUTH_TYPE.getKey, AccumuloClientConfig.PasswordAuthType)
} else {
props.put(ClientProperty.AUTH_TYPE.getKey, AccumuloClientConfig.KerberosAuthType)
props.put(ClientProperty.SASL_ENABLED.getKey, "true")
}
val props = AccumuloDataStoreFactory.buildAccumuloClientConfig(params)
// TODO verify kerberos still works
if (ClientProperty.AUTH_TYPE.getValue(props).toLowerCase(Locale.US).contains("kerberos")) {
props.put(ClientProperty.SASL_ENABLED.getKey, "true")
// // note: for Kerberos, this will create a DelegationToken for us and add it to the Job credentials
// AbstractInputFormat.setConnectorInfo(job, user, token)
}

// TODO verify kerberos still works
// val token = AccumuloDataStoreParams.PasswordParam.lookupOpt(params) match {
// case Some(p) => new PasswordToken(p.getBytes(StandardCharsets.UTF_8))
// case None =>
// // must be using Kerberos
// val file = new java.io.File(keytabPath)
// // mimic behavior from accumulo 1.9 and earlier:
// // `public KerberosToken(String principal, File keytab, boolean replaceCurrentUser)`
// UserGroupInformation.loginUserFromKeytab(user, file.getAbsolutePath)
// new KerberosToken(user, file)
// }
// // note: for Kerberos, this will create a DelegationToken for us and add it to the Job credentials
// AbstractInputFormat.setConnectorInfo(job, user, token)

// use the query plan to set the accumulo input format options
require(plan.tables.lengthCompare(1) == 0, s"Can only query from a single table: ${plan.tables.mkString(", ")}")

Expand Down

0 comments on commit f38756d

Please sign in to comment.