Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move file to results #335

Merged
merged 6 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import spire.math.Integral
import spire.syntax.cfor.cfor

import java.nio.channels.FileChannel
import java.nio.file.{FileAlreadyExistsException, Path, Paths}
import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths}
import java.time.Duration
import java.time.format.DateTimeFormatter
import java.util.{ArrayList, Collections, Map, List => JList}
import scala.collection.JavaConverters._
import scala.reflect._
import scala.util.control.Breaks.{break, breakable}

package object geotiff {

Expand Down Expand Up @@ -699,7 +700,6 @@ package object geotiff {
croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]],
compression: Compression, formatOptions: Option[GTiffOptions] = None
) = {
this.logger.info("stitchAndWriteToTiff tiles.size: " + tiles.size) // Remove before release
val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch()

val re = raster.rasterExtent
Expand Down Expand Up @@ -742,9 +742,7 @@ package object geotiff {
) {
geotiff = geotiff.withOverviews(NearestNeighbor, List(4, 8, 16))
}
val res = writeGeoTiff(geotiff, filePath)
this.logger.info("stitchAndWriteToTiff writeGeoTiff done. filePath: " + filePath) // Remove before release
res
writeGeoTiff(geotiff, filePath)
}

def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey],
Expand Down Expand Up @@ -827,7 +825,7 @@ package object geotiff {
.toList.asJava
}

private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
import java.nio.file.Files
if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")
Expand All @@ -838,31 +836,48 @@ package object geotiff {
uploadToS3(tempFile, correctS3Path)

} else {
val tempFile = Files.createTempFile(null, ".tif")
val tempFile = getTempFile(null, ".tif")
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)

// Geotrellis writes the file piecewise and sometimes files are only partially written.
// Maybe a move operation is easier for the fusemount:
try {
Files.move(tempFile, Path.of(path))
} catch {
case e: FileAlreadyExistsException =>
logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage)
// The existing file could be a partial result of a previous failing Spark task.
Files.deleteIfExists(Path.of(path))
Files.move(tempFile, Path.of(path))
}
// TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths.
moveOverwriteWithRetries(tempFile, Path.of(path))

// Call fsync on the parent path to assure the fusemount is up-to-date.
// The equivalent of Python's os.fsync
FileChannel.open(Path.of(path)).force(true)

try {
FileChannel.open(Path.of(path)).force(true)
} catch {
case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor
}
path
}

}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
var try_count = 1
breakable {
while (true) {
try {
if (newPath.toFile.exists()) {
// It might be a partial result of a previous failing task.
EmileSonneveld marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Will replace $newPath. (try $try_count)")
}
Files.deleteIfExists(newPath)
Files.move(oldPath, newPath)
break
} catch {
case e: FileAlreadyExistsException =>
// Here if another executor wrote the file between the delete and the move statement.
try_count += 1
if (try_count > 5) {
throw e
}
}
}
}
}

def uploadToS3(localFile: Path, s3Path: String) = {
val s3Uri = new AmazonS3URI(s3Path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ package object geotrellis {
}

/**
* Inspired on 'Files.createTempFile'
* Inspired on 'Files.createTempFile', but does not create an empty file.
* The default permissions of 'createTempFile' are a bit too strict too: 600, which is not accessible by other users.
* This function could have default 664 for example.
*/
def getTempFile(prefix: String, suffix: String): Path = {
val prefixNonNull = if (prefix == null) "" else suffix
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package org.openeo.geotrellis

import geotrellis.raster.io.geotiff.GeoTiff
import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType}
import org.junit.Assert._
import org.junit.Test
import org.openeo.geotrellis.geotiff._

import java.nio.file.{Files, Path}

class PackageTest {
@Test
Expand All @@ -12,4 +16,23 @@ class PackageTest {
assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42)))
assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42)))
}

@Test
def testFileMove(): Unit = {
val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif")
val refTiff = GeoTiff.readMultiband(refFile.getPath)
val p = Path.of(f"tmp/testFileMove/")
Files.createDirectories(p)

(1 to 20).foreach { i =>
val dst = Path.of(p + f"/$i.tif")
// Limit the amount of parallel jobs to avoid getting over the max retries
(1 to 4).par.foreach { _ =>
writeGeoTiff(refTiff, dst.toString)
}
assertTrue(Files.exists(dst))
val refTiff2 = GeoTiff.readMultiband(dst.toString)
assertEquals(refTiff2.cellSize, refTiff.cellSize)
}
}
}