Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AN-380 Make call cache hashing strategy configurable per filesystem and backend #7683

Merged
merged 41 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e712005
Specify filesystemTypeKey on each Path subclass
jgainerdewar Jan 24, 2025
449567c
Read per-filesystem AsyncFileHashingStrategy from config
jgainerdewar Jan 24, 2025
4a36763
Set default hashing strategies for each backend
jgainerdewar Jan 24, 2025
98beda3
Move AsyncFileHashingStrategy to core
jgainerdewar Jan 24, 2025
7ea5618
Plumb hashStrategy through io commands, use for batch commands
jgainerdewar Jan 24, 2025
2f55cd4
Rename GcsBatchCrc32cCommand -> GcsBatchHashCommand
jgainerdewar Jan 24, 2025
6854497
Rename S3BatchTagCommand -> S3BatchHashCommand
jgainerdewar Jan 24, 2025
61eae83
Rename AsyncFileHashingStrategy -> FileHashStrategy
jgainerdewar Jan 24, 2025
be3a8a3
Update non-DRS NioHashing hash logic
jgainerdewar Jan 27, 2025
0c32295
Progress on DRS
jgainerdewar Jan 28, 2025
baf8151
Switch FileHashStrategy to list approach, make DRS conform
jgainerdewar Jan 28, 2025
6b435d5
Also check in this file
jgainerdewar Jan 28, 2025
c10ae98
Eliminate special GcsCrc32c hash type
jgainerdewar Jan 30, 2025
065ff05
Lazily evaluate hashes
jgainerdewar Jan 30, 2025
df9dccc
Test fixes
jgainerdewar Jan 30, 2025
b82f2ec
Remove defunct tests
jgainerdewar Jan 30, 2025
0a101df
Better handling for hex vs b64 crc32c representations
jgainerdewar Jan 31, 2025
add5a8c
Rename test file
jgainerdewar Jan 31, 2025
de33a85
Comments
jgainerdewar Jan 31, 2025
875f9b5
FileHashStrategy tests
jgainerdewar Jan 31, 2025
16137f2
Imports
jgainerdewar Jan 31, 2025
1966d28
Tests
jgainerdewar Feb 3, 2025
d0ad499
Scalafmt
jgainerdewar Feb 3, 2025
e30b7c2
Remove a few TODOs
jgainerdewar Feb 3, 2025
05876b3
Allow users to configure hash strategy as single string
jgainerdewar Feb 3, 2025
3a9e180
Explanatory comment for unprotected get
jgainerdewar Feb 4, 2025
22cbc1f
Cleanup
jgainerdewar Feb 4, 2025
83c381d
Consistent blob id strings
jgainerdewar Feb 4, 2025
1458942
Ignore failing test
jgainerdewar Feb 4, 2025
c1c9c38
Cleanup
jgainerdewar Feb 4, 2025
ed662f5
Merge branch 'develop' into jd_AN-380_hash
jgainerdewar Feb 4, 2025
b3f04fd
Docs
jgainerdewar Feb 5, 2025
c03f985
Merge branch 'jd_AN-380_hash' of github.com:broadinstitute/cromwell i…
jgainerdewar Feb 5, 2025
57ed4b4
PR Feedback
jgainerdewar Feb 11, 2025
1f036c1
Doc feedback
jgainerdewar Feb 11, 2025
cc0aba1
Fix test, clearer naming
jgainerdewar Feb 11, 2025
8c626c9
Centralized (and correct!) logic for identity hashing
jgainerdewar Feb 12, 2025
2071ecb
Fix typo
jgainerdewar Feb 12, 2025
46eb4bd
Remove unused checksum validation
jgainerdewar Feb 12, 2025
892a5d4
AN-375 Metrics, logging for present vs. missing md5 (#7690)
aednichols Feb 14, 2025
c01fbb1
Merge branch 'develop' into jd_AN-380_hash
jgainerdewar Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package cromwell.backend.standard.callcaching

import java.util.concurrent.TimeoutException

import akka.actor.{Actor, ActorLogging, ActorRef, Timers}
import akka.event.LoggingAdapter
import com.typesafe.config.Config
import cromwell.backend.standard.StandardCachingActorHelper
import cromwell.backend.standard.callcaching.RootWorkflowFileHashCacheActor.IoHashCommandWithContext
import cromwell.backend.standard.callcaching.StandardFileHashingActor._
Expand All @@ -12,8 +12,11 @@ import cromwell.core.JobKey
import cromwell.core.callcaching._
import cromwell.core.io._
import cromwell.core.logging.JobLogging
import cromwell.core.path.Path
import net.ceedubs.ficus.Ficus._
import wom.values.WomFile

import scala.jdk.CollectionConverters._
import scala.util.{Failure, Success, Try}

/**
Expand Down Expand Up @@ -48,6 +51,10 @@ case class FileHashContext(hashKey: HashKey, file: String)
class DefaultStandardFileHashingActor(standardParams: StandardFileHashingActorParams)
extends StandardFileHashingActor(standardParams) {
override val ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

override val defaultHashingStrategies: Map[String, FileHashStrategy] = Map(
("drs", FileHashStrategy.Crc32c)
)
}

object StandardFileHashingActor {
Expand Down Expand Up @@ -80,10 +87,40 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
override lazy val serviceRegistryActor: ActorRef = standardParams.serviceRegistryActor
override lazy val configurationDescriptor: BackendConfigurationDescriptor = standardParams.configurationDescriptor

// Child classes can override to set per-filesystem defaults
val defaultHashingStrategies: Map[String, FileHashStrategy] = Map.empty

// Hashing strategy to use if none is configured.
val fallbackHashingStrategy: FileHashStrategy = FileHashStrategy.Md5
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be the list of md5 + identity?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the generic fallback across all backends and filesystems, since identity is only implemented for GCP I'm afraid that would confuse people.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense!


// Combines defaultHashingStrategies with user-provided configuration
lazy val hashingStrategies: Map[String, FileHashStrategy] = {

val configuredHashingStrategies = for {
fsConfigs <- configurationDescriptor.backendConfig.as[Option[Config]]("filesystems").toList
fsKey <- fsConfigs.entrySet.asScala.map(_.getKey)
fileHashStrategyName <- fsConfigs.as[Option[String]](s"fileSystems.${fsKey}.caching.hash-strategy")
fileHashStrategy <- FileHashStrategy(fileHashStrategyName)
_ = log.info(s"Call caching hash strategy for ${fsKey} files will be ${fileHashStrategy}")
} yield (fsKey, fileHashStrategy)

val strats = defaultHashingStrategies ++ configuredHashingStrategies
val stratsReport = strats.keys.toList.sorted.map(k => s"$k -> ${strats.get(k)}").mkString(", ")
log.info(
s"Call caching configured with per-filesystem file hashing strategies: $stratsReport. " +
s"Others will use $fallbackHashingStrategy."
)
strats
}

protected def ioCommandBuilder: IoCommandBuilder = DefaultIoCommandBuilder

// Used by ConfigBackend for synchronous hashing of local files
def customHashStrategy(fileRequest: SingleFileHashRequest): Option[Try[String]] = None

def hashStrategyForPath(p: Path): FileHashStrategy =
hashingStrategies.getOrElse(p.filesystemTypeKey, fallbackHashingStrategy)

def fileHashingReceive: Receive = {
// Hash Request
case fileRequest: SingleFileHashRequest =>
Expand Down Expand Up @@ -115,8 +152,9 @@ abstract class StandardFileHashingActor(standardParams: StandardFileHashingActor
def asyncHashing(fileRequest: SingleFileHashRequest, replyTo: ActorRef): Unit = {
val fileAsString = fileRequest.file.value
val ioHashCommandTry = for {
gcsPath <- getPath(fileAsString)
command <- ioCommandBuilder.hashCommand(gcsPath)
path <- getPath(fileAsString)
hashStrategy = hashStrategyForPath(path)
command <- ioCommandBuilder.hashCommand(path, hashStrategy)
} yield command
lazy val fileHashContext = FileHashContext(fileRequest.hashKey, fileRequest.file.value)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cromwell.core.callcaching

// File hashing strategies used by IoHashCommand, primarily when obtaining file hashes
// for call caching purposes.
sealed trait FileHashStrategy

object FileHashStrategy {
case object Crc32c extends FileHashStrategy
case object Md5 extends FileHashStrategy
case object Md5ThenIdentity extends FileHashStrategy
case object ETag extends FileHashStrategy

// TODO validate fs type here?
def apply(s: String): Option[FileHashStrategy] = s.toLowerCase() match {
case "md5" => Some(Md5)
case "crc32c" => Some(Crc32c)
case "md5+identity" => Some(Md5ThenIdentity)
case "etag" => Some(ETag)
case _ => None
}
}
5 changes: 3 additions & 2 deletions core/src/main/scala/cromwell/core/io/AsyncIo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cromwell.core.io

import akka.actor.ActorRef
import com.typesafe.config.{Config, ConfigFactory}
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoPromiseProxyActor.IoCommandWithPromise
import cromwell.core.path.BetterFileMethods.OpenOptions
import cromwell.core.path.Path
Expand Down Expand Up @@ -47,8 +48,8 @@ class AsyncIo(ioEndpoint: ActorRef, ioCommandBuilder: IoCommandBuilder) {
def sizeAsync(path: Path): Future[Long] =
asyncCommand(ioCommandBuilder.sizeCommand(path))

def hashAsync(path: Path): Future[String] =
asyncCommand(ioCommandBuilder.hashCommand(path))
def hashAsync(path: Path, hashStrategy: FileHashStrategy): Future[String] =
asyncCommand(ioCommandBuilder.hashCommand(path, hashStrategy))

def deleteAsync(path: Path, swallowIoExceptions: Boolean = false): Future[Unit] =
asyncCommand(ioCommandBuilder.deleteCommand(path, swallowIoExceptions))
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/scala/cromwell/core/io/DefaultIoCommand.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.core.io

import better.files.File.OpenOptions
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path

Expand Down Expand Up @@ -42,8 +43,9 @@ object DefaultIoCommand {
s"DefaultIoDeleteCommand file '$file' swallowIOExceptions '$swallowIOExceptions'"
}

case class DefaultIoHashCommand(override val file: Path) extends IoHashCommand(file) {
override def commandDescription: String = s"DefaultIoHashCommand file '$file'"
case class DefaultIoHashCommand(override val file: Path, override val hashStrategy: FileHashStrategy)
extends IoHashCommand(file, hashStrategy) {
override def commandDescription: String = s"DefaultIoHashCommand file '$file' hashStrategy '$hashStrategy'"
}

case class DefaultIoTouchCommand(override val file: Path) extends IoTouchCommand(file) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package cromwell.core.io

import java.time.OffsetDateTime
import java.util.UUID

import better.files.File.OpenOptions
import com.google.api.client.util.ExponentialBackOff
import common.util.Backoff
import common.util.StringUtil.EnhancedToStringable
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.Path
import cromwell.core.retry.SimpleExponentialBackoff
Expand Down Expand Up @@ -161,8 +161,9 @@ abstract class IoDeleteCommand(val file: Path, val swallowIOExceptions: Boolean)
/**
* Get Hash value for file
*/
abstract class IoHashCommand(val file: Path) extends SingleFileIoCommand[String] {
override def toString = s"get hash of ${file.pathAsString}"
abstract class IoHashCommand(val file: Path, val hashStrategy: FileHashStrategy)
extends SingleFileIoCommand[String] {
override def toString = s"get $hashStrategy hash of ${file.pathAsString}"
override lazy val name = "hash"
}

Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/cromwell/core/io/IoCommandBuilder.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.core.io

import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io.DefaultIoCommand._
import cromwell.core.io.IoContentAsStringCommand.IoReadOptions
import cromwell.core.path.BetterFileMethods.OpenOptions
Expand All @@ -18,7 +19,7 @@ abstract class PartialIoCommandBuilder {
def sizeCommand: PartialFunction[Path, Try[IoSizeCommand]] = PartialFunction.empty
def deleteCommand: PartialFunction[(Path, Boolean), Try[IoDeleteCommand]] = PartialFunction.empty
def copyCommand: PartialFunction[(Path, Path), Try[IoCopyCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[Path, Try[IoHashCommand]] = PartialFunction.empty
def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[IoHashCommand]] = PartialFunction.empty
def touchCommand: PartialFunction[Path, Try[IoTouchCommand]] = PartialFunction.empty
def existsCommand: PartialFunction[Path, Try[IoExistsCommand]] = PartialFunction.empty
def isDirectoryCommand: PartialFunction[Path, Try[IoIsDirectoryCommand]] = PartialFunction.empty
Expand Down Expand Up @@ -85,8 +86,8 @@ class IoCommandBuilder(partialBuilders: List[PartialIoCommandBuilder] = List.emp
def copyCommand(src: Path, dest: Path): Try[IoCopyCommand] =
buildOrDefault(_.copyCommand, (src, dest), DefaultIoCopyCommand(src, dest))

def hashCommand(file: Path): Try[IoHashCommand] =
buildOrDefault(_.hashCommand, file, DefaultIoHashCommand(file))
def hashCommand(file: Path, hashStrategy: FileHashStrategy): Try[IoHashCommand] =
buildOrDefault(_.hashCommand, (file, hashStrategy), DefaultIoHashCommand(file, hashStrategy))

def touchCommand(file: Path): Try[IoTouchCommand] =
buildOrDefault(_.touchCommand, file, DefaultIoTouchCommand(file))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ case object DefaultPathBuilder extends PathBuilder {
}

case class DefaultPath private[path] (nioPath: NioPath) extends Path {

val filesystemTypeKey = "local"

override protected def newPath(nioPath: NioPath): DefaultPath = DefaultPath(nioPath)

override def pathAsString: String = nioPath.toString
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/cromwell/core/path/PathBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ trait PreResolvePathBuilder extends PathBuilder {
*/
trait Path extends PathObjectMethods with NioPathMethods with BetterFileMethods with EvenBetterPathMethods {

/**
* A string key corresponding to the filesystem this Path is associated with, ex. gcs, drs, local etc.
* This should be the string used to identify the filesystem in Cromwell's config file.
*/
val filesystemTypeKey: String
jgainerdewar marked this conversation as resolved.
Show resolved Hide resolved

/**
* A reference to the underlying nioPath, used to create new java.nio.Path's that will then be sent to newPath
* for wrapping.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class NioFlow(parallelism: Int,
}

private def hash(hash: IoHashCommand): IO[String] =
NioHashing.hash(hash.file)
NioHashing.hash(hash.file, hash.hashStrategy)

private def touch(touch: IoTouchCommand) = IO {
touch.file.touch()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cromwell.engine.io.nio
import cats.effect.IO
import cloud.nio.spi.{FileHash, HashType}
import common.util.StringUtil.EnhancedString
import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.path.Path
import cromwell.filesystems.blob.BlobPath
import cromwell.filesystems.drs.DrsPath
Expand All @@ -15,7 +16,8 @@ import scala.util.Try

object NioHashing {

def hash(file: Path): IO[String] =
// TODO update logic to respect hashStrategy
def hash(file: Path, hashStrategy: FileHashStrategy): IO[String] =
// If there is no hash accessible from the file storage system,
// we'll read the file and generate the hash ourselves if we can.
getStoredHash(file)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class IoActorProxyGcsBatchSpec

val copyCommand = GcsBatchCopyCommand.forPaths(src, dst).get
val sizeCommand = GcsBatchSizeCommand.forPath(src).get
val hashCommand = GcsBatchCrc32Command.forPath(src).get
val hashCommand = GcsBatchHashCommand.forPath(src).get
// Should return true
val isDirectoryCommand = GcsBatchIsDirectoryCommand.forPath(directory).get
// Should return false
Expand All @@ -112,7 +112,7 @@ class IoActorProxyGcsBatchSpec
fileSize shouldBe 5
}

received1 collect { case IoSuccess(_: GcsBatchCrc32Command, hash: String) =>
received1 collect { case IoSuccess(_: GcsBatchHashCommand, hash: String) =>
hash shouldBe "mnG7TA=="
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import common.assertion.CromwellTimeoutSpec
import cromwell.core.TestKitSuite
import cromwell.engine.io.IoCommandContext
import cromwell.filesystems.gcs.GcsPath
import cromwell.filesystems.gcs.batch.GcsBatchCrc32Command
import cromwell.filesystems.gcs.batch.GcsBatchHashCommand
import org.scalatest.PrivateMethodTester
import org.scalatest.flatspec.AnyFlatSpecLike
import org.scalatest.matchers.should.Matchers
Expand Down Expand Up @@ -85,7 +85,7 @@ class GcsBatchFlowSpec
projectId = "GcsBatchFlowSpec-project"
)
val gcsBatchCommandContext =
GcsBatchCommandContext(GcsBatchCrc32Command.forPath(mockGcsPath).get, TestProbe().ref, 5)
GcsBatchCommandContext(GcsBatchHashCommand.forPath(mockGcsPath).get, TestProbe().ref, 5)
val recoverCommandPrivateMethod =
PrivateMethod[PartialFunction[Throwable, Future[GcsBatchResponse[_]]]](Symbol("recoverCommand"))
val partialFuncAcceptingThrowable = gcsBatchFlow invokePrivate recoverCommandPrivateMethod(gcsBatchCommandContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ object BlobPath {
case class BlobPath private[blob] (pathString: String, endpoint: EndpointURL, container: BlobContainerName)(
private val fsm: BlobFileSystemManager
) extends Path {

val filesystemTypeKey = "blob"

override def nioPath: NioPath = findNioPath(pathString)

override protected def newPath(nioPath: NioPath): Path = BlobPath(nioPath, endpoint, container, fsm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import java.io.IOException

case class DrsPath(drsPath: CloudNioPath, requesterPaysProjectIdOption: Option[String]) extends Path {

val filesystemTypeKey = "drs"

override def nioPath: NioPath = drsPath

override protected def newPath(nioPath: NioPath): Path =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import cloud.nio.spi.CloudNioPath
import cromwell.core.path.{NioPath, Path}

case class FtpPath(ftpPath: CloudNioPath) extends Path {
val filesystemTypeKey = "ftp"
override def nioPath = ftpPath
override protected def newPath(nioPath: NioPath) = FtpPath(nioPath.asInstanceOf[CloudNioPath])
override def pathAsString = nioPath.uriAsString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,9 @@ case class GcsPath private[gcs] (nioPath: NioPath,
cloudStorage: com.google.cloud.storage.Storage,
projectId: String
) extends Path {

val filesystemTypeKey = "gcs"

lazy val objectBlobId: Try[BlobId] = Try {
val bucketName = cloudStoragePath.bucket
val objectName = cloudStoragePath.toRealPath().toString
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cromwell.filesystems.gcs.batch

import cromwell.core.callcaching.FileHashStrategy
import cromwell.core.io._
import cromwell.core.path.Path
import cromwell.filesystems.gcs.GcsPath
Expand All @@ -19,8 +20,9 @@ private case object PartialGcsBatchCommandBuilder extends PartialIoCommandBuilde
case (gcsSrc: GcsPath, gcsDest: GcsPath) => GcsBatchCopyCommand.forPaths(gcsSrc, gcsDest)
}

override def hashCommand: PartialFunction[Path, Try[GcsBatchCrc32Command]] = { case gcsPath: GcsPath =>
GcsBatchCrc32Command.forPath(gcsPath)
override def hashCommand: PartialFunction[(Path, FileHashStrategy), Try[GcsBatchHashCommand]] = {
case (gcsPath: GcsPath, s) =>
GcsBatchHashCommand.forPath(gcsPath, s)
}

override def touchCommand: PartialFunction[Path, Try[GcsBatchTouchCommand]] = { case gcsPath: GcsPath =>
Expand Down
Loading
Loading