diff --git a/CHANGES.txt b/CHANGES.txt index 4cccf8b..8e33874 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,17 @@ Changes ======= +0.2.0 +----- + +- np.linalg.inv replaced by the more recommanded np.linalg.solve in many places +- Integration of numba parallel accelerator in most fitting functions (new argument + to control number of threads in the .fit method of BaseNrt class) +- Possibility to pass kwargs to function of data module that load xarray.Datasets + objects (particularly useful to specify chunking and get a dask based object) +- New example in gallery on parallel fitting + + 0.1.1 ----- diff --git a/docs/conf.py b/docs/conf.py index 5ebde23..e99e099 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -12,6 +12,8 @@ import os import sys +import warnings +from numba import NumbaWarning import nrt @@ -45,6 +47,10 @@ 'gallery_dirs': 'auto_examples', # path to where to save gallery generated output } +# Avoid displaying some common warnings in gallery examples +warnings.filterwarnings('ignore', category=NumbaWarning) +warnings.filterwarnings('ignore', category=RuntimeWarning) + # Add any paths that contain templates here, relative to this directory. templates_path = ['_templates'] diff --git a/docs/gallery/plot_parallel_computing.py b/docs/gallery/plot_parallel_computing.py new file mode 100644 index 0000000..a5d088f --- /dev/null +++ b/docs/gallery/plot_parallel_computing.py @@ -0,0 +1,138 @@ +r""" +Parallel model fitting +====================== +The most computationally expensive part of a typical nrt workflow is the fitting +of a harmonic model over the stable history period. Starting with version ``0.2.0``, +``nrt`` uses multithreading to further speed-up the already fast model fitting. +This example illustrates how multithreading can be enabled and adjusted to your use case. +""" + +############################################################## +# Confirgure multithreading options of linear algebra library +# =========================================================== +# +# Most of the low level computation/numerical optimization occuring during model +# fitting with nrt relies on a linear algebra library. These libraries often implement +# low level methods with built-in multi-threading. ``nrt`` implements multi-threading +# thanks to ``numba`` on a different, higher level. +# To prevent nested parallelism that would result in over-subscription and potentially +# reduce performances, it is recommanded to disable the built in multi-threading +# of the linear algebra library being used. +# Depending on how ``numpy`` was installed, it will rely on one of the three linear +# algebra libraries which are OpenBLAS, MKL or BLIS. At the time of writing this +# tutorial, pipy wheels (obtain when installing ``numpy`` using pip) are shipped +# with OpenBLAS, while a conda installation from the default channel will come with +# MKL. All three libraries use an environmental variable to control threading +# (``MKL_NUM_THREADS``, ``OPENBLAS_NUM_THREADS`` and ``BLIS_NUM_THREADS``); in the +# present example, we set them all to ``'1'`` directly from within python. +# Although knowing which library is used on your system would allow you to remove +# the unnecessary configuration lines, it is not entirely necessary. +import os +# Note that 1 is a string, not an integer +os.environ['MKL_NUM_THREADS'] = '1' +os.environ['OPENBLAS_NUM_THREADS'] = '1' +os.environ['BLIS_NUM_THREADS'] = '1' + +############################################################## +# Create benchmark data +# ===================== +# +# Using the synthetic data generation functionalities of the package, we can create +# an xarray DataArray for benchmark. Note that in order to keep the compilation time +# of this tutorial manageable we limit the size of that object to 200 by 200 pixels. +# While this is significantly smaller than e.g. a Sentinel2 MGRS tile, it is sufficient +# to illustrate differences in fitting time among various fitting strategies +import xarray as xr +import numpy as np +from nrt import data + +# Create synthetic ndvi data cube +dates = np.arange('2018-01-01', '2020-12-31', dtype='datetime64[W]') +params_ds = data.make_cube_parameters(shape=(200,200), unstable_proportion=0) +cube = data.make_cube(dates=dates, params_ds=params_ds) +# We also create a very small cube for running each fitting method once before +# the benchmark, ensuring compilation of the jitted functions and fair comparison +cube_sub = cube.isel(indexers={'x': slice(1,5), 'y': slice(1,5)}) + + +############################################################## +# Benchmark fitting time of all methods +# ===================================== +# +# Note that we are only interested in fitting time and therefore use a single +# class instance for the benchmark. The time required for any subsequent .monitor() +# call is usually negligible and as a consequence not included in this benchmark. +# We use here ``CuSum`` but any of the monitoring classes could be used and +# would produce the same results. +import time +import itertools +from collections import defaultdict +from nrt.monitor.cusum import CuSum +import matplotlib.pyplot as plt + +# Benchmark parameters +benchmark_dict = defaultdict(dict) +monitor = CuSum() +methods = ['OLS', 'RIRLS', 'CCDC-stable', 'ROC'] +threads = range(1,4) + +# Make sure all numba jitted function are compiled +monitor_ = CuSum() +[monitor_.fit(cube_sub, method=method) for method in methods] + +# Benchmark loop +for method, n_threads in itertools.product(methods, threads): + t0 = time.time() + monitor.fit(cube, n_threads=n_threads, method=method) + t1 = time.time() + benchmark_dict[method][n_threads] = t1 - t0 + +# Visualize the results +index = np.arange(len(methods)) +for idx, n in enumerate(threads): + values = [benchmark_dict[method][n] for method in methods] + plt.bar(index + idx * 0.2, values, 0.2, label='%d thread(s)' % n) + +plt.xlabel('Fitting method') +plt.ylabel('Time (seconds)') +plt.title('Fitting time') +plt.xticks(index + 0.2, methods) +plt.legend() +plt.tight_layout() +plt.show() + +############################################################## +# From the results above we notice large differences in fitting time among fitting +# methods. Unsurprisingly, OLS is the fastest, which is expected given that all +# other methods use OLS complemented with some additional, sometimes iterative +# refitting, etc... All methods but ``ROC`` for which parallel fitting hasn't been +# implemented, benefit from using multiple threads. +# Note that a multithreading benefit can only be observed as long as the number +# threads is lower than the computing resources available. The machine used for +# compiling this tutorial is not meant for heavy computation and obviously has limited +# resources as shown by the cpu_count below +import multiprocessing +print(multiprocessing.cpu_count()) + + +############################################################## +# Further considerations +# ====================== +# +# A deployment at scale may involve several levels of parallelization. The multi-threaded +# example illustrated above is made possible thanks to the numba parallel accelerator. +# However, it is also very common to handle the earlier steps of data loading and +# data pre-processing with ``dask.distributed``, which facilitates lazy and distributed +# computation. There is no direct integration between the two parallelism mechanisms +# and while calling ``.fit()`` on a lazy distributed dask array is possible, the lazy +# evaluation cannot be preserved and all the input data need to be evaluated and +# loaded in memory +from nrt import data + +# Lazy load test data using dask +cube = data.romania_10m(chunks={'x': 20, 'y': 20}) +vi_cube = (cube.B8 - cube.B4) / (cube.B8 + cube.B4) +print(vi_cube) +monitor = CuSum() +monitor.fit(vi_cube, method='OLS', n_threads=3) +print(type(monitor.beta)) diff --git a/nrt/data/__init__.py b/nrt/data/__init__.py index 4530f88..7015d1f 100644 --- a/nrt/data/__init__.py +++ b/nrt/data/__init__.py @@ -24,20 +24,22 @@ data_dir = os.path.abspath(os.path.dirname(__file__)) -def _load(f): +def _load(f, **kwargs): """Load a ncdf file located in the data directory as a xarray Dataset Args: f (str): File basename + **kwargs: Keyword arguments passed to ``xarray.open_dataset`` Return: xarray.Dataset: The Dataset """ - xr_dataset = xr.open_dataset(os.path.join(data_dir, f)) + xr_dataset = xr.open_dataset(os.path.join(data_dir, f), + **kwargs) return xr_dataset -def romania_10m(): +def romania_10m(**kwargs): """Sentinel 2 datacube of a small forested area in Romania at 10 m resolution Examples: @@ -49,10 +51,10 @@ def romania_10m(): >>> # Filter clouds >>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7])) """ - return _load('sentinel2_cube_subset_romania_10m.nc') + return _load(f='sentinel2_cube_subset_romania_10m.nc', **kwargs) -def romania_20m(): +def romania_20m(**kwargs): """Sentinel 2 datacube of a small forested area in Romania at 20 m resolution Examples: @@ -64,7 +66,7 @@ def romania_20m(): >>> # Filter clouds >>> s2_cube = s2_cube.where(s2_cube.SCL.isin([4,5,7])) """ - return _load('sentinel2_cube_subset_romania_20m.nc') + return _load(f='sentinel2_cube_subset_romania_20m.nc', **kwargs) def romania_forest_cover_percentage(): diff --git a/nrt/fit_methods.py b/nrt/fit_methods.py index 3c1fc8c..30192af 100644 --- a/nrt/fit_methods.py +++ b/nrt/fit_methods.py @@ -72,7 +72,7 @@ def ols(X, y): @utils.numba_kwargs -@numba.jit(nopython=True, cache=True) +@numba.jit(nopython=True, cache=True, parallel=True) def rirls(X, y, M=bisquare, tune=4.685, scale_est=mad, scale_constant=0.6745, update_scale=True, maxiter=50, tol=1e-8): @@ -82,6 +82,14 @@ def rirls(X, y, M=bisquare, tune=4.685, according to weight function and tuning parameter. Basically a clone from `statsmodels` that should be much faster. + Note: + For best performances of the multithreaded implementation, it is + recommended to limit the number of threads used by MKL or OpenBLAS to 1. + This avoids over-subscription, and improves performances. + By default the function will use all cores available; the number of cores + used can be controled using the ``numba.set_num_threads`` function or + by modifying the ``NUMBA_NUM_THREADS`` environment variable + Args: X (np.ndarray): 2D (n_obs x n_features) design matrix y (np.ndarray): 1D independent variable @@ -102,7 +110,7 @@ def rirls(X, y, M=bisquare, tune=4.685, """ beta = np.zeros((X.shape[1], y.shape[1]), dtype=np.float64) resid = np.full_like(y, np.nan, dtype=np.float64) - for idx in range(y.shape[1]): + for idx in numba.prange(y.shape[1]): y_sub = y[:,idx] isna = np.isnan(y_sub) X_sub = X[~isna] @@ -156,7 +164,7 @@ def weighted_ols(X, y, w): return beta, resid @utils.numba_kwargs -@numba.jit(nopython=True, cache=True) +@numba.jit(nopython=True, cache=True, parallel=True) def ccdc_stable_fit(X, y, dates, threshold=3): """Fitting stable regressions using an adapted CCDC method @@ -176,6 +184,14 @@ def ccdc_stable_fit(X, y, dates, threshold=3): 2. first observation / RMSE < threshold 3. last observation / RMSE < threshold + Note: + For best performances of the multithreaded implementation, it is + recommended to limit the number of threads used by MKL or OpenBLAS to 1. + This avoids over-subscription, and improves performances. + By default the function will use all cores available; the number of cores + used can be controled using the ``numba.set_num_threads`` function or + by modifying the ``NUMBA_NUM_THREADS`` environment variable + Args: X ((M, N) np.ndarray): Matrix of independant variables y ((M, K) np.ndarray): Matrix of dependant variables @@ -194,7 +210,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3): residuals = np.full_like(y, np.nan) stable = np.empty((y.shape[1])) fit_start = np.empty((y.shape[1])) - for idx in range(y.shape[1]): + for idx in numba.prange(y.shape[1]): y_sub = y[:, idx] isna = np.isnan(y_sub) X_sub = X[~isna] @@ -209,9 +225,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3): # each iteration y_ = y_sub[-jdx:] X_ = X_sub[-jdx:] - XTX = np.linalg.inv(np.dot(X_.T, X_)) - XTY = np.dot(X_.T, y_) - beta_sub = np.dot(XTX, XTY) + beta_sub = np.linalg.solve(np.dot(X_.T, X_), np.dot(X_.T, y_)) resid_sub = np.dot(X_, beta_sub) - y_ # Check for stability @@ -238,7 +252,7 @@ def ccdc_stable_fit(X, y, dates, threshold=3): @utils.numba_kwargs -@numba.jit(nopython=True, cache=True) +@numba.jit(nopython=True, cache=True, parallel=False) def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134): """Fitting stable regressions using Reverse Ordered Cumulative Sums @@ -262,6 +276,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134): crit (float): Critical value corresponding to the chosen alpha. Can be calculated with ``_cusum_rec_test_crit``. Default is the value for alpha=0.05 + Returns: beta (numpy.ndarray): The array of regression estimators residuals (numpy.ndarray): The array of residuals @@ -273,7 +288,7 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134): fit_start = np.zeros_like(is_stable, dtype=np.uint16) beta = np.full((X.shape[1], y.shape[1]), np.nan, dtype=np.float64) nreg = X.shape[1] - for idx in range(y.shape[1]): + for idx in numba.prange(y.shape[1]): # subset and remove nan is_nan = np.isnan(y[:, idx]) _y = y[~is_nan, idx] @@ -300,9 +315,8 @@ def roc_stable_fit(X, y, dates, alpha=0.05, crit=0.9478982340418134): # Subset and fit X_stable = _X[stable_idx:] y_stable = _y[stable_idx:] - XTX = np.linalg.inv(np.dot(X_stable.T, X_stable)) - XTY = np.dot(X_stable.T, y_stable) - beta[:, idx] = np.dot(XTX, XTY) + beta[:, idx] = np.linalg.solve(np.dot(X_stable.T, X_stable), + np.dot(X_stable.T, y_stable)) fit_start[idx] = _dates[stable_idx] residuals = np.dot(X, beta) - y diff --git a/nrt/monitor/__init__.py b/nrt/monitor/__init__.py index 522a6a6..bd34280 100644 --- a/nrt/monitor/__init__.py +++ b/nrt/monitor/__init__.py @@ -19,6 +19,7 @@ import numpy as np import pandas as pd +import numba from netCDF4 import Dataset import rasterio from rasterio.crs import CRS @@ -124,7 +125,8 @@ def __eq__(self, other): def _fit(self, X, dataarray, method='OLS', - screen_outliers=None, **kwargs): + screen_outliers=None, + n_threads=1, **kwargs): """Fit a regression model on an xarray.DataArray Args: X (numpy.ndarray): The design matrix used for the regression @@ -134,6 +136,9 @@ def _fit(self, X, dataarray, ``'RIRLS'``, ``'LASSO'``, ``'ROC'`` and ``'CCDC-stable'``. screen_outliers (str): The screening method. Possible values include ``'Shewhart'`` and ``'CCDC_RIRLS'``. + n_threads (int): Number of threads used for parallel fitting. Note that + parallel fitting is not supported for ``ROC``; and that argument + has therefore no impact when combined with ``method='ROC'`` **kwargs: Other parameters specific to each fit and/or outlier screening method @@ -145,6 +150,7 @@ def _fit(self, X, dataarray, NotImplementedError: If method is not yet implemented ValueError: Unknown value for `method` """ + numba.set_num_threads(n_threads) # Check for strictly increasing time dimension: if not np.all(dataarray.time.values[1:] >= dataarray.time.values[:-1]): raise ValueError("Time dimension of dataarray has to be sorted chronologically.") diff --git a/nrt/stats.py b/nrt/stats.py index f71c480..dcc0a81 100644 --- a/nrt/stats.py +++ b/nrt/stats.py @@ -17,17 +17,30 @@ import numpy as np -@numba.jit(nopython=True, cache=True) +@numba.jit(nopython=True, cache=True, parallel=True) def nanlstsq(X, y): """Return the least-squares solution to a linear matrix equation Analog to ``numpy.linalg.lstsq`` for dependant variable containing ``Nan`` + Note: + For best performances of the multithreaded implementation, it is + recommended to limit the number of threads used by MKL or OpenBLAS to 1. + This avoids over-subscription, and improves performances. + By default the function will use all cores available; the number of cores + used can be controled using the ``numba.set_num_threads`` function or + by modifying the ``NUMBA_NUM_THREADS`` environment variable + Args: X ((M, N) np.ndarray): Matrix of independant variables y ({(M,), (M, K)} np.ndarray): Matrix of dependant variables Examples: + >>> import os + >>> # Adjust linear algebra configuration (only one should be required + >>> # depending on how numpy was installed/compiled) + >>> os.environ['OPENBLAS_NUM_THREADS'] = '1' + >>> os.environ['MKL_NUM_THREADS'] = '1' >>> import numpy as np >>> from sklearn.datasets import make_regression >>> from nrt.stats import nanlstsq @@ -46,19 +59,15 @@ def nanlstsq(X, y): np.ndarray: Least-squares solution, ignoring ``Nan`` """ beta = np.zeros((X.shape[1], y.shape[1]), dtype=np.float64) - for idx in range(y.shape[1]): + for idx in numba.prange(y.shape[1]): # subset y and X isna = np.isnan(y[:,idx]) X_sub = X[~isna] y_sub = y[~isna,idx] - # Compute beta on data subset - XTX = np.linalg.inv(np.dot(X_sub.T, X_sub)) - XTY = np.dot(X_sub.T, y_sub) - beta[:,idx] = np.dot(XTX, XTY) + beta[:, idx] = np.linalg.solve(np.dot(X_sub.T, X_sub), np.dot(X_sub.T, y_sub)) return beta - @numba.jit(nopython=True, cache=True) def mad(resid, c=0.6745): """Returns Median-Absolute-Deviation (MAD) for residuals diff --git a/nrt/utils_efp.py b/nrt/utils_efp.py index cadc8e3..3453474 100644 --- a/nrt/utils_efp.py +++ b/nrt/utils_efp.py @@ -215,6 +215,7 @@ def _cusum_rec_sctest(x): @numba.jit(nopython=True, cache=True) def _recresid(X, y, span): """ Return standardized recursive residuals for y ~ X + Args: X ((M, N) np.ndarray): Matrix of independant variables y ((M, K) np.ndarray): Matrix of dependant variables