Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GEOMESA-3436 NiFi - Fix FSDS path cache classpath #3258

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ import com.github.benmanes.caffeine.cache.{CacheLoader, Caffeine}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, RemoteIterator}
import org.locationtech.geomesa.utils.conf.GeoMesaSystemProperties.SystemProperty

import java.util.concurrent.TimeUnit
import java.io.FileNotFoundException
import java.util.concurrent.{CompletableFuture, Executor, TimeUnit}
import java.util.function.BiConsumer

/**
* Caches file statuses to avoid repeated file system operations. Status expires after a
Expand All @@ -32,14 +34,6 @@ object PathCache {
}
)

// cache for individual file status
private val statusCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), FileStatus]() {
override def load(key: (FileSystem, Path)): FileStatus = key._1.getFileStatus(key._2)
}
)

// cache for checking directory contents
private val listCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
Expand All @@ -49,6 +43,33 @@ object PathCache {
}
)

// cache for individual file status
private val statusCache =
Caffeine.newBuilder().expireAfterWrite(duration, TimeUnit.MILLISECONDS).build(
new CacheLoader[(FileSystem, Path), FileStatus]() {
override def load(key: (FileSystem, Path)): FileStatus = {
try { key._1.getFileStatus(key._2) } catch {
case _: FileNotFoundException => null
}
}

override def asyncLoad(key: (FileSystem, Path), executor: Executor): CompletableFuture[FileStatus] = {
super.asyncLoad(key, executor)
.whenCompleteAsync(new ListCacheRefresh(key), executor)
.asInstanceOf[CompletableFuture[FileStatus]]
}

override def asyncReload(
key: (FileSystem, Path),
oldValue: FileStatus,
executor: Executor): CompletableFuture[FileStatus] = {
super.asyncReload(key, oldValue, executor)
.whenCompleteAsync(new ListCacheRefresh(key), executor)
.asInstanceOf[CompletableFuture[FileStatus]]
}
}
)

/**
* Register a path as existing
*
Expand All @@ -57,14 +78,7 @@ object PathCache {
*/
def register(fs: FileSystem, path: Path): Unit = {
pathCache.put((fs, path), java.lang.Boolean.TRUE)
val status = statusCache.refresh((fs, path))
val parent = path.getParent
if (parent != null) {
listCache.getIfPresent((fs, parent)) match {
case null => // no-op
case list => listCache.put((fs, parent), list :+ status.get())
}
}
statusCache.refresh((fs, path)) // also triggers listCache update
}

/**
Expand All @@ -83,7 +97,7 @@ object PathCache {
}

/**
* Gets the file status for a path
* Gets the file status for a path. Path must exist.
*
* @param fs file system
* @param path path
Expand Down Expand Up @@ -116,12 +130,33 @@ object PathCache {
* @param fs file system
* @param path path
*/
def invalidate(fs: FileSystem, path: Path): Unit = Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path)))
def invalidate(fs: FileSystem, path: Path): Unit = {
Seq(pathCache, statusCache, listCache).foreach(_.invalidate((fs, path)))
if (path.getParent != null) {
listCache.invalidate((fs, path.getParent))
}
}

object RemoteIterator {
def apply[T](iter: RemoteIterator[T]): Iterator[T] = new Iterator[T] {
override def hasNext: Boolean = iter.hasNext
override def next(): T = iter.next
}
}

private class ListCacheRefresh(key: (FileSystem, Path)) extends BiConsumer[FileStatus, Throwable] {
override def accept(status: FileStatus, u: Throwable): Unit = {
if (status != null) { // could be null if load fails
val (fs, path) = key
val parent = path.getParent
if (parent != null) {
listCache.asMap().computeIfPresent((fs, parent), load(status)_)
}
}
}

// noinspection ScalaUnusedSymbol
private def load(status: FileStatus)(ignored: (FileSystem, Path), list: Stream[FileStatus]): Stream[FileStatus] =
list.filterNot(f => f.getPath == status.getPath) :+ status
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/***********************************************************************
* Copyright (c) 2013-2025 Commonwealth Computer Research, Inc.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Apache License, Version 2.0
* which accompanies this distribution and is available at
* http://www.opensource.org/licenses/apache2.0.php.
***********************************************************************/

package org.locationtech.geomesa.fs.storage.common.utils

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.junit.runner.RunWith
import org.specs2.mutable.Specification
import org.specs2.runner.JUnitRunner

import java.nio.file.Files
import scala.concurrent.duration.DurationInt

@RunWith(classOf[JUnitRunner])
class PathCacheTest extends Specification {

"PathCache" should {
"update list cache when registering a new file" >> {
val root = new Path(Files.createTempDirectory("geomesa").toFile.getPath)
val fs = FileSystem.get(root.toUri, new Configuration())
try {
val file = new Path(root, "test")
PathCache.exists(fs, file) must beFalse
PathCache.list(fs, root) must beEmpty
// create the file
fs.create(file).close()
fs.exists(file) must beTrue
// verify cache has not been updated
PathCache.exists(fs, file) must beFalse
PathCache.list(fs, root) must beEmpty
// register the file
PathCache.register(fs, file)
// verify cached values have been updated
PathCache.exists(fs, file) must beTrue
eventually(10, 100.millis)(PathCache.list(fs, root).toList must haveLength(1))
// note: it's hard to verify this is a cached value, since it doesn't cache if a file doesn't exist...
PathCache.status(fs, file) must not(beNull)
} finally {
fs.delete(root, true)
}
}
}
}
Loading