From 3ee787ebe10ff966624bf05379c86a369f67dd28 Mon Sep 17 00:00:00 2001 From: cuill Date: Thu, 11 Jan 2024 21:31:06 -0500 Subject: [PATCH] Revert "Updated gfs2 and hrrr3 class" --- .github/workflows/release.yml | 2 +- examples/Sflux/gen_sflux_gfs2.py | 24 +- examples/Sflux/gen_sflux_hrrr3.py | 30 +-- pyschism/forcing/nws/nws2/gfs2.py | 370 +++++++++----------------- pyschism/forcing/nws/nws2/hrrr3.py | 403 ++++++++++------------------- setup.py | 2 +- 6 files changed, 274 insertions(+), 557 deletions(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b320d1bb..9952a6c1 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -13,7 +13,7 @@ jobs: strategy: matrix: os: [ ubuntu-latest ] - python-version: ['3.8', '3.9', '3.x' ] + python-version: [ '3.8', '3.9', '3.x' ] steps: - name: checkout repository uses: actions/checkout@v2 diff --git a/examples/Sflux/gen_sflux_gfs2.py b/examples/Sflux/gen_sflux_gfs2.py index 4499331f..b7321f63 100644 --- a/examples/Sflux/gen_sflux_gfs2.py +++ b/examples/Sflux/gen_sflux_gfs2.py @@ -2,17 +2,14 @@ from time import time import multiprocessing as mp import logging -import pathlib import numpy as np import pandas as pd -from matplotlib.transforms import Bbox from pyschism.mesh.hgrid import Hgrid from pyschism.forcing.nws.nws2.gfs2 import GFS from pyschism.dates import nearest_cycle - logging.basicConfig( format="[%(asctime)s] %(name)s %(levelname)s: %(message)s", force=True, @@ -23,19 +20,16 @@ logging.getLogger('pyschism').setLevel(log_level) if __name__ == "__main__": + t0 = time() + startdate = datetime(2021, 10, 13) - #now = datetime.now() - #last_cycle = np.datetime64(pd.DatetimeIndex([now-timedelta(hours=2)]).floor('6H').values[0], 'h').tolist() - #start = (last_cycle - timedelta(days=1)).replace(hour=0) - start = datetime(2023, 10, 1) - rnday = 10 - #record = 5 - #outdir = path = pathlib.Path('./GFS_2023') + pscr = '/sciclone/pscr/lcui01/GFS/' + rnday = 5 + record = 1 hgrid = Hgrid.open('../../static/hgrid.gr3', crs='epsg:4326') - pscr = '/sciclone/pscr/lcui01/GFS/' - gfs = GFS(level=1, pscr=pscr, bbox=hgrid.bbox) - gfs.write(start_date=start, rnday=rnday, air=True, prc=True, rad=True) - - print(f'It took {(time()-t0)/60} mins to process {rnday} days') + + gfs = GFS(start_date=startdate, rnday=rnday, pscr=pscr, record=record, bbox=hgrid.bbox) + + print(f'It took {(time()-t0)/60} mins to process {rnday} days, {record*24} records/day') diff --git a/examples/Sflux/gen_sflux_hrrr3.py b/examples/Sflux/gen_sflux_hrrr3.py index 63557cd5..45f2fed6 100644 --- a/examples/Sflux/gen_sflux_hrrr3.py +++ b/examples/Sflux/gen_sflux_hrrr3.py @@ -1,11 +1,6 @@ from datetime import datetime, timedelta from time import time import logging -import pathlib - -import numpy as np -import pandas as pd -from matplotlib.transforms import Bbox from pyschism.mesh.hgrid import Hgrid from pyschism.forcing.nws.nws2.hrrr3 import HRRR @@ -22,24 +17,11 @@ if __name__ == "__main__": t0 = time() - - #now = datetime.now() - #last_cycle = np.datetime64(pd.DatetimeIndex([now-timedelta(hours=2)]).floor('6H').values[0], 'h').tolist() - #print(last_cycle) - - #start = (last_cycle - timedelta(days=1)).replace(hour=0) - #end = last_cycle + timedelta(days=2) - - #rndelta = end -start - #rnday = rndelta.total_seconds() / 86400. - start = datetime(2023, 10, 26) - rnday = 2 - hgrid = Hgrid.open('./hgrid.gr3', crs='epsg:4326') - #outdir = path = pathlib.Path('./HRRR_2017_2') - #bbox = Bbox.from_extents(-162, 60.79, -143, 69.1) - + rnday = 1 + record = 1 + startdate = nearest_cycle() + hgrid = Hgrid.open('../../static/hgrid.gr3', crs='epsg:4326') pscr='/sciclone/pscr/lcui01/HRRR/' - hrrr = HRRR(level=2, region='alaska', pscr=pscr, bbox=bbox) - hrrr.write(start_date=start, rnday=rnday, air=True, prc=True, rad=True) - print(f'It took {(time()-t0)/60} mins to process {rnday} days!') + hrrr = HRRR(start_date=startdate, rnday=rnday, pscr=pscr, record=record, bbox=hgrid.bbox) + print(f'It took {(time()-t0)/60} mins to process {rnday} days, {record*24} files/day') diff --git a/pyschism/forcing/nws/nws2/gfs2.py b/pyschism/forcing/nws/nws2/gfs2.py index e5123741..5267fcd1 100644 --- a/pyschism/forcing/nws/nws2/gfs2.py +++ b/pyschism/forcing/nws/nws2/gfs2.py @@ -1,4 +1,3 @@ -import os from datetime import datetime, timedelta import logging import pathlib @@ -28,94 +27,44 @@ def __init__( product='atmos', ): """ - Download GFS data from AWS. - This dataset GFS V16.3.0 starts on Feb 26, 2021. + This will download the GFS data. """ self.start_date = nearest_cycle() if start_date is None else start_date + self.record = record self.pscr = pscr self.product = product - self.forecast_cycle = self.start_date + self.cycle=self.forecast_cycle.hour + + paginator=self.s3.get_paginator('list_objects_v2') + pages=paginator.paginate(Bucket=self.bucket, + Prefix=f'gfs.{self.forecast_cycle.strftime("%Y%m%d")}' + f'/{self.cycle:02d}/{self.product}/') - timevector = np.arange( - self.start_date, - self.start_date + timedelta(hours=record*24+1), - np.timedelta64(1, 'h') - ).astype(datetime) + data=[] + for page in pages: + for obj in page['Contents']: + data.append(obj) - file_metadata = self.get_file_namelist(timevector) + file_metadata = list(sorted([ + _['Key'] for _ in data if '0p25' in _['Key'] and 'pgrb2' in _['Key'] and not 'goessim' in _['Key'] and not 'anl' in _['Key'] and not 'idx' in _['Key']])) - for dt in timevector: - - outfile_name = f"gfs.{self.start_date.strftime('%Y%m%d')}/gfs.pgrb2.0p25.{dt.strftime('%Y%m%d%H')}.grib2" - filename = pathlib.Path(self.tmpdir) / outfile_name + for key in file_metadata[1:self.record*24+1]: + key2 = key + '.grib2' + filename = pathlib.Path(self.tmpdir) / key2 filename.parent.mkdir(parents=True, exist_ok=True) with open(filename, 'wb') as f: - while(file_metadata[dt]): - try: - key = file_metadata[dt].pop(0) - logger.info(f"Downloading file {key} for {dt}") - self.s3.download_fileobj(self.bucket, key, f) - logger.info("Success!") - break - except: - if not file_metadata[dt]: - logger.info(f'No file for {dt}') - if os.path.exists(filename): - os.remove(filename) - else: - logger.info(f'file {key} is not available, try next file') - continue - - def get_file_namelist(self, requested_dates): - - file_metadata = {} - hours = (datetime.utcnow() - self.start_date).days * 24 + (datetime.utcnow() - self.start_date).seconds // 3600 - n_cycles = hours // 6 if hours < 25 else 4 - cycle_index = (int(self.start_date.hour) - 1) // 6 - dt = requested_dates[0] - for it, dt in enumerate(requested_dates[:n_cycles*6+1]): - levels = 3 - i = 0 - fhour = int(dt.hour) - cycle_index = (fhour - 1) // 6 - date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') - - while (levels): - - cycle = self.fcst_cycles[cycle_index - i] - fhour2 = fhour + i * 6 if cycle_index == 0 else fhour - cycle_index * 6 + i * 6 - - file_metadata.setdefault(dt, []).append(f"gfs.{date2}/{cycle}/{self.product}/gfs.t{cycle}z.pgrb2.0p25.f{fhour2:03d}") - levels -= 1 - i += 1 - - if it < 25: - date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') - for it, dt in enumerate(requested_dates[n_cycles*6+1:]): - levels = 3 - i = 0 - hours = (dt - self.forecast_cycle).days * 24 + (dt - self.forecast_cycle).seconds // 3600 - - while (levels): - #starting from the last cycle - cycle = self.fcst_cycles[cycle_index - i] - fhour2 = (hours - (n_cycles - 1) * 6) + i * 6 - file_metadata.setdefault(dt, []).append(f"gfs.{date2}/{cycle}/{self.product}/gfs.t{cycle}z.pgrb2.0p25.f{fhour2:03d}") - levels -= 1 - i += 1 - - return file_metadata + logger.info(f'Downloading file {key}, ') + try: + self.s3.download_fileobj(self.bucket, key, f) + except: + logger.info(f'file {key} is not available') @property def bucket(self): return 'noaa-gfs-bdp-pds' - @property - def fcst_cycles(self): - return ['00', '06', '12', '18'] - @property def s3(self): try: @@ -133,63 +82,42 @@ def tmpdir(self): @property def files(self): - grbfiles=glob.glob(f'{self.tmpdir}/gfs.{self.forecast_cycle.strftime("%Y%m%d")}/gfs.pgrb2.0p25.*.grib2') + grbfiles=glob.glob(f'{self.tmpdir}/gfs.{self.forecast_cycle.strftime("%Y%m%d")}/{self.cycle:02d}/{self.product}/gfs.t{self.cycle:02d}z.pgrb2.0p25.f*.grib2') grbfiles.sort() return grbfiles class GFS: - def __init__(self, level=1, bbox=None, pscr=None, outdir=None): - self.level = level - self.bbox = bbox - self.pscr = pscr - self.outdir = outdir - self.record = 1 - - def write(self, start_date, rnday, air: bool=True, prc: bool=True, rad: bool=True): - + def __init__(self, start_date=None, rnday=None, pscr=None, record=1, bbox=None, outdir=None): start_date = nearest_cycle() if start_date is None else start_date + logger.info(f'start_date is {start_date}') + self.bbox = bbox - if (start_date + timedelta(days=rnday)) > datetime.utcnow(): - logger.info(f'End date is beyond the current time, set rnday to 1 day and record to 5 days') - rnday = 1 - self.record = 5 #days - - end_date = start_date + timedelta(hours=rnday * self.record + 1) - logger.info(f'start time is {start_date}, end time is {end_date}') + if outdir is None: + self.path = pathlib.Path(start_date.strftime("%Y%m%d")) + self.path.mkdir(parents=True, exist_ok=True) + else: + self.path = outdir - if self.outdir is None: - #self.outdir = pathlib.Path(start_date.strftime("%Y%m%d")) - self.outdir = pathlib.Path("sflux") - self.outdir.mkdir(parents=True, exist_ok=True) + end_date = start_date + timedelta(days=rnday) - datevector = np.arange( - start_date, - start_date + timedelta(days=rnday), - np.timedelta64(1, 'D'), - dtype='datetime64', - ) + datevector = np.arange(start_date, end_date, np.timedelta64(1, 'D'), + dtype='datetime64') datevector = pd.to_datetime(datevector) npool = len(datevector) if len(datevector) < mp.cpu_count()/2 else mp.cpu_count()/2 logger.info(f'npool is {npool}') pool = mp.Pool(int(npool)) - pool.starmap(self.gen_sflux, [(istack+1, date, air, prc, rad) for istack, date in enumerate(datevector)]) + pool.starmap(self.gen_sflux, [(date, record, pscr) for date in datevector]) + pool.close() - #self.gen_sflux(1, datevector[0], air, prc, rad) - - def gen_sflux( - self, - istack, - date, - air: bool = True, - prc: bool = True, - rad: bool = True, - ): - - inventory = AWSGrib2Inventory(date, self.record, self.pscr) + + def gen_sflux(self, date, record, pscr): + + inventory = AWSGrib2Inventory(date, record, pscr) grbfiles = inventory.files - #cycle = date.hour + cycle = date.hour + prate = [] dlwrf = [] @@ -211,7 +139,6 @@ def gen_sflux( #Get lon/lat lon, lat, idx_ymin, idx_ymax, idx_xmin, idx_xmax = self.modified_latlon(grbfiles[0]) - times = [] for ifile, file in enumerate(grbfiles): logger.info(f'file {ifile} is {file}') @@ -232,7 +159,6 @@ def gen_sflux( for key2, value2 in value.items(): tmp = ds[key2][idx_ymin:idx_ymax+1, idx_xmin:idx_xmax+1].astype('float32') value2[1].append(tmp[::-1,:]) - times.append(ds.valid_time.values) ds.close() elif key == 'group3': @@ -253,154 +179,90 @@ def gen_sflux( value2[1].append(tmp[::-1, :]) ds.close() - #write to netcdf - bdate = date.strftime('%Y %m %d %H').split(' ') - bdate = [int(q) for q in bdate[:4]] + [0] - - if air: - ds = xr.Dataset({ - 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), + fout = xr.Dataset({'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), - }, - coords={ - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat) - }) - - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.uwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'eastward_wind' - } - - ds.vwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/VGRD', - 'standard_name':'northward_wind' - } - - ds.spfh.attrs={ - 'units': 'kg kg-1', - 'long_name': '2m_above_ground/Specific Humidity', - 'standard_name':'specific_humidity' - } - - ds.prmsl.attrs = { - 'units': 'Pa', - 'long_name': 'Pressure reduced to MSL', - 'standard_name': 'air_pressure_at_sea_level' - } - - ds.stmp.attrs={ - 'units': 'K', - 'long_name': '2m_above_ground/Temperature', - } - - ds.to_netcdf(f'{self.outdir}/sflux_air_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() - - if prc: - ds = xr.Dataset({ 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), - }, - coords={ - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat) - }) - - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC" - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.prate.attrs={ - 'units': 'kg m-2 s-1', - 'long_name': 'Precipitation rate' - } - - ds.to_netcdf(f'{self.outdir}/sflux_prc_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() - - if rad: - ds = xr.Dataset({ 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), - }, + }, coords={ - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'time': np.round(np.arange(1, len(grbfiles)+1)/24, 4).astype('float32'), 'lon': (['ny_grid', 'nx_grid'], lon), 'lat': (['ny_grid', 'nx_grid'], lat) - }) - - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC" - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.dlwrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward short-wave radiation flux' - } - - ds.dswrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward long-wave radiation flux' - } - - ds.to_netcdf(f'{self.outdir}/sflux_rad_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() + }) + + bdate = date.strftime('%Y %m %d %H').split(' ') + bdate = [int(q) for q in bdate[:4]] + [0] + + fout.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f"days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC" + } + + fout.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } + + fout.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + fout.uwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'eastward_wind' + } + + fout.vwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/VGRD', + 'standard_name':'northward_wind' + } + + + fout.spfh.attrs={ + 'units': 'kg kg-1', + 'long_name': '2m_above_ground/Specific Humidity', + 'standard_name':'specific_humidity' + } + + fout.prmsl.attrs = { + 'units': 'Pa', + 'long_name': 'Pressure reduced to MSL', + 'standard_name': 'air_pressure_at_sea_level' + } + + fout.stmp.attrs={ + 'units': 'K', + 'long_name': '2m_above_ground/Temperature', + } + + fout.prate.attrs={ + 'units': 'kg m-2 s-1', + 'long_name': 'Precipitation rate' + } + + fout.dlwrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward short-wave radiation flux' + } + + fout.dswrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward long-wave radiation flux' + } + + fout.to_netcdf(self.path / f'gfs_{date.strftime("%Y%m%d")}{cycle:02d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') + def modified_latlon(self, grbfile): xmin, xmax, ymin, ymax = self.bbox.xmin, self.bbox.xmax, self.bbox.ymin, self.bbox.ymax diff --git a/pyschism/forcing/nws/nws2/hrrr3.py b/pyschism/forcing/nws/nws2/hrrr3.py index bcbd12e4..0389cb3a 100644 --- a/pyschism/forcing/nws/nws2/hrrr3.py +++ b/pyschism/forcing/nws/nws2/hrrr3.py @@ -26,7 +26,7 @@ class AWSGrib2Inventory: def __init__( self, start_date: datetime = None, - record = 1, + record = 2, pscr = None, #tmpdir to save grib files product='conus', ): @@ -36,90 +36,64 @@ def __init__( self.forecast_cycle = self.start_date #nearest_cycle() - timevector = np.arange( - self.start_date, - self.start_date + timedelta(hours=record*24+1), - np.timedelta64(1, 'h') - ).astype(datetime) - - file_metadata = self.get_file_namelist(timevector) - - for dt in timevector: - - outfile_name = f"hrrr.{self.start_date.strftime('%Y%m%d')}/hrrr_wrfsfcf{dt.strftime('%Y%m%d%H')}.grib2" - filename = pathlib.Path(self.tmpdir) / outfile_name + # paginator=self.s3.get_paginator('list_objects_v2') + # pages=paginator.paginate(Bucket=self.bucket, + # Prefix=f'hrrr.{self.forecast_cycle.strftime("%Y%m%d")}' + # f'/{self.product}/') + # data=[] + # for page in pages: + # for obj in page['Contents']: + # data.append(obj) + + # self.cycle=self.forecast_cycle.hour + # tz='t{:02d}z'.format(self.cycle) + # self.file_metadata = list(sorted([ + # _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] + # ])) + self.get_file_namelist(self.forecast_cycle) + + while (not self.file_metadata): + logger.info(f'No data for cycle t{self.forecast_cycle.hour:02d}z on {self.forecast_cycle}, try next cycle: +1!') + self.forecast_cycle += timedelta(hours=1) + self.get_file_namelist(self.forecast_cycle) + + for key in self.file_metadata[1:record*24+1]: + filename = pathlib.Path(self.tmpdir) / key filename.parent.mkdir(parents=True, exist_ok=True) with open(filename, 'wb') as f: - while(file_metadata[dt]): - try: - key = file_metadata[dt].pop(0) - logger.info(f"Downloading file {key} for {dt}") - self.s3.download_fileobj(self.bucket, key, f) - logger.info("Success!") - break - except: - if not file_metadata[dt]: - logger.info(f'No file for {dt}') - if os.path.exists(filename): - os.remove(filename) - else: - logger.info(f'file {key} is not available, try next file') - continue - - def get_file_namelist(self, requested_dates): - - file_metadata = {} - file_metadata = {} - hours = (datetime.utcnow() - self.start_date).days * 24 + (datetime.utcnow() - self.start_date).seconds // 3600 - n_cycles = hours // 6 if hours < 25 else 4 - cycle_index = (int(self.start_date.hour) - 1) // 6 - dt = requested_dates[0] - for it, dt in enumerate(requested_dates[:n_cycles*6+1]): - levels = 3 - i = 0 - fhour = int(dt.hour) - cycle_index = (fhour - 1) // 6 - date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') - - while (levels): - - cycle = self.fcst_cycles[cycle_index - i] - fhour2 = fhour + i * 6 if cycle_index == 0 else fhour - cycle_index * 6 + i * 6 - if self.product == 'conus': - file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.grib2") - elif self.product == 'alaska': - file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.ak.grib2") - levels -= 1 - i += 1 - - if it < 25: - date2 = (dt - timedelta(days=1)).strftime('%Y%m%d') if dt.hour == 0 else dt.strftime('%Y%m%d') - for it, dt in enumerate(requested_dates[n_cycles*6+1:]): - levels = 3 - i = 0 - hours = (dt - self.forecast_cycle).days * 24 + (dt - self.forecast_cycle).seconds // 3600 - - while (levels): - cycle = self.fcst_cycles[cycle_index - i] - fhour2 = ( hours - (n_cycles - 1) * 6) + i * 6 - if self.product == 'conus': - file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.grib2") - elif self.product == 'alaska': - file_metadata.setdefault(dt, []).append(f"hrrr.{date2}/{self.product}/hrrr.t{cycle}z.wrfsfcf{fhour2:02d}.ak.grib2") - - levels -= 1 - i += 1 - - return file_metadata + logger.info(f'Downloading file {key}, ') + try: + self.s3.download_fileobj(self.bucket, key, f) + except: + logger.info(f'file {key} is not available') + + def get_file_namelist(self, requested_date): + paginator=self.s3.get_paginator('list_objects_v2') + pages=paginator.paginate(Bucket=self.bucket, + Prefix=f'hrrr.{requested_date.strftime("%Y%m%d")}' + f'/{self.product}/') + data=[] + for page in pages: + for obj in page['Contents']: + data.append(obj) + + self.cycle = requested_date.hour + tz='t{:02d}z'.format(self.cycle) + self.file_metadata = list(sorted([ + _['Key'] for _ in data if 'wrfsfcf' in _['Key'] and tz in _['Key'] and not 'idx' in _['Key'] + ])) + return self.file_metadata @property def bucket(self): return 'noaa-hrrr-bdp-pds' @property - def fcst_cycles(self): - return ['00', '06', '12', '18'] + def output_interval(self) -> timedelta: + return { + 'conus': timedelta(hours=1) + }[self.product] @property def s3(self): @@ -138,44 +112,28 @@ def tmpdir(self): @property def files(self): - grbfiles=glob.glob(f'{self.tmpdir}/hrrr.{self.forecast_cycle.strftime("%Y%m%d")}/hrrr_wrfsfcf*.grib2') + grbfiles=glob.glob(f'{self.tmpdir}/hrrr.{self.forecast_cycle.strftime("%Y%m%d")}/conus/hrrr.t{self.cycle:02d}z.wrfsfcf*.grib2') grbfiles.sort() return grbfiles class HRRR: - def __init__(self, level=2, region='conus', bbox=None, pscr=None, outdir=None): + def __init__(self, start_date=None, rnday=None, pscr=None, record=2, bbox=None, outdir=None): - self.level = level - self.region = region + start_date = nearest_cycle() if start_date is None else start_date self.bbox = bbox - self.pscr = pscr - self.outdir = outdir - self.record = 1 - def write(self, start_date, rnday, air: bool=True, prc: bool=True, rad: bool=True): + if outdir is None: + self.path = pathlib.Path(start_date.strftime("%Y%m%d")) + self.path.mkdir(parents=True, exist_ok=True) + else: + self.path = outdir - start_date = nearest_cycle() if start_date is None else start_date - - if (start_date + timedelta(days=rnday)) > datetime.utcnow(): - logger.info(f"End date is beyond current time, set rnday to one day and record to 2 days") - rnday = 1 - self.record = 2 #days - - end_date = start_date + timedelta(hours=rnday * self.record + 1) - logger.info(f'start time is {start_date}, end time is {end_date}') - - if self.outdir is None: - #self.outdir = pathlib.Path(start_date.strftime("%Y%m%d")) - self.outdir = pathlib.Path("sflux") - self.outdir.mkdir(parents=True, exist_ok=True) + end_date = start_date + timedelta(days=rnday) + logger.info(f'start_date is {start_date}, end_date is {end_date}') - datevector = np.arange( - start_date, - start_date + timedelta(days=rnday), - np.timedelta64(1, 'D'), - dtype='datetime64', - ) + datevector = np.arange(start_date, end_date, np.timedelta64(1, 'D'), + dtype='datetime64') datevector = pd.to_datetime(datevector) #Use all CPUs may cause memory issue @@ -183,23 +141,16 @@ def write(self, start_date, rnday, air: bool=True, prc: bool=True, rad: bool=Tru logger.info(f'npool is {npool}') pool = mp.Pool(int(npool)) - pool.starmap(self.gen_sflux, [(istack+1, date, air, prc, rad) for istack, date in enumerate(datevector)]) + pool.starmap(self.gen_sflux, [(date, record, pscr) for date in datevector]) #self.gen_sflux(datevector[0], record, pscr) pool.close() - def gen_sflux( - self, - istack, - date, - air: bool = True, - prc: bool = True, - rad: bool = True, - ): - - inventory = AWSGrib2Inventory(date, self.record, self.pscr, self.region) + def gen_sflux(self, date, record, pscr): + + inventory = AWSGrib2Inventory(date, record, pscr) grbfiles = inventory.files - #cycle = date.hour + cycle = date.hour stmp = [] spfh = [] @@ -218,6 +169,7 @@ def gen_sflux( #Get lon/lat lon, lat, idx_ymin, idx_ymax, idx_xmin, idx_xmax = self.modified_latlon(grbfiles[0]) + times = [] for ifile, fname in enumerate(grbfiles): @@ -263,162 +215,89 @@ def gen_sflux( ds.close() #write netcdf + fout = xr.Dataset({ + 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), + 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), + 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), + 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), + 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), + 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), + 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), + 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), + }, + coords={ + 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), + 'lon': (['ny_grid', 'nx_grid'], lon), + 'lat': (['ny_grid', 'nx_grid'], lat)}) + + #date_string = np.datetime_as_string(times[0], unit='m') + #bdate = datetime.strptime(date_string, '%Y-%m-%dT%H:%M') bdate = date.strftime('%Y %m %d %H').split(' ') bdate = [int(q) for q in bdate[:4]] + [0] - if air: - ds = xr.Dataset( - { - 'stmp': (['time', 'ny_grid', 'nx_grid'], np.array(stmp)), - 'spfh': (['time', 'ny_grid', 'nx_grid'], np.array(spfh)), - 'uwind': (['time', 'ny_grid', 'nx_grid'], np.array(uwind)), - 'vwind': (['time', 'ny_grid', 'nx_grid'], np.array(vwind)), - 'prmsl': (['time', 'ny_grid', 'nx_grid'], np.array(prmsl)), - }, - coords = { - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat) - } - ) + fout.time.attrs = { + 'long_name': 'Time', + 'standard_name': 'time', + 'base_date': bdate, + 'units': f'days since {date.year}-{date.month}-{date.day} {cycle:02d}:00 UTC' + } + + fout.lat.attrs = { + 'units': 'degrees_north', + 'long_name': 'Latitude', + 'standard_name': 'latitude', + } - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.uwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'eastward_wind' - } + fout.lon.attrs = { + 'units': 'degrees_east', + 'long_name': 'Longitude', + 'standard_name': 'longitude', + } + + fout.uwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'eastward_wind' + } - ds.vwind.attrs={ - 'units': 'm/s', - 'long_name': '10m_above_ground/UGRD', - 'standard_name':'northward_wind' - } - - ds.spfh.attrs={ - 'units': 'kg kg-1', - 'long_name': '2m_above_ground/Specific Humidity', - 'standard_name':'specific_humidity' - } - - ds.prmsl.attrs = { - 'units': 'Pa', - 'long_name': 'Pressure reduced to MSL', - 'standard_name': 'air_pressure_at_sea_level' - } - - ds.stmp.attrs={ - 'units': 'K', - 'long_name': '2m_above_ground/Temperature', - } - - #write to file - ds.to_netcdf(self.outdir / f'sflux_air_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() + fout.vwind.attrs={ + 'units': 'm/s', + 'long_name': '10m_above_ground/UGRD', + 'standard_name':'northward_wind' + } - if prc: - ds = xr.Dataset( - { - 'prate': (['time', 'ny_grid', 'nx_grid'], np.array(prate)), - }, - coords = { - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat) - } - ) + fout.spfh.attrs={ + 'units': 'kg kg-1', + 'long_name': '2m_above_ground/Specific Humidity', + 'standard_name':'specific_humidity' + } - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.prate.attrs={ - 'units': 'kg m-2 s-1', - 'long_name': 'Precipitation rate' - } - - #write to file - ds.to_netcdf(self.outdir / f'sflux_prc_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() + fout.prmsl.attrs = { + 'units': 'Pa', + 'long_name': 'Pressure reduced to MSL', + 'standard_name': 'air_pressure_at_sea_level' + } - if rad: - ds = xr.Dataset( - { - 'dlwrf': (['time', 'ny_grid', 'nx_grid'], np.array(dlwrf)), - 'dswrf': (['time', 'ny_grid', 'nx_grid'], np.array(dswrf)), - }, - coords = { - 'time': np.round((times - date.to_datetime64()) / np.timedelta64(1, 'D'), 5).astype('float32'), - 'lon': (['ny_grid', 'nx_grid'], lon), - 'lat': (['ny_grid', 'nx_grid'], lat) - } - ) + fout.stmp.attrs={ + 'units': 'K', + 'long_name': '2m_above_ground/Temperature', + } + + fout.prate.attrs={ + 'units': 'kg m-2 s-1', + 'long_name': 'Precipitation rate' + } + fout.dlwrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward short-wave radiation flux' + } - ds.time.attrs = { - 'long_name': 'Time', - 'standard_name': 'time', - 'base_date': bdate, - 'units': f"days since {date.strftime('%Y-%m-%d %H:00')} UTC", - } - - ds.lat.attrs = { - 'units': 'degrees_north', - 'long_name': 'Latitude', - 'standard_name': 'latitude', - } - - ds.lon.attrs = { - 'units': 'degrees_east', - 'long_name': 'Longitude', - 'standard_name': 'longitude', - } - - ds.dlwrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward short-wave radiation flux' - } - - ds.dswrf.attrs = { - 'units': 'W m-2', - 'long_name': 'Downward long-wave radiation flux' - } + fout.dswrf.attrs = { + 'units': 'W m-2', + 'long_name': 'Downward long-wave radiation flux' + } - #write to file - ds.to_netcdf(self.outdir / f'sflux_rad_{self.level}.{istack:04d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') - ds.close() + fout.to_netcdf(self.path / f'hrrr_{date.strftime("%Y%m%d")}{cycle:02d}.nc','w', 'NETCDF3_CLASSIC', unlimited_dims='time') def modified_latlon(self, grbfile): diff --git a/setup.py b/setup.py index 755978dd..be27b4ac 100755 --- a/setup.py +++ b/setup.py @@ -93,7 +93,7 @@ def run(self): long_description_content_type="text/markdown", url=meta['url'], packages=setuptools.find_packages(exclude=['tests', 'examples', 'docs', 'docker']), - python_requires='>=3.8', + python_requires='>3.6', setup_requires=['wheel', 'setuptools_scm', 'setuptools>=41.2', 'netcdf-flattener>=1.2.0'], include_package_data=True,