diff --git a/docs/user/filesystem/index_config.rst b/docs/user/filesystem/index_config.rst index dfedf470f1d1..ae0b135055e4 100644 --- a/docs/user/filesystem/index_config.rst +++ b/docs/user/filesystem/index_config.rst @@ -250,3 +250,25 @@ Observers can be specified through the user data key ``geomesa.fs.observers``: sft.setObservers(factories) // or set directly in the user data as a comma-delimited string sft.getUserData.put("geomesa.fs.observers", factories.mkString(",")) + +Configuring Path Filters +------------------------ + +.. note:: + + Path filtering is supported for ``converter`` encoding only. + +The FSDS can filter paths within a partition for more granular control of queries. Path filtering is configured +through the user data key ``geomesa.fs.path-filter.name``. + +Currently, the only implementation is the ``dtg`` path filter, whose purpose is to parse a datetime from the given +path and compare it to the query filter to include or exclude the file from the query. The following options are +required for the ``dtg`` path filter, configured through the key ``geomesa.fs.path-filter.opts``: + +* ``attribute`` - The ``Date`` attribute in the query to compare against. +* ``pattern`` - The regular expression, with a single capturing group, to extract a datetime string from the path. +* ``format`` - The datetime formatting pattern to parse a date from the regex capture. +* ``buffer`` - The duration to buffer the bounds of the parsed datetime by within the current partition. To buffer time + across partitions, see the ``receipt-time`` partition scheme. + +Custom path filters can be loaded via SPI. diff --git a/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala b/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala index 0f9829466415..0ef139cbb716 100644 --- a/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala +++ b/geomesa-filter/src/main/scala/org/locationtech/geomesa/filter/Bounds.scala @@ -119,6 +119,7 @@ object Bounds { object Bound { private val unboundedBound = Bound[Any](None, inclusive = false) def unbounded[T]: Bound[T] = unboundedBound.asInstanceOf[Bound[T]] + def inclusive[T](value: T): Bound[T] = Bound(Option(value), inclusive = true) } private val allValues = Bounds(Bound.unbounded, Bound.unbounded) diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory new file mode 100644 index 000000000000..f10142b8bf99 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/resources/META-INF/services/org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFilteringFactory @@ -0,0 +1 @@ +org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering$DtgPathFilteringFactory diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala index 6db37de760bc..1edb486d07d7 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemReader.scala @@ -10,13 +10,14 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.StrictLogging import org.apache.commons.compress.archivers.ArchiveStreamFactory -import org.apache.hadoop.fs.{FileContext, FileSystem, Path} +import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.geotools.api.feature.simple.{SimpleFeature, SimpleFeatureType} import org.geotools.api.filter.Filter import org.locationtech.geomesa.convert.EvaluationContext import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.features.{ScalaSimpleFeature, TransformSimpleFeature} import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader +import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering import org.locationtech.geomesa.utils.collection.CloseableIterator import org.locationtech.geomesa.utils.hadoop.HadoopDelegate.{HadoopFileHandle, HadoopTarHandle, HadoopZipHandle} import org.locationtech.geomesa.utils.io.PathUtils @@ -28,27 +29,34 @@ class ConverterFileSystemReader( fs: FileSystem, converter: SimpleFeatureConverter, filter: Option[Filter], - transform: Option[(String, SimpleFeatureType)] + transform: Option[(String, SimpleFeatureType)], + pathFiltering: Option[PathFiltering] ) extends FileSystemPathReader with StrictLogging { import ArchiveStreamFactory.{JAR, TAR, ZIP} + private lazy val pathFilter: Option[PathFilter] = pathFiltering.flatMap(pf => filter.map(pf.apply)) + override def read(path: Path): CloseableIterator[SimpleFeature] = { - logger.debug(s"Opening file $path") - val iter = try { - val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match { - case TAR => new HadoopTarHandle(fs, path) - case ZIP | JAR => new HadoopZipHandle(fs, path) - case _ => new HadoopFileHandle(fs, path) - } - handle.open.flatMap { case (name, is) => - val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path)) - converter.process(is, converter.createEvaluationContext(params)) + if (pathFilter.forall(_.accept(path))) { + logger.debug(s"Opening file $path") + val iter = try { + val handle = PathUtils.getUncompressedExtension(path.getName).toLowerCase(Locale.US) match { + case TAR => new HadoopTarHandle(fs, path) + case ZIP | JAR => new HadoopZipHandle(fs, path) + case _ => new HadoopFileHandle(fs, path) + } + handle.open.flatMap { case (name, is) => + val params = EvaluationContext.inputFileParam(name.getOrElse(handle.path)) + converter.process(is, converter.createEvaluationContext(params)) + } + } catch { + case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty } - } catch { - case NonFatal(e) => logger.error(s"Error processing uri '$path'", e); CloseableIterator.empty + transformed(filtered(iter)) + } else { + CloseableIterator.empty } - transformed(filtered(iter)) } private def filtered(in: CloseableIterator[SimpleFeature]): CloseableIterator[SimpleFeature] = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala index 8a4503fb2b80..f507bc019519 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorage.scala @@ -19,8 +19,12 @@ import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage import org.locationtech.geomesa.fs.storage.common.AbstractFileSystemStorage.FileSystemPathReader import org.locationtech.geomesa.fs.storage.common.observer.FileSystemObserver import org.locationtech.geomesa.fs.storage.common.utils.PathCache +import org.locationtech.geomesa.fs.storage.converter.pathfilter.PathFiltering -class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, converter: SimpleFeatureConverter) +class ConverterStorage(context: FileSystemContext, + metadata: StorageMetadata, + converter: SimpleFeatureConverter, + pathFiltering: Option[PathFiltering]) extends AbstractFileSystemStorage(context, metadata, "") { // TODO close converter... @@ -36,7 +40,7 @@ class ConverterStorage(context: FileSystemContext, metadata: StorageMetadata, co override protected def createReader( filter: Option[Filter], transform: Option[(String, SimpleFeatureType)]): FileSystemPathReader = { - new ConverterFileSystemReader(context.fs, converter, filter, transform) + new ConverterFileSystemReader(context.fs, converter, filter, transform, pathFiltering) } override def getFilePaths(partition: String): Seq[StorageFilePath] = { diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala index 8b2e8433de94..13007a3533d9 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/ConverterStorageFactory.scala @@ -12,10 +12,15 @@ import com.typesafe.scalalogging.LazyLogging import org.locationtech.geomesa.convert.{ConfArgs, ConverterConfigResolver} import org.locationtech.geomesa.convert2.SimpleFeatureConverter import org.locationtech.geomesa.fs.storage.api._ -import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory.{ConverterConfigParam, ConverterNameParam} +import org.locationtech.geomesa.fs.storage.converter.ConverterStorageFactory._ +import org.locationtech.geomesa.fs.storage.converter.pathfilter.{PathFiltering, PathFilteringFactory} + +import java.util.regex.Pattern class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging { + import scala.collection.JavaConverters._ + override val encoding: String = "converter" override def apply(context: FileSystemContext, metadata: StorageMetadata): FileSystemStorage = { @@ -29,9 +34,22 @@ class ConverterStorageFactory extends FileSystemStorageFactory with LazyLogging } SimpleFeatureConverter(metadata.sft, converterConfig) } - new ConverterStorage(context, metadata, converter) - } + val pathFilteringOpts = + context.conf.getValByRegex(Pattern.quote(PathFilterOptsPrefix) + ".*").asScala.map { + case (k, v) => k.substring(PathFilterOptsPrefix.length) -> v + } + + val pathFiltering = Option(context.conf.get(PathFilterName)).flatMap { name => + val factory = PathFilteringFactory.load(NamedOptions(name, pathFilteringOpts.toMap)) + if (factory.isEmpty) { + throw new IllegalArgumentException(s"Failed to load ${classOf[PathFiltering].getName} for config '$name'") + } + factory + } + + new ConverterStorage(context, metadata, converter, pathFiltering) + } } object ConverterStorageFactory { @@ -43,4 +61,6 @@ object ConverterStorageFactory { val LeafStorageParam = "fs.options.leaf-storage" val PartitionSchemeParam = "fs.partition-scheme.name" val PartitionOptsPrefix = "fs.partition-scheme.opts." + val PathFilterName = "fs.path-filter.name" + val PathFilterOptsPrefix = "fs.path-filter.opts." } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala new file mode 100644 index 000000000000..362e1fc5d784 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFiltering.scala @@ -0,0 +1,101 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.converter.pathfilter + +import com.typesafe.scalalogging.LazyLogging +import org.apache.hadoop.fs.PathFilter +import org.geotools.api.filter.Filter +import org.locationtech.geomesa.filter.Bounds.Bound +import org.locationtech.geomesa.filter.{Bounds, FilterHelper} +import org.locationtech.geomesa.fs.storage.api.NamedOptions + +import java.time.format.DateTimeFormatter +import java.time.{ZoneOffset, ZonedDateTime} +import java.util.regex.Pattern +import scala.concurrent.duration.Duration +import scala.util.control.NonFatal + +class DtgPathFiltering(attribute: String, pattern: Pattern, format: DateTimeFormatter, buffer: Duration) + extends PathFiltering with LazyLogging { + + def apply(filter: Filter): PathFilter = { + val filterIntervals = FilterHelper.extractIntervals(filter, attribute, handleExclusiveBounds = true) + path => try { + val time = parseDtg(path.getName).toInstant + val millis = buffer.toMillis + val lower = ZonedDateTime.ofInstant(time.minusMillis(millis), ZoneOffset.UTC) + val upper = ZonedDateTime.ofInstant(time.plusMillis(millis), ZoneOffset.UTC) + val buffered = Bounds(Bound.inclusive(lower), Bound.inclusive(upper)) + val included = filterIntervals.exists(bounds => bounds.intersects(buffered)) + logger.whenDebugEnabled { + if (included) { + logger.debug(s"Including path ${path.getName} for filter $filter") + } else { + logger.debug(s"Excluding path ${path.getName} for filter $filter") + } + } + included + } catch { + case NonFatal(ex) => + logger.warn(s"Failed to evaluate filter for path '${path.getName}'", ex) + true + } + } + + private def parseDtg(name: String): ZonedDateTime = { + Option(name) + .map(pattern.matcher) + .filter(_.matches) + .filter(_.groupCount > 0) + .map(_.group(1)) + .map(ZonedDateTime.parse(_, format)) + .getOrElse { + throw new IllegalArgumentException(s"Failed to parse ${classOf[ZonedDateTime].getName} " + + s"from file name '$name' for pattern '$pattern' and format '$format'") + } + } + + override def toString: String = { + s"${this.getClass.getName}(attribute = $attribute, pattern = $pattern, format = $format, buffer = $buffer)" + } +} + +object DtgPathFiltering extends LazyLogging { + + val Name = "dtg" + + object Config { + val Attribute = "attribute" + val Pattern = "pattern" + val Format = "format" + val Buffer = "buffer" + } + + class DtgPathFilteringFactory extends PathFilteringFactory { + override def load(config: NamedOptions): Option[PathFiltering] = { + if (config.name != Name) { None } else { + val attribute = config.options.getOrElse(Config.Attribute, null) + require(attribute != null, s"$Name path filter requires a dtg attribute config '${Config.Attribute}'") + val patternConfig = config.options.getOrElse(Config.Pattern, null) + require(patternConfig != null, s"$Name path filter requires a dtg pattern config '${Config.Pattern}'") + val formatConfig = config.options.getOrElse(Config.Format, null) + require(formatConfig != null, s"$Name path filter requires a dtg format config '${Config.Format}'") + val bufferConfig = config.options.getOrElse(Config.Buffer, null) + require(bufferConfig != null, s"$Name path filter requires a buffer duration config '${Config.Buffer}'") + + val pattern = Pattern.compile(patternConfig) + val format = DateTimeFormatter.ofPattern(formatConfig).withZone(ZoneOffset.UTC) + val buffer = Duration.apply(bufferConfig) + val pathFiltering = new DtgPathFiltering(attribute, pattern, format, buffer) + logger.info(s"Loaded PathFiltering: $pathFiltering") + Some(pathFiltering) + } + } + } +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala new file mode 100644 index 000000000000..ea0bdfb25985 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFiltering.scala @@ -0,0 +1,16 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.converter.pathfilter + +import org.apache.hadoop.fs.PathFilter +import org.geotools.api.filter.Filter + +trait PathFiltering { + def apply(filter: Filter): PathFilter +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala new file mode 100644 index 000000000000..1073b07d6fed --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/main/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/PathFilteringFactory.scala @@ -0,0 +1,26 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.converter.pathfilter + +import org.locationtech.geomesa.fs.storage.api.NamedOptions + +import java.util.ServiceLoader + +trait PathFilteringFactory { + def load(config: NamedOptions): Option[PathFiltering] +} + +object PathFilteringFactory { + + import scala.collection.JavaConverters._ + + private lazy val factories = ServiceLoader.load(classOf[PathFilteringFactory]).asScala.toSeq + + def load(config: NamedOptions): Option[PathFiltering] = factories.toStream.flatMap(_.load(config)).headOption +} diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test/2023/01/18/example.tgz b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-1/2023/01/18/example.tgz similarity index 100% rename from geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test/2023/01/18/example.tgz rename to geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-1/2023/01/18/example.tgz diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv new file mode 100644 index 000000000000..f3411ac2e53c --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412110600.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +0,Harry,17,2024-12-11T01:00:00.000Z,0.0,0.0 +1,Hermione,18,2024-12-11T11:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv new file mode 100644 index 000000000000..355ab2b8c9d7 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111200.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +2,Ronald,17,2024-12-11T07:00:00.000Z,0.0,0.0 +3,Draco,18,2024-12-11T11:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv new file mode 100644 index 000000000000..6dff3517aac9 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412111800.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +4,Neville,17,2024-12-11T13:00:00.000Z,0.0,0.0 +5,Rubeus,43,2024-12-11T08:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv new file mode 100644 index 000000000000..f4109e763168 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/11/data-202412112330.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +6,Severus,52,2024-12-11T19:00:00.000Z,0.0,0.0 +7,Alfred,78,2024-12-11T23:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv new file mode 100644 index 000000000000..314fc56bb7d3 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120100.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +8,Dean,17,2024-12-11T23:50:00.000Z,0.0,0.0 +9,Minerva,57,2024-12-12T00:30:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv new file mode 100644 index 000000000000..4d1e2bd46ba7 --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/resources/example-convert-test-2/2024/12/12/data-202412120600.csv @@ -0,0 +1,3 @@ +ID,Name,Age,Dtg,Lon,Dat +10,Luna,17,2024-12-12T03:00:00.000Z,0.0,0.0 +11,Dudley,19,2024-12-12T05:00:00.000Z,0.0,0.0 \ No newline at end of file diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala index 5d6c2eb6683b..a982c8f1289f 100644 --- a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/ConverterFileSystemStorageTest.scala @@ -10,7 +10,7 @@ package org.locationtech.geomesa.fs.storage.converter import com.typesafe.scalalogging.LazyLogging import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.geotools.api.data.Query import org.geotools.filter.text.ecql.ECQL import org.junit.runner.RunWith @@ -23,40 +23,43 @@ import org.specs2.runner.JUnitRunner @RunWith(classOf[JUnitRunner]) class ConverterFileSystemStorageTest extends Specification with LazyLogging { + private val sftConfig = + """geomesa.sfts.example = { + | attributes = [ + | { name = "name", type = "String" } + | { name = "age", type = "Int" } + | { name = "dtg", type = "Date", default = true } + | { name = "geom", type = "Point", srid = 4326, default = true } + | ] + |}""".stripMargin + private val converterConfig = + """geomesa.converters.example = { + | type = "delimited-text" + | format = "CSV" + | options { + | skip-lines = 1 + | } + | id-field = "toString($fid)", + | fields = [ + | { name = "fid", transform = "$1::int" } + | { name = "name", transform = "$2::string" } + | { name = "age", transform = "$3::int" } + | { name = "dtg", transform = "datetime($4)" } + | { name = "lon", transform = "$5::double" } + | { name = "lat", transform = "$6::double" } + | { name = "geom", transform = "point($lon, $lat)" } + | ] + |}""".stripMargin + "ConverterFileSystemStorage" should { "read features in compressed tar.gz files" in { - val dir = Option(getClass.getClassLoader.getResource("example-convert-test")).map(_.toURI).orNull + val dir = Option(getClass.getClassLoader.getResource("example-convert-test-1")).map(_.toURI).orNull dir must not(beNull) val conf = new Configuration() - conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test") - conf.set(ConverterStorageFactory.SftConfigParam, - """geomesa.sfts.example = { - | attributes = [ - | { name = "name", type = "String" } - | { name = "age", type = "Int" } - | { name = "dtg", type = "Date", default = true } - | { name = "geom", type = "Point", srid = 4326, default = true } - | ] - |}""".stripMargin) - conf.set(ConverterStorageFactory.ConverterConfigParam, - """geomesa.converters.example = { - | type = "delimited-text" - | format = "CSV" - | options { - | skip-lines = 1 - | } - | id-field = "toString($fid)", - | fields = [ - | { name = "fid", transform = "$1::int" } - | { name = "name", transform = "$2::string" } - | { name = "age", transform = "$3::int" } - | { name = "dtg", transform = "datetime($4)" } - | { name = "lon", transform = "$5::double" } - | { name = "lat", transform = "$6::double" } - | { name = "geom", transform = "point($lon, $lat)" } - | ] - |}""".stripMargin) + conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test-1") + conf.set(ConverterStorageFactory.SftConfigParam, sftConfig) + conf.set(ConverterStorageFactory.ConverterConfigParam, converterConfig) conf.set(ConverterStorageFactory.PartitionSchemeParam, "daily") conf.set(ConverterStorageFactory.LeafStorageParam, "false") @@ -74,5 +77,45 @@ class ConverterFileSystemStorageTest extends Specification with LazyLogging { features must haveLength(6) } + + "filter file paths by dtg" in { + val dir = Option(getClass.getClassLoader.getResource("example-convert-test-2")).map(_.toURI).orNull + dir must not(beNull) + + val conf = new Configuration() + conf.set(ConverterStorageFactory.ConverterPathParam, "example-convert-test-2") + conf.set(ConverterStorageFactory.SftConfigParam, sftConfig) + conf.set(ConverterStorageFactory.ConverterConfigParam, converterConfig) + conf.set(ConverterStorageFactory.PartitionSchemeParam, "receipt-time") + conf.set(ConverterStorageFactory.PartitionOptsPrefix + "datetime-scheme", "daily") + conf.set(ConverterStorageFactory.PartitionOptsPrefix + "buffer", "10 minutes") + conf.set(ConverterStorageFactory.LeafStorageParam, "false") + conf.set(ConverterStorageFactory.PathFilterName, "dtg") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "attribute", "dtg") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "pattern", "^data-(.*)\\.csv$") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "format", "yyyyMMddHHmm") + conf.set(ConverterStorageFactory.PathFilterOptsPrefix + "buffer", "2 hours") + + val context = FileSystemContext(new Path(dir), conf) + val metadata = StorageMetadataFactory.load(context).orNull + metadata must not(beNull) + metadata must haveClass[ConverterMetadata] + val storage = FileSystemStorageFactory(context, metadata) + + val filterText = "dtg DURING 2024-12-11T10:00:00Z/2024-12-11T23:55:00Z " + + "OR dtg = 2024-12-11T10:00:00Z OR dtg = 2024-12-11T23:55:00Z" + val query = new Query(metadata.sft.getTypeName, ECQL.toFilter(filterText)) + val features = { + val iter = SelfClosingIterator(storage.getReader(query)) + // note: need to copy features in iterator as same object is re-used + iter.map(ScalaSimpleFeature.copy).toList + } + + // id 1 is excluded because of the path dtg filter even though dtg is within filter bounds + // id 5 is excluded because dtg is outside filter bounds even though included by path filter + // id 8 is included because within partition scheme buffer and path filter buffer + features must haveLength(5) + features.map(_.getID) must containTheSameElementsAs(Seq("3", "4", "6", "7", "8")) + } } } diff --git a/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala new file mode 100644 index 000000000000..77db35056d9e --- /dev/null +++ b/geomesa-fs/geomesa-fs-storage/geomesa-fs-storage-convert/src/test/scala/org/locationtech/geomesa/fs/storage/converter/pathfilter/DtgPathFilteringTest.scala @@ -0,0 +1,55 @@ +/*********************************************************************** + * Copyright (c) 2013-2024 Commonwealth Computer Research, Inc. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Apache License, Version 2.0 + * which accompanies this distribution and is available at + * http://www.opensource.org/licenses/apache2.0.php. + ***********************************************************************/ + +package org.locationtech.geomesa.fs.storage.converter.pathfilter + +import org.apache.hadoop.fs.Path +import org.geotools.filter.text.ecql.ECQL +import org.junit.runner.RunWith +import org.locationtech.geomesa.fs.storage.api.NamedOptions +import org.specs2.mutable.Specification +import org.specs2.runner.JUnitRunner + +@RunWith(classOf[JUnitRunner]) +class DtgPathFilteringTest extends Specification { + + import org.locationtech.geomesa.fs.storage.converter.pathfilter.DtgPathFiltering.Config._ + + "DtgPathFilterFactory" should { + "parse, format, buffer, and filter a dtg from a file path" in { + val attribute = "dtg" + val pattern = "^data-(.*)\\..*$" + val format = "yyyyMMddHHmm" + val buffer = "6 hours" + val config = NamedOptions(DtgPathFiltering.Name, + Map(Attribute -> attribute, Pattern -> pattern, Format -> format, Buffer -> buffer)) + + val pathFilterFactory = PathFilteringFactory.load(config) + pathFilterFactory must beSome { factory: PathFiltering => + factory must haveClass[DtgPathFiltering] + } + + val filterText = s"$attribute DURING 2024-12-10T00:00:00Z/2024-12-11T00:00:00Z " + + s"OR $attribute = 2024-12-10T00:00:00Z OR $attribute = 2024-12-11T00:00:00Z" + val filter = ECQL.toFilter(filterText) + val pathFilter = pathFilterFactory.get.apply(filter) + + val path1 = new Path("/geomesa/fs/data-202412080000.csv") + val path2 = new Path("/geomesa/fs/data-202412092200.csv") + val path3 = new Path("/geomesa/fs/data-202412110000.csv") + val path4 = new Path("/geomesa/fs/data-202412110600.csv") + val path5 = new Path("/geomesa/fs/data-202412111000.csv") + + pathFilter.accept(path1) must beFalse + pathFilter.accept(path2) must beTrue + pathFilter.accept(path3) must beTrue + pathFilter.accept(path4) must beTrue + pathFilter.accept(path5) must beFalse + } + } +} diff --git a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala index 8895273e76ca..e4a5e02fa68f 100644 --- a/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala +++ b/geomesa-fs/geomesa-fs-tools/src/main/scala/org/locationtech/geomesa/fs/tools/compact/PartitionInputFormat.scala @@ -8,7 +8,7 @@ package org.locationtech.geomesa.fs.tools.compact -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ import org.geotools.api.data.Query