From a232b7c863f94c2239dcc9d409ac10a84bc4d074 Mon Sep 17 00:00:00 2001 From: Emilio Lahr-Vivaz Date: Wed, 25 Sep 2024 12:49:44 -0400 Subject: [PATCH] GEOMESA-3383 Accumulo - Move core bulk copy logic out of CLI tools * Make tools loggers use scalalogging * Fix hard-coded scala.version in geomesa-jobs and geomesa-tools poms --- .../geomesa-accumulo-datastore/pom.xml | 4 + .../geomesa/accumulo/util/SchemaCopier.scala | 457 ++++++++++++++++++ .../ingest/AccumuloBulkCopyCommand.scala | 415 ++-------------- .../stats/AccumuloQueryAuditCommand.scala | 2 +- .../status/ArrowDescribeSchemaCommand.scala | 15 +- .../status/FsDescribeSchemaCommand.scala | 11 +- .../tools/status/FsGetPartitionsCommand.scala | 2 +- geomesa-jobs/pom.xml | 6 +- geomesa-tools/common-env/log4j.properties | 5 + geomesa-tools/pom.xml | 6 +- .../locationtech/geomesa/tools/package.scala | 16 +- .../tools/status/DescribeSchemaCommand.scala | 11 +- .../tools/status/EnvironmentCommand.scala | 8 +- .../tools/status/GetTypeNamesCommand.scala | 2 +- .../geomesa/tools/utils/DistributedCopy.scala | 36 +- .../geomesa-hadoop-utils/pom.xml | 4 + .../utils/hadoop/DistributedCopyOptions.scala | 59 +++ 17 files changed, 604 insertions(+), 455 deletions(-) create mode 100644 geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala create mode 100644 geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/DistributedCopyOptions.scala diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/pom.xml b/geomesa-accumulo/geomesa-accumulo-datastore/pom.xml index a06fef2bdbeb..c8d6713b6a36 100644 --- a/geomesa-accumulo/geomesa-accumulo-datastore/pom.xml +++ b/geomesa-accumulo/geomesa-accumulo-datastore/pom.xml @@ -110,6 +110,10 @@ org.apache.hadoop hadoop-client + + org.apache.hadoop + hadoop-distcp + diff --git a/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala new file mode 100644 index 000000000000..d7c460990108 --- /dev/null +++ b/geomesa-accumulo/geomesa-accumulo-datastore/src/main/scala/org/locationtech/geomesa/accumulo/util/SchemaCopier.scala @@ -0,0 +1,457 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.accumulo.util + +import com.typesafe.scalalogging.StrictLogging +import org.apache.accumulo.core.client.admin.TableOperations +import org.apache.commons.io.IOUtils +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.tools.DistCp +import org.geotools.api.data.DataStoreFinder +import org.geotools.api.feature.simple.SimpleFeatureType +import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams} +import org.locationtech.geomesa.accumulo.util.SchemaCopier.{Cluster, ClusterConfig, CopyOptions, PartitionId, PartitionName, PartitionValue} +import org.locationtech.geomesa.features.ScalaSimpleFeature +import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex +import org.locationtech.geomesa.index.conf.partition.TablePartition +import org.locationtech.geomesa.utils.concurrent.CachedThreadPool +import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes +import org.locationtech.geomesa.utils.hadoop.DistributedCopyOptions +import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} + +import java.io.{Closeable, File, IOException} +import java.nio.charset.StandardCharsets +import java.util.Collections +import java.util.concurrent.atomic.AtomicBoolean +import scala.collection.mutable.ListBuffer +import scala.util.Try +import scala.util.control.NonFatal + +/** + * Copies a schema from one cluster (or catalog) to another, using bulk file operations + * + * @param fromCluster source cluster + * @param toCluster destination cluster + * @param typeName name of the feature type to copy + * @param exportDir hdfs or s3a path to use for source table export - the scheme and authority (e.g. bucket name) must match + * the destination table filesystem + * @param indices specific indices to copy, or empty to copy all indices + * @param partitions if schema is partitioned - specific partitions to copy, or empty to copy all partitions + * @param options other copy options + */ +class SchemaCopier( + fromCluster: ClusterConfig, + toCluster: ClusterConfig, + typeName: String, + exportDir: String, + indices: Seq[String], + partitions: Seq[PartitionId], + options: CopyOptions, + ) extends Runnable with Closeable with StrictLogging { + + import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType + + import scala.collection.JavaConverters._ + + private val tryFrom = Try(Cluster(fromCluster)) + private val tryTo = Try(Cluster(toCluster)) + + // note: all other class variables are lazy, so that we can instantiate an instance and then clean up connections on close() + + // the cluster we are exporting from + lazy private val from: Cluster = tryFrom.get + // the cluster we are exporting to + lazy private val to: Cluster = tryTo.get + + lazy private val exportPath: Path = { + val defaultFs = FileSystem.get(to.conf) + // makeQualified is a no-op if the path was already qualified (has a defined scheme and is not a relative path) + val path = new Path(exportDir).makeQualified(defaultFs.getUri, defaultFs.getWorkingDirectory) + if (path.toUri.getScheme == "file") { + throw new RuntimeException("Could not read defaultFS - this may be caused by a missing Hadoop core-site.xml file") + } + path + } + + lazy private val exportFs: FileSystem = exportPath.getFileSystem(to.conf) + + // sft should be shareable/the same from both datastores + lazy private val sft: SimpleFeatureType = { + val sft = from.ds.getSchema(typeName) + if (sft == null) { + throw new IllegalArgumentException(s"Schema '$typeName' does not exist in the source store") + } else { + val toSft = to.ds.getSchema(typeName) + if (toSft == null) { + throw new IllegalArgumentException(s"Schema '$typeName' does not exist in the destination store") + } else if (SimpleFeatureTypes.compare(sft, toSft) != 0) { + throw new IllegalArgumentException(s"Schema '$typeName' is not the same in the source and destination store") + } else if (SimpleFeatureTypes.compareIndexConfigs(sft, toSft) != 0) { + throw new IllegalArgumentException(s"Schema '$typeName' does not have compatible indices in the source and destination store") + } + } + sft + } + + lazy private val fromIndices = { + val all = from.ds.manager.indices(sft) + if (indices.isEmpty) { all } else { + val builder = Seq.newBuilder[GeoMesaFeatureIndex[_, _]] + indices.foreach { ident => + val filtered = all.filter(_.identifier.contains(ident)) + if (filtered.isEmpty) { + throw new IllegalArgumentException( + s"Index '$ident' does not exist in the schema. Available indices: ${all.map(_.identifier).mkString(", ")}") + } + logger.debug(s"Mapped identifier $ident to ${filtered.map(_.identifier).mkString(", ")}") + builder ++= filtered + } + builder.result.distinct + } + } + + // these get passed into our index method calls - for partitioned schemas, it must be a Seq[Some[_]], + // while for non-partitioned schemas it must always be Seq(None) + lazy private val fromPartitions: Seq[Option[String]] = { + if (sft.isPartitioned) { + val builder = ListBuffer.empty[String] + lazy val partitioning = TablePartition(from.ds, sft).get + lazy val sf = new ScalaSimpleFeature(sft, "") + builder ++= partitions.map { + case PartitionName(name) => name + case PartitionValue(value) => + sf.setAttribute(sft.getDtgIndex.get, value) + val partition = partitioning.partition(sf) + logger.debug(s"Generated partition $partition from value $value") + partition + } + if (builder.isEmpty) { + logger.debug("No partitions specified - loading all partitions from store") + builder ++= fromIndices.flatMap(_.getPartitions) + } + builder.result.distinct.sorted.map(Option.apply) + } else { + if (partitions.nonEmpty) { + throw new IllegalArgumentException("partitions are not applicable for a non-partitioned schema") + } + Seq(None) + } + } + + /** + * Execute the copy + */ + override def run(): Unit = run(false) + + /** + * Execute the copy + * + * @param resume resume from a previously interrupted run, vs overwrite any existing output + */ + def run(resume: Boolean): Unit = { + CachedThreadPool.executor(options.tableConcurrency) { executor => + fromPartitions.foreach { partition => + fromIndices.foreach { fromIndex => + val toIndex = to.ds.manager.index(sft, fromIndex.identifier) + val partitionLogId = s"${partition.fold(s"index")(p => s"partition $p")} ${fromIndex.identifier}" + val runnable: Runnable = () => { + try { + logger.info(s"Copying $partitionLogId") + copy(fromIndex, toIndex, partition, resume, partitionLogId) + logger.info(s"Bulk copy complete for $partitionLogId") + } catch { + // catch Throwable so NoClassDefFound still gets logged + case e: Throwable => + logger.error(s"Error copying $partitionLogId: ${e.getMessage}") + logger.debug(s"Error copying $partitionLogId", e) + } + } + executor.submit(runnable) + } + } + } + } + + /** + * Copy a single index + partition + * + * @param fromIndex from index + * @param toIndex to index + * @param partition partition name - must be Some if schema is partitioned + * @param resume use any partial results from a previous run, if present + * @param partitionLogId identifier for log messages + */ + private def copy( + fromIndex: GeoMesaFeatureIndex[_, _], + toIndex: GeoMesaFeatureIndex[_, _], + partition: Option[String], + resume: Boolean, + partitionLogId: String): Unit = { + + require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup + + val fromTable = try { fromIndex.getTableName(partition) } catch { + case NonFatal(e) => throw new RuntimeException("Could not get source table", e) + } + + val completeMarker = new Path(exportPath, s"$fromTable.complete") + if (exportFs.exists(completeMarker)) { + if (resume) { + logger.debug("Skipping already completed copy") + return + } else { + exportFs.delete(completeMarker, false) + } + } + + val tableExportPath = new Path(exportPath, fromTable) + val distcpPath = new Path(tableExportPath, "distcp.txt") + val copyToDir = new Path(tableExportPath, "files") + val cloneTable = s"${fromTable}_bc_tmp" + + logger.debug(s"Source table $fromTable (${from.tableOps.tableIdMap().get(fromTable)})") + logger.debug(s"Export path $tableExportPath") + + if (resume && from.tableOps.exists(cloneTable)) { + logger.debug(s"Using existing cloned table $cloneTable - ensuring table is offline") + from.tableOps.offline(cloneTable, true) + } else { + // clone the table as we have to take it offline in order to export it + // note that cloning is just a metadata op as it shares the underlying data files (until they change) + logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable") + from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist + logger.debug(s"Cloning $fromTable to $cloneTable") + from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility + logger.debug(s"Taking $cloneTable offline") + from.tableOps.offline(cloneTable, true) + } + + if (resume && exportFs.exists(distcpPath) && exportFs.getFileStatus(distcpPath).getLen > 0) { + logger.debug(s"Using existing export results $distcpPath") + } else { + if (exportFs.exists(tableExportPath)) { + logger.debug(s"Deleting existing export directory $tableExportPath") + exportFs.delete(tableExportPath, true) + } + logger.debug(s"Exporting table to $tableExportPath") + from.tableOps.exportTable(cloneTable, tableExportPath.toString) + + if (!exportFs.exists(distcpPath) || exportFs.getFileStatus(distcpPath).getLen == 0) { + throw new RuntimeException(s"Could not read table export results at $distcpPath") + } + } + + // ensures the destination table exists + logger.debug(s"Checking destination for table $fromTable") + to.ds.adapter.createTable(toIndex, partition, Seq.empty) + val toTable = try { toIndex.getTableName(partition) } catch { + case NonFatal(e) => throw new RuntimeException("Could not get destination table", e) + } + logger.debug(s"Destination table $toTable (${to.tableOps.tableIdMap().get(toTable)})") + + // create splits, do this separately in case the table already exists + val splits = new java.util.TreeSet(from.tableOps.listSplits(cloneTable)) + val existingSplits = to.tableOps.listSplits(toTable) + splits.removeAll(existingSplits) + if (!splits.isEmpty) { + if (!existingSplits.isEmpty) { + logger.warn(s"Detected split mismatch between source ($fromTable) and destination ($toTable) for $partitionLogId") + } + logger.debug(s"Adding splits to destination table $toTable") + to.tableOps.addSplits(toTable, splits) + } + + val hadCopyError = new AtomicBoolean(false) + // read the distcp.txt file produced by the table export + // consumer: (src, dest) => Unit + def distCpConsumer(threads: Int)(consumer: (Path, Path) => Unit): Unit = { + logger.debug(s"Reading $distcpPath") + WithClose(IOUtils.lineIterator(exportFs.open(distcpPath), StandardCharsets.UTF_8)) { files => + CachedThreadPool.executor(threads) { executor => + files.asScala.foreach { file => + val runnable: Runnable = () => { + val path = new Path(file) + val copy = new Path(copyToDir, path.getName) + try { + if (resume && exportFs.exists(copy) && + path.getFileSystem(from.conf).getFileStatus(path).getLen == exportFs.getFileStatus(copy).getLen) { + logger.debug(s"Using existing copy of $path at $copy") + } else { + consumer(path, copy) + } + } catch { + // catch Throwable so NoClassDefFound still gets logged + case e: Throwable => + hadCopyError.set(true) + logger.error(s"Failed to copy $path to $copy: ${e.getMessage}") + logger.debug(s"Failed to copy $path to $copy", e) + } + } + executor.submit(runnable) + } + } + } + } + + if (options.distCp) { + var inputPath = distcpPath + if (resume) { + logger.debug(s"Checking copy status of files in $distcpPath") + inputPath = new Path(tableExportPath, "distcp-remaining.txt") + WithClose(exportFs.create(inputPath, true)) { out => + distCpConsumer(1) { (path, _) => + logger.debug(s"Adding $path to distcp") + out.writeUTF(s"$path\n") + } + } + } + val job = new DistCp(from.conf, DistributedCopyOptions(inputPath, copyToDir)).execute() + logger.info(s"Tracking available at ${job.getStatus.getTrackingUrl}") + while (!job.isComplete) { + Thread.sleep(500) + } + if (job.isSuccessful) { + logger.info(s"Successfully copied data to $copyToDir") + } else { + hadCopyError.set(true) + logger.error(s"Job failed with state ${job.getStatus.getState} due to: ${job.getStatus.getFailureInfo}") + } + } else { + distCpConsumer(options.fileConcurrency) { (path, copy) => + logger.debug(s"Copying $path to $copy") + val fs = path.getFileSystem(from.conf) + if (!FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) { + // consolidate error handling in the catch block + throw new IOException(s"Failed to copy $path to $copy, copy returned false") + } + } + } + if (hadCopyError.get) { + throw new RuntimeException("Error copying data files") + } + + logger.debug(s"Loading rfiles from $copyToDir to $toTable") + val importDir = to.tableOps.importDirectory(copyToDir.toString).to(toTable) + try { importDir.ignoreEmptyDir(true) } catch { + case _: NoSuchMethodError => // accumulo 2.0, ignore + } + try { importDir.load() } catch { + case e: IllegalArgumentException => logger.trace("Error importing directory:", e) // should mean empty dir + } + + // create marker indicating this copy was successful + logger.debug(s"Creating completion marker $completeMarker") + exportFs.create(completeMarker).close() + + // cleanup + logger.debug(s"Deleting export path $tableExportPath") + exportFs.delete(tableExportPath, true) + logger.debug(s"Deleting clone table $cloneTable") + from.tableOps.delete(cloneTable) + } + + override def close(): Unit = { + CloseWithLogging(tryFrom.toOption) + CloseWithLogging(tryTo.toOption) + } +} + +object SchemaCopier { + + sealed trait ClusterCredentials + case class ClusterPassword(password: String) extends ClusterCredentials + case class ClusterKeytab(keytabPath: String) extends ClusterCredentials + + sealed trait PartitionId + case class PartitionName(name: String) extends PartitionId + case class PartitionValue(value: AnyRef) extends PartitionId + + /** + * Connection info for an Accumulo cluster + * + * @param instanceName instance name + * @param zookeepers zookeepers + * @param user user + * @param credentials credentials + * @param catalog catalog table containing the feature type + * @param configFiles additional hadoop *-site.xml files for configuring hdfs operations + */ + case class ClusterConfig( + instanceName: String, zookeepers: String, user: String, credentials: ClusterCredentials, catalog: String, configFiles: Seq[File]) + + /** + * Options + * + * @param tableConcurrency number of tables to copy in parallel + * @param fileConcurrency number of files to copy in parallel, per table (when not using distcp) + * @param distCp use hadoop distcp to copy files, instead of the hadoop client + */ + case class CopyOptions(tableConcurrency: Int = 1, fileConcurrency: Int = 4, distCp: Boolean = false) + + /** + * Holds state for a given Accumulo cluster + * + * @param instance instance name + * @param zk instance zookeepers + * @param user username + * @param credentials credentials - either Right(password) or Left(keytab) + * @param catalog catalog table + * @param configs additional hadoop configuration files + */ + private class Cluster( + instance: String, + zk: String, + user: String, + credentials: ClusterCredentials, + catalog: String, + configs: Seq[File] + ) extends Closeable { + + import scala.collection.JavaConverters._ + + private val auth = credentials match { + case ClusterPassword(password) => AccumuloDataStoreParams.PasswordParam.key -> password + case ClusterKeytab(keytabPath) => AccumuloDataStoreParams.KeytabPathParam.key -> keytabPath + } + + private val params = Map( + AccumuloDataStoreParams.InstanceNameParam.key -> instance, + AccumuloDataStoreParams.ZookeepersParam.key -> zk, + AccumuloDataStoreParams.UserParam.key -> user, + AccumuloDataStoreParams.CatalogParam.key -> catalog, + auth + ) + + val conf = new Configuration() + configs.foreach(f => conf.addResource(f.toURI.toURL)) + + val ds: AccumuloDataStore = + try { DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore] } catch { + case NonFatal(e) => throw new IOException("Unable to load datastore:", e) + } + + if (ds == null) { + val maskedParams = params.map { + case (AccumuloDataStoreParams.PasswordParam.key , _) => s"${AccumuloDataStoreParams.PasswordParam.key }=******" + case (k, v) => s"$k=$v" + } + throw new IOException(s"Unable to load datastore using provided values: ${maskedParams.mkString(", ")}") + } + + val tableOps: TableOperations = ds.connector.tableOperations() + + override def close(): Unit = ds.dispose() + } + + private object Cluster { + def apply(config: ClusterConfig): Cluster = + new Cluster(config.instanceName, config.zookeepers, config.user, config.credentials, config.catalog, config.configFiles) + } +} \ No newline at end of file diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala index 8f36d2d1c1a9..310d23b19739 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/ingest/AccumuloBulkCopyCommand.scala @@ -10,410 +10,54 @@ package org.locationtech.geomesa.accumulo.tools.ingest import com.beust.jcommander.{Parameter, ParameterException, Parameters} import com.typesafe.scalalogging.{LazyLogging, StrictLogging} -import org.apache.accumulo.core.client.admin.TableOperations -import org.apache.commons.io.IOUtils -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} -import org.geotools.api.data.DataStoreFinder -import org.geotools.api.feature.simple.SimpleFeatureType -import org.locationtech.geomesa.accumulo.data.{AccumuloDataStore, AccumuloDataStoreParams} -import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkCopyCommand.{AccumuloBulkCopyParams, BulkCopier} -import org.locationtech.geomesa.features.ScalaSimpleFeature -import org.locationtech.geomesa.index.api.GeoMesaFeatureIndex -import org.locationtech.geomesa.index.conf.partition.TablePartition -import org.locationtech.geomesa.jobs.JobResult.{JobFailure, JobSuccess} +import org.locationtech.geomesa.accumulo.tools.ingest.AccumuloBulkCopyCommand.AccumuloBulkCopyParams +import org.locationtech.geomesa.accumulo.util.SchemaCopier +import org.locationtech.geomesa.accumulo.util.SchemaCopier._ import org.locationtech.geomesa.tools._ import org.locationtech.geomesa.tools.utils.ParameterValidators.PositiveInteger -import org.locationtech.geomesa.tools.utils.{DistributedCopy, TerminalCallback} -import org.locationtech.geomesa.utils.concurrent.CachedThreadPool -import org.locationtech.geomesa.utils.geotools.SimpleFeatureTypes -import org.locationtech.geomesa.utils.io.{CloseWithLogging, WithClose} +import org.locationtech.geomesa.utils.io.WithClose -import java.io.{Closeable, File, IOException} -import java.nio.charset.StandardCharsets -import java.util.Collections -import java.util.concurrent.atomic.AtomicBoolean -import scala.collection.mutable.ListBuffer -import scala.util.Try -import scala.util.control.NonFatal +import java.io.File /** * Copy a partitioned table out of one feature type and into an identical feature type */ class AccumuloBulkCopyCommand extends Command with StrictLogging { - override val name = "bulk-copy" - override val params = new AccumuloBulkCopyParams() - - override def execute(): Unit = WithClose(new BulkCopier(params))(_.run()) -} - -object AccumuloBulkCopyCommand extends LazyLogging { - import scala.collection.JavaConverters._ - /** - * Encapsulates the logic for the bulk copy operation - * - * @param params params - */ - private class BulkCopier(params: AccumuloBulkCopyParams) extends Runnable with Closeable with StrictLogging { - - import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType.RichSimpleFeatureType - - import scala.collection.JavaConverters._ - - // the cluster we are exporting from - lazy private val from: Cluster = tryFrom.get - - private val tryFrom = Try { - new Cluster(params.fromInstance, params.fromZookeepers, params.fromUser, Option(params.fromPassword).toRight(params.fromKeytab), - params.fromCatalog, Option(params.fromConfigs)) - } - - // the cluster we are exporting to - lazy private val to: Cluster = tryTo.get - - private val tryTo = Try { - new Cluster(params.toInstance, params.toZookeepers, params.toUser, Option(params.toPassword).toRight(params.toKeytab), - params.toCatalog, Option(params.toConfigs)) - } - - lazy private val exportPath: Path = { - val defaultFs = FileSystem.get(to.conf) - // makeQualified is a no-op if the path was already qualified (has a defined scheme and is not a relative path) - new Path(params.exportPath).makeQualified(defaultFs.getUri, defaultFs.getWorkingDirectory) - } - - lazy private val exportFs: FileSystem = exportPath.getFileSystem(to.conf) - - // sft should be shareable/the same from both datastores - lazy private val sft: SimpleFeatureType = from.ds.getSchema(params.featureName) + override val name = "bulk-copy" + override val params = new AccumuloBulkCopyParams() - lazy private val indices = { - val all = from.ds.manager.indices(sft) - if (params.indices == null || params.indices.isEmpty) { all } else { - val builder = Seq.newBuilder[GeoMesaFeatureIndex[_, _]] - params.indices.asScala.foreach { ident => - val filtered = all.filter(_.identifier.contains(ident)) - if (filtered.isEmpty) { - throw new ParameterException( - s"Index '$ident' does not exist in the schema. Available indices: ${all.map(_.identifier).mkString(", ")}") - } - logger.debug(s"Mapped identifier $ident to ${filtered.map(_.identifier).mkString(", ")}") - builder ++= filtered - } - builder.result.distinct - } + override def execute(): Unit = { + val fromCluster = { + val auth = if (params.fromKeytab != null) { ClusterKeytab(params.fromKeytab) } else { ClusterPassword(params.fromPassword) } + val configs = if (params.fromConfigs == null) { Seq.empty } else { params.fromConfigs.asScala } + ClusterConfig(params.fromInstance, params.fromZookeepers, params.fromUser, auth, params.fromCatalog, configs) } - - // these get passed into our index method calls - for partitioned schemas, it must be a Seq[Some[_]], - // while for non-partitioned schemas it must always be Seq(None) - lazy private val partitions: Seq[Option[String]] = { - if (sft.isPartitioned) { - val builder = ListBuffer.empty[String] - if (params.partitions != null && !params.partitions.isEmpty) { - builder ++= params.partitions.asScala - } - if (params.partitionValues != null && !params.partitionValues.isEmpty) { - val partitioning = TablePartition(from.ds, sft).get - val sf = new ScalaSimpleFeature(sft, "") - builder ++= params.partitionValues.asScala.map { value => - sf.setAttribute(sft.getDtgIndex.get, value) - val partition = partitioning.partition(sf) - logger.debug(s"Generated partition $partition from value $value") - partition - } - } - if (builder.isEmpty) { - logger.debug("No partitions specified - loading all partitions from store") - indices.foreach { index => - builder ++= index.getPartitions - } - } - builder.result.distinct.sorted.map(Option.apply) - } else { - if ((params.partitions != null && !params.partitions.isEmpty) || - (params.partitionValues != null && !params.partitionValues.isEmpty)) { - throw new ParameterException("--partition and/or --partition-value are not applicable for a non-partitioned schema") - } - Seq(None) - } + val toCluster = { + val auth = if (params.toKeytab != null) { ClusterKeytab(params.toKeytab) } else { ClusterPassword(params.toPassword) } + val configs = if (params.toConfigs == null) { Seq.empty } else { params.toConfigs.asScala } + ClusterConfig(params.toInstance, params.toZookeepers, params.toUser, auth, params.toCatalog, configs) } - - - override def run(): Unit = { - // validate our params/setup - if (exportPath.toUri.getScheme == "file") { - throw new RuntimeException("Could not read defaultFS - this may be caused by a missing Hadoop core-site.xml file") - } - if (sft == null) { - throw new ParameterException(s"Schema '${params.featureName}' does not exist in the source store") - } else { - val toSft = to.ds.getSchema(params.featureName) - if (toSft == null) { - throw new ParameterException(s"Schema '${params.featureName}' does not exist in the destination store") - } else if (SimpleFeatureTypes.compare(sft, toSft) != 0) { - throw new ParameterException(s"Schema '${params.featureName}' is not the same in the source and destination store") - } else if (SimpleFeatureTypes.compareIndexConfigs(sft, toSft) != 0) { - throw new ParameterException( - s"Schema '${params.featureName}' does not have compatible indices in the source and destination store") - } - } - - // now execute the copy - CachedThreadPool.executor(params.tableThreads) { executor => - partitions.foreach { partition => - indices.map { fromIndex => - val toIndex = to.ds.manager.index(sft, fromIndex.identifier) - val partitionLogId = s"${partition.fold(s"index")(p => s"partition $p")} ${fromIndex.identifier}" - val runnable: Runnable = () => { - try { - Command.user.info(s"Copying $partitionLogId") - copy(fromIndex, toIndex, partition, partitionLogId) - Command.user.info(s"Bulk copy complete for $partitionLogId") - } catch { - // catch Throwable so NoClassDefFound still gets logged - case e: Throwable => - Command.user.error(s"Error copying $partitionLogId: ${e.getMessage}") - logger.error(s"Error copying $partitionLogId", e) - } - } - executor.submit(runnable) - } - } + val indices = if (params.indices == null) { Seq.empty } else { params.indices.asScala } + val partitions: Seq[PartitionId] = + Option(params.partitions).fold(Seq.empty[PartitionId])(_.asScala.toSeq.map(PartitionName.apply)) ++ + Option(params.partitionValues).fold(Seq.empty[PartitionId])(_.asScala.toSeq.map(PartitionValue.apply)) + val opts = CopyOptions(params.tableThreads, params.fileThreads, params.distCp) + + WithClose(new SchemaCopier(fromCluster, toCluster, params.featureName, params.exportPath, indices, partitions, opts)) { copier => + try { + copier.run(params.resume) + } catch { + case e: IllegalArgumentException => throw new ParameterException(e.getMessage) } } - /** - * Copy a single index + partition - * - * @param fromIndex from index - * @param toIndex to index - * @param partition partition name - must be Some if schema is partitioned - * @param partitionLogId identifier for log messages - */ - private def copy( - fromIndex: GeoMesaFeatureIndex[_, _], - toIndex: GeoMesaFeatureIndex[_, _], - partition: Option[String], - partitionLogId: String): Unit = { - require(sft.isPartitioned == partition.isDefined) // sanity check - this should always be true due to our setup - - val fromTable = try { fromIndex.getTableName(partition) } catch { - case NonFatal(e) => throw new RuntimeException("Could not get source table", e) - } - - val completeMarker = new Path(exportPath, s"$fromTable.complete") - if (exportFs.exists(completeMarker)) { - if (params.resume) { - logger.debug("Skipping already completed copy") - return - } else { - exportFs.delete(completeMarker, false) - } - } - - val tableExportPath = new Path(exportPath, fromTable) - val distcpPath = new Path(tableExportPath, "distcp.txt") - val copyToDir = new Path(tableExportPath, "files") - val cloneTable = s"${fromTable}_bc_tmp" - - logger.debug(s"Source table $fromTable (${from.tableOps.tableIdMap().get(fromTable)})") - logger.debug(s"Export path $tableExportPath") - - if (params.resume && from.tableOps.exists(cloneTable)) { - logger.debug(s"Using existing cloned table $cloneTable - ensuring table is offline") - from.tableOps.offline(cloneTable, true) - } else { - // clone the table as we have to take it offline in order to export it - // note that cloning is just a metadata op as it shares the underlying data files (until they change) - logger.debug(s"Checking for existence and deleting any existing cloned table $cloneTable") - from.ds.adapter.deleteTable(cloneTable) // no-op if table doesn't exist - logger.debug(s"Cloning $fromTable to $cloneTable") - from.tableOps.clone(fromTable, cloneTable, false, Collections.emptyMap(), Collections.emptySet()) // use 2.0 method for compatibility - logger.debug(s"Taking $cloneTable offline") - from.tableOps.offline(cloneTable, true) - } - - if (params.resume && exportFs.exists(distcpPath) && exportFs.getFileStatus(distcpPath).getLen > 0) { - logger.debug(s"Using existing export results $distcpPath") - } else { - if (exportFs.exists(tableExportPath)) { - logger.debug(s"Deleting existing export directory $tableExportPath") - exportFs.delete(tableExportPath, true) - } - logger.debug(s"Exporting table to $tableExportPath") - from.tableOps.exportTable(cloneTable, tableExportPath.toString) - - if (!exportFs.exists(distcpPath) || exportFs.getFileStatus(distcpPath).getLen == 0) { - throw new RuntimeException(s"Could not read table export results at $distcpPath") - } - } - - // ensures the destination table exists - logger.debug(s"Checking destination for table $fromTable") - to.ds.adapter.createTable(toIndex, partition, Seq.empty) - val toTable = try { toIndex.getTableName(partition) } catch { - case NonFatal(e) => throw new RuntimeException("Could not get destination table", e) - } - logger.debug(s"Destination table $toTable (${to.tableOps.tableIdMap().get(toTable)})") - - // create splits, do this separately in case the table already exists - val splits = new java.util.TreeSet(from.tableOps.listSplits(cloneTable)) - val existingSplits = to.tableOps.listSplits(toTable) - splits.removeAll(existingSplits) - if (!splits.isEmpty) { - if (!existingSplits.isEmpty) { - val warning = s"Detected split mismatch between source ($fromTable) and destination ($toTable) for $partitionLogId" - Command.user.warn(warning) - logger.warn(warning) - } - logger.debug(s"Adding splits to destination table $toTable") - to.tableOps.addSplits(toTable, splits) - } - - val hadCopyError = new AtomicBoolean(false) - // read the distcp.txt file produced by the table export - // consumer: (src, dest) => Unit - def distCpConsumer(threads: Int)(consumer: (Path, Path) => Unit): Unit = { - logger.debug(s"Reading $distcpPath") - WithClose(IOUtils.lineIterator(exportFs.open(distcpPath), StandardCharsets.UTF_8)) { files => - CachedThreadPool.executor(threads) { executor => - files.asScala.foreach { file => - val runnable: Runnable = () => { - val path = new Path(file) - val copy = new Path(copyToDir, path.getName) - try { - if (params.resume && exportFs.exists(copy) && - path.getFileSystem(from.conf).getFileStatus(path).getLen == exportFs.getFileStatus(copy).getLen) { - logger.debug(s"Using existing copy of $path at $copy") - } else { - consumer(path, copy) - } - } catch { - // catch Throwable so NoClassDefFound still gets logged - case e: Throwable => - hadCopyError.set(true) - Command.user.error(s"Failed to copy $path to $copy") - logger.error(s"Failed to copy $path to $copy", e) - } - } - executor.submit(runnable) - } - } - } - } - - if (params.distCp) { - var inputPath = distcpPath - if (params.resume) { - logger.debug(s"Checking copy status of files in $distcpPath") - inputPath = new Path(tableExportPath, "distcp-remaining.txt") - WithClose(exportFs.create(inputPath, true)) { out => - distCpConsumer(1) { (path, _) => - logger.debug(s"Adding $path to distcp") - out.writeUTF(s"$path\n") - } - } - } - new DistributedCopy(from.conf).copy(inputPath, copyToDir, TerminalCallback()) match { - case JobSuccess(message, counts) => - Command.user.info(message) - logger.debug(s"Distributed copy counters: ${counts.mkString("\n ", "\n ", "")}") - - case JobFailure(message) => - hadCopyError.set(true) - Command.user.error(message) - logger.error(message) - } - } else { - distCpConsumer(params.fileThreads) { (path, copy) => - logger.debug(s"Copying $path to $copy") - val fs = path.getFileSystem(from.conf) - if (!FileUtil.copy(fs, path, exportFs, copy, false, true, to.conf)) { - // consolidate error handling in the catch block - throw new IOException(s"Failed to copy $path to $copy, copy returned false") - } - } - } - if (hadCopyError.get) { - throw new RuntimeException("Error copying data files") - } - - logger.debug(s"Loading rfiles from $copyToDir to $toTable") - val importDir = to.tableOps.importDirectory(copyToDir.toString).to(toTable) - try { importDir.ignoreEmptyDir(true) } catch { - case _: NoSuchMethodError => // accumulo 2.0, ignore - } - try { importDir.load() } catch { - case e: IllegalArgumentException => logger.trace("Error importing directory:", e) // should mean empty dir - } - - // create marker indicating this copy was successful - logger.debug(s"Creating completion marker $completeMarker") - exportFs.create(completeMarker).close() - - // cleanup - logger.debug(s"Deleting export path $tableExportPath") - exportFs.delete(tableExportPath, true) - logger.debug(s"Deleting clone table $cloneTable") - from.tableOps.delete(cloneTable) - } - - override def close(): Unit = { - CloseWithLogging(tryFrom.toOption) - CloseWithLogging(tryTo.toOption) - } } +} - /** - * Holds state for a given Accumulo cluster - * - * @param instance instance name - * @param zk instance zookeepers - * @param user username - * @param credentials credentials - either Right(password) or Left(keytab) - * @param catalog catalog table - * @param configs additional hadoop configuration files - */ - private class Cluster( - instance: String, - zk: String, - user: String, - credentials: Either[String, String], - catalog: String, - configs: Option[java.util.List[File]] - ) extends Closeable { - - private val params = Map( - AccumuloDataStoreParams.InstanceNameParam.key -> instance, - AccumuloDataStoreParams.ZookeepersParam.key -> zk, - AccumuloDataStoreParams.UserParam.key -> user, - AccumuloDataStoreParams.PasswordParam.key -> credentials.right.getOrElse(null), - AccumuloDataStoreParams.KeytabPathParam.key -> credentials.left.getOrElse(null), - AccumuloDataStoreParams.CatalogParam.key -> catalog, - ) - - val conf = new Configuration() - configs.foreach { files => - files.asScala.foreach(f => conf.addResource(f.toURI.toURL)) - } - - val ds: AccumuloDataStore = - try { DataStoreFinder.getDataStore(params.asJava).asInstanceOf[AccumuloDataStore] } catch { - case NonFatal(e) => throw new ParameterException("Unable to load datastore:", e) - } - - if (ds == null) { - throw new ParameterException( - s"Unable to load datastore using provided values: ${params.map { case (k, v) => s"$k=$v" }.mkString(", ")}") - } - - val tableOps: TableOperations = ds.connector.tableOperations() - - override def close(): Unit = ds.dispose() - } +object AccumuloBulkCopyCommand extends LazyLogging { @Parameters(commandDescription = "Bulk copy RFiles to a different cluster") class AccumuloBulkCopyParams extends RequiredTypeNameParam { @@ -487,5 +131,4 @@ object AccumuloBulkCopyCommand extends LazyLogging { description = "Resume a previously interrupted run from where it left off") var resume: Boolean = false } - } diff --git a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloQueryAuditCommand.scala b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloQueryAuditCommand.scala index b0fc82645196..6f63e137ff67 100644 --- a/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloQueryAuditCommand.scala +++ b/geomesa-accumulo/geomesa-accumulo-tools/src/main/scala/org/locationtech/geomesa/accumulo/tools/stats/AccumuloQueryAuditCommand.scala @@ -73,7 +73,7 @@ class AccumuloQueryAuditCommand extends AccumuloDataStoreCommand { WithClose(new AccumuloAuditReader(ds)) { reader=> val writer = if ("json".equalsIgnoreCase(params.outputFormat)) { new JsonWriter() } else { new CsvWriter() } - writer.header().foreach(Command.output.info) + writer.header().foreach(h => Command.output.info(h)) dateRanges.foreach { case (start, end) => WithClose(reader.getQueryEvents(params.featureName, (start, end))) { events => val out = cql match { diff --git a/geomesa-arrow/geomesa-arrow-tools/src/main/scala/org/locationtech/geomesa/arrow/tools/status/ArrowDescribeSchemaCommand.scala b/geomesa-arrow/geomesa-arrow-tools/src/main/scala/org/locationtech/geomesa/arrow/tools/status/ArrowDescribeSchemaCommand.scala index dbf1409f0b05..74a6fd2cc68c 100644 --- a/geomesa-arrow/geomesa-arrow-tools/src/main/scala/org/locationtech/geomesa/arrow/tools/status/ArrowDescribeSchemaCommand.scala +++ b/geomesa-arrow/geomesa-arrow-tools/src/main/scala/org/locationtech/geomesa/arrow/tools/status/ArrowDescribeSchemaCommand.scala @@ -9,6 +9,7 @@ package org.locationtech.geomesa.arrow.tools.status import com.beust.jcommander.{Parameter, Parameters} +import com.typesafe.scalalogging.Logger import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.arrow.data.ArrowDataStore import org.locationtech.geomesa.arrow.tools.ArrowDataStoreCommand @@ -22,17 +23,17 @@ class ArrowDescribeSchemaCommand extends DescribeSchemaCommand[ArrowDataStore] w override protected def getSchema(ds: ArrowDataStore): SimpleFeatureType = ds.getSchema - override protected def describe(ds: ArrowDataStore, sft: SimpleFeatureType, output: String => Unit): Unit = { - super.describe(ds, sft, output) - output("") + override protected def describe(ds: ArrowDataStore, sft: SimpleFeatureType, logger: Logger): Unit = { + super.describe(ds, sft, logger) + logger.info("") val dictionaries = ds.dictionaries if (dictionaries.isEmpty) { - output("Dictionaries: none") + logger.info("Dictionaries: none") } else if (params.dictionaries) { - output("Dictionaries:") - dictionaries.foreach { case (field, dictionary) => output(s" $field: ${dictionary.iterator.mkString(", ")}") } + logger.info("Dictionaries:") + dictionaries.foreach { case (field, dictionary) => logger.info(s" $field: ${dictionary.iterator.mkString(", ")}") } } else { - output(s"Dictionaries: ${ds.dictionaries.keys.mkString(", ")}") + logger.info(s"Dictionaries: ${ds.dictionaries.keys.mkString(", ")}") } } } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsDescribeSchemaCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsDescribeSchemaCommand.scala index 52ec6c36a637..56822e82fc1e 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsDescribeSchemaCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsDescribeSchemaCommand.scala @@ -9,6 +9,7 @@ package org.locationtech.geomesa.fs.tools.status import com.beust.jcommander.Parameters +import com.typesafe.scalalogging.Logger import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.fs.data.FileSystemDataStore import org.locationtech.geomesa.fs.tools.FsDataStoreCommand @@ -20,12 +21,12 @@ import org.locationtech.geomesa.tools.status.DescribeSchemaCommand class FsDescribeSchemaCommand extends DescribeSchemaCommand[FileSystemDataStore] with FsDataStoreCommand { override val params = new FsDescribeSchemaParams - override protected def describe(ds: FileSystemDataStore, sft: SimpleFeatureType, output: String => Unit): Unit = { - super.describe(ds, sft, output) + override protected def describe(ds: FileSystemDataStore, sft: SimpleFeatureType, logger: Logger): Unit = { + super.describe(ds, sft, logger) val metadata = ds.storage(sft.getTypeName).metadata - output(s"\nPartition scheme | ${metadata.scheme.pattern}") - output(s"File encoding | ${metadata.encoding}") - output(s"Leaf storage | ${metadata.leafStorage}") + logger.info(s"\nPartition scheme | ${metadata.scheme.pattern}") + logger.info(s"File encoding | ${metadata.encoding}") + logger.info(s"Leaf storage | ${metadata.leafStorage}") } } diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsGetPartitionsCommand.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsGetPartitionsCommand.scala index d4465e74d1bd..e6815e852c4b 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsGetPartitionsCommand.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/status/FsGetPartitionsCommand.scala @@ -21,7 +21,7 @@ class FsGetPartitionsCommand extends FsDataStoreCommand { override def execute(): Unit = withDataStore { ds => Command.user.info(s"Partitions for type ${params.featureName}:") - ds.storage(params.featureName).metadata.getPartitions().map(_.name).sorted.foreach(Command.output.info) + ds.storage(params.featureName).metadata.getPartitions().map(_.name).sorted.foreach(p => Command.output.info(p)) } } diff --git a/geomesa-jobs/pom.xml b/geomesa-jobs/pom.xml index caa33311e58e..77a9533d0ea3 100644 --- a/geomesa-jobs/pom.xml +++ b/geomesa-jobs/pom.xml @@ -18,15 +18,15 @@ org.locationtech.geomesa - geomesa-index-api_2.12 + geomesa-index-api_${scala.binary.version} org.locationtech.geomesa - geomesa-feature-all_2.12 + geomesa-feature-all_${scala.binary.version} org.locationtech.geomesa - geomesa-convert-all_2.12 + geomesa-convert-all_${scala.binary.version} org.apache.hadoop diff --git a/geomesa-tools/common-env/log4j.properties b/geomesa-tools/common-env/log4j.properties index fb0f40175b90..1ffd26f9358c 100644 --- a/geomesa-tools/common-env/log4j.properties +++ b/geomesa-tools/common-env/log4j.properties @@ -13,6 +13,9 @@ log4j.logger.org.locationtech.geomesa.tools.output=info,output # un-comment to enable audit logging #log4j.logger.org.locationtech.geomesa.index.audit=debug +# schema copier log config - debug to file, info to user+file +log4j.logger.org.locationtech.geomesa.accumulo.util.SchemaCopier=debug,user,file + # various libraries we want to lower the volume on log4j.logger.hsqldb.db=warn log4j.logger.org.apache.curator=warn @@ -26,12 +29,14 @@ log4j.appender.user=org.apache.log4j.ConsoleAppender log4j.appender.user.layout=org.apache.log4j.PatternLayout log4j.appender.user.layout.ConversionPattern=%-5p %m%n log4j.appender.user.Target=System.err +log4j.appender.user.Threshold=INFO # log to stdout for program output log4j.appender.output=org.apache.log4j.ConsoleAppender log4j.appender.output.layout=org.apache.log4j.PatternLayout log4j.appender.output.layout.ConversionPattern=%m%n log4j.appender.output.Target=System.out +log4j.appender.output.Threshold=INFO # file logging log4j.appender.file=org.apache.log4j.RollingFileAppender diff --git a/geomesa-tools/pom.xml b/geomesa-tools/pom.xml index aedffa4bf4e0..d7c0b6e5eaf2 100644 --- a/geomesa-tools/pom.xml +++ b/geomesa-tools/pom.xml @@ -28,6 +28,10 @@ org.locationtech.geomesa geomesa-utils_${scala.binary.version} + + org.locationtech.geomesa + geomesa-hadoop-utils_${scala.binary.version} + org.locationtech.geomesa geomesa-feature-exporters_${scala.binary.version} @@ -58,7 +62,7 @@ org.locationtech.geomesa - geomesa-filter_2.12 + geomesa-filter_${scala.binary.version} org.apache.avro diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/package.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/package.scala index 131c64fd699d..2858fe267a36 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/package.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/package.scala @@ -8,13 +8,13 @@ package org.locationtech.geomesa -package object tools { +import com.beust.jcommander.ParameterException +import com.typesafe.scalalogging.Logger +import org.geotools.api.data.{DataStore, DataStoreFinder} +import org.locationtech.geomesa.tools.utils.Prompt +import org.locationtech.geomesa.utils.classpath.ClassPathUtils - import com.beust.jcommander.ParameterException - import org.geotools.api.data.{DataStore, DataStoreFinder} - import org.locationtech.geomesa.tools.utils.Prompt - import org.locationtech.geomesa.utils.classpath.ClassPathUtils - import org.slf4j.{Logger, LoggerFactory} +package object tools { import java.io.File import scala.collection.JavaConverters._ @@ -42,9 +42,9 @@ package object tools { object Command { // send messages to the user - status, errors, etc - val user: Logger = LoggerFactory.getLogger("org.locationtech.geomesa.tools.user") + val user: Logger = Logger("org.locationtech.geomesa.tools.user") // send output from a command - val output: Logger = LoggerFactory.getLogger("org.locationtech.geomesa.tools.output") + val output: Logger = Logger("org.locationtech.geomesa.tools.output") /** * Exception used to indicate a failure in the command run, without printing a stack trace diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/DescribeSchemaCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/DescribeSchemaCommand.scala index 96a5581acafb..f254532f1a9a 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/DescribeSchemaCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/DescribeSchemaCommand.scala @@ -9,6 +9,7 @@ package org.locationtech.geomesa.tools.status import com.beust.jcommander.ParameterException +import com.typesafe.scalalogging.Logger import org.geotools.api.data.DataStore import org.geotools.api.feature.simple.SimpleFeatureType import org.locationtech.geomesa.index.geotools.GeoMesaDataStore @@ -36,14 +37,14 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] { throw new ParameterException(msg) } Command.user.info(s"Describing attributes of feature '${sft.getTypeName}'") - describe(ds, sft, Command.output.info) + describe(ds, sft, Command.output) } protected def getSchema(ds: DS): SimpleFeatureType = params match { case p: TypeNameParam => ds.getSchema(p.featureName) } - protected def describe(ds: DS, sft: SimpleFeatureType, output: String => Unit): Unit = { + protected def describe(ds: DS, sft: SimpleFeatureType, logger: Logger): Unit = { import org.locationtech.geomesa.utils.geotools.RichSimpleFeatureType._ val indices = ds match { @@ -78,12 +79,12 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] { val maxName = namesAndDescriptions.map(_._1.length).max val maxType = namesAndDescriptions.map(_._2.length).max namesAndDescriptions.foreach { case (n, t, d) => - output(s"${n.padTo(maxName, ' ')} | ${t.padTo(maxType, ' ')} $d") + logger.info(s"${n.padTo(maxName, ' ')} | ${t.padTo(maxType, ' ')} $d") } val userData = sft.getUserData if (!userData.isEmpty) { - output("\nUser data:") + logger.info("\nUser data:") val namesAndValues = userData.asScala.toSeq.map { case (k, v) => if (k == SimpleFeatureTypes.Configs.Keywords) { (SimpleFeatureTypes.Configs.Keywords, sft.getKeywords.mkString("'", "', '", "'")) @@ -93,7 +94,7 @@ trait DescribeSchemaCommand[DS <: DataStore] extends DataStoreCommand[DS] { } val maxName = namesAndValues.map(_._1.length).max namesAndValues.sortBy(_._1).foreach { case (n, v) => - output(s" ${n.padTo(maxName, ' ')} | $v") + logger.info(s" ${n.padTo(maxName, ' ')} | $v") } } } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/EnvironmentCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/EnvironmentCommand.scala index bc8f792beb0c..de6881f50a6c 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/EnvironmentCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/EnvironmentCommand.scala @@ -67,7 +67,7 @@ class EnvironmentCommand extends Command { case "spec" => (sft: SimpleFeatureType) => s"${SimpleFeatureTypes.encodeType(sft, !params.excludeUserData)}" } - filtered.sortBy(_.getTypeName).map(s => s"${s.getTypeName} = ${formatFn(s)}").foreach(Command.output.info) + filtered.sortBy(_.getTypeName).map(s => s"${s.getTypeName} = ${formatFn(s)}").foreach(m => Command.output.info(m)) } else { throw new ParameterException(s"Unknown format '${params.format}'. Valid values are 'typesafe' or 'spec'") } @@ -84,19 +84,19 @@ class EnvironmentCommand extends Command { val options = ConfigRenderOptions.defaults().setJson(false).setOriginComments(false) def render(c: Config) = c.root().render(options) val strings = filtered.map { case (cname, conf)=> s"converter-name=$cname\n${render(conf)}\n" } - strings.toArray.sorted.foreach(Command.output.info) + strings.toArray.sorted.foreach(m => Command.output.info(m)) } } def listSftsNames(): Unit = { Command.output.info("Simple Feature Types:") val all = SimpleFeatureTypeLoader.sfts - all.sortBy(_.getTypeName).map(s => s"${s.getTypeName}").foreach(Command.output.info) + all.sortBy(_.getTypeName).map(s => s"${s.getTypeName}").foreach(m => Command.output.info(m)) } def listConverterNames(): Unit = { Command.output.info("Simple Feature Type Converters:") val all = ConverterConfigLoader.confs - all.map { case (cname, conf) => s"$cname"}.toArray.sorted.foreach(Command.output.info) + all.map { case (cname, conf) => s"$cname"}.toArray.sorted.foreach(m => Command.output.info(m)) } } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/GetTypeNamesCommand.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/GetTypeNamesCommand.scala index f142b2131a36..f0c5868de906 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/GetTypeNamesCommand.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/status/GetTypeNamesCommand.scala @@ -17,6 +17,6 @@ trait GetTypeNamesCommand[DS <: DataStore] extends DataStoreCommand[DS] { override def execute(): Unit = { Command.output.info("Current feature types:") - withDataStore(_.getTypeNames.foreach(Command.output.info)) + withDataStore(_.getTypeNames.foreach(m => Command.output.info(m))) } } diff --git a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/DistributedCopy.scala b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/DistributedCopy.scala index 80b49e719714..f6f8a8a60821 100644 --- a/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/DistributedCopy.scala +++ b/geomesa-tools/src/main/scala/org/locationtech/geomesa/tools/utils/DistributedCopy.scala @@ -14,6 +14,7 @@ import org.apache.hadoop.tools.{DistCp, DistCpOptions} import org.locationtech.geomesa.jobs.JobResult.JobSuccess import org.locationtech.geomesa.jobs.{JobResult, StatusCallback} import org.locationtech.geomesa.tools.Command +import org.locationtech.geomesa.utils.hadoop.DistributedCopyOptions /** * Executes a hadoop distcp @@ -31,7 +32,7 @@ class DistributedCopy(conf: Configuration = new Configuration()) { * @return */ def copy(sourceFileList: Path, dest: Path, statusCallback: StatusCallback): JobResult = - copy(DistributedCopy.distCpOptions(Right(sourceFileList), dest), statusCallback) + copy(DistributedCopyOptions(sourceFileList, dest), statusCallback) /** * Execute the job @@ -42,7 +43,7 @@ class DistributedCopy(conf: Configuration = new Configuration()) { * @return */ def copy(sources: Seq[Path], dest: Path, statusCallback: StatusCallback): JobResult = - copy(DistributedCopy.distCpOptions(Left(sources), dest), statusCallback) + copy(DistributedCopyOptions(sources, dest), statusCallback) /** * Executes the job @@ -63,34 +64,3 @@ class DistributedCopy(conf: Configuration = new Configuration()) { } } } - -object DistributedCopy { - - import scala.collection.JavaConverters._ - - private def distCpOptions(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = - try { distCpOptions3(sources, dest) } catch { case _: ClassNotFoundException => distCpOptions2(sources, dest) } - - // hadoop 3 API - private def distCpOptions3(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = { - val builder = sources match { - case Right(file) => new DistCpOptions.Builder(file, dest) - case Left(dirs) => new DistCpOptions.Builder(dirs.asJava, dest) - } - builder.withAppend(false).withOverwrite(true).withBlocking(false).withCopyStrategy("dynamic").build() - } - - // hadoop 2 API - private def distCpOptions2(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = { - val opts = sources match { - case Right(file) => - classOf[DistCpOptions].getConstructor(classOf[Path], classOf[Path]).newInstance(file, dest) - case Left(dirs) => - classOf[DistCpOptions].getConstructor(classOf[java.util.List[Path]], classOf[Path]).newInstance(dirs.asJava, dest) - } - classOf[DistCpOptions].getMethod("setAppend", classOf[Boolean]).invoke(opts, java.lang.Boolean.FALSE) - classOf[DistCpOptions].getMethod("setOverwrite", classOf[Boolean]).invoke(opts, java.lang.Boolean.TRUE) - classOf[DistCpOptions].getMethod("setCopyStrategy", classOf[String]).invoke(opts, "dynamic") - opts - } -} diff --git a/geomesa-utils-parent/geomesa-hadoop-utils/pom.xml b/geomesa-utils-parent/geomesa-hadoop-utils/pom.xml index a66c664f2429..ac48c5424c7e 100644 --- a/geomesa-utils-parent/geomesa-hadoop-utils/pom.xml +++ b/geomesa-utils-parent/geomesa-hadoop-utils/pom.xml @@ -24,6 +24,10 @@ org.apache.hadoop hadoop-common + + org.apache.hadoop + hadoop-distcp + diff --git a/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/DistributedCopyOptions.scala b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/DistributedCopyOptions.scala new file mode 100644 index 000000000000..120185dd67dd --- /dev/null +++ b/geomesa-utils-parent/geomesa-hadoop-utils/src/main/scala/org/locationtech/geomesa/utils/hadoop/DistributedCopyOptions.scala @@ -0,0 +1,59 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.utils.hadoop + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.tools.DistCpOptions + +object DistributedCopyOptions { + + import scala.collection.JavaConverters._ + + /** + * Create the options + * + * @param sources source files to copy + * @param dest destination + * @return + */ + def apply(sources: Seq[Path], dest: Path): DistCpOptions = + try { distCpOptions3(Left(sources), dest) } catch { case _: ClassNotFoundException => distCpOptions2(Left(sources), dest) } + + /** + * Create the options + * + * @param sourceFileList file containing list of sources to copy + * @param dest destination + * @return + */ + def apply(sourceFileList: Path, dest: Path): DistCpOptions = + try { distCpOptions3(Right(sourceFileList), dest) } catch { case _: ClassNotFoundException => distCpOptions2(Right(sourceFileList), dest) } + + // hadoop 3 API + private def distCpOptions3(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = { + val builder = sources match { + case Right(file) => new DistCpOptions.Builder(file, dest) + case Left(dirs) => new DistCpOptions.Builder(dirs.asJava, dest) + } + builder.withAppend(false).withOverwrite(true).withBlocking(false).withCopyStrategy("dynamic").build() + } + + // hadoop 2 API + private def distCpOptions2(sources: Either[Seq[Path], Path], dest: Path): DistCpOptions = { + val clas = classOf[DistCpOptions] + val opts = sources match { + case Right(file) => clas.getConstructor(classOf[Path], classOf[Path]).newInstance(file, dest) + case Left(dirs) => clas.getConstructor(classOf[java.util.List[Path]], classOf[Path]).newInstance(dirs.asJava, dest) + } + clas.getMethod("setAppend", classOf[Boolean]).invoke(opts, java.lang.Boolean.FALSE) + clas.getMethod("setOverwrite", classOf[Boolean]).invoke(opts, java.lang.Boolean.TRUE) + clas.getMethod("setCopyStrategy", classOf[String]).invoke(opts, "dynamic") + opts + } +}