Skip to content

Commit

Permalink
see if setting callsite from scala works better than from python
Browse files Browse the repository at this point in the history
  • Loading branch information
jdries committed Sep 19, 2024
1 parent e9540b3 commit 9782c10
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,11 @@ class OpenEOProcesses extends Serializable {
return aggregateTemporal(datacube, intervals, labels, scriptBuilder, context,true)
}
def aggregateTemporal(datacube:MultibandTileLayerRDD[SpaceTimeKey], intervals:java.lang.Iterable[String],labels:java.lang.Iterable[String], scriptBuilder:OpenEOProcessScriptBuilder,context: java.util.Map[String,Any], reduce:Boolean ) :MultibandTileLayerRDD[SpaceTimeKey] = {
if(reduce) {
datacube.sparkContext.setCallSite(s"aggregate_temporal $intervals")
}else{
datacube.sparkContext.setCallSite(s"apply_neighborhood over time intervals")
}
val timePeriods: Seq[Iterable[Instant]] = JavaConverters.iterableAsScalaIterableConverter(intervals).asScala.map(s => Instant.parse(s)).grouped(2).toList
val labelsDates = labels.asScala.map(ZonedDateTime.parse(_))
val periodsToLabels: Seq[(Iterable[Instant], String)] = timePeriods.zip(labels.asScala)
Expand Down Expand Up @@ -468,9 +473,9 @@ class OpenEOProcesses extends Serializable {


val sc = SparkContext.getOrCreate()
sc.setCallSite("aggregate_temporal")

val allKeysRDD: RDD[(SpaceTimeKey, Null)] = sc.parallelize(allPossibleSpacetime)
sc.clearCallSite()


def mapToNewKey(tuple: (SpaceTimeKey, MultibandTile)): Seq[(SpaceTimeKey, (SpaceTimeKey,MultibandTile))] = {
val instant = tuple._1.time.toInstant
Expand Down Expand Up @@ -504,7 +509,6 @@ class OpenEOProcesses extends Serializable {

}

sc.setCallSite("aggregate_temporal")
val tilesByInterval: RDD[(SpaceTimeKey, MultibandTile)] =
if(reduce) {
if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] && datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[SparseSpaceOnlyPartitioner]) {
Expand Down Expand Up @@ -1077,6 +1081,7 @@ class OpenEOProcesses extends Serializable {

def rasterMaskGeneric[K: Boundable: PartitionerIndex: ClassTag,M: GetComponent[*, Bounds[K]]]
(datacube: RDD[(K,MultibandTile)] with Metadata[M], mask: RDD[(K,MultibandTile)] with Metadata[M], replacement: java.lang.Double): RDD[(K,MultibandTile)] with Metadata[M] = {
datacube.sparkContext.setCallSite("mask with rastercube")
DatacubeSupport.rasterMaskGeneric(datacube, mask, replacement)
}

Expand All @@ -1094,6 +1099,7 @@ class OpenEOProcesses extends Serializable {
* @return
*/
def apply_kernel[K: SpatialComponent: ClassTag](datacube:MultibandTileLayerRDD[K],kernel:Tile): RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]] = {
datacube.sparkContext.setCallSite(s"apply_kernel")
val k = new Kernel(kernel)
val outputCellType = datacube.convert(datacube.metadata.cellType.union(kernel.cellType))
if (kernel.cols > 10 || kernel.rows > 10) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ package object geotiff {
cropBounds: Option[Extent] = Option.empty[Extent],
formatOptions: GTiffOptions = new GTiffOptions
): java.util.List[(String, String, Extent)] = {
rdd.sparkContext.setCallSite(s"save_result(GTiff, temporal)")
val ret = saveRDDTemporalAllowAssetPerBand(rdd, path, zLevel, cropBounds, formatOptions).asScala
logger.warn("Calling backwards compatibility version for saveRDDTemporalConsiderAssetPerBand")
// val duplicates = ret.groupBy(_._2).filter(_._2.size > 1)
Expand Down Expand Up @@ -195,6 +196,7 @@ package object geotiff {
cropBounds: Option[Extent] = Option.empty[Extent],
formatOptions: GTiffOptions = new GTiffOptions
): java.util.List[String] = {
rdd.sparkContext.setCallSite(s"save_result(GTiff, spatial, $bandCount)")
val tmp = saveRDDAllowAssetPerBand(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala
logger.warn("Calling backwards compatibility version for saveRDDAllowAssetPerBand")
// if (tmp.size() > 1) {
Expand Down

0 comments on commit 9782c10

Please sign in to comment.