Skip to content

Commit

Permalink
GEOMESA-3383 Accumulo - Move core bulk copy logic out of CLI tools
Browse files Browse the repository at this point in the history
* Make tools loggers use scalalogging
* Fix hard-coded scala.version in geomesa-jobs and geomesa-tools poms
  • Loading branch information
elahrvivaz committed Sep 25, 2024
1 parent 73eceae commit a232b7c
Show file tree
Hide file tree
Showing 17 changed files with 604 additions and 455 deletions.
4 changes: 4 additions & 0 deletions geomesa-accumulo/geomesa-accumulo-datastore/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class AccumuloQueryAuditCommand extends AccumuloDataStoreCommand {

WithClose(new AccumuloAuditReader(ds)) { reader=>
val writer = if ("json".equalsIgnoreCase(params.outputFormat)) { new JsonWriter() } else { new CsvWriter() }
writer.header().foreach(Command.output.info)
writer.header().foreach(h => Command.output.info(h))
dateRanges.foreach { case (start, end) =>
WithClose(reader.getQueryEvents(params.featureName, (start, end))) { events =>
val out = cql match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.arrow.tools.status

import com.beust.jcommander.{Parameter, Parameters}
import com.typesafe.scalalogging.Logger
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.arrow.data.ArrowDataStore
import org.locationtech.geomesa.arrow.tools.ArrowDataStoreCommand
Expand All @@ -22,17 +23,17 @@ class ArrowDescribeSchemaCommand extends DescribeSchemaCommand[ArrowDataStore] w

override protected def getSchema(ds: ArrowDataStore): SimpleFeatureType = ds.getSchema

override protected def describe(ds: ArrowDataStore, sft: SimpleFeatureType, output: String => Unit): Unit = {
super.describe(ds, sft, output)
output("")
override protected def describe(ds: ArrowDataStore, sft: SimpleFeatureType, logger: Logger): Unit = {
super.describe(ds, sft, logger)
logger.info("")
val dictionaries = ds.dictionaries
if (dictionaries.isEmpty) {
output("Dictionaries: none")
logger.info("Dictionaries: none")
} else if (params.dictionaries) {
output("Dictionaries:")
dictionaries.foreach { case (field, dictionary) => output(s" $field: ${dictionary.iterator.mkString(", ")}") }
logger.info("Dictionaries:")
dictionaries.foreach { case (field, dictionary) => logger.info(s" $field: ${dictionary.iterator.mkString(", ")}") }
} else {
output(s"Dictionaries: ${ds.dictionaries.keys.mkString(", ")}")
logger.info(s"Dictionaries: ${ds.dictionaries.keys.mkString(", ")}")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.fs.tools.status

import com.beust.jcommander.Parameters
import com.typesafe.scalalogging.Logger
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.fs.data.FileSystemDataStore
import org.locationtech.geomesa.fs.tools.FsDataStoreCommand
Expand All @@ -20,12 +21,12 @@ import org.locationtech.geomesa.tools.status.DescribeSchemaCommand
class FsDescribeSchemaCommand extends DescribeSchemaCommand[FileSystemDataStore] with FsDataStoreCommand {
override val params = new FsDescribeSchemaParams

override protected def describe(ds: FileSystemDataStore, sft: SimpleFeatureType, output: String => Unit): Unit = {
super.describe(ds, sft, output)
override protected def describe(ds: FileSystemDataStore, sft: SimpleFeatureType, logger: Logger): Unit = {
super.describe(ds, sft, logger)
val metadata = ds.storage(sft.getTypeName).metadata
output(s"\nPartition scheme | ${metadata.scheme.pattern}")
output(s"File encoding | ${metadata.encoding}")
output(s"Leaf storage | ${metadata.leafStorage}")
logger.info(s"\nPartition scheme | ${metadata.scheme.pattern}")
logger.info(s"File encoding | ${metadata.encoding}")
logger.info(s"Leaf storage | ${metadata.leafStorage}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class FsGetPartitionsCommand extends FsDataStoreCommand {

override def execute(): Unit = withDataStore { ds =>
Command.user.info(s"Partitions for type ${params.featureName}:")
ds.storage(params.featureName).metadata.getPartitions().map(_.name).sorted.foreach(Command.output.info)
ds.storage(params.featureName).metadata.getPartitions().map(_.name).sorted.foreach(p => Command.output.info(p))
}
}

Expand Down
6 changes: 3 additions & 3 deletions geomesa-jobs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@
<dependencies>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-index-api_2.12</artifactId>
<artifactId>geomesa-index-api_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-feature-all_2.12</artifactId>
<artifactId>geomesa-feature-all_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-convert-all_2.12</artifactId>
<artifactId>geomesa-convert-all_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
Expand Down
5 changes: 5 additions & 0 deletions geomesa-tools/common-env/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ log4j.logger.org.locationtech.geomesa.tools.output=info,output
# un-comment to enable audit logging
#log4j.logger.org.locationtech.geomesa.index.audit=debug

# schema copier log config - debug to file, info to user+file
log4j.logger.org.locationtech.geomesa.accumulo.util.SchemaCopier=debug,user,file

# various libraries we want to lower the volume on
log4j.logger.hsqldb.db=warn
log4j.logger.org.apache.curator=warn
Expand All @@ -26,12 +29,14 @@ log4j.appender.user=org.apache.log4j.ConsoleAppender
log4j.appender.user.layout=org.apache.log4j.PatternLayout
log4j.appender.user.layout.ConversionPattern=%-5p %m%n
log4j.appender.user.Target=System.err
log4j.appender.user.Threshold=INFO

# log to stdout for program output
log4j.appender.output=org.apache.log4j.ConsoleAppender
log4j.appender.output.layout=org.apache.log4j.PatternLayout
log4j.appender.output.layout.ConversionPattern=%m%n
log4j.appender.output.Target=System.out
log4j.appender.output.Threshold=INFO

# file logging
log4j.appender.file=org.apache.log4j.RollingFileAppender
Expand Down
6 changes: 5 additions & 1 deletion geomesa-tools/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-utils_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-hadoop-utils_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-feature-exporters_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -58,7 +62,7 @@
</dependency>
<dependency>
<groupId>org.locationtech.geomesa</groupId>
<artifactId>geomesa-filter_2.12</artifactId>
<artifactId>geomesa-filter_${scala.binary.version}</artifactId>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@

package org.locationtech.geomesa

package object tools {
import com.beust.jcommander.ParameterException
import com.typesafe.scalalogging.Logger
import org.geotools.api.data.{DataStore, DataStoreFinder}
import org.locationtech.geomesa.tools.utils.Prompt
import org.locationtech.geomesa.utils.classpath.ClassPathUtils

import com.beust.jcommander.ParameterException
import org.geotools.api.data.{DataStore, DataStoreFinder}
import org.locationtech.geomesa.tools.utils.Prompt
import org.locationtech.geomesa.utils.classpath.ClassPathUtils
import org.slf4j.{Logger, LoggerFactory}
package object tools {

import java.io.File
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -42,9 +42,9 @@ package object tools {

object Command {
// send messages to the user - status, errors, etc
val user: Logger = LoggerFactory.getLogger("org.locationtech.geomesa.tools.user")
val user: Logger = Logger("org.locationtech.geomesa.tools.user")
// send output from a command
val output: Logger = LoggerFactory.getLogger("org.locationtech.geomesa.tools.output")
val output: Logger = Logger("org.locationtech.geomesa.tools.output")

/**
* Exception used to indicate a failure in the command run, without printing a stack trace
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.locationtech.geomesa.tools.status

import com.beust.jcommander.ParameterException
import com.typesafe.scalalogging.Logger
import org.geotools.api.data.DataStore
import org.geotools.api.feature.simple.SimpleFeatureType
import org.locationtech.geomesa.index.geotools.GeoMesaDataStore
Expand Down Expand Up @@ -36,14 +37,14 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] {
throw new ParameterException(msg)
}
Command.user.info(s"Describing attributes of feature '${sft.getTypeName}'")
describe(ds, sft, Command.output.info)
describe(ds, sft, Command.output)
}

protected def getSchema(ds: DS): SimpleFeatureType = params match {
case p: TypeNameParam => ds.getSchema(p.featureName)
}

protected def describe(ds: DS, sft: SimpleFeatureType, output: String => Unit): Unit = {
protected def describe(ds: DS, sft: SimpleFeatureType, logger: Logger): Unit = {
import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType._

val indices = ds match {
Expand Down Expand Up @@ -78,12 +79,12 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] {
val maxName = namesAndDescriptions.map(_._1.length).max
val maxType = namesAndDescriptions.map(_._2.length).max
namesAndDescriptions.foreach { case (n, t, d) =>
output(s"${n.padTo(maxName, ' ')} | ${t.padTo(maxType, ' ')} $d")
logger.info(s"${n.padTo(maxName, ' ')} | ${t.padTo(maxType, ' ')} $d")
}

val userData = sft.getUserData
if (!userData.isEmpty) {
output("\nUser data:")
logger.info("\nUser data:")
val namesAndValues = userData.asScala.toSeq.map { case (k, v) =>
if (k == SimpleFeatureTypes.Configs.Keywords) {
(SimpleFeatureTypes.Configs.Keywords, sft.getKeywords.mkString("'", "', '", "'"))
Expand All @@ -93,7 +94,7 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] {
}
val maxName = namesAndValues.map(_._1.length).max
namesAndValues.sortBy(_._1).foreach { case (n, v) =>
output(s" ${n.padTo(maxName, ' ')} | $v")
logger.info(s" ${n.padTo(maxName, ' ')} | $v")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class EnvironmentCommand extends Command {
case "spec" =>
(sft: SimpleFeatureType) => s"${SimpleFeatureTypes.encodeType(sft, !params.excludeUserData)}"
}
filtered.sortBy(_.getTypeName).map(s => s"${s.getTypeName} = ${formatFn(s)}").foreach(Command.output.info)
filtered.sortBy(_.getTypeName).map(s => s"${s.getTypeName} = ${formatFn(s)}").foreach(m => Command.output.info(m))
} else {
throw new ParameterException(s"Unknown format '${params.format}'. Valid values are 'typesafe' or 'spec'")
}
Expand All @@ -84,19 +84,19 @@ class EnvironmentCommand extends Command {
val options = ConfigRenderOptions.defaults().setJson(false).setOriginComments(false)
def render(c: Config) = c.root().render(options)
val strings = filtered.map { case (cname, conf)=> s"converter-name=$cname\n${render(conf)}\n" }
strings.toArray.sorted.foreach(Command.output.info)
strings.toArray.sorted.foreach(m => Command.output.info(m))
}
}

def listSftsNames(): Unit = {
Command.output.info("Simple Feature Types:")
val all = SimpleFeatureTypeLoader.sfts
all.sortBy(_.getTypeName).map(s => s"${s.getTypeName}").foreach(Command.output.info)
all.sortBy(_.getTypeName).map(s => s"${s.getTypeName}").foreach(m => Command.output.info(m))
}
def listConverterNames(): Unit = {
Command.output.info("Simple Feature Type Converters:")
val all = ConverterConfigLoader.confs
all.map { case (cname, conf) => s"$cname"}.toArray.sorted.foreach(Command.output.info)
all.map { case (cname, conf) => s"$cname"}.toArray.sorted.foreach(m => Command.output.info(m))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ trait GetTypeNamesCommand[DS <: DataStore] extends DataStoreCommand[DS] {

override def execute(): Unit = {
Command.output.info("Current feature types:")
withDataStore(_.getTypeNames.foreach(Command.output.info))
withDataStore(_.getTypeNames.foreach(m => Command.output.info(m)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import org.apache.hadoop.tools.{DistCp, DistCpOptions}
import org.locationtech.geomesa.jobs.JobResult.JobSuccess
import org.locationtech.geomesa.jobs.{JobResult, StatusCallback}
import org.locationtech.geomesa.tools.Command
import org.locationtech.geomesa.utils.hadoop.DistributedCopyOptions

/**
* Executes a hadoop distcp
Expand All @@ -31,7 +32,7 @@ class DistributedCopy(conf: Configuration = new Configuration()) {
* @return
*/
def copy(sourceFileList: Path, dest: Path, statusCallback: StatusCallback): JobResult =
copy(DistributedCopy.distCpOptions(Right(sourceFileList), dest), statusCallback)
copy(DistributedCopyOptions(sourceFileList, dest), statusCallback)

/**
* Execute the job
Expand All @@ -42,7 +43,7 @@ class DistributedCopy(conf: Configuration = new Configuration()) {
* @return
*/
def copy(sources: Seq[Path], dest: Path, statusCallback: StatusCallback): JobResult =
copy(DistributedCopy.distCpOptions(Left(sources), dest), statusCallback)
copy(DistributedCopyOptions(sources, dest), statusCallback)

/**
* Executes the job
Expand All @@ -63,34 +64,3 @@ class DistributedCopy(conf: Configuration = new Configuration()) {
}
}
}

object DistributedCopy {

import scala.collection.JavaConverters._

private def distCpOptions(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions =
try { distCpOptions3(sources, dest) } catch { case _: ClassNotFoundException => distCpOptions2(sources, dest) }

// hadoop 3 API
private def distCpOptions3(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = {
val builder = sources match {
case Right(file) => new DistCpOptions.Builder(file, dest)
case Left(dirs) => new DistCpOptions.Builder(dirs.asJava, dest)
}
builder.withAppend(false).withOverwrite(true).withBlocking(false).withCopyStrategy("dynamic").build()
}

// hadoop 2 API
private def distCpOptions2(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = {
val opts = sources match {
case Right(file) =>
classOf[DistCpOptions].getConstructor(classOf[Path], classOf[Path]).newInstance(file, dest)
case Left(dirs) =>
classOf[DistCpOptions].getConstructor(classOf[java.util.List[Path]], classOf[Path]).newInstance(dirs.asJava, dest)
}
classOf[DistCpOptions].getMethod("setAppend", classOf[Boolean]).invoke(opts, java.lang.Boolean.FALSE)
classOf[DistCpOptions].getMethod("setOverwrite", classOf[Boolean]).invoke(opts, java.lang.Boolean.TRUE)
classOf[DistCpOptions].getMethod("setCopyStrategy", classOf[String]).invoke(opts, "dynamic")
opts
}
}
4 changes: 4 additions & 0 deletions geomesa-utils-parent/geomesa-hadoop-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>
</dependency>

<!-- test dependencies -->
<dependency>
Expand Down
Loading

0 comments on commit a232b7c

Please sign in to comment.