Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring getting data manifest #33

Merged
merged 1 commit into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ classifiers = [
"Intended Audience :: Science/Research",
"Topic :: Scientific/Engineering",
]
requires-python = ">=3.7"
requires-python = ">=3.10"
dependencies = [
"backoff>=2,<3",
"dask[dataframe]",
"geopandas<1",
"h5py",
"httpx",
Expand Down
6 changes: 2 additions & 4 deletions src/blackmarble/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from pathlib import Path
import logging

# from .base import BlackMarbleDownloader # noqa: F401
from pathlib import Path

__version__ = (Path(__file__).parent / "VERSION").read_text().strip()

Expand All @@ -12,4 +10,4 @@

logger: logging.Logger = logging.getLogger(__name__)

logger.setLevel(logging.DEBUG)
logger.setLevel(logging.WARN)
132 changes: 83 additions & 49 deletions src/blackmarble/download.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
import asyncio
import datetime
import json
from dataclasses import dataclass
from importlib.resources import files
from pathlib import Path
from typing import ClassVar, List

import backoff
import dask.dataframe as dd
import geopandas
import httpx
import nest_asyncio
import pandas as pd
from httpx import HTTPError
from pqdm.threads import pqdm
from pydantic import BaseModel
from tqdm.auto import tqdm

from .types import Product


def chunks(ls, n):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(ls), n):
yield ls[i : i + n]


@backoff.on_exception(
backoff.expo,
HTTPError,
)
async def get_url(client, url, params):
"""

Returns
-------
httpx.Response
HTTP response
"""
return await client.get(url, params=params)


@dataclass
class BlackMarbleDownloader(BaseModel):
"""A downloader to retrieve `NASA Black Marble <https://blackmarble.gsfc.nasa.gov>`_ data.
Expand All @@ -36,13 +58,15 @@ class BlackMarbleDownloader(BaseModel):
TILES: ClassVar[geopandas.GeoDataFrame] = geopandas.read_file(
files("blackmarble.data").joinpath("blackmarbletiles.geojson")
)
URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/5000"
URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov"

def __init__(self, bearer: str, directory: Path):
nest_asyncio.apply()
super().__init__(bearer=bearer, directory=directory)

def _retrieve_manifest(
async def get_manifest(
self,
gdf: geopandas.GeoDataFrame,
product_id: Product,
date_range: datetime.date | List[datetime.date],
) -> pd.DataFrame:
Expand All @@ -66,20 +90,42 @@ def _retrieve_manifest(
if isinstance(product_id, str):
product_id = Product(product_id)

urlpaths = set()
for date in date_range:
match product_id:
case Product.VNP46A3: # if VNP46A3 then first day of the month
tm_yday = date.replace(day=1).timetuple().tm_yday
case Product.VNP46A4: # if VNP46A4 then first day of the year
tm_yday = date.replace(month=1, day=1).timetuple().tm_yday
case _:
tm_yday = date.timetuple().tm_yday

urlpath = f"{self.URL}/{product_id.value}/{date.year}/{tm_yday}.csv"
urlpaths.add(urlpath)
# Create bounding box
gdf = pd.concat([gdf, gdf.bounds], axis="columns").round(2)
gdf["bbox"] = gdf.round(2).apply(
lambda row: f"x{row.minx}y{row.miny},x{row.maxx}y{row.maxy}", axis=1
)

return dd.read_csv(list(urlpaths)).compute()
async with httpx.AsyncClient(verify=False) as client:
tasks = []
for chunk in chunks(date_range, 250):
for _, row in gdf.iterrows():
url = f"{self.URL}/api/v1/files"
params = {
"product": product_id.value,
"collection": "5000",
"dateRanges": f"{min(chunk)}..{max(chunk)}",
"areaOfInterest": row["bbox"],
}
tasks.append(asyncio.ensure_future(get_url(client, url, params)))

responses = [
await f
for f in tqdm(
asyncio.as_completed(tasks),
total=len(tasks),
desc="GETTING MANIFEST...",
)
]

rs = []
for r in responses:
try:
rs.append(pd.DataFrame(r.json()).T)
except json.decoder.JSONDecodeError:
continue

return pd.concat(rs)

@backoff.on_exception(
backoff.expo,
Expand All @@ -101,19 +147,14 @@ def _download_file(
filename: pathlib.Path
Filename of downloaded data file
"""
year = name[9:13]
day = name[13:16]
product_id = name[0:7]

url = f"{self.URL}/{product_id}/{year}/{day}/{name}"
headers = {"Authorization": f"Bearer {self.bearer}"}
filename = Path(self.directory, name)
url = f"{self.URL}{name}"
name = name.split("/")[-1]

with open(filename, "wb+") as f:
with open(filename := Path(self.directory, name), "wb+") as f:
with httpx.stream(
"GET",
url,
headers=headers,
headers={"Authorization": f"Bearer {self.bearer}"},
) as response:
total = int(response.headers["Content-Length"])
with tqdm(
Expand All @@ -129,29 +170,11 @@ def _download_file(

return filename

def _download(self, names: List[str], n_jobs: int = 16):
"""Download (in parallel) from NASA Black Marble archive

Parameters
----------
names: List[str]
List of names for which to download from the NASA Black Marble archive
"""
args = [(name,) for name in names]

return pqdm(
args,
self._download_file,
n_jobs=n_jobs,
argument_type="args",
desc="Downloading...",
)

def download(
self,
gdf: geopandas.GeoDataFrame,
product_id: Product,
date_range: datetime.date | List[datetime.date],
date_range: List[datetime.date],
skip_if_exists: bool = True,
):
"""Download (in parallel) from NASA Black Marble archive
Expand All @@ -164,16 +187,27 @@ def download(
product: Product
Nasa Black Marble Product Id (e.g, VNP46A1)

date_range: List[datetime.date]
Date range for which to download NASA Black Marble data.

skip_if_exists: bool, default=True
Whether to skip downloading or extracting data if the data file for that date already exists
Whether to skip downloading data if file already exists
"""
gdf = geopandas.overlay(
gdf.to_crs("EPSG:4326").dissolve(), self.TILES, how="intersection"
)
bm_files_df = self._retrieve_manifest(product_id, date_range)

bm_files_df = asyncio.run(self.get_manifest(gdf, product_id, date_range))
bm_files_df = bm_files_df[
bm_files_df["name"].str.contains("|".join(gdf["TileID"]))
]
names = bm_files_df["name"].tolist()
names = bm_files_df["fileURL"].tolist()

return self._download(names)
args = [(name,) for name in names]
return pqdm(
args,
self._download_file,
n_jobs=16,
argument_type="args",
desc="Downloading...",
)
42 changes: 23 additions & 19 deletions src/blackmarble/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .download import BlackMarbleDownloader
from .types import Product


VARIABLE_DEFAULT = {
Product.VNP46A1: "DNB_At_Sensor_Radiance_500m",
Product.VNP46A2: "Gap_Filled_DNB_BRDF-Corrected_NTL",
Expand Down Expand Up @@ -56,9 +55,9 @@ def h5_to_geotiff(
Returns
------
output_path: Path
Path to which GeoTIFF
Path to which export GeoTIFF file
"""
output_path = Path(output_directory, f"{output_prefix}{f.stem}").with_suffix(".tif")
output_path = Path(output_directory, f.name).with_suffix(".tif")
product_id = Product(f.stem.split(".")[0])

if variable is None:
Expand Down Expand Up @@ -277,24 +276,29 @@ def bm_raster(
for date in tqdm(date_range, desc="COLLATING RESULTS | Processing..."):
filenames = _pivot_paths_by_date(pathnames).get(date)

# Open each GeoTIFF file as a DataArray and store in a list
da = [
rioxarray.open_rasterio(
h5_to_geotiff(
f,
variable=variable,
quality_flag_rm=quality_flag_rm,
output_prefix=file_prefix,
output_directory=d,
try:
# Open each GeoTIFF file as a DataArray and store in a list
da = [
rioxarray.open_rasterio(
h5_to_geotiff(
f,
variable=variable,
quality_flag_rm=quality_flag_rm,
output_prefix=file_prefix,
output_directory=d,
),
)
)
for f in filenames
]
ds = merge_arrays(da)
ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True)
ds["time"] = pd.to_datetime(date)
for f in filenames
]
ds = merge_arrays(da)
ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True)
ds["time"] = pd.to_datetime(date)

dx.append(ds.squeeze())
except TypeError:
continue

dx.append(ds.squeeze())
dx = filter(lambda item: item is not None, dx)

# Stack the individual dates along "time" dimension
ds = (
Expand Down
22 changes: 2 additions & 20 deletions src/blackmarble/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import date
from enum import Enum
"""Types for blackmarblepy"""

from pydantic import BaseModel, validator
from enum import Enum


class Product(Enum):
Expand All @@ -11,20 +10,3 @@ class Product(Enum):
VNP46A2 = "VNP46A2"
VNP46A3 = "VNP46A3"
VNP46A4 = "VNP46A4"


class DateRange(BaseModel):
start_date: date
end_date: date

@validator("start_date", "end_date", pre=True)
def parse_dates(cls, v):
if isinstance(v, str):
return date.fromisoformat(v)
return v

@validator("end_date")
def check_date_range(cls, v, values, **kwargs):
if "start_date" in values and v < values["start_date"]:
raise ValueError("End date cannot be before start date")
return v