Skip to content

Commit

Permalink
Test with not changing output URI. #335
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Oct 17, 2024
1 parent 244ba1e commit 059bdeb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 25 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,4 +38,5 @@ tmp*
*m2e*.prefs
*eclipse*.prefs
.cache-main
.cache-tests
.cache-tests
.flattened-pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,10 @@ package object geotiff {
.map { case (bandTags, _) => bandTags }
fo.setBandTags(newBandTags)

val correctedPath = writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs,
writeTiff(thePath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs,
tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo,
)
(correctedPath, timestamp, croppedExtent, bandIndices)
(thePath, timestamp, croppedExtent, bandIndices)
}.collect().toList.asJava

}
Expand Down Expand Up @@ -245,9 +245,8 @@ package object geotiff {
// Keep only one band tag
val newBandTags = List(formatOptions.tags.bandTags(bandIndex))
fo.setBandTags(newBandTags)

(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo))
(fixedPath, Collections.singletonList(bandIndex))
}.collect().toList.sortBy(_._1).asJava
} else {
val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala
Expand Down Expand Up @@ -411,8 +410,8 @@ package object geotiff {
val metadata = new STACItem()
metadata.asset(fixedPath)
metadata.write(stacItemPath)
val finalPath = writeTiff( fixedPath,tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount,formatOptions = formatOptions, overviews = overviews)
return Collections.singletonList(finalPath)
writeTiff(fixedPath, tiffs, gridBounds, croppedExtent, preprocessedRdd.metadata.crs, preprocessedRdd.metadata.tileLayout, compression, cellType, detectedBandCount, segmentCount, formatOptions = formatOptions, overviews = overviews)
return Collections.singletonList(fixedPath)
}finally {
preprocessedRdd.unpersist()
}
Expand Down Expand Up @@ -689,8 +688,8 @@ package object geotiff {
}.groupByKey()
.map { case ((name, extent), tiles) =>
val filePath = newFilePath(path, name)

(stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent)
stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression)
(filePath, extent)
}.collect()
.toList.asJava
}
Expand Down Expand Up @@ -818,28 +817,26 @@ package object geotiff {
val filename = s"${filenamePrefix.getOrElse("openEO")}_${DateTimeFormatter.ISO_DATE.format(time)}_$name.tif"
val filePath = Paths.get(path).resolve(filename).toString
val timestamp = time format DateTimeFormatter.ISO_ZONED_DATE_TIME
(stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression),
timestamp, geometry.extent)
stitchAndWriteToTiff(tiles, filePath, layout, crs, geometry, croppedExtent, cropDimensions, compression)
(filePath, timestamp, geometry.extent)
}
.collect()
.toList.asJava
}

def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): String = {
import java.nio.file.Files
if (path.startsWith("s3:/")) {
val correctS3Path = path.replaceFirst("s3:/(?!/)", "s3://")

def writeGeoTiff(geoTiff: MultibandGeoTiff, path: String): Unit = {
val tempFile = getTempFile(null, ".tif")
geoTiff.write(tempFile.toString, optimizedOrder = true)

val tempFile = Files.createTempFile(null, null)
geoTiff.write(tempFile.toString, optimizedOrder = true)
uploadToS3(tempFile, correctS3Path)
if (path.startsWith("s3:/")) {
if (!path.startsWith("s3://")) {
// Test where this happens
logger.warn("Invalid S3 path. Use s3:// instead of s3:/")
}
uploadToS3(tempFile, path)

} else {
val tempFile = getTempFile(null, ".tif")
// TODO: Try to run fsync on the file opened by GeoTrellis (without the temporary copy)
geoTiff.write(tempFile.toString, optimizedOrder = true)

// TODO: Write to unique path instead to avoid collisions between executors. Let the driver choose the paths.
moveOverwriteWithRetries(tempFile, Path.of(path))

Expand All @@ -850,9 +847,7 @@ package object geotiff {
} catch {
case _: NoSuchFileException => // Ignore. The file may already be deleted by another executor
}
path
}

}

def moveOverwriteWithRetries(oldPath: Path, newPath: Path): Unit = {
Expand Down

0 comments on commit 059bdeb

Please sign in to comment.