diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala index 52a433ecf..c3db346e0 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/OpenEOProcesses.scala @@ -424,6 +424,11 @@ class OpenEOProcesses extends Serializable { return aggregateTemporal(datacube, intervals, labels, scriptBuilder, context,true) } def aggregateTemporal(datacube:MultibandTileLayerRDD[SpaceTimeKey], intervals:java.lang.Iterable[String],labels:java.lang.Iterable[String], scriptBuilder:OpenEOProcessScriptBuilder,context: java.util.Map[String,Any], reduce:Boolean ) :MultibandTileLayerRDD[SpaceTimeKey] = { + if(reduce) { + datacube.sparkContext.setCallSite(s"aggregate_temporal $intervals") + }else{ + datacube.sparkContext.setCallSite(s"apply_neighborhood over time intervals") + } val timePeriods: Seq[Iterable[Instant]] = JavaConverters.iterableAsScalaIterableConverter(intervals).asScala.map(s => Instant.parse(s)).grouped(2).toList val labelsDates = labels.asScala.map(ZonedDateTime.parse(_)) val periodsToLabels: Seq[(Iterable[Instant], String)] = timePeriods.zip(labels.asScala) @@ -468,9 +473,9 @@ class OpenEOProcesses extends Serializable { val sc = SparkContext.getOrCreate() - sc.setCallSite("aggregate_temporal") + val allKeysRDD: RDD[(SpaceTimeKey, Null)] = sc.parallelize(allPossibleSpacetime) - sc.clearCallSite() + def mapToNewKey(tuple: (SpaceTimeKey, MultibandTile)): Seq[(SpaceTimeKey, (SpaceTimeKey,MultibandTile))] = { val instant = tuple._1.time.toInstant @@ -504,7 +509,6 @@ class OpenEOProcesses extends Serializable { } - sc.setCallSite("aggregate_temporal") val tilesByInterval: RDD[(SpaceTimeKey, MultibandTile)] = if(reduce) { if(datacube.partitioner.isDefined && datacube.partitioner.get.isInstanceOf[SpacePartitioner[SpaceTimeKey]] && datacube.partitioner.get.asInstanceOf[SpacePartitioner[SpaceTimeKey]].index.isInstanceOf[SparseSpaceOnlyPartitioner]) { @@ -1077,6 +1081,7 @@ class OpenEOProcesses extends Serializable { def rasterMaskGeneric[K: Boundable: PartitionerIndex: ClassTag,M: GetComponent[*, Bounds[K]]] (datacube: RDD[(K,MultibandTile)] with Metadata[M], mask: RDD[(K,MultibandTile)] with Metadata[M], replacement: java.lang.Double): RDD[(K,MultibandTile)] with Metadata[M] = { + datacube.sparkContext.setCallSite("mask with rastercube") DatacubeSupport.rasterMaskGeneric(datacube, mask, replacement) } @@ -1094,6 +1099,7 @@ class OpenEOProcesses extends Serializable { * @return */ def apply_kernel[K: SpatialComponent: ClassTag](datacube:MultibandTileLayerRDD[K],kernel:Tile): RDD[(K, MultibandTile)] with Metadata[TileLayerMetadata[K]] = { + datacube.sparkContext.setCallSite(s"apply_kernel") val k = new Kernel(kernel) val outputCellType = datacube.convert(datacube.metadata.cellType.union(kernel.cellType)) if (kernel.cols > 10 || kernel.rows > 10) { diff --git a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala index dabd20d2f..9175e00e6 100644 --- a/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala +++ b/openeo-geotrellis/src/main/scala/org/openeo/geotrellis/geotiff/package.scala @@ -91,6 +91,7 @@ package object geotiff { cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions ): java.util.List[(String, String, Extent)] = { + rdd.sparkContext.setCallSite(s"save_result(GTiff, temporal)") val ret = saveRDDTemporalAllowAssetPerBand(rdd, path, zLevel, cropBounds, formatOptions).asScala logger.warn("Calling backwards compatibility version for saveRDDTemporalConsiderAssetPerBand") // val duplicates = ret.groupBy(_._2).filter(_._2.size > 1) @@ -195,6 +196,7 @@ package object geotiff { cropBounds: Option[Extent] = Option.empty[Extent], formatOptions: GTiffOptions = new GTiffOptions ): java.util.List[String] = { + rdd.sparkContext.setCallSite(s"save_result(GTiff, spatial, $bandCount)") val tmp = saveRDDAllowAssetPerBand(rdd, bandCount, path, zLevel, cropBounds, formatOptions).asScala logger.warn("Calling backwards compatibility version for saveRDDAllowAssetPerBand") // if (tmp.size() > 1) {