Skip to content

Commit

Permalink
React to MR: Cleanup code. Use @tempdir. Prune imports. #329
Browse files Browse the repository at this point in the history
  • Loading branch information
EmileSonneveld committed Oct 31, 2024
1 parent 65046dd commit 22327e1
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,42 @@ package object geotiff {
ret.map(t => (t._1, t._2, t._3)).asJava
}

private val executorAttemptDirectoryPrefix = "executorAttemptDirectory"

private def createExecutorAttemptDirectory(parentDirectory: String): Path = {
createExecutorAttemptDirectory(Path.of(parentDirectory))
}

private def createExecutorAttemptDirectory(parentDirectory: Path): Path = {
// Multiple executors with the same task can run at the same time.
// Writing their output to the same path would create a racing condition.
// Let's provide a unique directory for each executor:
val rand = new java.security.SecureRandom().nextLong()
val uniqueFolderName = executorAttemptDirectoryPrefix + java.lang.Long.toUnsignedString(rand)
val executorAttemptDirectory = Paths.get(parentDirectory + "/" + uniqueFolderName)
Files.createDirectories(executorAttemptDirectory)
executorAttemptDirectory
}

private def moveFromExecutorAttemptDirectory(parentDirectory: Path, absolutePath: String): Path = {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = parentDirectory.relativize(Path.of(absolutePath)).toString
if (!relativePath.startsWith(executorAttemptDirectoryPrefix)) throw new Exception()
// Remove the executorAttemptDirectory part from the path:
val destinationPath = parentDirectory.resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
destinationPath
}

private def cleanUpExecutorAttemptDirectory(parentDirectory: Path): Unit = {
Files.list(parentDirectory).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith(executorAttemptDirectoryPrefix)) {
FileUtils.deleteDirectory(p.toFile)
}
}
}

/**
* Save temporal rdd, on the executors
*
Expand Down Expand Up @@ -176,10 +212,8 @@ package object geotiff {
val segmentCount = bandSegmentCount * tiffBands

// Each executor writes to a unique folder to avoid conflicts:
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val base = Paths.get(path + "/" + uniqueFolderName)
Files.createDirectories(base)
val thePath = base.resolve(filename).toString
val executorAttemptDirectory = createExecutorAttemptDirectory(path)
val thePath = executorAttemptDirectory.resolve(filename).toString

// filter band tags that match bandIndices
val fo = formatOptions.deepClone()
Expand All @@ -192,22 +226,14 @@ package object geotiff {
tileLayout, compression, cellTypes.head, tiffBands, segmentCount, fo,
)
(correctedPath, timestamp, croppedExtent, bandIndices)
}.collect().map({
}.collect().map {
case (absolutePath, timestamp, croppedExtent, bandIndices) =>
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = Path.of(path).relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path), absolutePath)
(destinationPath.toString, timestamp, croppedExtent, bandIndices)
}).toList.asJava
}.toList.asJava

cleanUpExecutorAttemptDirectory(Path.of(path))

// Clean up failed tasks:
Files.list(Path.of(path)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
res
}

Expand Down Expand Up @@ -254,12 +280,10 @@ package object geotiff {
}
}
val res = rdd_per_band.groupByKey().map { case ((name, bandIndex), tiles) =>
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val fixedPath =
if (path.endsWith("out")) {
val base = path.substring(0, path.length - 3) + uniqueFolderName + "/"
Files.createDirectories(Path.of(base))
base + name
val executorAttemptDirectory = createExecutorAttemptDirectory(path.substring(0, path.length - 3))
executorAttemptDirectory + "/" + name
}
else {
path
Expand All @@ -272,27 +296,22 @@ package object geotiff {

(stitchAndWriteToTiff(tiles, fixedPath, layout, crs, extent, None, None, compression, Some(fo)),
Collections.singletonList(bandIndex))
}.collect().map({
case (absolutePath, y) =>
}.collect().map {
case (absolutePath, bandIndices) =>
if (path.endsWith("out")) {
// Move output file to standard location. (On S3, a move is more a copy and delete):
val beforeOut = path.substring(0, path.length - "out".length)
val relativePath = Path.of(beforeOut).relativize(Path.of(absolutePath)).toString
val destinationPath = beforeOut + relativePath.substring(relativePath.indexOf("/") + 1)
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), Path.of(destinationPath))
(destinationPath, y)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(beforeOut), absolutePath)
(destinationPath.toString, bandIndices)
} else {
(absolutePath, y)
(absolutePath, bandIndices)
}
}).toList.sortBy(_._1).asJava
// Clean up failed tasks:
val beforeOut = path.substring(0, path.length - "out".length)
Files.list(Path.of(beforeOut)).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}.toList.sortBy(_._1).asJava

if (path.endsWith("out")) {
val beforeOut = path.substring(0, path.length - "out".length)
cleanUpExecutorAttemptDirectory(Path.of(beforeOut))
}

res
} else {
val tmp = saveRDDGeneric(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala
Expand Down Expand Up @@ -734,28 +753,18 @@ package object geotiff {
}.groupByKey()
.map { case ((tileId, extent), tiles) =>
// Each executor writes to a unique folder to avoid conflicts:
val uniqueFolderName = "tmp" + java.lang.Long.toUnsignedString(new java.security.SecureRandom().nextLong())
val base = Paths.get(Path.of(path).getParent + "/" + uniqueFolderName)
Files.createDirectories(base)
val filePath = base + "/" + newFilePath(Path.of(path).getFileName.toString, tileId)
val executorAttemptDirectory = createExecutorAttemptDirectory(Path.of(path).getParent)
val filePath = executorAttemptDirectory + "/" + newFilePath(Path.of(path).getFileName.toString, tileId)

(stitchAndWriteToTiff(tiles, filePath, layout, crs, extent, croppedExtent, cropDimensions, compression), extent)
}.collect().map({
}.collect().map {
case (absolutePath, croppedExtent) =>
// Move output file to standard location. (On S3, a move is more a copy and delete):
val relativePath = Path.of(path).getParent.relativize(Path.of(absolutePath)).toString
val destinationPath = Path.of(path).getParent.resolve(relativePath.substring(relativePath.indexOf("/") + 1))
waitTillPathAvailable(Path.of(absolutePath))
Files.move(Path.of(absolutePath), destinationPath)
val destinationPath = moveFromExecutorAttemptDirectory(Path.of(path).getParent, absolutePath)
(destinationPath.toString, croppedExtent)
}).toList.asJava
}.toList.asJava

cleanUpExecutorAttemptDirectory(Path.of(path).getParent)

// Clean up failed tasks:
Files.list(Path.of(path).getParent).forEach { p =>
if (Files.isDirectory(p) && p.getFileName.toString.startsWith("tmp")) {
FileUtils.deleteDirectory(p.toFile)
}
}
res
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
package org.openeo.geotrellis.geotiff

import better.files.File.apply

import java.time.LocalTime.MIDNIGHT
import java.time.ZoneOffset.UTC
import java.time.{LocalDate, ZonedDateTime}
import geotrellis.proj4.{CRS, LatLng}
import geotrellis.raster.io.geotiff.compression.DeflateCompression
import geotrellis.spark._
import geotrellis.spark.util.SparkUtils
import geotrellis.vector.{Extent, ProjectedExtent}
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel.DISK_ONLY
import org.junit._
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{BeforeAll, Test}
import org.junit.{AfterClass, Assert}
import org.openeo.geotrellis.LayerFixtures.rgbLayerProvider
import org.openeo.geotrellis.png.PngTest
import org.openeo.geotrellis.tile_grid.TileGrid
import org.openeo.geotrellis.{LayerFixtures, geotiff}

import java.nio.file.{Files, Paths}
import java.nio.file.Path
import java.time.LocalTime.MIDNIGHT
import java.time.ZoneOffset.UTC
import java.time.format.DateTimeFormatter.ISO_ZONED_DATE_TIME
import java.time.{LocalDate, ZonedDateTime}
import scala.collection.JavaConverters._
import scala.reflect.io.Directory

object TileGridTest {
private var sc: SparkContext = _

@BeforeClass
@BeforeAll
def setupSpark(): Unit = {
// originally geotrellis.spark.util.SparkUtils.createLocalSparkContext
val conf = SparkUtils.createSparkConf
Expand All @@ -51,11 +50,7 @@ class TileGridTest {
import TileGridTest._

@Test
def testSaveStitchWithTileGrids(): Unit = {
val outDir = Paths.get("tmp/testSaveStitchWithTileGrids/")
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)

def testSaveStitchWithTileGrids(@TempDir outDir: Path): Unit = {
val date = ZonedDateTime.of(LocalDate.of(2020, 4, 5), MIDNIGHT, UTC)
val bbox = ProjectedExtent(Extent(1.95, 50.95, 2.05, 51.05), LatLng)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@ import geotrellis.vector._
import geotrellis.vector.io.json.GeoJson
import org.apache.spark.{SparkConf, SparkContext, SparkEnv}
import org.junit.Assert._
import org.junit._
import org.junit.jupiter.api.io.TempDir
import org.junit.jupiter.api.{BeforeAll, Test}
import org.junit.rules.TemporaryFolder
import org.junit.{AfterClass, Rule}
import org.openeo.geotrellis.{LayerFixtures, OpenEOProcesses, ProjectedPolygons}
import org.openeo.sparklisteners.GetInfoSparkListener
import org.slf4j.{Logger, LoggerFactory}

import java.nio.file.{Files, Path, Paths}
import java.time.{LocalDate, LocalTime, ZoneOffset, ZonedDateTime}
import java.time.{LocalTime, ZoneOffset, ZonedDateTime}
import java.util
import java.util.zip.Deflater._
import scala.annotation.meta.getter
Expand All @@ -34,7 +35,7 @@ object WriteRDDToGeotiffTest{

var sc: SparkContext = _

@BeforeClass
@BeforeAll
def setupSpark() = {
sc = {
val conf = new SparkConf().setMaster("local[2]").setAppName(getClass.getSimpleName)
Expand Down Expand Up @@ -151,11 +152,7 @@ class WriteRDDToGeotiffTest {
}

@Test
def testWriteRDD_apply_neighborhood(): Unit ={
val outDir = Paths.get("tmp/testWriteRDD_apply_neighborhood/")
new Directory(outDir.toFile).deepList().foreach(_.delete())
Files.createDirectories(outDir)

def testWriteRDD_apply_neighborhood(@TempDir outDir: Path): Unit = {
val layoutCols = 8
val layoutRows = 4

Expand Down
Loading

0 comments on commit 22327e1

Please sign in to comment.