diff --git a/pyschism/forcing/nws/nws2/hrrr3.py b/pyschism/forcing/nws/nws2/hrrr3.py index cd0004af..0389cb3a 100644 --- a/pyschism/forcing/nws/nws2/hrrr3.py +++ b/pyschism/forcing/nws/nws2/hrrr3.py @@ -20,6 +20,7 @@ logger = logging.getLogger(__name__) + class AWSGrib2Inventory: def __init__( @@ -35,32 +36,54 @@ def __init__( self.forecast_cycle = self.start_date #nearest_cycle() + # paginator=self.s3.get_paginator('list_objects_v2') + # pages=paginator.paginate(Bucket=self.bucket, + # Prefix=f'hrrr.{self.forecast_cycle.strftime("%Y%m%d")}' + # f'/{self.product}/') + # data=[] + # for page in pages: + # for obj in page['Contents']: + # data.append(obj) + + # self.cycle=self.forecast_cycle.hour + # tz='t{:02d}z'.format(self.cycle) + # self.file_metadata = list(sorted([ + # _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] + # ])) + self.get_file_namelist(self.forecast_cycle) + + while (not self.file_metadata): + logger.info(f'No data for cycle t{self.forecast_cycle.hour:02d}z on {self.forecast_cycle}, try next cycle: +1!') + self.forecast_cycle += timedelta(hours=1) + self.get_file_namelist(self.forecast_cycle) + + for key in self.file_metadata[1:record*24+1]: + filename = pathlib.Path(self.tmpdir) / key + filename.parent.mkdir(parents=True, exist_ok=True) + + with open(filename, 'wb') as f: + logger.info(f'Downloading file {key}, ') + try: + self.s3.download_fileobj(self.bucket, key, f) + except: + logger.info(f'file {key} is not available') + + def get_file_namelist(self, requested_date): paginator=self.s3.get_paginator('list_objects_v2') pages=paginator.paginate(Bucket=self.bucket, - Prefix=f'hrrr.{self.forecast_cycle.strftime("%Y%m%d")}' + Prefix=f'hrrr.{requested_date.strftime("%Y%m%d")}' f'/{self.product}/') data=[] for page in pages: for obj in page['Contents']: data.append(obj) - self.cycle=self.forecast_cycle.hour + self.cycle = requested_date.hour tz='t{:02d}z'.format(self.cycle) self.file_metadata = list(sorted([ _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] ])) - - for key in self.file_metadata[1:record*24+1]: - filename = pathlib.Path(self.tmpdir) / key - filename.parent.mkdir(parents=True, exist_ok=True) - - with open(filename, 'wb') as f: - logger.info(f'Downloading file {key}, ') - try: - self.s3.download_fileobj(self.bucket, key, f) - except: - logger.info(f'file {key} is not available') - #return filename + return self.file_metadata @property def bucket(self): @@ -119,6 +142,7 @@ def __init__(self, start_date=None, rnday=None, pscr=None, record=2, bbox=None, pool = mp.Pool(int(npool)) pool.starmap(self.gen_sflux, [(date, record, pscr) for date in datevector]) + #self.gen_sflux(datevector[0], record, pscr) pool.close() @@ -145,31 +169,47 @@ def gen_sflux(self, date, record, pscr): #Get lon/lat lon, lat, idx_ymin, idx_ymax, idx_xmin, idx_xmax = self.modified_latlon(grbfiles[0]) + + + times = [] + for ifile, fname in enumerate(grbfiles): + logger.info(f'file {ifile} is {fname}') - for ifile, file in enumerate(grbfiles): - logger.info(f'file {ifile} is {file}') + ds=xr.open_dataset( + fname, + engine='cfgrib', + backend_kwargs=dict(filter_by_keys={'stepType': 'instant','typeOfLevel': 'surface'}) + ) + times.append(ds.valid_time.values) + ds.close() for key, value in Vars.items(): if key == 'group1': for key2, value2 in value.items(): - ds=xr.open_dataset(file, + ds=xr.open_dataset( + fname, engine='cfgrib', - backend_kwargs=dict(filter_by_keys={'paramId':int(value2[0])})) + backend_kwargs=dict(filter_by_keys={'paramId':int(value2[0])}) + ) value2[1].append(ds[key2][idx_ymin:idx_ymax+1, idx_xmin:idx_xmax+1].astype('float32')) ds.close() elif key == 'group2': - ds=xr.open_dataset(file, + ds=xr.open_dataset( + fname, engine='cfgrib', - backend_kwargs=dict(filter_by_keys={'typeOfLevel':'meanSea'})) + backend_kwargs=dict(filter_by_keys={'typeOfLevel':'meanSea'}) + ) for key2, value2 in value.items(): value2[1].append(ds[key2][idx_ymin:idx_ymax+1, idx_xmin:idx_xmax+1].astype('float32')) ds.close() else: - ds=xr.open_dataset(file, + ds=xr.open_dataset( + fname, engine='cfgrib', - backend_kwargs=dict(filter_by_keys={'stepType': 'instant','typeOfLevel': 'surface'})) + backend_kwargs=dict(filter_by_keys={'stepType': 'instant','typeOfLevel': 'surface'}) + ) for key2, value2 in value.items(): value2[1].append(ds[key2][idx_ymin:idx_ymax+1, idx_xmin:idx_xmax+1].astype('float32')) ds.close() @@ -186,10 +226,12 @@ def gen_sflux(self, date, record, pscr): 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), }, coords={ - 'time': np.round(np.arange(1, len(grbfiles)+1)/24, 4).astype('float32'), + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), 'lon': (['ny_grid', 'nx_grid'], lon), 'lat': (['ny_grid', 'nx_grid'], lat)}) + #date_string = np.datetime_as_string(times[0], unit='m') + #bdate = datetime.strptime(date_string, '%Y-%m-%dT%H:%M') bdate = date.strftime('%Y %m %d %H').split(' ') bdate = [int(q) for q in bdate[:4]] + [0] @@ -197,7 +239,7 @@ def gen_sflux(self, date, record, pscr): 'long_name': 'Time', 'standard_name': 'time', 'base_date': bdate, - 'units': f"days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC" + 'units': f'days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC' } fout.lat.attrs = {