is an external shuffle service that serves shuffle blocks from outside an Executor process. It runs as a standalone application and manages shuffle output files so they are available for executors at all time. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down.
You start ExternalShuffleService
using start-shuffle-service.sh
shell script and enable its use by the driver and executors using spark.shuffle.service.enabled.
There is a custom external shuffle service for Spark on YARN — YarnShuffleService. |
Enable Add the following line to
Refer to Logging. |
shell script allows you to launch ExternalShuffleService
. The script is under sbin
When executed, it runs sbin/spark-config.sh
and bin/load-spark-env.sh
shell scripts. It then executes sbin/spark-daemon.sh
with start
command and the parameters: org.apache.spark.deploy.ExternalShuffleService
and 1
$ ./sbin/start-shuffle-service.sh starting org.apache.spark.deploy.ExternalShuffleService, logging to ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out $ tail -f ...logs/spark-jacek-org.apache.spark.deploy.ExternalShuffleService-1-japila.local.out Spark Command: /Library/Java/JavaVirtualMachines/Current/Contents/Home/bin/java -cp /Users/jacek/dev/oss/spark/conf/:/Users/jacek/dev/oss/spark/assembly/target/scala-2.11/jars/* -Xmx1g org.apache.spark.deploy.ExternalShuffleService ======================================== Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/06/07 08:02:02 INFO ExternalShuffleService: Started daemon with process name: [email protected] 16/06/07 08:02:03 INFO ExternalShuffleService: Starting shuffle service on port 7337 with useSasl = false
You can also use spark-class to launch
When started, it executes Utils.initDaemon(log)
FIXME Utils.initDaemon(log) ? See spark-submit.
It loads default Spark properties and creates a SecurityManager
It sets spark.shuffle.service.enabled to true
(as later it is checked whether it is enabled or not).
A shutdown hook is registered so when ExternalShuffleService
is shut down, it prints the following INFO message to the logs and the stop method is executed.
INFO ExternalShuffleService: Shutting down shuffle service.
Enable Add the following line to
Refer to Logging. |
You should see the following INFO message in the logs:
INFO ExternalShuffleBlockResolver: Registered executor [AppExecId] with [executorInfo]
You should also see the following messages when a SparkContext
is closed:
INFO ExternalShuffleBlockResolver: Application [appId] removed, cleanupLocalDirs = [cleanupLocalDirs]
INFO ExternalShuffleBlockResolver: Cleaning up executor [AppExecId]'s [executor.localDirs.length] local dirs
DEBUG ExternalShuffleBlockResolver: Successfully cleaned up directory: [localDir]
requires a SparkConf and SecurityManager.
When created, it reads spark.shuffle.service.enabled (disabled by default) and spark.shuffle.service.port (defaults to 7337
) configuration settings. It also checks whether authentication is enabled.
FIXME Review securityManager.isAuthenticationEnabled()
It then creates a TransportConf (as transportConf
It creates a ExternalShuffleBlockHandler (as blockHandler
) and TransportContext
(as transportContext
FIXME TransportContext? |
No internal TransportServer
(as server
) is created.
start(): Unit
starts a ExternalShuffleService
When start
is executed, you should see the following INFO message in the logs:
INFO ExternalShuffleService: Starting shuffle service on port [port] with useSasl = [useSasl]
If useSasl
is enabled, a SaslServerBootstrap
is created.
FIXME SaslServerBootstrap? |
The internal server
reference (a TransportServer
) is created (which will attempt to bind to port
stop(): Unit
closes the internal server
reference and clears it (i.e. sets it to null
is a RpcHandler
(i.e. a handler for sendRPC()
messages sent by TransportClients
When created, ExternalShuffleBlockHandler
requires a OneForOneStreamManager
and TransportConf with a registeredExecutorFile
to create a ExternalShuffleBlockResolver
Enable Add the following line to
Refer to Logging. |
BlockTransferMessage msgObj,
TransportClient client,
RpcResponseCallback callback)
handles two types of BlockTransferMessage
For any other BlockTransferMessage
message it throws a UnsupportedOperationException
Unexpected message: [msgObj]
OpenBlocks(String appId, String execId, String[] blockIds)
When OpenBlocks
is received, handleMessage authorizes the client
FIXME checkAuth ?
It then gets block data for each block id in blockIds
(using ExternalShuffleBlockResolver).
Finally, it registers a stream and does callback.onSuccess
with a serialized byte buffer (for the streamId
and the number of blocks in msg
FIXME callback.onSuccess ?
You should see the following TRACE message in the logs:
TRACE Registered streamId [streamId] with [length] buffers for client [clientId] from host [remoteAddress]
ManagedBuffer getBlockData(String appId, String execId, String blockId)
parses blockId
(in the format of shuffle_[shuffleId]_[mapId]_[reduceId]
) and returns the FileSegmentManagedBuffer
that corresponds to shuffle_[shuffleId]_[mapId]_0.data
splits blockId
to 4 parts using _
(underscore). It works exclusively with shuffle
block ids with the other three parts being shuffleId
, mapId
, and reduceId
It looks up an executor (i.e. a ExecutorShuffleInfo
in executors
private registry) for appId
and execId
to search for a ManagedBuffer.
The ManagedBuffer
is indexed using a binary file shuffle_[shuffleId]_[mapId]_0.index
(that contains offset and length of the buffer) with a data file being shuffle_[shuffleId]_[mapId]_0.data
(that is returned as FileSegmentManagedBuffer
It throws a IllegalArgumentException
for block ids with less than four parts:
Unexpected block id format: [blockId]
or for non-shuffle
block ids:
Expected shuffle block id, got: [blockId]
It throws a RuntimeException
when no ExecutorShuffleInfo
could be found.
Executor is not registered (appId=[appId], execId=[execId])"
flag (default: false
) controls whether the External Shuffle Service is used or not. When enabled (true
), the driver registers with the shuffle service.
has to be enabled for dynamic allocation of executors.
It is used in CoarseMesosSchedulerBackend to instantiate MesosExternalShuffleClient
It is explicitly disabled for LocalSparkCluster
(and any attempts to set it will fall short).