diff --git a/environment.yml b/environment.yml index c92922f4..f7b646a6 100644 --- a/environment.yml +++ b/environment.yml @@ -34,8 +34,8 @@ dependencies: - tqdm - pip: - duckdb==1.1.0 - - h3==3.7.7 - - openeo-gfmap==0.2.0 + - h3==4.1.0 + - openeo-gfmap==0.3.0 - git+https://github.com/worldcereal/worldcereal-classification - git+https://github.com/WorldCereal/presto-worldcereal.git@croptype diff --git a/pyproject.toml b/pyproject.toml index afd4d1bf..d7c0aabf 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,13 +37,13 @@ dependencies = [ "cftime", "geojson", "geopandas", - "h3==3.7.7", + "h3==4.1.0", "h5netcdf>=1.1.0", "loguru>=0.7.2", "netcdf4<=1.6.4", "numpy<2.0.0", "openeo==0.31.0", - "openeo-gfmap==0.2.0", + "openeo-gfmap==0.3.0", "pyarrow", "pydantic==2.8.0", "rioxarray>=0.13.0", diff --git a/scripts/extractions/extract.py b/scripts/extractions/extract.py index 39a992bf..8d2859d0 100644 --- a/scripts/extractions/extract.py +++ b/scripts/extractions/extract.py @@ -103,7 +103,7 @@ def prepare_job_dataframe( pipeline_log.info("Preparing the job dataframe.") # Filter the input dataframe to only keep the locations to extract - input_df = input_df[input_df["extract"] == extract_value].copy() + input_df = input_df[input_df["extract"] >= extract_value].copy() # Split the locations into chunks of max_locations split_dfs = [] @@ -144,9 +144,9 @@ def prepare_job_dataframe( def setup_extraction_functions( collection: ExtractionCollection, extract_value: int, - memory: str, - python_memory: str, - max_executors: int, + memory: typing.Union[str, None], + python_memory: typing.Union[str, None], + max_executors: typing.Union[int, None], ) -> tuple[typing.Callable, typing.Callable, typing.Callable]: """Setup the datacube creation, path generation and post-job action functions for the given collection. Returns a tuple of three functions: @@ -158,33 +158,33 @@ def setup_extraction_functions( datacube_creation = { ExtractionCollection.PATCH_SENTINEL1: partial( create_job_patch_s1, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1900m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_SENTINEL2: partial( create_job_patch_s2, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1900m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_METEO: partial( create_job_patch_meteo, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "1000m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.PATCH_WORLDCEREAL: partial( create_job_patch_worldcereal, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "3000m", + max_executors=max_executors if max_executors is not None else 22, ), ExtractionCollection.POINT_WORLDCEREAL: partial( create_job_point_worldcereal, - executor_memory=memory, - python_memory=python_memory, - max_executors=max_executors, + executor_memory=memory if memory is not None else "1800m", + python_memory=python_memory if python_memory is not None else "3000m", + max_executors=max_executors if max_executors is not None else 22, ), } @@ -334,6 +334,102 @@ def manager_main_loop( raise e +def run_extractions( + collection: ExtractionCollection, + output_folder: Path, + input_df: Path, + max_locations_per_job: int = 500, + memory: str = "1800m", + python_memory: str = "1900m", + max_executors: int = 22, + parallel_jobs: int = 2, + restart_failed: bool = False, + extract_value: int = 1, + backend=Backend.CDSE, +) -> None: + """Main function responsible for launching point and patch extractions. + + Parameters + ---------- + collection : ExtractionCollection + The collection to extract. Most popular: PATCH_WORLDCEREAL, POINT_WORLDCEREAL + output_folder : Path + The folder where to store the extracted data + input_df : Path + Path to the input dataframe containing the geometries + for which extractions need to be done + max_locations_per_job : int, optional + The maximum number of locations to extract per job, by default 500 + memory : str, optional + Memory to allocate for the executor, by default "1800m" + python_memory : str, optional + Memory to allocate for the python processes as well as OrfeoToolbox in the executors, + by default "1900m" + max_executors : int, optional + Number of executors to run, by default 22 + parallel_jobs : int, optional + The maximum number of parallel jobs to run at the same time, by default 10 + restart_failed : bool, optional + Restart the jobs that previously failed, by default False + extract_value : int, optional + All samples with an "extract" value equal or larger than this one, will be extracted, by default 1 + backend : openeo_gfmap.Backend, optional + cloud backend where to run the extractions, by default Backend.CDSE + + Raises + ------ + ValueError + _description_ + """ + + if not output_folder.is_dir(): + output_folder.mkdir(parents=True) + + tracking_df_path = output_folder / "job_tracking.csv" + + # Load the input dataframe and build the job dataframe + input_df = load_dataframe(input_df) + + job_df = None + if not tracking_df_path.exists(): + job_df = prepare_job_dataframe( + input_df, collection, max_locations_per_job, extract_value, backend + ) + + # Setup the extraction functions + pipeline_log.info("Setting up the extraction functions.") + datacube_fn, path_fn, post_job_fn = setup_extraction_functions( + collection, extract_value, memory, python_memory, max_executors + ) + + # Initialize and setups the job manager + pipeline_log.info("Initializing the job manager.") + + job_manager = GFMAPJobManager( + output_dir=output_folder, + output_path_generator=path_fn, + post_job_action=post_job_fn, + poll_sleep=60, + n_threads=4, + restart_failed=restart_failed, + stac_enabled=False, + ) + + job_manager.add_backend( + backend.value, + cdse_connection, + parallel_jobs=parallel_jobs, + ) + + manager_main_loop(job_manager, collection, job_df, datacube_fn, tracking_df_path) + + pipeline_log.info("Extraction completed successfully.") + send_notification( + title=f"WorldCereal Extraction {collection.value} - Completed", + message="Extractions have been completed successfully.", + ) + + if __name__ == "__main__": parser = argparse.ArgumentParser(description="Extract data from a collection") parser.add_argument( @@ -372,8 +468,8 @@ def manager_main_loop( parser.add_argument( "--parallel_jobs", type=int, - default=10, - help="The maximum number of parrallel jobs to run at the same time.", + default=2, + help="The maximum number of parallel jobs to run at the same time.", ) parser.add_argument( "--restart_failed", @@ -389,55 +485,16 @@ def manager_main_loop( args = parser.parse_args() - # Fetches values and setups hardocded values - collection = args.collection - extract_value = args.extract_value - max_locations_per_job = args.max_locations - backend = Backend.CDSE - - if not args.output_folder.is_dir(): - raise ValueError(f"Output folder {args.output_folder} does not exist.") - - tracking_df_path = Path(args.output_folder) / "job_tracking.csv" - - # Load the input dataframe and build the job dataframe - input_df = load_dataframe(args.input_df) - - job_df = None - if not tracking_df_path.exists(): - job_df = prepare_job_dataframe( - input_df, collection, max_locations_per_job, extract_value, backend - ) - - # Setup the extraction functions - pipeline_log.info("Setting up the extraction functions.") - datacube_fn, path_fn, post_job_fn = setup_extraction_functions( - collection, extract_value, args.memory, args.python_memory, args.max_executors - ) - - # Initialize and setups the job manager - pipeline_log.info("Initializing the job manager.") - - job_manager = GFMAPJobManager( - output_dir=args.output_folder, - output_path_generator=path_fn, - post_job_action=post_job_fn, - poll_sleep=60, - n_threads=4, - restart_failed=args.restart_failed, - stac_enabled=False, - ) - - job_manager.add_backend( - Backend.CDSE.value, - cdse_connection, + run_extractions( + collection=args.collection, + output_folder=args.output_folder, + input_df=args.input_df, + max_locations_per_job=args.max_locations, + memory=args.memory, + python_memory=args.python_memory, + max_executors=args.max_executors, parallel_jobs=args.parallel_jobs, - ) - - manager_main_loop(job_manager, collection, job_df, datacube_fn, tracking_df_path) - - pipeline_log.info("Extraction completed successfully.") - send_notification( - title=f"WorldCereal Extraction {collection.value} - Completed", - message="Extractions have been completed successfully.", + restart_failed=args.restart_failed, + extract_value=args.extract_value, + backend=Backend.CDSE, ) diff --git a/scripts/extractions/patch_extractions/extract_patch_meteo.py b/scripts/extractions/patch_extractions/extract_patch_meteo.py index ae8662c6..285fa57b 100644 --- a/scripts/extractions/patch_extractions/extract_patch_meteo.py +++ b/scripts/extractions/patch_extractions/extract_patch_meteo.py @@ -24,11 +24,11 @@ def create_job_dataframe_patch_meteo( def create_job_patch_meteo( row: pd.Series, connection: openeo.DataCube, - provider=None, - connection_provider=None, - executor_memory: str = "2G", - python_memory: str = "1G", - max_executors: int = 22, + provider, + connection_provider, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> gpd.GeoDataFrame: start_date = row.start_date end_date = row.end_date diff --git a/scripts/extractions/patch_extractions/extract_patch_s1.py b/scripts/extractions/patch_extractions/extract_patch_s1.py index b23ea5a8..69b58a97 100644 --- a/scripts/extractions/patch_extractions/extract_patch_s1.py +++ b/scripts/extractions/patch_extractions/extract_patch_s1.py @@ -122,9 +122,9 @@ def create_job_patch_s1( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> openeo.BatchJob: """Creates an OpenEO BatchJob from the given row information. This job is a S1 patch of 32x32 pixels at 20m spatial resolution.""" diff --git a/scripts/extractions/patch_extractions/extract_patch_s2.py b/scripts/extractions/patch_extractions/extract_patch_s2.py index 7af3f1f2..351737bd 100644 --- a/scripts/extractions/patch_extractions/extract_patch_s2.py +++ b/scripts/extractions/patch_extractions/extract_patch_s2.py @@ -70,11 +70,11 @@ def create_job_dataframe_patch_s2( def create_job_patch_s2( row: pd.Series, connection: openeo.DataCube, - provider=None, - connection_provider=None, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + provider, + connection_provider, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> gpd.GeoDataFrame: start_date = row.start_date end_date = row.end_date diff --git a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py index f862d758..da409bf5 100644 --- a/scripts/extractions/patch_extractions/extract_patch_worldcereal.py +++ b/scripts/extractions/patch_extractions/extract_patch_worldcereal.py @@ -124,9 +124,9 @@ def create_job_patch_worldcereal( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ) -> openeo.BatchJob: """Creates an OpenEO BatchJob from the given row information.""" @@ -398,13 +398,16 @@ def post_job_action_patch_worldcereal( def generate_output_path_patch_worldcereal( - root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame + root_folder: Path, + job_index: int, + row: pd.Series, + asset_id: str, + s2_grid: gpd.GeoDataFrame, ): """Generate the output path for the extracted data, from a base path and the row information. """ - features = geojson.loads(row.geometry) - sample_id = features[geometry_index].properties.get("sample_id", None) + sample_id = asset_id.replace(".nc", "").replace("openEO_", "") s2_tile_id = row.s2_tile epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg diff --git a/scripts/extractions/point_extractions/extract_point_worldcereal.py b/scripts/extractions/point_extractions/extract_point_worldcereal.py index ad07ec1a..ab800871 100644 --- a/scripts/extractions/point_extractions/extract_point_worldcereal.py +++ b/scripts/extractions/point_extractions/extract_point_worldcereal.py @@ -16,32 +16,44 @@ def generate_output_path_point_worldcereal( - root_folder: Path, geometry_index: int, row: pd.Series -): - """ - For point extractions, only one asset (a geoparquet file) is generated per job. - Therefore geometry_index is always 0. - It has to be included in the function signature to be compatible with the GFMapJobManager. + root_folder: Path, + geometry_index: int, + row: pd.Series, + asset_id: Optional[str] = None, +) -> Path: + """Method to generate the output path for the point extractions. + + Parameters + ---------- + root_folder : Path + root folder where the output parquet file will be saved + geometry_index : int + For point extractions, only one asset (a geoparquet file) is generated per job. + Therefore geometry_index is always 0. It has to be included in the function signature + to be compatible with the GFMapJobManager + row : pd.Series + the current job row from the GFMapJobManager + asset_id : str, optional + Needed for compatibility with GFMapJobManager but not used. + + Returns + ------- + Path + output path for the point extractions parquet file """ - features = geojson.loads(row.geometry) - ref_id = features[geometry_index].properties["ref_id"] s2_tile_id = row.s2_tile + utm_zone = str(s2_tile_id[0:2]) - subfolder = root_folder / ref_id / s2_tile_id - + # Create the subfolder to store the output + subfolder = root_folder / utm_zone / s2_tile_id subfolder.mkdir(parents=True, exist_ok=True) - # Subfolder is not necessarily unique, so we create numbered folders. - if not any(subfolder.iterdir()): - real_subfolder = subfolder / "0" - else: - i = 0 - while (subfolder / str(i)).exists(): - i += 1 - real_subfolder = subfolder / str(i) + # we may have multiple output files per s2_tile_id and need + # a unique name so we use the job ID + output_file = f"WORLDCEREAL_{root_folder.name}_{row.start_date}_{row.end_date}_{s2_tile_id}_{row.id}{row.out_extension}" - return real_subfolder / f"point_extractions{row.out_extension}" + return subfolder / output_file def create_job_dataframe_point_worldcereal( @@ -52,12 +64,16 @@ def create_job_dataframe_point_worldcereal( for job in tqdm(split_jobs): min_time = job.valid_time.min() max_time = job.valid_time.max() + # 9 months before and after the valid time start_date = (min_time - pd.Timedelta(days=275)).to_pydatetime() end_date = (max_time + pd.Timedelta(days=275)).to_pydatetime() + # ensure start date is 1st day of month, end date is last day of month + start_date = start_date.replace(day=1) + end_date = end_date.replace(day=1) + pd.offsets.MonthEnd(0) + s2_tile = job.tile.iloc[0] - h3_l3_cell = job.h3_l3_cell.iloc[0] # Convert dates to string format start_date, end_date = start_date.strftime("%Y-%m-%d"), end_date.strftime( @@ -67,6 +83,12 @@ def create_job_dataframe_point_worldcereal( # Set back the valid_time in the geometry as string job["valid_time"] = job.valid_time.dt.strftime("%Y-%m-%d") + # Add other attributes we want to keep in the result + job["start_date"] = start_date + job["end_date"] = end_date + job["lat"] = job.geometry.y + job["lon"] = job.geometry.x + variables = { "backend_name": backend.value, "out_prefix": "point-extraction", @@ -74,7 +96,6 @@ def create_job_dataframe_point_worldcereal( "start_date": start_date, "end_date": end_date, "s2_tile": s2_tile, - "h3_l3_cell": h3_l3_cell, "geometry": job.to_json(), } @@ -88,9 +109,9 @@ def create_job_point_worldcereal( connection: openeo.DataCube, provider, connection_provider, - executor_memory: str = "5G", - python_memory: str = "2G", - max_executors: int = 22, + executor_memory: str, + python_memory: str, + max_executors: int, ): """Creates an OpenEO BatchJob from the given row information.""" @@ -106,6 +127,13 @@ def create_job_point_worldcereal( backend = Backend(row.backend_name) backend_context = BackendContext(backend) + # Try to get s2 tile ID to filter the collection + if "s2_tile" in row: + pipeline_log.debug(f"Extracting data for S2 tile {row.s2_tile}") + s2_tile = row.s2_tile + else: + s2_tile = None + inputs = worldcereal_preprocessed_inputs( connection=connection, backend_context=backend_context, @@ -113,6 +141,7 @@ def create_job_point_worldcereal( temporal_extent=temporal_extent, fetch_type=FetchType.POINT, validate_temporal_context=False, + s2_tile=s2_tile, ) # Finally, create a vector cube based on the Point geometries @@ -150,7 +179,8 @@ def post_job_action_point_worldcereal( gdf = gpd.read_parquet(item_asset_path) # Convert the dates to datetime format - gdf["date"] = pd.to_datetime(gdf["date"]) + gdf["timestamp"] = pd.to_datetime(gdf["date"]) + gdf.drop(columns=["date"], inplace=True) # Convert band dtype to uint16 (temporary fix) # TODO: remove this step when the issue is fixed on the OpenEO backend diff --git a/src/worldcereal/openeo/extract.py b/src/worldcereal/openeo/extract.py index c4d27ebb..7d34f64b 100644 --- a/src/worldcereal/openeo/extract.py +++ b/src/worldcereal/openeo/extract.py @@ -153,15 +153,26 @@ def post_job_action_patch( def generate_output_path_patch( - root_folder: Path, geometry_index: int, row: pd.Series, s2_grid: gpd.GeoDataFrame + root_folder: Path, + job_index: int, + row: pd.Series, + asset_id: str, + s2_grid: gpd.GeoDataFrame, ): """Generate the output path for the extracted data, from a base path and the row information. """ - features = geojson.loads(row.geometry) - sample_id = features[geometry_index].properties.get("sample_id", None) - if sample_id is None: - sample_id = features[geometry_index].properties["sampleID"] + # First extract the sample ID from the asset ID + sample_id = asset_id.replace(".nc", "").replace("openEO_", "") + + # Find which index in the FeatureCollection corresponds to the sample_id + features = geojson.loads(row.geometry)["features"] + sample_id_to_index = { + feature.properties.get("sample_id", None): index + for index, feature in enumerate(features) + } + geometry_index = sample_id_to_index.get(sample_id, None) + ref_id = features[geometry_index].properties["ref_id"] if "orbit_state" in row: @@ -170,10 +181,11 @@ def generate_output_path_patch( orbit_state = "" s2_tile_id = row.s2_tile - h3_l3_cell = row.h3_l3_cell + utm_zone = str(s2_tile_id[0:2]) epsg = s2_grid[s2_grid.tile == s2_tile_id].iloc[0].epsg - subfolder = root_folder / ref_id / h3_l3_cell / sample_id + subfolder = root_folder / ref_id / utm_zone / s2_tile_id / sample_id + return ( subfolder / f"{row.out_prefix}{orbit_state}_{sample_id}_{epsg}_{row.start_date}_{row.end_date}{row.out_extension}" diff --git a/src/worldcereal/openeo/preprocessing.py b/src/worldcereal/openeo/preprocessing.py index f4300da9..1b68f91b 100644 --- a/src/worldcereal/openeo/preprocessing.py +++ b/src/worldcereal/openeo/preprocessing.py @@ -253,9 +253,8 @@ def raw_datacube_DEM( cube = extractor.get_cube(connection, spatial_extent, None) cube = cube.rename_labels(dimension="bands", target=["elevation"]) - if backend_context.backend == Backend.CDSE and fetch_type == FetchType.TILE: + if backend_context.backend == Backend.CDSE: # On CDSE we can load the slope from a global slope collection - # but this currently only works for tile fetching. if isinstance(spatial_extent, BoundingBoxExtent): spatial_extent = dict(spatial_extent) @@ -340,6 +339,7 @@ def worldcereal_preprocessed_inputs( validate_temporal_context: bool = True, s1_orbit_state: Optional[str] = None, tile_size: Optional[int] = None, + s2_tile: Optional[str] = None, ) -> DataCube: # First validate the temporal context @@ -364,7 +364,7 @@ def worldcereal_preprocessed_inputs( "S2-L2A-B12", ], fetch_type=fetch_type, - filter_tile=None, + filter_tile=s2_tile, distance_to_cloud_flag=False if fetch_type == FetchType.POINT else True, additional_masks_flag=False, apply_mask_flag=True, diff --git a/tests/worldcerealtests/testresources/preprocess_graph.json b/tests/worldcerealtests/testresources/preprocess_graph.json index 22f739e0..c227e237 100644 --- a/tests/worldcerealtests/testresources/preprocess_graph.json +++ b/tests/worldcerealtests/testresources/preprocess_graph.json @@ -540,35 +540,6 @@ } }, "arrayelement2": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 0 - } - }, - "isnodata1": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement2" - } - } - }, - "if1": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power1" - }, - "value": { - "from_node": "isnodata1" - } - } - }, - "arrayelement3": { "process_id": "array_element", "arguments": { "data": { @@ -582,7 +553,7 @@ "arguments": { "base": 10, "x": { - "from_node": "arrayelement3" + "from_node": "arrayelement2" } } }, @@ -622,44 +593,15 @@ } } }, - "arrayelement4": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 1 - } - }, - "isnodata2": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement4" - } - } - }, - "if2": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power2" - }, - "value": { - "from_node": "isnodata2" - } - } - }, "arraycreate1": { "process_id": "array_create", "arguments": { "data": [ { - "from_node": "if1" + "from_node": "power1" }, { - "from_node": "if2" + "from_node": "power2" } ] }, @@ -705,6 +647,85 @@ } } }, + "loadstac1": { + "process_id": "load_stac", + "arguments": { + "bands": [ + "Slope" + ], + "spatial_extent": { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": { + "type": "Polygon", + "coordinates": [ + [ + [ + 44.433631, + 51.317362 + ], + [ + 44.432274, + 51.427238 + ], + [ + 44.69808, + 51.428224 + ], + [ + 44.698802, + 51.318344 + ], + [ + 44.433631, + 51.317362 + ] + ] + ] + }, + "properties": {} + } + ] + }, + "url": "https://stac.openeo.vito.be/collections/COPERNICUS30_DEM_SLOPE" + } + }, + "renamelabels4": { + "process_id": "rename_labels", + "arguments": { + "data": { + "from_node": "loadstac1" + }, + "dimension": "bands", + "target": [ + "slope" + ] + } + }, + "reducedimension1": { + "process_id": "reduce_dimension", + "arguments": { + "data": { + "from_node": "renamelabels4" + }, + "dimension": "t", + "reducer": { + "process_graph": { + "min1": { + "process_id": "min", + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "result": true + } + } + } + } + }, "loadcollection4": { "process_id": "load_collection", "arguments": { @@ -760,7 +781,7 @@ } } }, - "reducedimension1": { + "reducedimension2": { "process_id": "reduce_dimension", "arguments": { "data": { @@ -769,7 +790,7 @@ "dimension": "t", "reducer": { "process_graph": { - "min1": { + "min2": { "process_id": "min", "arguments": { "data": { @@ -782,11 +803,11 @@ } } }, - "renamelabels4": { + "renamelabels5": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "reducedimension1" + "from_node": "reducedimension2" }, "dimension": "bands", "source": [ @@ -797,11 +818,11 @@ ] } }, - "renamelabels5": { + "renamelabels6": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "renamelabels4" + "from_node": "renamelabels5" }, "dimension": "bands", "target": [ @@ -809,11 +830,22 @@ ] } }, + "mergecubes2": { + "process_id": "merge_cubes", + "arguments": { + "cube1": { + "from_node": "reducedimension1" + }, + "cube2": { + "from_node": "renamelabels6" + } + } + }, "resamplecubespatial1": { "process_id": "resample_cube_spatial", "arguments": { "data": { - "from_node": "renamelabels5" + "from_node": "mergecubes2" }, "method": "bilinear", "target": { @@ -846,7 +878,7 @@ } } }, - "mergecubes2": { + "mergecubes3": { "process_id": "merge_cubes", "arguments": { "cube1": { @@ -857,7 +889,7 @@ } } }, - "loadstac1": { + "loadstac2": { "process_id": "load_stac", "arguments": { "bands": [ @@ -907,11 +939,11 @@ "url": "https://s3.waw3-1.cloudferro.com/swift/v1/agera/stac/collection.json" } }, - "renamelabels6": { + "renamelabels7": { "process_id": "rename_labels", "arguments": { "data": { - "from_node": "loadstac1" + "from_node": "loadstac2" }, "dimension": "bands", "target": [ @@ -924,7 +956,7 @@ "process_id": "resample_cube_spatial", "arguments": { "data": { - "from_node": "renamelabels6" + "from_node": "renamelabels7" }, "method": "bilinear", "target": { @@ -932,11 +964,11 @@ } } }, - "mergecubes3": { + "mergecubes4": { "process_id": "merge_cubes", "arguments": { "cube1": { - "from_node": "mergecubes2" + "from_node": "mergecubes3" }, "cube2": { "from_node": "resamplecubespatial2" diff --git a/tests/worldcerealtests/testresources/preprocess_graphwithslope.json b/tests/worldcerealtests/testresources/preprocess_graphwithslope.json index a96e0077..a167a7de 100644 --- a/tests/worldcerealtests/testresources/preprocess_graphwithslope.json +++ b/tests/worldcerealtests/testresources/preprocess_graphwithslope.json @@ -429,35 +429,6 @@ } }, "arrayelement2": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 0 - } - }, - "isnodata1": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement2" - } - } - }, - "if1": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power1" - }, - "value": { - "from_node": "isnodata1" - } - } - }, - "arrayelement3": { "process_id": "array_element", "arguments": { "data": { @@ -471,7 +442,7 @@ "arguments": { "base": 10, "x": { - "from_node": "arrayelement3" + "from_node": "arrayelement2" } } }, @@ -511,44 +482,15 @@ } } }, - "arrayelement4": { - "process_id": "array_element", - "arguments": { - "data": { - "from_parameter": "data" - }, - "index": 1 - } - }, - "isnodata2": { - "process_id": "is_nodata", - "arguments": { - "x": { - "from_node": "arrayelement4" - } - } - }, - "if2": { - "process_id": "if", - "arguments": { - "accept": 1, - "reject": { - "from_node": "power2" - }, - "value": { - "from_node": "isnodata2" - } - } - }, "arraycreate1": { "process_id": "array_create", "arguments": { "data": [ { - "from_node": "if1" + "from_node": "power1" }, { - "from_node": "if2" + "from_node": "power2" } ] },