Skip to content

Commit

Permalink
Merge pull request #520 from NVIDIA/branch-23.10
Browse files Browse the repository at this point in the history
patch for 23.10 release [skip ci]
  • Loading branch information
YanxuanLiu authored Nov 9, 2023
2 parents f6fc5b8 + 8f7cb6a commit 5f77d4b
Show file tree
Hide file tree
Showing 23 changed files with 261 additions and 37 deletions.
44 changes: 44 additions & 0 deletions docs/site/performance.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
---
title: Performance Tuning
nav_order: 6
---
# Performance Tuning

* TOC
{:toc}

## Stage-level scheduling

Starting from spark-rapids-ml `23.10.0`, stage-level scheduling is automatically enabled.
Therefore, if you are using Spark **standalone** cluster version **`3.4.0`** or higher, we strongly recommend
configuring the `"spark.task.resource.gpu.amount"` as a fractional value. This will
enable running multiple tasks in parallel during the ETL phase to help the performance. An example configuration
would be `"spark.task.resource.gpu.amount=1/spark.executor.cores"`. For example,

``` bash
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=0.08 \
...
```

The above spark-submit command specifies a request for 1 GPU and 12 CPUs per executor. So you can see,
a total of 12 tasks per executor will be executed concurrently during the ETL phase. And the stage-level scheduling
is then used internally to the library to automatically carry out the ML training phases using the required 1 gpu per task.

However, if you are using a spark-rapids-ml version earlier than 23.10.0 or a Spark
standalone cluster version below 3.4.0, you need to make sure there will be only 1 task running at any time per executor.
You can set `spark.task.cpus` equal to `spark.executor.cores`, or `"spark.task.resource.gpu.amount"=1`. For example,

``` bash
spark-submit \
--master spark://<master-ip>:7077 \
--conf spark.executor.cores=12 \
--conf spark.task.cpus=1 \
--conf spark.executor.resource.gpu.amount=1 \
--conf spark.task.resource.gpu.amount=1 \
...
```
6 changes: 3 additions & 3 deletions jvm/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ the _project root path_ with:
cd jvm
mvn clean package
```
Then `rapids-4-spark-ml_2.12-23.08.0-SNAPSHOT.jar` will be generated under `target` folder.
Then `rapids-4-spark-ml_2.12-23.10.0-SNAPSHOT.jar` will be generated under `target` folder.
Users can also use the _release_ version spark-rapids plugin as the dependency if it's already been
released in public maven repositories, see [rapids-4-spark maven repository](https://mvnrepository.com/artifact/com.nvidia/rapids-4-spark)
Expand All @@ -94,8 +94,8 @@ repository, usually in your `~/.m2/repository`.
Add the artifact jar to the Spark, for example:
```bash
ML_JAR="target/rapids-4-spark-ml_2.12-23.08.0-SNAPSHOT.jar"
PLUGIN_JAR="~/.m2/repository/com/nvidia/rapids-4-spark_2.12/23.08.2-SNAPSHOT/rapids-4-spark_2.12-23.08.2-SNAPSHOT.jar"
ML_JAR="target/rapids-4-spark-ml_2.12-23.10.0-SNAPSHOT.jar"
PLUGIN_JAR="~/.m2/repository/com/nvidia/rapids-4-spark_2.12/23.10.0/rapids-4-spark_2.12-23.10.0.jar"
$SPARK_HOME/bin/spark-shell --master $SPARK_MASTER \
--driver-memory 20G \
Expand Down
4 changes: 2 additions & 2 deletions jvm/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark-ml_2.12</artifactId>
<version>23.08.0-SNAPSHOT</version>
<version>23.10.0-SNAPSHOT</version>
<name>RAPIDS Accelerator for Apache Spark ML</name>
<description>The RAPIDS cuML library for Apache Spark</description>
<inceptionYear>2021</inceptionYear>
Expand Down Expand Up @@ -93,7 +93,7 @@
<dependency>
<groupId>com.nvidia</groupId>
<artifactId>rapids-4-spark_2.12</artifactId>
<version>23.08.0</version>
<version>23.10.0</version>
</dependency>


Expand Down
4 changes: 1 addition & 3 deletions notebooks/databricks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ If you already have a Databricks account, you can run the example notebooks on a
- downloads and installs the [Spark-Rapids](https://github.com/NVIDIA/spark-rapids) plugin for accelerating data loading and Spark SQL.
- installs various `cuXX` dependencies via pip.

**Note**: as of the last update of this README, Azure Databricks requires a CUDA driver forward compatibility package. Uncomment the designated lines for this in the init script. AWS Databricks does not need this and leave the lines commented in that case.

- Copy the modified `init-pip-cuda-11.8.sh` init script to your *workspace* (not DBFS) (ex. workspace directory: /Users/< databricks-user-name >/init_scripts).
```bash
export WS_SAVE_DIR="/path/to/directory/in/workspace"
Expand All @@ -46,7 +44,7 @@ If you already have a Databricks account, you can run the example notebooks on a
spark.task.resource.gpu.amount 1
spark.databricks.delta.preview.enabled true
spark.python.worker.reuse true
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-23.08.2.jar:/databricks/spark/python
spark.executorEnv.PYTHONPATH /databricks/jars/rapids-4-spark_2.12-23.10.0.jar:/databricks/spark/python
spark.sql.execution.arrow.maxRecordsPerBatch 100000
spark.rapids.memory.gpu.minAllocFraction 0.0001
spark.plugins com.nvidia.spark.SQLPlugin
Expand Down
16 changes: 2 additions & 14 deletions notebooks/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,17 @@
# set portion of path below after /dbfs/ to dbfs zip file location
SPARK_RAPIDS_ML_ZIP=/dbfs/path/to/zip/file
# IMPORTANT: specify RAPIDS_VERSION fully 23.10.0 and not 23.10
# also RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# also in general, RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.2)
RAPIDS_VERSION=23.10.0
SPARK_RAPIDS_VERSION=23.08.2
SPARK_RAPIDS_VERSION=23.10.0

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

# install cudatoolkit 11.8 via runfile approach
wget https://developer.download.nvidia.com/compute/cuda/11.8.0/local_installers/cuda_11.8.0_520.61.05_linux.run
sh cuda_11.8.0_520.61.05_linux.run --silent --toolkit

# install forward compatibility package due to old driver
# uncomment below lines on Azure Databricks
# distro=ubuntu2004
# arch=x86_64
# apt-key del 7fa2af80
# wget https://developer.download.nvidia.com/compute/cuda/repos/$distro/$arch/cuda-keyring_1.0-1_all.deb
# dpkg -i cuda-keyring_1.0-1_all.deb
# apt-get update
# apt-get install -y cuda-compat-11-8
# export LD_LIBRARY_PATH=/usr/local/cuda/compat:/usr/local/cuda/lib64
# ldconfig

# reset symlink and update library loading paths
rm /usr/local/cuda
ln -s /usr/local/cuda-11.8 /usr/local/cuda
Expand Down
6 changes: 3 additions & 3 deletions python/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ For simplicity, the following instructions just use Spark local mode, assuming a

First, install RAPIDS cuML per [these instructions](https://rapids.ai/start.html). Example for CUDA Toolkit 11.8:
```bash
conda create -n rapids-23.08 \
conda create -n rapids-23.10 \
-c rapidsai -c conda-forge -c nvidia \
cuml=23.08 python=3.9 cuda-version=11.8
cuml=23.10 python=3.9 cuda-version=11.8
```

**Note**: while testing, we recommend using conda or docker to simplify installation and isolate your environment while experimenting. Once you have a working environment, you can then try installing directly, if necessary.
Expand All @@ -19,7 +19,7 @@ conda create -n rapids-23.08 \

Once you have the conda environment, activate it and install the required packages.
```bash
conda activate rapids-23.08
conda activate rapids-23.10

## for development access to notebooks, tests, and benchmarks
git clone --branch main https://github.com/NVIDIA/spark-rapids-ml.git
Expand Down
2 changes: 1 addition & 1 deletion python/benchmark/databricks/gpu_cluster_spec.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cat <<EOF
"spark.task.cpus": "1",
"spark.databricks.delta.preview.enabled": "true",
"spark.python.worker.reuse": "true",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-23.08.2.jar:/databricks/spark/python",
"spark.executorEnv.PYTHONPATH": "/databricks/jars/rapids-4-spark_2.12-23.10.0.jar:/databricks/spark/python",
"spark.sql.files.minPartitionNum": "2",
"spark.sql.execution.arrow.maxRecordsPerBatch": "10000",
"spark.executor.cores": "8",
Expand Down
4 changes: 2 additions & 2 deletions python/benchmark/databricks/init-pip-cuda-11.8.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
SPARK_RAPIDS_ML_ZIP=/dbfs/path/to/spark-rapids-ml.zip
BENCHMARK_ZIP=/dbfs/path/to/benchmark.zip
# IMPORTANT: specify rapids fully 23.10.0 and not 23.10
# also RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# also, in general, RAPIDS_VERSION (python) fields should omit any leading 0 in month/minor field (i.e. 23.8.0 and not 23.08.0)
# while SPARK_RAPIDS_VERSION (jar) should have leading 0 in month/minor (e.g. 23.08.2 and not 23.8.1)
RAPIDS_VERSION=23.10.0
SPARK_RAPIDS_VERSION=23.08.2
SPARK_RAPIDS_VERSION=23.10.0

curl -L https://repo1.maven.org/maven2/com/nvidia/rapids-4-spark_2.12/${SPARK_RAPIDS_VERSION}/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}-cuda11.jar -o /databricks/jars/rapids-4-spark_2.12-${SPARK_RAPIDS_VERSION}.jar

Expand Down
2 changes: 1 addition & 1 deletion python/run_benchmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ EOF

if [[ $cluster_type == "gpu_etl" ]]
then
SPARK_RAPIDS_VERSION=23.08.2
SPARK_RAPIDS_VERSION=23.10.0
rapids_jar=${rapids_jar:-rapids-4-spark_2.12-$SPARK_RAPIDS_VERSION.jar}
if [ ! -f $rapids_jar ]; then
echo "downloading spark rapids jar"
Expand Down
10 changes: 9 additions & 1 deletion python/src/spark_rapids_ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from abc import ABCMeta
from typing import (
TYPE_CHECKING,
Any,
Expand All @@ -27,6 +28,7 @@
cast,
)

import pyspark
from pyspark.ml.common import _py2java
from pyspark.ml.evaluation import Evaluator, MulticlassClassificationEvaluator

Expand Down Expand Up @@ -298,6 +300,9 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]:
mapping = super()._param_mapping()
return mapping

def _pyspark_class(self) -> Optional[ABCMeta]:
return pyspark.ml.classification.RandomForestClassifier


class RandomForestClassifier(
_RandomForestClassifierClass,
Expand Down Expand Up @@ -664,7 +669,7 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]:
def _param_value_mapping(
cls,
) -> Dict[str, Callable[[Any], Union[None, str, float, int]]]:
return {"C": lambda x: 1 / x if x != 0.0 else 0.0}
return {"C": lambda x: 1 / x if x > 0.0 else (0.0 if x == 0.0 else None)}

def _get_cuml_params_default(self) -> Dict[str, Any]:
return {
Expand Down Expand Up @@ -704,6 +709,9 @@ def _reg_params_value_mapping(

return (penalty, C, l1_ratio)

def _pyspark_class(self) -> Optional[ABCMeta]:
return pyspark.ml.classification.LogisticRegression


class _LogisticRegressionCumlParams(
_CumlParams,
Expand Down
5 changes: 5 additions & 0 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
# limitations under the License.
#

from abc import ABCMeta
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast

import numpy as np
import pandas as pd
import pyspark
from pyspark import keyword_only
from pyspark.ml.clustering import KMeansModel as SparkKMeansModel
from pyspark.ml.clustering import _KMeansParams
Expand Down Expand Up @@ -92,6 +94,9 @@ def _get_cuml_params_default(self) -> Dict[str, Any]:
"max_samples_per_batch": 32768,
}

def _pyspark_class(self) -> Optional[ABCMeta]:
return pyspark.ml.clustering.KMeans


class _KMeansCumlParams(_CumlParams, _KMeansParams, HasFeaturesCols):
"""
Expand Down
39 changes: 38 additions & 1 deletion python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import os
import threading
from abc import abstractmethod
from abc import ABCMeta, abstractmethod
from collections import namedtuple
from typing import (
TYPE_CHECKING,
Expand Down Expand Up @@ -56,6 +56,7 @@
MLWritable,
MLWriter,
)
from pyspark.ml.wrapper import JavaParams
from pyspark.sql import Column, DataFrame
from pyspark.sql.functions import col, struct
from pyspark.sql.pandas.functions import pandas_udf
Expand Down Expand Up @@ -301,6 +302,16 @@ def _initialize_cuml_logging(verbose: Optional[Union[bool, int]]) -> None:

cuml_logger.set_level(log_level)

def _pyspark_class(self) -> Optional[ABCMeta]:
"""
Subclass should override to return corresponding pyspark.ml class
Ex. logistic regression should return pyspark.ml.classification.LogisticRegression
Return None if no corresponding class in pyspark, e.g. knn
"""
raise NotImplementedError(
"pyspark.ml class corresponding to estimator not specified."
)


class _CumlCaller(_CumlParams, _CumlCommon):
"""
Expand Down Expand Up @@ -419,6 +430,31 @@ def _require_nccl_ucx(self) -> Tuple[bool, bool]:
"""
return (True, False)

def _validate_parameters(self) -> None:
cls_name = self._pyspark_class()

if cls_name is not None:
pyspark_est = cls_name()
# Both pyspark and cuml may have a parameter with the same name,
# but cuml might have additional optional values that can be set.
# If we transfer these cuml-specific values to the Spark JVM,
# it would result in an exception.
# To avoid this issue, we skip transferring these parameters
# since the mapped parameters have been validated in _get_cuml_mapping_value.
cuml_est = self.copy()
cuml_params = cuml_est._param_value_mapping().keys()
param_mapping = cuml_est._param_mapping()
pyspark_params = [k for k, v in param_mapping.items() if v in cuml_params]
for p in pyspark_params:
cuml_est.clear(cuml_est.getParam(p))

cuml_est._copyValues(pyspark_est)
# validate the parameters
pyspark_est._transfer_params_to_java()

del pyspark_est
del cuml_est

@abstractmethod
def _get_cuml_fit_func(
self,
Expand Down Expand Up @@ -468,6 +504,7 @@ def _call_cuml_fit_func(
:class:`Transformer`
fitted model
"""
self._validate_parameters()

cls = self.__class__

Expand Down
5 changes: 5 additions & 0 deletions python/src/spark_rapids_ml/feature.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
#

import itertools
from abc import ABCMeta
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
import pyspark
from pyspark import keyword_only
from pyspark.ml.common import _py2java
from pyspark.ml.feature import PCAModel as SparkPCAModel
Expand Down Expand Up @@ -69,6 +71,9 @@ def _get_cuml_params_default(self) -> Dict[str, Any]:
"whiten": False,
}

def _pyspark_class(self) -> Optional[ABCMeta]:
return pyspark.ml.feature.PCA


class _PCACumlParams(_CumlParams, _PCAParams, HasInputCols):
"""
Expand Down
4 changes: 4 additions & 0 deletions python/src/spark_rapids_ml/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#

import asyncio
from abc import ABCMeta
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

import numpy as np
Expand Down Expand Up @@ -68,6 +69,9 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]:
def _get_cuml_params_default(self) -> Dict[str, Any]:
return {"n_neighbors": 5, "verbose": False, "batch_size": 2000000}

def _pyspark_class(self) -> Optional[ABCMeta]:
return None


class _NearestNeighborsCumlParams(_CumlParams, HasInputCol, HasLabelCol, HasInputCols):
"""
Expand Down
11 changes: 10 additions & 1 deletion python/src/spark_rapids_ml/params.py
Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,16 @@ def _set_cuml_param(

if cuml_param is not None:
# if Spark Param is mapped to cuML parameter, set cuml_params
self._set_cuml_value(cuml_param, spark_value)
try:
self._set_cuml_value(cuml_param, spark_value)
except ValueError:
# create more informative message
param_ref_str = (
cuml_param + " or " + spark_param
if cuml_param != spark_param
else spark_param
)
raise ValueError(f"{param_ref_str} given invalid value {spark_value}")

def _get_cuml_mapping_value(self, k: str, v: Any) -> Any:
value_map = self._param_value_mapping()
Expand Down
Loading

0 comments on commit 5f77d4b

Please sign in to comment.