Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using intake-esm to load an ensemble: real-world problem that might lead to a tutorial/example #444

Open
jemmajeffree opened this issue Aug 23, 2024 · 28 comments
Assignees

Comments

@jemmajeffree
Copy link
Collaborator

jemmajeffree commented Aug 23, 2024

What I want to do is this:

def filepath(m,var,freq,time='185001-201412'):
    ''' Intentded use filepath(1,'pr','Amon')'''
    return '/g/data/fs38/publications/CMIP6/CMIP/CSIRO/ACCESS-ESM1-5/historical/r'+m+'i1p1f1/'+freq+'/'+var+'/gn/latest/'+var+'_'+freq+'_ACCESS-ESM1-5_historical_r'+m+'i1p1f1_gn_'+time+'.nc'

pr = xr.open_mfdataset([filepath(str(m),'pr','Amon') for m in range(1,41)],
                  # coords='minimal',
                  # compat='override',
                  combine = 'nested',
                  concat_dim = ('SMILE_M',),
                  preprocess = lambda x: x['pr'],
                  parallel = True,
                 ).assign_coords(SMILE_M = np.array([str(m) for m in range(1,41)]))

but I understand that intake-esm should make these type of things easier and less hard-coded. So I tried to do this:

import intake
catalog = intake.cat.access_nri
catalog.search(model="ACCESS-ESM1-5",variable='pr')

cat = catalog["cmip6_fs38"].search(variable_id='pr',
                                      experiment_id='historical',
                                      frequency='mon',
                                      file_type="l",
                                      institution_id='CSIRO')

cat.esmcat.aggregation_control.groupby_attrs = ['file_type',
 'project_id',
 'institution_id',
 'source_id',
 'experiment_id',
 #'member_id',
 'frequency',
 'realm',
 'table_id',
 'variable_id',
 'grid_label',
#version]

cat.to_dask(xarray_open_kwargs={'combine':'nested','concat_dim':('M',),'parallel':True},)

Which threw me errors, as did any variant on to_dask I tried (mostly about being unsure of how to combine the ensemble members together)

@Thomas-Moore-Creative Thomas-Moore-Creative self-assigned this Aug 23, 2024
@navidcy
Copy link
Collaborator

navidcy commented Aug 24, 2024

[tip: when adding code in within triple backticks, if you add ```python at the top instead of a plan ```, then GitHub adds some python-coloring. (works also for ```julia or ```bash, and possibly others!)

I edited the post above to add that ;) ]

@Thomas-Moore-Creative
Copy link
Collaborator

Thanks for documenting this use-case @jemmajeffree. Can we specify what the "typical" cluster size / resources should be to constrain this problem? For example "how much compute resource does the typical COSIMA affiliated student or post-doc have"?

@jemmajeffree
Copy link
Collaborator Author

Thanks @navidcy

@Thomas-Moore-Creative, I was running this on a XL ARE job, which is a bit more than I'd usually be using for data crunching but I was feeling impatient. Most of the work I do is piggybacking pre-existing model runs, which is fairly low compute — as a ballpark estimate, I've used 10 kSU for my PhD so far which involved analysing 9 different CMIP runs, but I understand that this is almost nothing for anyone running models

@edoddridge
Copy link
Collaborator

@Thomas-Moore-Creative I'm not sure there's an easy answer to that question. I'd say most people within COSIMA have access to pretty generous compute, and because the cost of running the models far exceeds the cost of the analysis, we tend not to spend a lot of time thinking about the compute resources for analysis. A modelling based PhD student would likely use hundreds of kSU over the course of their degree.

As an example, running the 0.25° version of ACCESS-OM2 with BGC consumes 13.1 kSU/year. So a 60 year run is ~0.8 MSU. The 0.1° version is much more expensive.

Whereas a 24 hour session of the XL ARE cluster that @jemmajeffree mentioned is 420 SU. In general, the problem for analysis is human time, not compute time.

@anton-seaice
Copy link
Collaborator

Hi Gemma - apologies I thought this was a dask question. You can use 'to_dataset_dict() on your search:

search = cat["cmip6_fs38"].search(
           variable_id='pr',
           experiment_id='historical',
           frequency='mon',
           file_type="l",
           source_id="ACCESS-ESM1-5"
                                 )


ds_dict = search.to_dataset_dict()

ds = xr.concat(
    ds_dict.values(),
    dim=ds_dict.keys()
)

Your resulting dataarray has silly chunks:

Screenshot 2024-09-05 at 8 09 05 AM

These are basically defined by the source files, in this case you can't improve the opening time and the dataset is small, so you could just .load() the whole dataset. Or you can rechunk after opening (using .chunk())

@jemmajeffree
Copy link
Collaborator Author

Thanks Anton

  • Is there a better way to fix up the ensemble members than manually relabelling them (currently the concat_dim axis is something like l.CMIP.CSIRO.ACCESS-ESM1-5.historical.r6i1p1f1.mon.atmos.Amon.pr.gn.v20200529 when I only want the r6i1p1f1 bit)?
  • Would the ensemble members always be read/combined in the same order? I'd be hesitant to trust this, but I can't see a way of putting them in numerical order except for sorting the entire dataarray by the concat_dim axis? (Do you know if xr.sortby actually does anything in memory or just changes the wrapper?)

With intake loading data now, I have a couple more comments on efficiency. I can get xr.open_mfdataset to work more than twice as fast as intake. To my understanding, intake wraps around open_dataset not open_mfdataset, so the optimisations for open_mfdataset don't transfer. Is that correct? In which case, the tiny chunks are a problem. I'd rather not pull everything into memory, though I agree that in the small example I've provided here it is a solution. Even doing that, the data loads faster with open_mfdataset and different chunking than intake.

pr_daily = xr.open_mfdataset([[filepath(str(m),'pr','day',time) for m in range(1,41)] for time in ('18500101-18991231','19000101-19491231','19500101-19991231','20000101-20141231')],
                  coords='minimal', #Grab medium speed-up by not checking coordinates align
                  compat='override', #Grab medium speed-up by not checking coordinates align
                  combine = 'nested',
                  concat_dim = ('time','SMILE_M',),
                  preprocess = lambda x: x['pr'], #Often doing something more complicated in here that gets bonus efficiency
                  parallel = True,
                  chunks={'SMILE_M':1,'time':365,'lat':-1,'lon':-1}, #Chunks are big enough that dask scheduling overhead goes away
                 ).assign_coords(SMILE_M = np.array([str(m) for m in range(1,41)]))

Am I missing any intake optimisations to achieve comparable performance?

Direct comparisons of speed for loading and example computations are here: /scratch/nf33/jj8842/intake comparison.ipynb

I can see that I should have put this on hive to begin with, and it's probably more valuable there digging into efficiencies with intake - would you recommend moving it or is that overcomplicating things?

@Thomas-Moore-Creative
Copy link
Collaborator

Thomas-Moore-Creative commented Sep 6, 2024

file_type="l"

SIDEWAYS question, @anton-seaice et al.
I feel I've searched NCI documentation at length to understand the file_type="l" vs file_type="f" with no luck at understanding the detail in this choice.

I'm assuming "l" stands for "link" and "f" for file and my anecdotal experience is results can be different and not specifying leads to duplication.

Anyone have more insight and link to authoritative NCI documentation?

@jemmajeffree
Copy link
Collaborator Author

@Thomas-Moore-Creative, empirically, using "l" gets you just the most recent ACCESS-ESM1-5 data files (stored in a version file "latest" symlink not the actual most recent version) rather than other bits and pieces, but I couldn't tell you what it means

@Thomas-Moore-Creative
Copy link
Collaborator

@Thomas-Moore-Creative, empirically, using "l" gets you just the most recent ACCESS-ESM1-5 data files (stored in a version file "latest" symlink not the actual most recent version) rather than other bits and pieces, but I couldn't tell you what it means

Thanks for your experience on this. Sounds like you've tested this? Do we know if NCI documents this anywhere?

The very useful ACCESS-NRI intake catalog docs from @dougiesquire use "f" in the example here

cmip6_datastore_filtered = cmip6_datastore.search(
    source_id="ACCESS-ESM1-5", 
    table_id="Omon", 
    variable_id="tos", 
    experiment_id="historical", 
    file_type="f"
)

cmip6_datastore_filtered

but that example might not be practice if you want the latest CMIP file versions?

Also, on the chunking related settings for the intake workflow, further from the ACCESS-NRI documentation "Chunking tutorial" notes:

Intake-ESM uses xarray.open_dataset to open datasets. By default, the argument chunks={} is used,

and that this approach on chunking could be used:

xarray_open_kwargs = {"chunks": {"time": -1, "st_ocean": 7, "xt_ocean": 400, "yt_ocean": 300}}

ds = esm_datastore_filtered.to_dask(xarray_open_kwargs=xarray_open_kwargs)

I use this with ACCESS-ESM1.5 data in ways like this:

xarray_open_kwargs = {"chunks": {"time": 12, "lev": 50, "i" : 360,'j' :300}}
DS_dict = cmip6_fs38_cat_filtered.to_dataset_dict(progressbar=False,xarray_open_kwargs=xarray_open_kwargs)

I will find some time to run your example .ipynb file.

Without looking at it I wonder if the list of file paths you are sending to .open_mfdataset and .open_dataset are the same list of files?

I also note that if you use intake you can simply pump out a list of file paths using an approach like cmip6_fs38_cat_filtered.unique().path.

Apologies if these comments are obvious or unhelpful given the work you've already done.

@jemmajeffree
Copy link
Collaborator Author

Starting from the bottom:
While you can pump out a list of paths from intake, you lose the information about how those filepaths fit together (ie, what ensemble member, and it's kinda stupid to manually pull that back out of the filepath again). Anyway, this is probably what I would do in future if I didn't know how the data I needed was stored
I suspect they're the same list of files
If you're passing chunks to open_dataset, I would expect that it gets sad about (or fails silently on) having chunks bigger than individual files, which is what I want here

@Thomas-Moore-Creative
Copy link
Collaborator

While you can pump out a list of paths from intake, you lose the information about how those filepaths fit together (ie, what ensemble member, and it's kinda stupid to manually pull that back out of the filepath again)

Depending on your needs, it might be. I probably do kinda stupid things all the time. 😄

@anton-seaice
Copy link
Collaborator

  • Is there a better way to fix up the ensemble members than manually relabelling them (currently the concat_dim axis is something like l.CMIP.CSIRO.ACCESS-ESM1-5.historical.r6i1p1f1.mon.atmos.Amon.pr.gn.v20200529 when I only want the r6i1p1f1 bit)?

Not that I know of, you could use something like this:

for i in search.keys():
    print(re.findall(r"r[^/.]*", i)[1])

Or write some better regex to get a better match :) (Interestingly set(search.df.member_id) returns the same result !, although I'm not sure I totally trust that to always be in the same order as .to_dataset_dict())

  • Would the ensemble members always be read/combined in the same order? I'd be hesitant to trust this, but I can't see a way of putting them in numerical order except for sorting the entire dataarray by the concat_dim axis? (Do you know if xr.sortby actually does anything in memory or just changes the wrapper?)

It looks like they are read in alphabetical order. Its safe to assume sortby will just change the order of the index, and not move anything else around in memory.

Am I missing any intake optimisations to achieve comparable performance?

Sorry - I missed this the first time. This dataset has more than one timestep per file ! So you can supply a time chunksize which reduces the number of times the file has to be opened / read and improves caching efficiency:

e.g. for the monthly data:

ds_dict = search.to_dataset_dict(xarray_open_kwargs={'chunks':{'lat':-1,'lon':-1, 'time':-1}})

for the daily data, those chunks would be too big, but size of 365 could be good:

ds_dict = search.to_dataset_dict(xarray_open_kwargs={'chunks':{'lat':-1,'lon':-1, 'time':365}})

Direct comparisons of speed for loading and example computations are here: /scratch/nf33/jj8842/intake comparison.ipynb

The training fairies has been cleaning up nf33, so I can't see this! Note that caching makes good time comparisons hard!

I feel I've searched NCI documentation at length to understand the file_type="l" vs file_type="f" with no luck at understanding the detail in this choice.

I'm assuming "l" stands for "link" and "f" for file and my anecdotal experience is results can be different and not specifying leads to duplication.

The only time I looked, the link just point to the file and they were the same. But I didn't look exhaustively at all.

Anyone have more insight and link to authoritative NCI documentation?

My only suggestion would be the NCI helpdesk

@jemmajeffree
Copy link
Collaborator Author

@anton-seaice Can you see x77? g40? v45? I put it in nf33 because it was the only directory I knew you would have access to

@anton-seaice
Copy link
Collaborator

v45 is ok but I think you can use /scratch/public

@jemmajeffree
Copy link
Collaborator Author

It's now in /scratch/public/jj8842

@anton-seaice
Copy link
Collaborator

It's now in /scratch/public/jj8842

The timing is not really reliable for performance comparisons, its an indication but changed wildly. i.e. I ran this notebook with the cells in the opposite order, and open_mfdataset was slower.

Adding

xarray_open_kwargs={'chunks':{'time':365,'lat':-1,'lon':-1}}

as an argument in to_dask() will help,

you also could supply

xarray_combine_by_kwargs in to_dask()

(e.g.

dict( coords='minimal', compat='override',  combine = 'nested',

)

but i dont think its necessary

@jemmajeffree
Copy link
Collaborator Author

With those additional keywords, to_dataset_dict is working faster than xr.open_mfdataset. The concatenate a dictionary and then reorder dimensions is annoying, but I can work around it.

For any future readers of this thread: the documentation for what arguments to_dataset_dict takes is here https://intake-esm.readthedocs.io/en/stable/reference/api.html (I couldn't find it at the time I was originally trying to read in data, and had no idea xarray_combine_by_kwargs existed)

@Thomas-Moore-Creative
Copy link
Collaborator

@anton-seaice - re: using %%time or %%timeit cell magics for evaluating performance in a notebook. My anecdotal experience and poorly tested opinion is that running garbage collection, deleting existing objects, and restarting the kernel between testing (A) vs (B) can make testing and evaluating performance in a notebook more accurate. Do you agree with that in any way?

@adele-morrison
Copy link
Collaborator

@Thomas-Moore-Creative I found %%timeit is not useful if you are timing opening data from Intake or the cookbook, because the first time is very slow and repeat tests are very fast, so averaging over multiple tests does not give an accurate time estimate of the first opening.

@Thomas-Moore-Creative
Copy link
Collaborator

@jemmajeffree are you happy with where you landed in your "intake comparison" example? I'm going to compare it to my current approach for ACCESS-ESM1.5, see what I can learn from it, and possibly add to it.

@Thomas-Moore-Creative
Copy link
Collaborator

@Thomas-Moore-Creative I found %%timeit is not useful if you are timing opening data from Intake or the cookbook, because the first time is very slow and repeat tests are very fast, so averaging over multiple tests does not give an accurate time estimate of the first opening.

😃
You can't restart the kernel in a %%timeit cell but you could include clearing key objects and garbage collection which might address some of the issues - hence looking for @anton-seaice's views above.
🤔

@jemmajeffree
Copy link
Collaborator Author

@jemmajeffree are you happy with where you landed in your "intake comparison" example? I'm going to compare it to my current approach for ACCESS-ESM1.5, see what I can learn from it, and possibly add to it.

I'm still not really happy with the way the ensemble members are combined, but I've ceased messing with it or trying to improve the approach.

@anton-seaice
Copy link
Collaborator

@anton-seaice - re: using %%time or %%timeit cell magics for evaluating performance in a notebook. My anecdotal experience and poorly tested opinion is that running garbage collection, deleting existing objects, and restarting the kernel between testing (A) vs (B) can make testing and evaluating performance in a notebook more accurate. Do you agree with that in any way?

I don't really have an informed view ...

I've found it can be totally arbitrary. One day its 20 seconds, the next day its 2 minutes for no apparent reason.

I suspect restarting the kernel is a reasonable approach, I don't know to what extent / where data may be cached at a more fundamental level.

@Thomas-Moore-Creative
Copy link
Collaborator

I don't really have an informed view ...

I do think my "view" is more a "feeling" and "opinion" ( feelpinion? ) from trial and error rather than a fundamental understanding of how things like dask and the NCI nodes and network actually work.

@access-hive-bot
Copy link

This issue has been mentioned on ACCESS Hive Community Forum. There might be relevant details there:

https://forum.access-hive.org.au/t/join-in-investigating-analysis-ready-data-ard-strategies-to-increase-impact-of-ocean-and-climate-model-archives-at-nci/3723/1

@Thomas-Moore-Creative
Copy link
Collaborator

@jemmajeffree, wondering if you settled on how to best retain the ensemble member name labels on loading and if you feel comfortable on how robust / reliable it is?

I am trying the below approach ( which is likely very similar to @anton-seaice suggestion above ) based on the dataset_dict.keys. I've only done light testing for robustness. I'm still building basic utilities that better allow me to use the catalog and discover data and haven't even gotten to the "how fast can this be" part yet.

dataset_dict = catalog_search.to_dataset_dict(progressbar=False,xarray_open_kwargs=xarray_open_kwargs)
member_names = [key.split('.')[5] for key in dataset_dict.keys()]
# Concatenate the datasets along the 'member' dimension and retain the member names
ds = xr.concat(
    dataset_dict.values(), 
    dim=xr.DataArray(member_names, dims="member", name="member"))
# Extract the numeric part from each member name and sort based on it
sorted_member_indices = np.argsort([int(member[1:].split('i')[0]) for member in ds['member'].values])
# Reorder the dataset based on the sorted member indices
ds_sorted = ds.isel(member=sorted_member_indices)

@jemmajeffree
Copy link
Collaborator Author

@Thomas-Moore-Creative I'm afraid not; I went back to what I was originally doing reading from individual filepaths
What you've got looks good, though

@Thomas-Moore-Creative
Copy link
Collaborator

@anton-seaice after some head-banging I did reach out to NCI about "f" vs "l". Apparently "f" means file and "l" does mean link but it can't be relied on to deliver only the latest logical links that are provided in the "latest" directories?

@jemmajeffree if you or others are still using and relying on file_type =, I'm chatting about this here - just FYI. https://forum.access-hive.org.au/t/cmip6-data-analysis-at-nci-file-type-issue/3842

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants