Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel fitting #7

Merged
merged 8 commits into from
Jan 15, 2024
11 changes: 11 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
Changes
=======

0.2.0
-----

- np.linalg.inv replaced by the more recommanded np.linalg.solve in many places
- Integration of numba parallel accelerator in most fitting functions (new argument
to control number of threads in the .fit method of BaseNrt class)
- Possibility to pass kwargs to function of data module that load xarray.Datasets
objects (particularly useful to specify chunking and get a dask based object)
- New example in gallery on parallel fitting


0.1.1
-----

Expand Down
6 changes: 6 additions & 0 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

import os
import sys
import warnings
from numba import NumbaWarning
import nrt


Expand Down Expand Up @@ -45,6 +47,10 @@
'gallery_dirs': 'auto_examples', # path to where to save gallery generated output
}

# Avoid displaying some common warnings in gallery examples
warnings.filterwarnings('ignore', category=NumbaWarning)
warnings.filterwarnings('ignore', category=RuntimeWarning)

# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']

Expand Down
138 changes: 138 additions & 0 deletions docs/gallery/plot_parallel_computing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
r"""
Parallel model fitting
======================
The most computationally expensive part of a typical nrt workflow is the fitting
of a harmonic model over the stable history period. Starting with version ``0.2.0``,
``nrt`` uses multithreading to further speed-up the already fast model fitting.
This example illustrates how multithreading can be enabled and adjusted to your use case.
"""

##############################################################
# Confirgure multithreading options of linear algebra library
# ===========================================================
#
# Most of the low level computation/numerical optimization occuring during model
# fitting with nrt relies on a linear algebra library. These libraries often implement
# low level methods with built-in multi-threading. ``nrt`` implements multi-threading
# thanks to ``numba`` on a different, higher level.
# To prevent nested parallelism that would result in over-subscription and potentially
# reduce performances, it is recommanded to disable the built in multi-threading
# of the linear algebra library being used.
# Depending on how ``numpy`` was installed, it will rely on one of the three linear
# algebra libraries which are OpenBLAS, MKL or BLIS. At the time of writing this
# tutorial, pipy wheels (obtain when installing ``numpy`` using pip) are shipped
# with OpenBLAS, while a conda installation from the default channel will come with
# MKL. All three libraries use an environmental variable to control threading
# (``MKL_NUM_THREADS``, ``OPENBLAS_NUM_THREADS`` and ``BLIS_NUM_THREADS``); in the
# present example, we set them all to ``'1'`` directly from within python.
# Although knowing which library is used on your system would allow you to remove
# the unnecessary configuration lines, it is not entirely necessary.
import os
# Note that 1 is a string, not an integer
os.environ['MKL_NUM_THREADS'] = '1'
os.environ['OPENBLAS_NUM_THREADS'] = '1'
os.environ['BLIS_NUM_THREADS'] = '1'

##############################################################
# Create benchmark data
# =====================
#
# Using the synthetic data generation functionalities of the package, we can create
# an xarray DataArray for benchmark. Note that in order to keep the compilation time
# of this tutorial manageable we limit the size of that object to 200 by 200 pixels.
# While this is significantly smaller than e.g. a Sentinel2 MGRS tile, it is sufficient
# to illustrate differences in fitting time among various fitting strategies
import xarray as xr
import numpy as np
from nrt import data

# Create synthetic ndvi data cube
dates = np.arange('2018-01-01', '2020-12-31', dtype='datetime64[W]')
params_ds = data.make_cube_parameters(shape=(200,200), unstable_proportion=0)
cube = data.make_cube(dates=dates, params_ds=params_ds)
# We also create a very small cube for running each fitting method once before
# the benchmark, ensuring compilation of the jitted functions and fair comparison
cube_sub = cube.isel(indexers={'x': slice(1,5), 'y': slice(1,5)})


##############################################################
# Benchmark fitting time of all methods
# =====================================
#
# Note that we are only interested in fitting time and therefore use a single
# class instance for the benchmark. The time required for any subsequent .monitor()
# call is usually negligible and as a consequence not included in this benchmark.
# We use here ``CuSum`` but any of the monitoring classes could be used and
# would produce the same results.
import time
import itertools
from collections import defaultdict
from nrt.monitor.cusum import CuSum
import matplotlib.pyplot as plt

# Benchmark parameters
benchmark_dict = defaultdict(dict)
monitor = CuSum()
methods = ['OLS', 'RIRLS', 'CCDC-stable', 'ROC']
threads = range(1,4)

# Make sure all numba jitted function are compiled
monitor_ = CuSum()
[monitor_.fit(cube_sub, method=method) for method in methods]

# Benchmark loop
for method, n_threads in itertools.product(methods, threads):
t0 = time.time()
monitor.fit(cube, n_threads=n_threads, method=method)
t1 = time.time()
benchmark_dict[method][n_threads] = t1 - t0

# Visualize the results
index = np.arange(len(methods))
for idx, n in enumerate(threads):
values = [benchmark_dict[method][n] for method in methods]
plt.bar(index + idx * 0.2, values, 0.2, label='%d thread(s)' % n)

plt.xlabel('Fitting method')
plt.ylabel('Time (seconds)')
plt.title('Fitting time')
plt.xticks(index + 0.2, methods)
plt.legend()
plt.tight_layout()
plt.show()

##############################################################
# From the results above we notice large differences in fitting time among fitting
# methods. Unsurprisingly, OLS is the fastest, which is expected given that all
# other methods use OLS complemented with some additional, sometimes iterative
# refitting, etc... All methods but ``ROC`` for which parallel fitting hasn't been
# implemented, benefit from using multiple threads.
# Note that a multithreading benefit can only be observed as long as the number
# threads is lower than the computing resources available. The machine used for
# compiling this tutorial is not meant for heavy computation and obviously has limited
# resources as shown by the cpu_count below
import multiprocessing
print(multiprocessing.cpu_count())


##############################################################
# Further considerations
# ======================
#
# A deployment at scale may involve several levels of parallelization. The multi-threaded
# example illustrated above is made possible thanks to the numba parallel accelerator.
# However, it is also very common to handle the earlier steps of data loading and
# data pre-processing with ``dask.distributed``, which facilitates lazy and distributed
# computation. There is no direct integration between the two parallelism mechanisms
# and while calling ``.fit()`` on a lazy distributed dask array is possible, the lazy
# evaluation cannot be preserved and all the input data need to be evaluated and
# loaded in memory
from nrt import data

# Lazy load test data using dask
cube = data.romania_10m(chunks={'x': 20, 'y': 20})
vi_cube = (cube.B8 - cube.B4) / (cube.B8 + cube.B4)
print(vi_cube)
monitor = CuSum()
monitor.fit(vi_cube, method='OLS', n_threads=3)
print(type(monitor.beta))
14 changes: 8 additions & 6 deletions nrt/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,22 @@
data_dir = os.path.abspath(os.path.dirname(__file__))


def _load(f):
def _load(f, **kwargs):
"""Load a ncdf file located in the data directory as a xarray Dataset

Args:
f (str): File basename
**kwargs: Keyword arguments passed to ``xarray.open_dataset``

Return:
xarray.Dataset: The Dataset
"""
xr_dataset = xr.open_dataset(os.path.join(data_dir, f))
xr_dataset = xr.open_dataset(os.path.join(data_dir, f),
**kwargs)
return xr_dataset


def romania_10m():
def romania_10m(**kwargs):
"""Sentinel 2 datacube of a small forested area in Romania at 10 m resolution

Examples:
Expand All @@ -49,10 +51,10 @@ def romania_10m():
>>> # Filter clouds
>>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7]))
"""
return _load('sentinel2_cube_subset_romania_10m.nc')
return _load(f='sentinel2_cube_subset_romania_10m.nc', **kwargs)


def romania_20m():
def romania_20m(**kwargs):
"""Sentinel 2 datacube of a small forested area in Romania at 20 m resolution

Examples:
Expand All @@ -64,7 +66,7 @@ def romania_20m():
>>> # Filter clouds
>>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7]))
"""
return _load('sentinel2_cube_subset_romania_20m.nc')
return _load(f='sentinel2_cube_subset_romania_20m.nc', **kwargs)


def romania_forest_cover_percentage():
Expand Down
38 changes: 26 additions & 12 deletions nrt/fit_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def ols(X, y):


@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=True)
def rirls(X, y, M=bisquare, tune=4.685,
scale_est=mad, scale_constant=0.6745,
update_scale=True, maxiter=50, tol=1e-8):
Expand All @@ -82,6 +82,14 @@ def rirls(X, y, M=bisquare, tune=4.685,
according to weight function and tuning parameter.
Basically a clone from `statsmodels` that should be much faster.

Note:
For best performances of the multithreaded implementation, it is
recommended to limit the number of threads used by MKL or OpenBLAS to 1.
This avoids over-subscription, and improves performances.
By default the function will use all cores available; the number of cores
used can be controled using the ``numba.set_num_threads`` function or
by modifying the ``NUMBA_NUM_THREADS`` environment variable

Args:
X (np.ndarray): 2D (n_obs x n_features) design matrix
y (np.ndarray): 1D independent variable
Expand All @@ -102,7 +110,7 @@ def rirls(X, y, M=bisquare, tune=4.685,
"""
beta = np.zeros((X.shape[1], y.shape[1]), dtype=np.float64)
resid = np.full_like(y, np.nan, dtype=np.float64)
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
y_sub = y[:,idx]
isna = np.isnan(y_sub)
X_sub = X[~isna]
Expand Down Expand Up @@ -156,7 +164,7 @@ def weighted_ols(X, y, w):
return beta, resid

@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=True)
def ccdc_stable_fit(X, y, dates, threshold=3):
"""Fitting stable regressions using an adapted CCDC method

Expand All @@ -176,6 +184,14 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
2. first observation / RMSE < threshold
3. last observation / RMSE < threshold

Note:
For best performances of the multithreaded implementation, it is
recommended to limit the number of threads used by MKL or OpenBLAS to 1.
This avoids over-subscription, and improves performances.
By default the function will use all cores available; the number of cores
used can be controled using the ``numba.set_num_threads`` function or
by modifying the ``NUMBA_NUM_THREADS`` environment variable

Args:
X ((M, N) np.ndarray): Matrix of independant variables
y ((M, K) np.ndarray): Matrix of dependant variables
Expand All @@ -194,7 +210,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
residuals = np.full_like(y, np.nan)
stable = np.empty((y.shape[1]))
fit_start = np.empty((y.shape[1]))
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
y_sub = y[:, idx]
isna = np.isnan(y_sub)
X_sub = X[~isna]
Expand All @@ -209,9 +225,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):
# each iteration
y_ = y_sub[-jdx:]
X_ = X_sub[-jdx:]
XTX = np.linalg.inv(np.dot(X_.T, X_))
XTY = np.dot(X_.T, y_)
beta_sub = np.dot(XTX, XTY)
beta_sub = np.linalg.solve(np.dot(X_.T, X_), np.dot(X_.T, y_))
resid_sub = np.dot(X_, beta_sub) - y_

# Check for stability
Expand All @@ -238,7 +252,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3):


@utils.numba_kwargs
@numba.jit(nopython=True, cache=True)
@numba.jit(nopython=True, cache=True, parallel=False)
def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
"""Fitting stable regressions using Reverse Ordered Cumulative Sums

Expand All @@ -262,6 +276,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
crit (float): Critical value corresponding to the chosen alpha. Can be
calculated with ``_cusum_rec_test_crit``.
Default is the value for alpha=0.05

Returns:
beta (numpy.ndarray): The array of regression estimators
residuals (numpy.ndarray): The array of residuals
Expand All @@ -273,7 +288,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
fit_start = np.zeros_like(is_stable, dtype=np.uint16)
beta = np.full((X.shape[1], y.shape[1]), np.nan, dtype=np.float64)
nreg = X.shape[1]
for idx in range(y.shape[1]):
for idx in numba.prange(y.shape[1]):
# subset and remove nan
is_nan = np.isnan(y[:, idx])
_y = y[~is_nan, idx]
Expand All @@ -300,9 +315,8 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134):
# Subset and fit
X_stable = _X[stable_idx:]
y_stable = _y[stable_idx:]
XTX = np.linalg.inv(np.dot(X_stable.T, X_stable))
XTY = np.dot(X_stable.T, y_stable)
beta[:, idx] = np.dot(XTX, XTY)
beta[:, idx] = np.linalg.solve(np.dot(X_stable.T, X_stable),
np.dot(X_stable.T, y_stable))
fit_start[idx] = _dates[stable_idx]

residuals = np.dot(X, beta) - y
Expand Down
8 changes: 7 additions & 1 deletion nrt/monitor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import numpy as np
import pandas as pd
import numba
from netCDF4 import Dataset
import rasterio
from rasterio.crs import CRS
Expand Down Expand Up @@ -124,7 +125,8 @@ def __eq__(self, other):

def _fit(self, X, dataarray,
method='OLS',
screen_outliers=None, **kwargs):
screen_outliers=None,
n_threads=1, **kwargs):
"""Fit a regression model on an xarray.DataArray
Args:
X (numpy.ndarray): The design matrix used for the regression
Expand All @@ -134,6 +136,9 @@ def _fit(self, X, dataarray,
``'RIRLS'``, ``'LASSO'``, ``'ROC'`` and ``'CCDC-stable'``.
screen_outliers (str): The screening method. Possible values include
``'Shewhart'`` and ``'CCDC_RIRLS'``.
n_threads (int): Number of threads used for parallel fitting. Note that
parallel fitting is not supported for ``ROC``; and that argument
has therefore no impact when combined with ``method='ROC'``
**kwargs: Other parameters specific to each fit and/or outlier
screening method

Expand All @@ -145,6 +150,7 @@ def _fit(self, X, dataarray,
NotImplementedError: If method is not yet implemented
ValueError: Unknown value for `method`
"""
numba.set_num_threads(n_threads)
# Check for strictly increasing time dimension:
if not np.all(dataarray.time.values[1:] >= dataarray.time.values[:-1]):
raise ValueError("Time dimension of dataarray has to be sorted chronologically.")
Expand Down
Loading
Loading