Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preload (FCI) filehandlers for eager processing #2686

Open
wants to merge 83 commits into
base: main
Choose a base branch
from

Conversation

gerritholl
Copy link
Collaborator

@gerritholl gerritholl commented Dec 12, 2023

Add functionality to preload filehandlers for files that have not yet arrived on disk. This works by passing a single file (the first chunk) and then it will generate glob patterns for all remaining chunks.

It seems to work for loading only FDHSI data.

Deferred: support for mixing FDHSI and HRFI. This PR is ambitious enough as it is, and I would like to postpone that complication to a later PR.

As of 2024-06-20, it works with some limitations.

The default scheduler offers no control of the order of tasks. The result is that dask tasks waiting for segments 33–40 might be scheduled first, while segments 2–32 are coming in but the corresponding tasks being later in the queue. With the dask.distributed scheduler we can avoid this problem, but the dask distributed scheduler has limited support in Satpy. PR #2822 makes the FCI reader work with dask.distributed, but nearest neighbour resampling or the GeoTIFF writer still fail (see #1762). Therefore, to use eager processing, the user either:

  • needs to accept suboptimal ordering of tasks, or
  • use many workers for many chunks, or
  • use dask.distributed and accept its limitations.

First beginning of work to preload filehandlers before files are
present.  Not much implementation yet, just a skeleton on what it might
look like in the YAMLReader.
Copy link

codecov bot commented Dec 12, 2023

Codecov Report

Attention: Patch coverage is 98.47199% with 9 lines in your changes missing coverage. Please review.

Project coverage is 96.08%. Comparing base (5e27be4) to head (3736fbf).
Report is 152 commits behind head on main.

Files with missing lines Patch % Lines
satpy/readers/netcdf_utils.py 96.17% 6 Missing ⚠️
satpy/tests/reader_tests/test_netcdf_utils.py 98.88% 2 Missing ⚠️
satpy/readers/yaml_reader.py 98.57% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2686      +/-   ##
==========================================
+ Coverage   96.05%   96.08%   +0.02%     
==========================================
  Files         370      370              
  Lines       54320    54861     +541     
==========================================
+ Hits        52177    52713     +536     
- Misses       2143     2148       +5     
Flag Coverage Δ
behaviourtests 3.98% <2.54%> (-0.02%) ⬇️
unittests 96.18% <98.47%> (+0.02%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@coveralls
Copy link

coveralls commented Dec 12, 2023

Pull Request Test Coverage Report for Build 10528467872

Details

  • 580 of 589 (98.47%) changed or added relevant lines in 9 files are covered.
  • No unchanged relevant lines lost coverage.
  • Overall coverage increased (+0.03%) to 96.18%

Changes Missing Coverage Covered Lines Changed/Added Lines %
satpy/readers/yaml_reader.py 69 70 98.57%
satpy/tests/reader_tests/test_netcdf_utils.py 178 180 98.89%
satpy/readers/netcdf_utils.py 151 157 96.18%
Totals Coverage Status
Change from base Build 10528275069: 0.03%
Covered Lines: 52944
Relevant Lines: 55047

💛 - Coveralls

Prepare GeoSegmentYAMLReader for preloading filehandlers for files
corresponding to segments that don't exist yet.  Early draft
implementation that appears to work with limitations.  Implementation in
GEOSegmentYAMLReader still needs tweaking, tests need to be improved,
and the corresponding file handler (for now just FCI) needs to be able
to handle it.
Continue the pre-loading implementation in the GEOSegmentedYAMLReader.
Add unit tests.
Don't raise an error that we can't predict the remaining files if this
functionality was not requested.
Add a Preloadable class to netcdf_utils.  This so far implements
pre-loading filehandlers for to-be-expected files if a single one
already exists, taking a defined set of data variables and their
attributes from the first segment.  Still to be implemented is to take
other information from other repeat cycles, by on-disk caching.
Cache data variables between repeat cycles.
Continue working on repeat-cycle caching.
@gerritholl
Copy link
Collaborator Author

The good news is that my test script now passes without errors.

The bad news is that the resulting image has no data / is all black.

@gerritholl
Copy link
Collaborator Author

sc._readers["fci_l1c_nc"].file_handlers["fci_l1c_fdhsi"][27]["data/ir_105/measured/effective_radiance"][149, 5233].compute() shows valid values, but in the scene it's all NaN... ;·(

Copy link
Member

@djhoese djhoese left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side request as mentioned on slack: Move the "does NetCDF file exist" functionality to its own function that is Delayed. Hold on to the result of this in your file handler(s). Pass the result of that (the filename I think) to the _get_delayed_value function which is also delayed, and then go on as you are. This way the globbing and sleeping only has to happen once.

@gerritholl
Copy link
Collaborator Author

gerritholl commented Dec 18, 2023

There is something wrong with the values and the calibration. For one test case, for one pixel, loading counts gives array(93.51871, dtype=float32), loading radiance gives array(-5.7980175, dtype=float32), and loading brightness_temperature gives NaN (unsurprising with such radiances). Maybe calibration is applied twice? To be investigated.

@gerritholl
Copy link
Collaborator Author

The bug with the wrong values was because automaskandscale was being applied automatically by the NetCDF library. After unsetting this, the values seem to match with the reference.

This image was created while scene creation, loading, and creating the resampling graph happening before accessing any data:

MTG-I1-fci-202312040810-eurol-airmass-test-preloaded

Processing took 28.4 seconds, took 1.89 GB RAM, and used 121% CPU.

Classical processing took 19.0 seconds, took 2.03 GB RAM, and used 73% CPU.

Now that it's functional, I will work on adding tests and improving the implementation.

@mraspaud
Copy link
Member

Very nice! How long did it take after the last segment had "arrived"?

Split the delayed loading from file into one delayed function that waits
for the file and another that loads a variable from the file.
Add tests for the functionality to wait for a file to appear in a
delayed object.
Add a test method to test getting a delayed value from a (delayed) file
PRogress un nit tests fol the puproses of delayed olading
@gerritholl
Copy link
Collaborator Author

gerritholl commented Dec 18, 2023

Very nice! How long did it take after the last segment had "arrived"?

I haven't done a full test including a realistic simulation of delayed segment arrival yet. But when I skip computing, it takes 19 seconds with 421 MB RAM.

Add more tests for the preloadable mixin in netcdf_utils.  Cleanup some
unused code and add checks/guards in this mixin.
Improve tests for the yaml reader in case of preloading file handlers.
Tests fail.
Improve the tests for the YAML reader and the NetCDF utils.  Tolerate
absence of time_tags and variable_tags for preloadables.  Verify
presence of required_netcdf_filehandlers on creation rather than on
caching time.
Using datetime.min for an artificial date leads to different strftime
results between platforms (see https://bugs.python.org/msg307401).
Use datetime.max instead.
Simplify _new_filehandler_instances.  Hopefully this will satisfy
codescene.

Replace "/dev/null" by os.devnull for cross-platform support.
In test waiting for file to appear, wait longer.  Maybe this will fix
the "file not found" on 3.11 problem.
Fixing three merge conflicts.
When caching, make sure we use the CachingFileManager already upon scene
creation and not only by the time we are loading.
Don't subclass netCDF4.Dataset, rather just return an instance from a
helper function.  Seems good enough and gets rid of the weird error
messages upon exit.
Some readers read entire groups; this needs xarray kwargs to be set even
if caching is used.
Guard against file handle caching with the xarray manager when
preloading.  We can't cache very well when data are not there yet.
@gerritholl
Copy link
Collaborator Author

I get mixed results with .secede(...) and .rejoin(...). Sometimes this leads to dask switching to workers waiting for other files. Sometimes it doesn't. Still trying to work out why and how.

@gerritholl
Copy link
Collaborator Author

Creating the dask distributed client makes subsequent scene loading extremely slow. The test script (intended to simulate what trollflow2 does):

import hdf5plugin
import satpy
satpy.config.set({"readers.preload_segments": True})
satpy.config.set({"readers.preload_dask_distributed": True})
from satpy import Scene
from satpy.writers import compute_writer_results
from dask.distributed import Client
import dask.config
import time
fci_file = "/media/nas/x21308/scratch/FCI/202312201500b/W_XX-EUMETSAT-Darmstadt,IMG+SAT,MTI1+FCI-1C-RRAD-FDHSI-FD--CHK-BODY--DIS-NC4E_C_EUMT_20231220150628_IDPFI_OPE_20231220150007_20231220150017_N_JLS_C_0091_0001.nc"
res500 = ["vis_06"]
res1000 = ["vis_08", "vis_08", "nir_16", "ir_38", "ir_105"]
res2000 = ["wv_63", "wv_63", "wv_73", "ir_87", "ir_97", "ir_123", "ir_133", "airmass", "convection", "dust", "ash"]
def main():
    cl = Client()
    t1 = time.time()
    sc = Scene(filenames={"fci_l1c_nc": [fci_file]})
    #sc = Scene(filenames={"fci_l1c_nc": fci_files})
    t2 = time.time()
    sc.load(res500 + res1000 + res2000, generate=False)
    t3 = time.time()
    ls1 = sc.resample("nqceur500m", resampler="gradient_search")
    ls2 = sc.resample("nqceur1km", resampler="gradient_search")
    ls3 = sc.resample("nqceur2km", resampler="gradient_search")
    t4 = time.time()
    comps = []
    for c in res500:
        comps.append(ls1.save_dataset(c, filename=f"{c:s}.png",
                                      compute=False, writer="simple_image"))
    for c in res1000:
        comps.append(ls2.save_dataset(c, filename=f"{c:s}.png",
                                      compute=False, writer="simple_image"))
    for c in res2000:
        comps.append(ls3.save_dataset(c, filename=f"{c:s}.png",
                                      compute=False, writer="simple_image"))
    t5 = time.time()
    print("Scene creation", t2-t1)
    print("Loading", t3-t2)
    print("Resampling", t4-t3)
    print("Saving (without computing)", t5-t4)

if __name__ == "__main__":
    main()

Testing with aca09ff, without Client(), Scene.load(...) takes 10.8 seconds. With Client(), it takes 376 seconds:

Without Client():

Scene creation 6.005215167999268
Loading 10.792689323425293
Resampling 25.876293897628784
Saving (without computing) 3.311211347579956

With Client()::

Scene creation 6.583888292312622
Loading 376.2109725475311
Resampling 27.489383697509766
Saving (without computing) 3.4741480350494385

No such difference is observed with satpy main or with #2822 (which I merged into this one). According to CustomScheduler(max_computes=0), there are no computations happening.

@gerritholl
Copy link
Collaborator Author

Correction: according to dask.config.set(scheduler=CustomScheduler(max_computes=0)) there are computes happening, with both schedulers. This happens for the preloaded segments.

@gerritholl gerritholl marked this pull request as draft July 31, 2024 09:11
…ndler

When segment-shareable data are obtained from the reference filehandler,
get this directly from its ``file_content`` attribute and not via
the filehandlers ``__getitem__`` method.  This solves the performance
problem using preloading with dask distributed.

Details: ``NetCDF4FileHandler.__getitem__`` turns a netCDF4.Variable
into an xarray.DataArray encapsulating a dask array.  Therefore, a
small read, such as the FCI L1C NC file handler needs to perform upon
``Scene.load``, becomes a small compute.  Those small computes have a
small negative impact when using the normal dask scheduler, but a terrible
negative impact when using the dask distributed scheduler, where in one
case I tested, ``Scene.load`` increases from 10 seconds to 375 seconds.

Time for ``Scene.load`` with preloaded segments with the regular scheduler, before this commit: 10.8 seconds

Time for ``Scene.load`` with preloaded segments with the distributed scheduler, before this commit: 376 seconds

Time for ``Scene.load`` with preloaded segments with the regular scheduler, after this commit: 6.5 seconds

Time for ``Scene.load`` with preloaded segments with the distributed scheduler, after this commit: 6.9 seconds
@gerritholl gerritholl marked this pull request as ready for review July 31, 2024 10:15
@gerritholl
Copy link
Collaborator Author

Solved the dask distributed performance problem in ec49a6f:

When segment-shareable data are obtained from the reference filehandler,
get this directly from its file_content attribute and not via
the filehandlers __getitem__ method. This solves the performance
problem using preloading with dask distributed.

Details: NetCDF4FileHandler.__getitem__ turns a netCDF4.Variable
into an xarray.DataArray encapsulating a dask array. Therefore, a
small read, such as the FCI L1C NC file handler needs to perform upon
Scene.load, becomes a small compute. Those small computes have a
small negative impact when using the normal dask scheduler, but a terrible
negative impact when using the dask distributed scheduler, where in one
case I tested, Scene.load increases from 10 seconds to 375 seconds.

Time for Scene.load with preloaded segments with the regular scheduler, before this commit: 10.8 seconds

Time for Scene.load with preloaded segments with the distributed scheduler, before this commit: 376 seconds

Time for Scene.load with preloaded segments with the regular scheduler, after this commit: 6.5 seconds

Time for Scene.load with preloaded segments with the distributed scheduler, after this commit: 6.9 seconds

@djhoese
Copy link
Member

djhoese commented Jul 31, 2024

So the item in file_content is a NetCDF4 Variable? And the item from __getitem__ is the DataArray version of that? You mention that a small load is needed as part of the preload process on this item, which load is that (sorry I'm unfamiliar with the overall preload process done here)?

@gerritholl
Copy link
Collaborator Author

gerritholl commented Jul 31, 2024

So the item in file_content is a NetCDF4 Variable? And the item from __getitem__ is the DataArray version of that?

Yes. That predates this PR.

You mention that a small load is needed as part of the preload process on this item, which load is that (sorry I'm unfamiliar with the overall preload process done here)?

Not as part of the preload process, but in the FCI file handler for either regular loading or pre-loading. For example, it loads the satellite location and the Sun-Earth distance:

def get_parameters_lon_lat_alt(self):
"""Compute the orbital parameters.
Compute satellite_actual_longitude,satellite_actual_latitude,satellite_actual_altitude.
"""
actual_subsat_lon = float(np.nanmean(self._get_aux_data_lut_vector("subsatellite_longitude")))
actual_subsat_lat = float(np.nanmean(self._get_aux_data_lut_vector("subsatellite_latitude")))
actual_sat_alt = float(np.nanmean(self._get_aux_data_lut_vector("platform_altitude")))
return actual_subsat_lon,actual_subsat_lat,actual_sat_alt

Those are parameters that are constant between segments, so when we are pre-loading segments 2–40 (by pre-loading I mean the FileHandler handling a file that is expected, but not yet available), those parameters are taken from segment 1. When passed a NetCDF4.Variable, this triggers a load, but when passed an xarray.DataArray encapsulating this variable using dask, it triggers a compute. I didn't notice that before testing dask distributed, because those computes are cheap with the regular scheduler and I did not do a CustomScheduler(max_computes=0) test (or not correctly). But with the dask distributed scheduler, for some reason, those computes are almost two orders of magnitude more expensive.

@djhoese
Copy link
Member

djhoese commented Jul 31, 2024

Ah ok and your change was specifically for things being loaded from the other "ref" segment. Ok. And these things being loaded are stored as variables in the NetCDF files and not attributes?

@gerritholl
Copy link
Collaborator Author

Ah ok and your change was specifically for things being loaded from the other "ref" segment. Ok. And these things being loaded are stored as variables in the NetCDF files and not attributes?

Yes and yes.

@gerritholl
Copy link
Collaborator Author

gerritholl commented Aug 1, 2024

I do not manage to get the task ordering to work reliably with dask distributed using the secede/rejoin approach proposed by @mraspaud on 2024-06-12 in a situation with a realistic complexity. Even with seceding/rejoining, it seems hard to control when dask finally reaches tasks depending on early chunks. A case study, based on debug statements, shows an example with 8 workers:

  • 14:55:26: T=0, starting Satpy (via satpy_cli in trollflow2 for a workflow of realistic complexity).
  • 14:55:36: chunk 2 arrives
  • 14:55:42: chunk 3 arrives
  • 14:56:02: building of dask graphs (overhead) finished, looking for chunks 13, 15, 18, 17, 14, 16, 9, 8, 20, 19
  • 14:56:08: chunk 5 arrives
  • 14:56:15: chunk 4 arrives
  • 14:56:27: chunk 6 arrives
  • 14:56:35: chunk 7 arrives
  • 14:56:46: looking for chunk 30
  • 14:56:47: looking for chunk 27
  • 14:56:50: looking for chunks 11, 12, 2, 3: start of calculations
  • 14:56:52: looking for chunks 10, 21, 22, 24, 26, 28, 29, 32, 33, 35, 37, 38, 39, 40
  • 14:56:54: looking for chunk 4, 34, 36
  • 14:56:55: looking for chunks 6, 7
  • 14:56:57: looking for chunks 5, 31
  • 14:56:58: looking for chunk 25
  • 14:57:04: looking for chunk 23
  • 14:57:09-15:13:01: chunks 8–40 arrive

Results appear differently in detail on a different machine or even depending on how I run (in a satpy test script, through trollflow2s satpy_cli, via trollflow2 messaging).

So even with the secede/rejoin complications, we are idling for 48 seconds before calculations start, even though by then 7 chunks are already available. Although the tasks are doing plenty of secede/rejoin, they are just jumping back and forth between tasks depending on ten chunks that aren't there yet, then two more after 44/45 seconds. Only after 62 seconds have the workers alternated between tasks that have dependencies on all possible chunks.

I don't know if dask is ping-ponging between the same tasks (that are immediately seceding) and letting others lie, or if it's actually going by all tasks, and we have so many of them that it takes 64 seconds to reach the last chunk, in an order we still do not control.

Based on this and other problems I encounter with dask distributed, and the fact it would take significant development work on satpy to make it suit our needs (writing geotiff), I am inclined to give up on full dask distributed compatibility.

Move preloading documentations to its own section under "Advanced
topics".  Fix internal reference links.
Rename the preloading configuration parameters to use their own
subsection.
Merge three very similar test functions to a single parametrised one.
@gerritholl
Copy link
Collaborator Author

Thought by @mraspaud on another way to possibly resolve the ordering: https://pytroll.slack.com/archives/C0LNH7LMB/p1723793710070629

Or explicitly make tasks depend on other tasks, not sure how.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

4 participants