Skip to content

Commit

Permalink
Include support of 3d CM2.6 data
Browse files Browse the repository at this point in the history
  • Loading branch information
Pperezhogin committed Apr 16, 2024
1 parent 40816b4 commit 033d28e
Show file tree
Hide file tree
Showing 3 changed files with 2,260 additions and 73 deletions.
137 changes: 64 additions & 73 deletions experiments/ANN-Results/helpers/cm26.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def read_datasets(operator_str = 'Filtering(FGR=2)+CoarsenKochkov()', factors =
return d

def mask_from_nans(variable):
mask = (1 - np.isnan(variable).astype('float32'))
mask = np.logical_not(np.isnan(variable)).astype('float32')
if 'time' in variable.dims:
mask = mask.isel(time=-1)
if 'time' in variable.coords:
Expand Down Expand Up @@ -50,7 +50,7 @@ def discard_land(x, percentile=1):
return (x>percentile).astype('float32')

class DatasetCM26():
def from_cloud(self, source='cmip6'):
def from_cloud(self, source='cmip6', compute_param=True):
'''
Algorithm:
* Initialize data and grid information from cloud in a lazy way
Expand All @@ -59,13 +59,17 @@ def from_cloud(self, source='cmip6'):
* Hint: put wall to the north pole for simplicity
'''
############ Read datasets ###########
rename_surf = {'usurf': 'u', 'vsurf': 'v', 'surface_salt': 'salt', 'surface_temp': 'temp'}
if source == 'leap':
from intake import open_catalog
ds = xr.open_dataset("gs://leap-persistent-ro/groundpepper/GFDL_cm2.6/GFDL_CM2_6_CONTROL_DAILY_SURF.zarr", engine='zarr', chunks={}, use_cftime=True)
ds = xr.open_dataset("gs://leap-persistent-ro/groundpepper/GFDL_cm2.6/GFDL_CM2_6_CONTROL_DAILY_SURF.zarr", engine='zarr', chunks={}, use_cftime=True).rename(**rename_surf)
cat = open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean/GFDL_CM2.6.yaml")
param_init = cat["GFDL_CM2_6_grid"].to_dask()
elif source == 'cmip6':
ds = xr.open_dataset("gs://cmip6/GFDL_CM2_6/control/surface", engine='zarr', chunks={}, use_cftime=True)
ds = xr.open_dataset("gs://cmip6/GFDL_CM2_6/control/surface", engine='zarr', chunks={}, use_cftime=True).rename(**rename_surf)
param_init = xr.open_dataset('gs://cmip6/GFDL_CM2_6/grid', engine='zarr')
elif source == 'cmip6-3d':
ds = xr.open_dataset("gs://cmip6/GFDL_CM2_6/control/ocean_3d", engine='zarr', chunks={}, use_cftime=True).rename({'st_ocean': 'zl'})
param_init = xr.open_dataset('gs://cmip6/GFDL_CM2_6/grid', engine='zarr')
else:
print('Error: wrong source parameter')
Expand Down Expand Up @@ -99,7 +103,7 @@ def from_cloud(self, source='cmip6'):
############ Compute masks for C-grid ###########
# Note, we assume that coastline goes accross U,V and corner points,
# while land points are uniquely defined by the T points
param['wet'] = mask_from_nans(ds.surface_salt)
param['wet'] = mask_from_nans(ds.temp)

# Set manually wall for the layer of points
# close to the north (and southern) pole
Expand Down Expand Up @@ -128,15 +132,19 @@ def from_cloud(self, source='cmip6'):

########### Interpolate velocities to C-grid ############
data = xr.Dataset()
data['u'] = grid.interp(ds.usurf.fillna(0.) * param.wet_c,'Y') * param.wet_u
data['v'] = grid.interp(ds.vsurf.fillna(0.) * param.wet_c,'X') * param.wet_v
data['u'] = grid.interp(ds.u.fillna(0.) * param.wet_c,'Y') * param.wet_u
data['v'] = grid.interp(ds.v.fillna(0.) * param.wet_c,'X') * param.wet_v
data['time'] = ds['time']

return data, param.compute()
if compute_param:
param = param.compute()
param = param.chunk()

return data, param

def __init__(self, data=None, param=None, source='cmip6'):
def __init__(self, data=None, param=None, source='cmip6', compute_param=True):
if data is None or param is None:
self.data, self.param = self.from_cloud(source=source)
self.data, self.param = self.from_cloud(source=source, compute_param=compute_param)
else:
self.data = data
self.param = param
Expand Down Expand Up @@ -217,9 +225,9 @@ def init_coarse_grid(self, factor=10, percentile=0):
param['wet_v'] = discard_land(grid.interp(param['wet'], 'Y'))
param['wet_c'] = discard_land(grid.interp(param['wet'], ['X', 'Y']))

return param.compute()
return param.compute().chunk()

def coarsen(self, factor=10, operator=CoarsenWeighted(), percentile=0, param=None):
def coarsen(self, factor=10, operator=CoarsenWeighted(), percentile=0):
'''
Coarsening of the dataset with a given factor
Expand All @@ -229,8 +237,7 @@ def coarsen(self, factor=10, operator=CoarsenWeighted(), percentile=0, param=Non
* Return new dataset with coarse velocities
'''
# Initialize coarse grid
if param is None:
param = self.init_coarse_grid(factor=factor, percentile=percentile)
param = self.init_coarse_grid(factor=factor, percentile=percentile)

##################### Coarsegraining velocities ########################
data = xr.Dataset()
Expand All @@ -240,71 +247,55 @@ def coarsen(self, factor=10, operator=CoarsenWeighted(), percentile=0, param=Non

# Coarsegrain velocities
ds_coarse.data['u'], ds_coarse.data['v'] = operator(self.data.u, self.data.v, self, ds_coarse)

if 'time' in self.data.dims:
ds_coarse.data['time'] = self.data.time
if 'zl' in self.data.dims:
ds_coarse.data['zl'] = self.data.zl

return ds_coarse

def generate_coarse_grids(self, factors, percentile=0):
'''
Generate and save grid objects if they are not generated yet
'''
try:
for factor in factors:
self.params[factor]
except:
self.params = {}
for factor in factors:
self.params[factor] = self.init_coarse_grid(factor=factor, percentile=percentile)

def split(self, time = None, compute = lambda x: x):
if time is None:
time=np.random.randint(0,len(self))
if isinstance(time, int):
compute = lambda x: x.compute() # Enforce load to memory if a single snapshot
return DatasetCM26(compute(self.data.isel(time=time)), self.param)

def sample_batch(self, time=np.random.randint(0,7305,1), factors = [4,6,9,12], operator=CoarsenWeighted(), percentile=0):

def split(self, time = None, zl=None):
data = self.data
if 'time' in self.data.dims:
if time is None:
time = np.random.randint(0,len(self))
data = data.isel(time=time)

if 'zl' in self.data.dims:
if zl is None:
zl = np.random.randint(0,len(self.data.zl))
data = data.isel(zl=zl)

return DatasetCM26(data, self.param)

def compute_subgrid_forcing(self, factor=4, operator=CoarsenWeighted(), percentile=0):
'''
This function samples batch and produces training dataset
consisting of velocities on a coarse grid and
corresponding subgrid forcing, for a range of factors
This function computes the subgrid forcing, that is
SGSx = filter(advection) - advection(filtered_state),
for a given coarsegraining factor and coarsegraining operator.
And returns the xr.Dataset() with lazy computations
Optionally, numerical approximation errors of numerical
schemes can be included into the subgrid forcing. This
would allow to effectively improve the order of approximation
but will make subgrid model sensitive to the numerical scheme
'''
############## Parallelization control ##################
if isinstance(time, int):
# Disable dask arrays and use numpy
compute = lambda x: x.compute()
else:
# Enable parallelization across chunks
compute = lambda x: x.compute().chunk({'time':1})

############## Automatic generation of coarse grids ###################
self.generate_coarse_grids(factors, percentile=percentile)
# Advection in high resolution model
hires_advection = self.state.advection()

############# Sampling batch from the dataset ###################
data = self.data.isel(time=time)
batch = DatasetCM26(compute(data), self.param)
# Coarsegrained state
ds_coarse = self.coarsen(factor, operator=operator, percentile=percentile)

############# High-resolution advection #################
hires_advection = batch.state.advection()
advx = compute(hires_advection[0])
advy = compute(hires_advection[1])

############## Coarsegraining and SGS ###################
output = {}
for factor in factors:
ds_coarse = batch.coarsen(factor, operator=operator, percentile=percentile,
param=self.params[factor])
coarse_advection = ds_coarse.state.advection()

ds_coarse.data['SGSx'], ds_coarse.data['SGSy'] = operator(advx, advy, batch, ds_coarse)
ds_coarse.data['SGSx'] = ds_coarse.data['SGSx'] - coarse_advection[0]
ds_coarse.data['SGSy'] = ds_coarse.data['SGSy'] - coarse_advection[1]
ds_coarse.data = compute(ds_coarse.data).squeeze()
if 'time' in ds_coarse.data.dims:
ds_coarse.data['time'] = batch.data['time']

output[factor] = ds_coarse

return output
# Compute advection on a coarse grid
coarse_advection = ds_coarse.state.advection()

# Compute subgrid forcing
advx_coarsen, advy_coarsen = operator(hires_advection[0], hires_advection[1], self, ds_coarse)
ds_coarse.data['SGSx'] = advx_coarsen - coarse_advection[0]
ds_coarse.data['SGSy'] = advy_coarsen - coarse_advection[1]

return ds_coarse

def predict_ANN(self, ann_Txy, ann_Txx_Tyy):
'''
Expand Down
Loading

0 comments on commit 033d28e

Please sign in to comment.