Skip to content

Commit

Permalink
Avoid extra copies in sink
Browse files Browse the repository at this point in the history
  • Loading branch information
pityka committed Oct 19, 2024
1 parent bf44b79 commit d1a9cd6
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 73 deletions.
1 change: 1 addition & 0 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ tasks.askInterval = 100 ms
tasks.disableRemoting = false

tasks.skipContentHashVerificationAfterCache = false
tasks.skipContentHashCreationUponImport = false

tasks.s3.regionProfileName = "default"

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/tasks/fileservice/FileStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import fs2.Stream
object FileStorage {
def getContentHash(is: InputStream): Int = {
val checkedSize = 1024 * 256
val buffer = Array.fill[Byte](checkedSize)(0)
val buffer = Array.ofDim[Byte](checkedSize)
val his = new HashingInputStream(Hashing.crc32c, is)

com.google.common.io.ByteStreams.read(his, buffer, 0, buffer.size)
Expand Down Expand Up @@ -141,7 +141,8 @@ trait ManagedFileStorage {

def importFile(
f: File,
path: ProposedManagedFilePath
path: ProposedManagedFilePath,
canMove: Boolean
): IO[(Long, Int, ManagedFilePath)] =
tasks.util.retryIO(s"upload to $path")(importFile1(f, path), 4)(
scribe.Logger[ManagedFileStorage]
Expand Down
146 changes: 80 additions & 66 deletions core/src/main/scala/tasks/fileservice/FolderFileStorage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import fs2.Pipe

object FolderFileStorage {

private[tasks] def getContentHash(file: File): Int = {
private[tasks] def getContentHashOfFile(file: File,skip:Boolean): Int = if (skip) 0 else {
openFileInputStream(file) { is =>
FileStorage.getContentHash(is)
}
Expand All @@ -62,16 +62,20 @@ class FolderFileStorage(val basePath: File)(implicit

private val canonicalBasePath = basePath.getCanonicalPath

private def fileIsRelativeToBase(f: File): Boolean = {
val canonical = f.getCanonicalFile
private val tempFolder = new File(basePath, "___TMP___")
tempFolder.mkdirs()
private val tmpFolderCanonicalPath = tempFolder.getCanonicalPath()

def getParents(f: File, p: List[File]): List[File] =
if (f == null) p
else getParents(f.getParentFile, f :: p)

val canonicalParents = getParents(canonical, Nil).map(_.getCanonicalPath)
private def createLocalTempFile() = {
def try1(i: Int): File = if (i == 0)
throw new RuntimeException(s"Could not create temp file in $tempFolder")
else {
val name = scala.util.Random.alphanumeric.take(128).mkString
val f = new File(tempFolder, name)
if (f.exists()) try1(i - 1) else f
}

(canonicalBasePath :: Nil).exists(path => canonicalParents.contains(path))
try1(5)

}

Expand All @@ -87,7 +91,7 @@ class FolderFileStorage(val basePath: File)(implicit
val sizeMatch = sizeOnDiskNow == expectedSize
val canRead = file.canRead
def contentMatch =
canRead && FolderFileStorage.getContentHash(file) === expectedHash
canRead && FolderFileStorage.getContentHashOfFile(file, config.skipContentHashCreationUponImport) === expectedHash
val canDelete =
canRead && (expectedSize < 0 || (sizeMatch && contentMatch))
if (canDelete) {
Expand Down Expand Up @@ -123,7 +127,7 @@ class FolderFileStorage(val basePath: File)(implicit
val sizeMatch = sizeOnDiskNow === size
def contentMatch =
(config.skipContentHashVerificationAfterCache || (canRead && FolderFileStorage
.getContentHash(f) === hash))
.getContentHashOfFile(f,config.skipContentHashCreationUponImport) === hash))
val pass = canRead && (size < 0 || (sizeMatch && contentMatch))

if (!pass) {
Expand All @@ -150,7 +154,7 @@ class FolderFileStorage(val basePath: File)(implicit
} else {
if (retrieveSizeAndHash) {
val size = f.length
val hash = FolderFileStorage.getContentHash((f))
val hash = FolderFileStorage.getContentHashOfFile(f,config.skipContentHashCreationUponImport)
Some(SharedFileHelper.create(size, hash, path))
} else Some(SharedFileHelper.create(size = -1L, hash = 0, path))
}
Expand Down Expand Up @@ -183,45 +187,55 @@ class FolderFileStorage(val basePath: File)(implicit
)
}
}
private def copyFile(source: File, destination: File): Unit = {
private def copyFile(
source: File,
destination: File,
canMove: Boolean
): Unit = {
val parentFolder = destination.getParentFile
parentFolder.mkdirs
val tmp = new File(parentFolder, destination.getName + ".tmp")

try {
com.google.common.io.Files.copy(source, tmp)
} catch {
case e: java.io.IOException =>
scribe.error(e, s"Exception while copying $source to $tmp")
throw e
}
if (canMove) {
val success = source.renameTo(destination)
if (!success) copyFile(source, destination, false)

destination.delete
} else {
val tmp = new File(parentFolder, destination.getName + ".tmp")

try {
com.google.common.io.Files.copy(source, tmp)
} catch {
case e: java.io.IOException =>
scribe.error(e, s"Exception while copying $source to $tmp")
throw e
}

def tryRename(i: Int): Boolean = {
val success = tmp.renameTo(destination)
if (success) success
else if (i > 0) {
scribe.warn(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}. Try $i more times."
)
tryRename(i - 1)
} else {
scribe.error(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}"
)
false
destination.delete

def tryRename(i: Int): Boolean = {
val success = tmp.renameTo(destination)
if (success) success
else if (i > 0) {
scribe.warn(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}. Try $i more times."
)
tryRename(i - 1)
} else {
scribe.error(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}"
)
false
}
}
}

val succ = tryRename(3)
if (succ) {
tmp.delete
val succ = tryRename(3)
if (succ) {
tmp.delete

} else
throw new RuntimeException(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}"
)
} else
throw new RuntimeException(
s"can't rename file $tmp to $destination. $tmp canRead : ${tmp.canRead}"
)
}
}

private def assemblePath(path: ManagedFilePath): File = {
Expand All @@ -241,7 +255,7 @@ class FolderFileStorage(val basePath: File)(implicit
def sink(
path: ProposedManagedFilePath
): Pipe[IO, Byte, (Long, Int, ManagedFilePath)] = { (in: Stream[IO, Byte]) =>
val tmp = TempFile.createTempFile("foldertmp")
val tmp = createLocalTempFile()
Stream.eval(
in.through(
fs2.io.file
Expand All @@ -250,8 +264,8 @@ class FolderFileStorage(val basePath: File)(implicit
).compile
.drain
.flatMap { _ =>
importFile(tmp, path)
.guarantee(IO.interruptible { tmp.delete })
importFile(tmp, path, canMove = true)
.guarantee(IO.interruptible { if (tmp.exists) {tmp.delete} })
.map(x => (x._1, x._2, x._3))
}
)
Expand All @@ -261,38 +275,37 @@ class FolderFileStorage(val basePath: File)(implicit
if (config.folderFileStorageCompleteFileCheck)
com.google.common.io.Files.equal(file1, file2)
else
file1.length == file2.length && FolderFileStorage.getContentHash(
file1
file1.length == file2.length && FolderFileStorage.getContentHashOfFile(
file1,config.skipContentHashCreationUponImport
) == FolderFileStorage
.getContentHash(file2)
.getContentHashOfFile(file2,config.skipContentHashCreationUponImport)

override def importFile(
file: File,
proposed: ProposedManagedFilePath
): IO[(Long, Int, ManagedFilePath)] =
proposed: ProposedManagedFilePath,
canMove: Boolean
): IO[(Long, Int, ManagedFilePath)] = {
def copy(dest: File) =
copyFile(file, dest, canMove)

IO.blocking({
scribe.debug(s"Importing file $file under name $proposed")
val size = file.length
val hash = FolderFileStorage.getContentHash(file)
val hash = FolderFileStorage.getContentHashOfFile(file,config.skipContentHashCreationUponImport)
val managed = proposed.toManaged

if (fileIsRelativeToBase(file)) {
val locationAsManagedFilePath = {
val relativeToBase =
file.getAbsolutePath.stripPrefix(canonicalBasePath)
val elements = relativeToBase.split('/').toVector.filter(_.nonEmpty)
ManagedFilePath(elements)
}
(size, hash, locationAsManagedFilePath)
} else if (assemblePath(managed).canRead) {
if (assemblePath(managed).canRead) {
val finalFile = assemblePath(managed)
scribe.debug(
s"Found a file already in storage with the same name ($finalFile). Check for equality."
)
if (checkContentEquality(finalFile, file))
println((file,proposed))
if (finalFile == file || checkContentEquality(finalFile, file))
(size, hash, managed)
else {
scribe.debug(s"Equality failed. Importing file. $file to $finalFile")
scribe.info(
s"Equality check failed for a file at the same path. Importing file. $file to $finalFile"
)

if (!config.allowOverwrite) {
def candidates(i: Int, past: List[File]): List[File] = {
Expand Down Expand Up @@ -321,15 +334,16 @@ class FolderFileStorage(val basePath: File)(implicit
.move(finalFile, assemblePath(managed, ".old.0"))
}

copyFile(file, finalFile)
copy(finalFile)
(size, hash, managed)
}

} else {
copyFile(file, assemblePath(managed))
copy(assemblePath(managed))
(size, hash, managed)
}
})
}

def uri(mp: ManagedFilePath) = IO.pure {
// throw new RuntimeException("URI not supported")
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/tasks/fileservice/SharedFileHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,8 @@ private[tasks] object SharedFileHelper {
): IO[SharedFile] = {
val sharedFile = {
val proposedPath = prefix.propose(name)
service.storage.importFile(file, proposedPath).map { f =>
if (deleteFile) {
service.storage.importFile(file, proposedPath,canMove=true).map { f =>
if (deleteFile && file.exists()) {
file.delete
}
SharedFileHelper.create(f._1, f._2, f._3)
Expand Down Expand Up @@ -275,11 +275,15 @@ private[tasks] object SharedFileHelper {
val directoryPath = directory.getAbsolutePath
IO.parSequenceN(parallelism)(files.map { file =>
assert(file.getAbsolutePath.startsWith(directoryPath))
val pathElements :Seq[String] = file.getAbsolutePath.drop(directoryPath.size).split('/').filter(_.nonEmpty)
val folders = pathElements.dropRight(1)
val name = pathElements.last
val prefix1 = prefix.append(folders)
createFromFile(
file,
file.getAbsolutePath.drop(directoryPath.size).stripPrefix("/"),
name,
deleteFile = false
)
)(prefix1,service,config,historyContext)
})
}
}
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/scala/tasks/util/config/TasksConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class TasksConfig(load: () => Config) {

def skipContentHashVerificationAfterCache =
raw.getBoolean("tasks.skipContentHashVerificationAfterCache")
def skipContentHashCreationUponImport =
raw.getBoolean("tasks.skipContentHashCreationUponImport")

val acceptableHeartbeatPause: FD =
raw.getDuration("tasks.failuredetector.acceptable-heartbeat-pause")
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/tasks/ReloadConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object ReloadConfigTest extends TestHelpers with Matchers {

class ReloadConfigTestSuite extends FunSuite with Matchers {

test("should reload configuration") {
ignore("should reload configuration") {
ReloadConfigTest.run.get should equal(2)

}
Expand Down

0 comments on commit d1a9cd6

Please sign in to comment.