Skip to content

Commit

Permalink
Merge pull request #7 from ec-jrc/feature-dask
Browse files Browse the repository at this point in the history
Parallel fitting
  • Loading branch information
loicdtx authored Jan 15, 2024
2 parents 4a7c078 + 445af38 commit 341f357
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 26 deletions.
11 changes: 11 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Changes
=======

0.2.0
-----

- np.linalg.inv replaced by the more recommanded np.linalg.solve in many places
- Integration of numba parallel accelerator in most fitting functions (new argument
to control number of threads in the .fit method of BaseNrt class)
- Possibility to pass kwargs to function of data module that load xarray.Datasets
objects (particularly useful to specify chunking and get a dask based object)
- New example in gallery on parallel fitting


0.1.1
-----

Expand Down
6 changes: 6 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import os
import sys
import warnings
from numba import NumbaWarning
import nrt


Expand Down Expand Up @@ -45,6 +47,10 @@
'gallery_dirs': 'auto_examples', # path to where to save gallery generated output
}

# Avoid displaying some common warnings in gallery examples
warnings.filterwarnings('ignore', category=NumbaWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

Expand Down
138 changes: 138 additions & 0 deletions docs/gallery/plot_parallel_computing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
r"""
Parallel model fitting
======================
The most computationally expensive part of a typical nrt workflow is the fitting
of a harmonic model over the stable history period. Starting with version ``0.2.0``,
``nrt`` uses multithreading to further speed-up the already fast model fitting.
This example illustrates how multithreading can be enabled and adjusted to your use case.
"""

##############################################################
# Confirgure multithreading options of linear algebra library
# ===========================================================
#
# Most of the low level computation/numerical optimization occuring during model
# fitting with nrt relies on a linear algebra library. These libraries often implement
# low level methods with built-in multi-threading. ``nrt`` implements multi-threading
# thanks to ``numba`` on a different, higher level.
# To prevent nested parallelism that would result in over-subscription and potentially
# reduce performances, it is recommanded to disable the built in multi-threading
# of the linear algebra library being used.
# Depending on how ``numpy`` was installed, it will rely on one of the three linear
# algebra libraries which are OpenBLAS, MKL or BLIS. At the time of writing this
# tutorial, pipy wheels (obtain when installing ``numpy`` using pip) are shipped
# with OpenBLAS, while a conda installation from the default channel will come with
# MKL. All three libraries use an environmental variable to control threading
# (``MKL_NUM_THREADS``, ``OPENBLAS_NUM_THREADS`` and ``BLIS_NUM_THREADS``); in the
# present example, we set them all to ``'1'`` directly from within python.
# Although knowing which library is used on your system would allow you to remove
# the unnecessary configuration lines, it is not entirely necessary.
import os
# Note that 1 is a string, not an integer
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['BLIS_NUM_THREADS'] = '1'

##############################################################
# Create benchmark data
# =====================
#
# Using the synthetic data generation functionalities of the package, we can create
# an xarray DataArray for benchmark. Note that in order to keep the compilation time
# of this tutorial manageable we limit the size of that object to 200 by 200 pixels.
# While this is significantly smaller than e.g. a Sentinel2 MGRS tile, it is sufficient
# to illustrate differences in fitting time among various fitting strategies
import xarray as xr
import numpy as np
from nrt import data

# Create synthetic ndvi data cube
dates = np.arange('2018-01-01', '2020-12-31', dtype='datetime64[W]')
params_ds = data.make_cube_parameters(shape=(200,200), unstable_proportion=0)
cube = data.make_cube(dates=dates, params_ds=params_ds)
# We also create a very small cube for running each fitting method once before
# the benchmark, ensuring compilation of the jitted functions and fair comparison
cube_sub = cube.isel(indexers={'x': slice(1,5), 'y': slice(1,5)})


##############################################################
# Benchmark fitting time of all methods
# =====================================
#
# Note that we are only interested in fitting time and therefore use a single
# class instance for the benchmark. The time required for any subsequent .monitor()
# call is usually negligible and as a consequence not included in this benchmark.
# We use here ``CuSum`` but any of the monitoring classes could be used and
# would produce the same results.
import time
import itertools
from collections import defaultdict
from nrt.monitor.cusum import CuSum
import matplotlib.pyplot as plt

# Benchmark parameters
benchmark_dict = defaultdict(dict)
monitor = CuSum()
methods = ['OLS', 'RIRLS', 'CCDC-stable', 'ROC']
threads = range(1,4)

# Make sure all numba jitted function are compiled
monitor_ = CuSum()
[monitor_.fit(cube_sub, method=method) for method in methods]

# Benchmark loop
for method, n_threads in itertools.product(methods, threads):
t0 = time.time()
monitor.fit(cube, n_threads=n_threads, method=method)
t1 = time.time()
benchmark_dict[method][n_threads] = t1 - t0

# Visualize the results
index = np.arange(len(methods))
for idx, n in enumerate(threads):
values = [benchmark_dict[method][n] for method in methods]
plt.bar(index + idx * 0.2, values, 0.2, label='%d thread(s)' % n)

plt.xlabel('Fitting method')
plt.ylabel('Time (seconds)')
plt.title('Fitting time')
plt.xticks(index + 0.2, methods)
plt.legend()
plt.tight_layout()
plt.show()

##############################################################
# From the results above we notice large differences in fitting time among fitting
# methods. Unsurprisingly, OLS is the fastest, which is expected given that all
# other methods use OLS complemented with some additional, sometimes iterative
# refitting, etc... All methods but ``ROC`` for which parallel fitting hasn't been
# implemented, benefit from using multiple threads.
# Note that a multithreading benefit can only be observed as long as the number
# threads is lower than the computing resources available. The machine used for
# compiling this tutorial is not meant for heavy computation and obviously has limited
# resources as shown by the cpu_count below
import multiprocessing
print(multiprocessing.cpu_count())


##############################################################
# Further considerations
# ======================
#
# A deployment at scale may involve several levels of parallelization. The multi-threaded
# example illustrated above is made possible thanks to the numba parallel accelerator.
# However, it is also very common to handle the earlier steps of data loading and
# data pre-processing with ``dask.distributed``, which facilitates lazy and distributed
# computation. There is no direct integration between the two parallelism mechanisms
# and while calling ``.fit()`` on a lazy distributed dask array is possible, the lazy
# evaluation cannot be preserved and all the input data need to be evaluated and
# loaded in memory
from nrt import data

# Lazy load test data using dask
cube = data.romania_10m(chunks={'x': 20, 'y': 20})
vi_cube = (cube.B8 - cube.B4) / (cube.B8 + cube.B4)
print(vi_cube)
monitor = CuSum()
monitor.fit(vi_cube, method='OLS', n_threads=3)
print(type(monitor.beta))
14 changes: 8 additions & 6 deletions nrt/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@
data_dir = os.path.abspath(os.path.dirname(__file__))


def _load(f):
def _load(f, **kwargs):
"""Load a ncdf file located in the data directory as a xarray Dataset
Args:
f (str): File basename
**kwargs: Keyword arguments passed to ``xarray.open_dataset``
Return:
xarray.Dataset: The Dataset
"""
xr_dataset = xr.open_dataset(os.path.join(data_dir, f))
xr_dataset = xr.open_dataset(os.path.join(data_dir, f),
**kwargs)
return xr_dataset


def romania_10m():
def romania_10m(**kwargs):
"""Sentinel 2 datacube of a small forested area in Romania at 10 m resolution
Examples:
Expand All @@ -49,10 +51,10 @@ def romania_10m():
>>> # Filter clouds
>>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7]))
"""
return _load('sentinel2_cube_subset_romania_10m.nc')
return _load(f='sentinel2_cube_subset_romania_10m.nc', **kwargs)


def romania_20m():
def romania_20m(**kwargs):
"""Sentinel 2 datacube of a small forested area in Romania at 20 m resolution
Examples:
Expand All @@ -64,7 +66,7 @@ def romania_20m():
>>> # Filter clouds
>>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7]))
"""
return _load('sentinel2_cube_subset_romania_20m.nc')
return _load(f='sentinel2_cube_subset_romania_20m.nc', **kwargs)


def romania_forest_cover_percentage():
Expand Down
38 changes: 26 additions & 12 deletions nrt/fit_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def ols(X, y):


@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=True)
def rirls(X, y, M=bisquare, tune=4.685,
scale_est=mad, scale_constant=0.6745,
update_scale=True, maxiter=50, tol=1e-8):
Expand All @@ -82,6 +82,14 @@ def rirls(X, y, M=bisquare, tune=4.685,
according to weight function and tuning parameter.
Basically a clone from `statsmodels` that should be much faster.
Note:
For best performances of the multithreaded implementation, it is
recommended to limit the number of threads used by MKL or OpenBLAS to 1.
This avoids over-subscription, and improves performances.
By default the function will use all cores available; the number of cores
used can be controled using the ``numba.set_num_threads`` function or
by modifying the ``NUMBA_NUM_THREADS`` environment variable
Args:
X (np.ndarray): 2D (n_obs x n_features) design matrix
y (np.ndarray): 1D independent variable
Expand All @@ -102,7 +110,7 @@ def rirls(X, y, M=bisquare, tune=4.685,
"""
beta = np.zeros((X.shape[1], y.shape[1]), dtype=np.float64)
resid = np.full_like(y, np.nan, dtype=np.float64)
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
y_sub = y[:,idx]
isna = np.isnan(y_sub)
X_sub = X[~isna]
Expand Down Expand Up @@ -156,7 +164,7 @@ def weighted_ols(X, y, w):
return beta, resid

@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=True)
def ccdc_stable_fit(X, y, dates, threshold=3):
"""Fitting stable regressions using an adapted CCDC method
Expand All @@ -176,6 +184,14 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
2. first observation / RMSE < threshold
3. last observation / RMSE < threshold
Note:
For best performances of the multithreaded implementation, it is
recommended to limit the number of threads used by MKL or OpenBLAS to 1.
This avoids over-subscription, and improves performances.
By default the function will use all cores available; the number of cores
used can be controled using the ``numba.set_num_threads`` function or
by modifying the ``NUMBA_NUM_THREADS`` environment variable
Args:
X ((M, N) np.ndarray): Matrix of independant variables
y ((M, K) np.ndarray): Matrix of dependant variables
Expand All @@ -194,7 +210,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
residuals = np.full_like(y, np.nan)
stable = np.empty((y.shape[1]))
fit_start = np.empty((y.shape[1]))
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
y_sub = y[:, idx]
isna = np.isnan(y_sub)
X_sub = X[~isna]
Expand All @@ -209,9 +225,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
# each iteration
y_ = y_sub[-jdx:]
X_ = X_sub[-jdx:]
XTX = np.linalg.inv(np.dot(X_.T, X_))
XTY = np.dot(X_.T, y_)
beta_sub = np.dot(XTX, XTY)
beta_sub = np.linalg.solve(np.dot(X_.T, X_), np.dot(X_.T, y_))
resid_sub = np.dot(X_, beta_sub) - y_

# Check for stability
Expand All @@ -238,7 +252,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):


@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=False)
def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
"""Fitting stable regressions using Reverse Ordered Cumulative Sums
Expand All @@ -262,6 +276,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
crit (float): Critical value corresponding to the chosen alpha. Can be
calculated with ``_cusum_rec_test_crit``.
Default is the value for alpha=0.05
Returns:
beta (numpy.ndarray): The array of regression estimators
residuals (numpy.ndarray): The array of residuals
Expand All @@ -273,7 +288,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
fit_start = np.zeros_like(is_stable, dtype=np.uint16)
beta = np.full((X.shape[1], y.shape[1]), np.nan, dtype=np.float64)
nreg = X.shape[1]
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
# subset and remove nan
is_nan = np.isnan(y[:, idx])
_y = y[~is_nan, idx]
Expand All @@ -300,9 +315,8 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
# Subset and fit
X_stable = _X[stable_idx:]
y_stable = _y[stable_idx:]
XTX = np.linalg.inv(np.dot(X_stable.T, X_stable))
XTY = np.dot(X_stable.T, y_stable)
beta[:, idx] = np.dot(XTX, XTY)
beta[:, idx] = np.linalg.solve(np.dot(X_stable.T, X_stable),
np.dot(X_stable.T, y_stable))
fit_start[idx] = _dates[stable_idx]

residuals = np.dot(X, beta) - y
Expand Down
8 changes: 7 additions & 1 deletion nrt/monitor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import numpy as np
import pandas as pd
import numba
from netCDF4 import Dataset
import rasterio
from rasterio.crs import CRS
Expand Down Expand Up @@ -124,7 +125,8 @@ def __eq__(self, other):

def _fit(self, X, dataarray,
method='OLS',
screen_outliers=None, **kwargs):
screen_outliers=None,
n_threads=1, **kwargs):
"""Fit a regression model on an xarray.DataArray
Args:
X (numpy.ndarray): The design matrix used for the regression
Expand All @@ -134,6 +136,9 @@ def _fit(self, X, dataarray,
``'RIRLS'``, ``'LASSO'``, ``'ROC'`` and ``'CCDC-stable'``.
screen_outliers (str): The screening method. Possible values include
``'Shewhart'`` and ``'CCDC_RIRLS'``.
n_threads (int): Number of threads used for parallel fitting. Note that
parallel fitting is not supported for ``ROC``; and that argument
has therefore no impact when combined with ``method='ROC'``
**kwargs: Other parameters specific to each fit and/or outlier
screening method
Expand All @@ -145,6 +150,7 @@ def _fit(self, X, dataarray,
NotImplementedError: If method is not yet implemented
ValueError: Unknown value for `method`
"""
numba.set_num_threads(n_threads)
# Check for strictly increasing time dimension:
if not np.all(dataarray.time.values[1:] >= dataarray.time.values[:-1]):
raise ValueError("Time dimension of dataarray has to be sorted chronologically.")
Expand Down
Loading

0 comments on commit 341f357

Please sign in to comment.