Skip to content

Commit

Permalink
Get manifest (#33)
Browse files Browse the repository at this point in the history
  • Loading branch information
g4brielvs authored Dec 11, 2023
1 parent e6c665b commit 1f1bde2
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 94 deletions.
3 changes: 1 addition & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ classifiers = [
"Intended Audience :: Science/Research",
"Topic :: Scientific/Engineering",
]
requires-python = ">=3.7"
requires-python = ">=3.10"
dependencies = [
"backoff>=2,<3",
"dask[dataframe]",
"geopandas<1",
"h5py",
"httpx",
Expand Down
6 changes: 2 additions & 4 deletions src/blackmarble/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from pathlib import Path
import logging

# from .base import BlackMarbleDownloader # noqa: F401
from pathlib import Path

__version__ = (Path(__file__).parent / "VERSION").read_text().strip()

Expand All @@ -12,4 +10,4 @@

logger: logging.Logger = logging.getLogger(__name__)

logger.setLevel(logging.DEBUG)
logger.setLevel(logging.WARN)
132 changes: 83 additions & 49 deletions src/blackmarble/download.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
import asyncio
import datetime
import json
from dataclasses import dataclass
from importlib.resources import files
from pathlib import Path
from typing import ClassVar, List

import backoff
import dask.dataframe as dd
import geopandas
import httpx
import nest_asyncio
import pandas as pd
from httpx import HTTPError
from pqdm.threads import pqdm
from pydantic import BaseModel
from tqdm.auto import tqdm

from .types import Product


def chunks(ls, n):
"""Yield successive n-sized chunks from list."""
for i in range(0, len(ls), n):
yield ls[i : i + n]


@backoff.on_exception(
backoff.expo,
HTTPError,
)
async def get_url(client, url, params):
"""
Returns
-------
httpx.Response
HTTP response
"""
return await client.get(url, params=params)


@dataclass
class BlackMarbleDownloader(BaseModel):
"""A downloader to retrieve `NASA Black Marble <https://blackmarble.gsfc.nasa.gov>`_ data.
Expand All @@ -36,13 +58,15 @@ class BlackMarbleDownloader(BaseModel):
TILES: ClassVar[geopandas.GeoDataFrame] = geopandas.read_file(
files("blackmarble.data").joinpath("blackmarbletiles.geojson")
)
URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov/archive/allData/5000"
URL: ClassVar[str] = "https://ladsweb.modaps.eosdis.nasa.gov"

def __init__(self, bearer: str, directory: Path):
nest_asyncio.apply()
super().__init__(bearer=bearer, directory=directory)

def _retrieve_manifest(
async def get_manifest(
self,
gdf: geopandas.GeoDataFrame,
product_id: Product,
date_range: datetime.date | List[datetime.date],
) -> pd.DataFrame:
Expand All @@ -66,20 +90,42 @@ def _retrieve_manifest(
if isinstance(product_id, str):
product_id = Product(product_id)

urlpaths = set()
for date in date_range:
match product_id:
case Product.VNP46A3: # if VNP46A3 then first day of the month
tm_yday = date.replace(day=1).timetuple().tm_yday
case Product.VNP46A4: # if VNP46A4 then first day of the year
tm_yday = date.replace(month=1, day=1).timetuple().tm_yday
case _:
tm_yday = date.timetuple().tm_yday

urlpath = f"{self.URL}/{product_id.value}/{date.year}/{tm_yday}.csv"
urlpaths.add(urlpath)
# Create bounding box
gdf = pd.concat([gdf, gdf.bounds], axis="columns").round(2)
gdf["bbox"] = gdf.round(2).apply(
lambda row: f"x{row.minx}y{row.miny},x{row.maxx}y{row.maxy}", axis=1
)

return dd.read_csv(list(urlpaths)).compute()
async with httpx.AsyncClient(verify=False) as client:
tasks = []
for chunk in chunks(date_range, 250):
for _, row in gdf.iterrows():
url = f"{self.URL}/api/v1/files"
params = {
"product": product_id.value,
"collection": "5000",
"dateRanges": f"{min(chunk)}..{max(chunk)}",
"areaOfInterest": row["bbox"],
}
tasks.append(asyncio.ensure_future(get_url(client, url, params)))

responses = [
await f
for f in tqdm(
asyncio.as_completed(tasks),
total=len(tasks),
desc="GETTING MANIFEST...",
)
]

rs = []
for r in responses:
try:
rs.append(pd.DataFrame(r.json()).T)
except json.decoder.JSONDecodeError:
continue

return pd.concat(rs)

@backoff.on_exception(
backoff.expo,
Expand All @@ -101,19 +147,14 @@ def _download_file(
filename: pathlib.Path
Filename of downloaded data file
"""
year = name[9:13]
day = name[13:16]
product_id = name[0:7]

url = f"{self.URL}/{product_id}/{year}/{day}/{name}"
headers = {"Authorization": f"Bearer {self.bearer}"}
filename = Path(self.directory, name)
url = f"{self.URL}{name}"
name = name.split("/")[-1]

with open(filename, "wb+") as f:
with open(filename := Path(self.directory, name), "wb+") as f:
with httpx.stream(
"GET",
url,
headers=headers,
headers={"Authorization": f"Bearer {self.bearer}"},
) as response:
total = int(response.headers["Content-Length"])
with tqdm(
Expand All @@ -129,29 +170,11 @@ def _download_file(

return filename

def _download(self, names: List[str], n_jobs: int = 16):
"""Download (in parallel) from NASA Black Marble archive
Parameters
----------
names: List[str]
List of names for which to download from the NASA Black Marble archive
"""
args = [(name,) for name in names]

return pqdm(
args,
self._download_file,
n_jobs=n_jobs,
argument_type="args",
desc="Downloading...",
)

def download(
self,
gdf: geopandas.GeoDataFrame,
product_id: Product,
date_range: datetime.date | List[datetime.date],
date_range: List[datetime.date],
skip_if_exists: bool = True,
):
"""Download (in parallel) from NASA Black Marble archive
Expand All @@ -164,16 +187,27 @@ def download(
product: Product
Nasa Black Marble Product Id (e.g, VNP46A1)
date_range: List[datetime.date]
Date range for which to download NASA Black Marble data.
skip_if_exists: bool, default=True
Whether to skip downloading or extracting data if the data file for that date already exists
Whether to skip downloading data if file already exists
"""
gdf = geopandas.overlay(
gdf.to_crs("EPSG:4326").dissolve(), self.TILES, how="intersection"
)
bm_files_df = self._retrieve_manifest(product_id, date_range)

bm_files_df = asyncio.run(self.get_manifest(gdf, product_id, date_range))
bm_files_df = bm_files_df[
bm_files_df["name"].str.contains("|".join(gdf["TileID"]))
]
names = bm_files_df["name"].tolist()
names = bm_files_df["fileURL"].tolist()

return self._download(names)
args = [(name,) for name in names]
return pqdm(
args,
self._download_file,
n_jobs=16,
argument_type="args",
desc="Downloading...",
)
42 changes: 23 additions & 19 deletions src/blackmarble/raster.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from .download import BlackMarbleDownloader
from .types import Product


VARIABLE_DEFAULT = {
Product.VNP46A1: "DNB_At_Sensor_Radiance_500m",
Product.VNP46A2: "Gap_Filled_DNB_BRDF-Corrected_NTL",
Expand Down Expand Up @@ -56,9 +55,9 @@ def h5_to_geotiff(
Returns
------
output_path: Path
Path to which GeoTIFF
Path to which export GeoTIFF file
"""
output_path = Path(output_directory, f"{output_prefix}{f.stem}").with_suffix(".tif")
output_path = Path(output_directory, f.name).with_suffix(".tif")
product_id = Product(f.stem.split(".")[0])

if variable is None:
Expand Down Expand Up @@ -277,24 +276,29 @@ def bm_raster(
for date in tqdm(date_range, desc="COLLATING RESULTS | Processing..."):
filenames = _pivot_paths_by_date(pathnames).get(date)

# Open each GeoTIFF file as a DataArray and store in a list
da = [
rioxarray.open_rasterio(
h5_to_geotiff(
f,
variable=variable,
quality_flag_rm=quality_flag_rm,
output_prefix=file_prefix,
output_directory=d,
try:
# Open each GeoTIFF file as a DataArray and store in a list
da = [
rioxarray.open_rasterio(
h5_to_geotiff(
f,
variable=variable,
quality_flag_rm=quality_flag_rm,
output_prefix=file_prefix,
output_directory=d,
),
)
)
for f in filenames
]
ds = merge_arrays(da)
ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True)
ds["time"] = pd.to_datetime(date)
for f in filenames
]
ds = merge_arrays(da)
ds = ds.rio.clip(gdf.geometry.apply(mapping), gdf.crs, drop=True)
ds["time"] = pd.to_datetime(date)

dx.append(ds.squeeze())
except TypeError:
continue

dx.append(ds.squeeze())
dx = filter(lambda item: item is not None, dx)

# Stack the individual dates along "time" dimension
ds = (
Expand Down
22 changes: 2 additions & 20 deletions src/blackmarble/types.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import date
from enum import Enum
"""Types for blackmarblepy"""

from pydantic import BaseModel, validator
from enum import Enum


class Product(Enum):
Expand All @@ -11,20 +10,3 @@ class Product(Enum):
VNP46A2 = "VNP46A2"
VNP46A3 = "VNP46A3"
VNP46A4 = "VNP46A4"


class DateRange(BaseModel):
start_date: date
end_date: date

@validator("start_date", "end_date", pre=True)
def parse_dates(cls, v):
if isinstance(v, str):
return date.fromisoformat(v)
return v

@validator("end_date")
def check_date_range(cls, v, values, **kwargs):
if "start_date" in values and v < values["start_date"]:
raise ValueError("End date cannot be before start date")
return v

0 comments on commit 1f1bde2

Please sign in to comment.