diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index b6cc19226..98ac2c12a 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -33,12 +33,13 @@ import spire.math.Integral import spire.syntax.cfor.cfor import java.nio.channels.FileChannel -import java.nio.file.{FileAlreadyExistsException, Path, Paths} +import java.nio.file.{FileAlreadyExistsException, Files, NoSuchFileException, Path, Paths} import java.time.Duration import java.time.format.DateTimeFormatter import java.util.{ArrayList, Collections, Map, List => JList} import scala.collection.JavaConverters._ import scala.reflect._ +import scala.util.control.Breaks.{break, breakable} package object geotiff { @@ -699,7 +700,6 @@ package object geotiff { croppedExtent: Option[Extent], cropDimensions: Option[java.util.ArrayList[Int]], compression: Compression, formatOptions: Option[GTiffOptions] = None ) = { - this.logger.info("stitchAndWriteToTiff tiles.size: " + tiles.size) // Remove before release val raster: Raster[MultibandTile] = ContextSeq(tiles, layout).stitch() val re = raster.rasterExtent @@ -742,9 +742,7 @@ package object geotiff { ) { geotiff = geotiff.withOverviews(NearestNeighbor, List(4, 8, 16)) } - val res = writeGeoTiff(geotiff, filePath) - this.logger.info("stitchAndWriteToTiff writeGeoTiff done. filePath: " + filePath) // Remove before release - res + writeGeoTiff(geotiff, filePath) } def saveSamples(rdd: MultibandTileLayerRDD[SpaceTimeKey], @@ -827,7 +825,7 @@ package object geotiff { .toList.asJava } - private def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { + def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = { import java.nio.file.Files if (path.startsWith("s3:/")) { val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://") @@ -838,31 +836,48 @@ package object geotiff { uploadToS3(tempFile, correctS3Path) } else { - val tempFile = Files.createTempFile(null, ".tif") + val tempFile = getTempFile(null, ".tif") // TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy) geoTiff.write(tempFile.toString, optimizedOrder = true) - // Geotrellis writes the file piecewise and sometimes files are only partially written. - // Maybe a move operation is easier for the fusemount: - try { - Files.move(tempFile, Path.of(path)) - } catch { - case e: FileAlreadyExistsException => - logger.info("FileAlreadyExistsException. Will overwrite file: " + e.getMessage) - // The existing file could be a partial result of a previous failing Spark task. - Files.deleteIfExists(Path.of(path)) - Files.move(tempFile, Path.of(path)) - } + // TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths. + moveOverwriteWithRetries(tempFile, Path.of(path)) // Call fsync on the parent path to assure the fusemount is up-to-date. // The equivalent of Python's os.fsync - FileChannel.open(Path.of(path)).force(true) - + try { + FileChannel.open(Path.of(path)).force(true) + } catch { + case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor + } path } } + def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = { + var try_count = 1 + breakable { + while (true) { + try { + if (newPath.toFile.exists()) { + // It might be a partial result of a previous failing task. + logger.info(f"Will replace $newPath. (try $try_count)") + } + Files.deleteIfExists(newPath) + Files.move(oldPath, newPath) + break + } catch { + case e: FileAlreadyExistsException => + // Here if another executor wrote the file between the delete and the move statement. + try_count += 1 + if (try_count > 5) { + throw e + } + } + } + } + } def uploadToS3(localFile: Path, s3Path: String) = { val s3Uri = new AmazonS3URI(s3Path) diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala index 685f6781a..73fac17fd 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/package.scala @@ -128,7 +128,9 @@ package object geotrellis { } /** - * Inspired on 'Files.createTempFile' + * Inspired on 'Files.createTempFile', but does not create an empty file. + * The default permissions of 'createTempFile' are a bit too strict too: 600, which is not accessible by other users. + * This function could have default 664 for example. */ def getTempFile(prefix: String, suffix: String): Path = { val prefixNonNull = if (prefix == null) "" else suffix diff --git a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala index 1eae19c94..3144aedb5 100644 --- a/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala +++ b/openeo-geotrellis/src/test/scala/org/openeo/geotrellis/PackageTest.scala @@ -1,8 +1,12 @@ package org.openeo.geotrellis +import geotrellis.raster.io.geotiff.GeoTiff import geotrellis.raster.{ByteCellType, ByteUserDefinedNoDataCellType, FloatUserDefinedNoDataCellType, UByteCellType, UByteUserDefinedNoDataCellType} import org.junit.Assert._ import org.junit.Test +import org.openeo.geotrellis.geotiff._ + +import java.nio.file.{Files, Path} class PackageTest { @Test @@ -12,4 +16,23 @@ class PackageTest { assertEquals(FloatUserDefinedNoDataCellType(42), toSigned(FloatUserDefinedNoDataCellType(42))) assertEquals(ByteUserDefinedNoDataCellType(42), toSigned(ByteUserDefinedNoDataCellType(42))) } + + @Test + def testFileMove(): Unit = { + val refFile = Thread.currentThread().getContextClassLoader.getResource("org/openeo/geotrellis/Sentinel2FileLayerProvider_multiband_reference.tif") + val refTiff = GeoTiff.readMultiband(refFile.getPath) + val p = Path.of(f"tmp/testFileMove/") + Files.createDirectories(p) + + (1 to 20).foreach { i => + val dst = Path.of(p + f"/$i.tif") + // Limit the amount of parallel jobs to avoid getting over the max retries + (1 to 4).par.foreach { _ => + writeGeoTiff(refTiff, dst.toString) + } + assertTrue(Files.exists(dst)) + val refTiff2 = GeoTiff.readMultiband(dst.toString) + assertEquals(refTiff2.cellSize, refTiff.cellSize) + } + } }