-
Notifications
You must be signed in to change notification settings - Fork 203
Home
Welcome to the spark-perf
wiki. This page lists several useful scripts, helper functions, and analysis tools for running spark-perf
tests
In config.py
:
import os
SPARK_COMMIT_ID = os.environ["SPARK_COMMIT_ID"]
To run against multiple commits, use a shell script to repeatedly call bin/run
with different environment variables:
#!/usr/bin/env bash
FWDIR="$(cd "`dirname "$0"`"; pwd)"
if [ "$#" -ne 4 ]; then
echo "Test every Nth commit between two commits"
echo "Usage: oldest_commit newest_commit N config_file"
exit 1
fi
oldestCommit="$1"
newestCommit="$2"
NR="$3"
configFile="$4"
# Find the SHAs of every NRth commit between two git commits:
cd "$FWDIR/spark-build-cache/master"
git fetch
versions=($(git log --oneline "$oldestCommit..$newestCommit" | awk "NR == 0 || NR % $NR == 0" | cut -d ' ' -f1))
versions+=("$oldestCommit")
versions+=("$newestCommit")
cd "$FWDIR"
echo "Going to test against ${#versions[@]} commits using config file $configFile"
echo "Commits (every ${NR}th commit between $newestCommit and $oldestCommit): ${versions[@]}"
read -p "Confirm? " -n 1 -r
echo
if [[ ! $REPLY =~ ^[Yy]$ ]]
then
echo "Exiting!"
exit 1
fi
for version in ${versions[@]}
do
echo "Testing against commit $version"
export SPARK_COMMIT_ID="$version"
./bin/run --config $configFile
done
Upgrade to a newer version of the aws
tool and configure AWS credentials:
sudo easy_install --upgrade awscli
aws configure
# AWS Access Key ID [None]: AKIAIOSFODNN7EXAMPLE
# AWS Secret Access Key [None]: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
# Default region name [None]:
# Default output format [None]:
Sync the results folder to an S3 bucket:
aws s3 cp --recursive /local/path/to/results/directory s3://bucket-name/resultsdir/
Many test suites report their results as JSON objects that contain information on the test environment and configuration, such as the Spark version / commit SHA, the contents of SparkConf, environment variables, etc. Using Spark SQL, we can create a SchemaRDD from these JSON objects in order to analyze test results.
When running spark-perf
, it will produce a results
directory that contains one subdirectory per run of bin/run
. Each subdirectory contains several log files; we're interested in the *.out
files that contain test results. First, list the paths of those files:
-
In Databricks Cloud:
Note: this step is only relevant to Databricks employees; the rest of this tutorial should work in any environment, though:
import dbutils.fs val resultsDir = "/mount/path/in/dbfs" fs.mount("s3n://AWSKEY:AWSSECRETKEY@bucket-name/bucket-filder/", resultsDir) val logFiles: Seq[String] = for ( logDir: String <- fs.ls(resultsDir).filter(_.name.endsWith("_logs/")).map(d => s"$resultsDir/${d.name}"); logFiles: Seq[String] = fs.ls(logDir).map(f => s"$logDir/${f.name}").filter(_.endsWith(".out")); logFile: String <- logFiles ) yield logFile
-
On a local filesystem:
import java.io.File val resultsDir = "/mount/path/in/filesystem" val logFiles: Seq[String] = for ( logDir: String <- new File(resultsDir).list.filter(_.endsWith("_logs")).map(d => s"$resultsDir/${d}"); logFiles: Seq[String] = new File(logDir).list.map(f => s"$logDir/${f}").filter(_.endsWith(".out")); logFile: String <- logFiles ) yield logFile
Next, create an RDD that loads these files, extracts the results lines (which start with "result: "), and grabs the JSON objects (which are written on a single line):
import org.apache.spark.rdd.RDD
val allLogLines: RDD[String] = sc.union(logFiles.map(f => sc.textFile(f)))
val allResultJsons = allLogLines.filter(_.startsWith("results:")).map(_.stripPrefix("results: "))
Using SparkSQL, create a SchemaRDD from these logs. In the JSON output that spark-perf
writes, some properties have names that contain dots (such as Java system properties like spark.shuffle.manager
). Spark SQL does not currently support column names with dots (see SPARK-2775), so we'll need to post-process the inferred schema to convert dots into underscores:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
import sqlContext.applySchema
import org.apache.spark.sql._
/** Replaces . in column names with _ (underscore) */
def cleanSchema(dataType: DataType): DataType = dataType match {
case StructType(fields) =>
StructType(
fields.map(f =>
f.copy(name = f.name.replaceAllLiterally(".", "_"), dataType = cleanSchema(f.dataType))))
case ArrayType(elemType, nullable) => ArrayType(cleanSchema(elemType), nullable)
case other => other
}
/** Replaces . in column names with _ */
def cleanSchema(schemaRDD: SchemaRDD): SchemaRDD = {
applySchema(schemaRDD, cleanSchema(schemaRDD.schema).asInstanceOf[StructType])
}
val resultsRdd = cleanSchema(sqlContext.jsonRDD(allResultJsons))
resultsRdd.registerTempTable("results")
Now, we have a SQL table with a schema that looks something like this:
root
|-- options: struct (nullable = true)
| |-- closure-size: string (nullable = true)
| |-- inter-trial-wait: string (nullable = true)
| |-- key-length: string (nullable = true)
| |-- num-jobs: string (nullable = true)
| |-- num-partitions: string (nullable = true)
| |-- num-records: string (nullable = true)
[...]
|-- results: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- time: double (nullable = true)
|-- sparkConf: struct (nullable = true)
| |-- spark_app_id: string (nullable = true)
| |-- spark_app_name: string (nullable = true)
| |-- spark_driver_host: string (nullable = true)
| |-- spark_driver_memory: string (nullable = true)
| |-- spark_driver_port: string (nullable = true)
[...]
|-- sparkVersion: string (nullable = true)
|-- systemProperties: struct (nullable = true)
| |-- SPARK_SUBMIT: string (nullable = true)
| |-- awt_toolkit: string (nullable = true)
| |-- file_encoding: string (nullable = true)
[...]
|-- testName: string (nullable = true)
In this schema, results
is an array of per-test-run times (or other outcome metrics); the rest of the schema describes the configuration used by those runs. To analyze these results using SQL, we can use Hive's lateral view explode
to un-nest this data and produce one row per test run. For example
SELECT result.time, results.systemProperties.spark_shuffle_blockTransferService as bts, testName, options, sparkVersion, results.systemProperties.sparkperf_commitSHA as commit
FROM results LATERAL VIEW EXPLODE(results) r as result
WHERE testName = "count";