diff --git a/pyproject.toml b/pyproject.toml index 1131cff..d7c85c1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -24,10 +24,9 @@ classifiers = [ "Intended Audience :: Science/Research", "Topic :: Scientific/Engineering", ] -requires-python = ">=3.7" +requires-python = ">=3.10" dependencies = [ "backoff>=2,<3", - "dask[dataframe]", "geopandas<1", "h5py", "httpx", diff --git a/src/blackmarble/__init__.py b/src/blackmarble/__init__.py index 4f29a85..ea52dcb 100644 --- a/src/blackmarble/__init__.py +++ b/src/blackmarble/__init__.py @@ -1,7 +1,5 @@ -from pathlib import Path import logging - -# from .base import BlackMarbleDownloader # noqa: F401 +from pathlib import Path __version__ = (Path(__file__).parent / "VERSION").read_text().strip() @@ -12,4 +10,4 @@ logger: logging.Logger = logging.getLogger(__name__) -logger.setLevel(logging.DEBUG) +logger.setLevel(logging.WARN) diff --git a/src/blackmarble/download.py b/src/blackmarble/download.py index 0b47f3e..db8d850 100644 --- a/src/blackmarble/download.py +++ b/src/blackmarble/download.py @@ -1,22 +1,44 @@ +import asyncio import datetime +import json from dataclasses import dataclass from importlib.resources import files from pathlib import Path from typing import ClassVar, List import backoff -import dask.dataframe as dd import geopandas import httpx +import nest_asyncio import pandas as pd from httpx import HTTPError from pqdm.threads import pqdm from pydantic import BaseModel from tqdm.auto import tqdm - from .types import Product +def chunks(ls, n): + """Yield successive n-sized chunks from list.""" + for i in range(0, len(ls), n): + yield ls[i : i + n] + + +@backoff.on_exception( + backoff.expo, + HTTPError, +) +async def get_url(client, url, params): + """ + + Returns + ------- + httpx.Response + HTTP response + """ + return await client.get(url, params=params) + + @dataclass class BlackMarbleDownloader(BaseModel): """A downloader to retrieve `NASA Black Marble `_ data. @@ -36,13 +58,15 @@ class BlackMarbleDownloader(BaseModel): TILES: ClassVar[geopandas.GeoDataFrame] = geopandas.read_file( files("blackmarble.data").joinpath("blackmarbletiles.geojson") ) - URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/5000" + URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov" def __init__(self, bearer: str, directory: Path): + nest_asyncio.apply() super().__init__(bearer=bearer, directory=directory) - def _retrieve_manifest( + async def get_manifest( self, + gdf: geopandas.GeoDataFrame, product_id: Product, date_range: datetime.date | List[datetime.date], ) -> pd.DataFrame: @@ -66,20 +90,42 @@ def _retrieve_manifest( if isinstance(product_id, str): product_id = Product(product_id) - urlpaths = set() - for date in date_range: - match product_id: - case Product.VNP46A3: # if VNP46A3 then first day of the month - tm_yday = date.replace(day=1).timetuple().tm_yday - case Product.VNP46A4: # if VNP46A4 then first day of the year - tm_yday = date.replace(month=1, day=1).timetuple().tm_yday - case _: - tm_yday = date.timetuple().tm_yday - - urlpath = f"{self.URL}/{product_id.value}/{date.year}/{tm_yday}.csv" - urlpaths.add(urlpath) + # Create bounding box + gdf = pd.concat([gdf, gdf.bounds], axis="columns").round(2) + gdf["bbox"] = gdf.round(2).apply( + lambda row: f"x{row.minx}y{row.miny},x{row.maxx}y{row.maxy}", axis=1 + ) - return dd.read_csv(list(urlpaths)).compute() + async with httpx.AsyncClient(verify=False) as client: + tasks = [] + for chunk in chunks(date_range, 250): + for _, row in gdf.iterrows(): + url = f"{self.URL}/api/v1/files" + params = { + "product": product_id.value, + "collection": "5000", + "dateRanges": f"{min(chunk)}..{max(chunk)}", + "areaOfInterest": row["bbox"], + } + tasks.append(asyncio.ensure_future(get_url(client, url, params))) + + responses = [ + await f + for f in tqdm( + asyncio.as_completed(tasks), + total=len(tasks), + desc="GETTING MANIFEST...", + ) + ] + + rs = [] + for r in responses: + try: + rs.append(pd.DataFrame(r.json()).T) + except json.decoder.JSONDecodeError: + continue + + return pd.concat(rs) @backoff.on_exception( backoff.expo, @@ -101,19 +147,14 @@ def _download_file( filename: pathlib.Path Filename of downloaded data file """ - year = name[9:13] - day = name[13:16] - product_id = name[0:7] - - url = f"{self.URL}/{product_id}/{year}/{day}/{name}" - headers = {"Authorization": f"Bearer {self.bearer}"} - filename = Path(self.directory, name) + url = f"{self.URL}{name}" + name = name.split("/")[-1] - with open(filename, "wb+") as f: + with open(filename := Path(self.directory, name), "wb+") as f: with httpx.stream( "GET", url, - headers=headers, + headers={"Authorization": f"Bearer {self.bearer}"}, ) as response: total = int(response.headers["Content-Length"]) with tqdm( @@ -129,29 +170,11 @@ def _download_file( return filename - def _download(self, names: List[str], n_jobs: int = 16): - """Download (in parallel) from NASA Black Marble archive - - Parameters - ---------- - names: List[str] - List of names for which to download from the NASA Black Marble archive - """ - args = [(name,) for name in names] - - return pqdm( - args, - self._download_file, - n_jobs=n_jobs, - argument_type="args", - desc="Downloading...", - ) - def download( self, gdf: geopandas.GeoDataFrame, product_id: Product, - date_range: datetime.date | List[datetime.date], + date_range: List[datetime.date], skip_if_exists: bool = True, ): """Download (in parallel) from NASA Black Marble archive @@ -164,16 +187,27 @@ def download( product: Product Nasa Black Marble Product Id (e.g, VNP46A1) + date_range: List[datetime.date] + Date range for which to download NASA Black Marble data. + skip_if_exists: bool, default=True - Whether to skip downloading or extracting data if the data file for that date already exists + Whether to skip downloading data if file already exists """ gdf = geopandas.overlay( gdf.to_crs("EPSG:4326").dissolve(), self.TILES, how="intersection" ) - bm_files_df = self._retrieve_manifest(product_id, date_range) + + bm_files_df = asyncio.run(self.get_manifest(gdf, product_id, date_range)) bm_files_df = bm_files_df[ bm_files_df["name"].str.contains("|".join(gdf["TileID"])) ] - names = bm_files_df["name"].tolist() + names = bm_files_df["fileURL"].tolist() - return self._download(names) + args = [(name,) for name in names] + return pqdm( + args, + self._download_file, + n_jobs=16, + argument_type="args", + desc="Downloading...", + ) diff --git a/src/blackmarble/raster.py b/src/blackmarble/raster.py index 0242cc7..5a9c35c 100644 --- a/src/blackmarble/raster.py +++ b/src/blackmarble/raster.py @@ -20,7 +20,6 @@ from .download import BlackMarbleDownloader from .types import Product - VARIABLE_DEFAULT = { Product.VNP46A1: "DNB_At_Sensor_Radiance_500m", Product.VNP46A2: "Gap_Filled_DNB_BRDF-Corrected_NTL", @@ -56,9 +55,9 @@ def h5_to_geotiff( Returns ------ output_path: Path - Path to which GeoTIFF + Path to which export GeoTIFF file """ - output_path = Path(output_directory, f"{output_prefix}{f.stem}").with_suffix(".tif") + output_path = Path(output_directory, f.name).with_suffix(".tif") product_id = Product(f.stem.split(".")[0]) if variable is None: @@ -277,24 +276,29 @@ def bm_raster( for date in tqdm(date_range, desc="COLLATING RESULTS | Processing..."): filenames = _pivot_paths_by_date(pathnames).get(date) - # Open each GeoTIFF file as a DataArray and store in a list - da = [ - rioxarray.open_rasterio( - h5_to_geotiff( - f, - variable=variable, - quality_flag_rm=quality_flag_rm, - output_prefix=file_prefix, - output_directory=d, + try: + # Open each GeoTIFF file as a DataArray and store in a list + da = [ + rioxarray.open_rasterio( + h5_to_geotiff( + f, + variable=variable, + quality_flag_rm=quality_flag_rm, + output_prefix=file_prefix, + output_directory=d, + ), ) - ) - for f in filenames - ] - ds = merge_arrays(da) - ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True) - ds["time"] = pd.to_datetime(date) + for f in filenames + ] + ds = merge_arrays(da) + ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True) + ds["time"] = pd.to_datetime(date) + + dx.append(ds.squeeze()) + except TypeError: + continue - dx.append(ds.squeeze()) + dx = filter(lambda item: item is not None, dx) # Stack the individual dates along "time" dimension ds = ( diff --git a/src/blackmarble/types.py b/src/blackmarble/types.py index c847dc4..00eabff 100644 --- a/src/blackmarble/types.py +++ b/src/blackmarble/types.py @@ -1,7 +1,6 @@ -from datetime import date -from enum import Enum +"""Types for blackmarblepy""" -from pydantic import BaseModel, validator +from enum import Enum class Product(Enum): @@ -11,20 +10,3 @@ class Product(Enum): VNP46A2 = "VNP46A2" VNP46A3 = "VNP46A3" VNP46A4 = "VNP46A4" - - -class DateRange(BaseModel): - start_date: date - end_date: date - - @validator("start_date", "end_date", pre=True) - def parse_dates(cls, v): - if isinstance(v, str): - return date.fromisoformat(v) - return v - - @validator("end_date") - def check_date_range(cls, v, values, **kwargs): - if "start_date" in values and v < values["start_date"]: - raise ValueError("End date cannot be before start date") - return v