-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1757] Add retry when sending RPC to LifecycleManager #3008
base: main
Are you sure you want to change the base?
Conversation
docs/configuration/network.md
Outdated
@@ -29,7 +29,7 @@ license: | | |||
| celeborn.<module>.io.enableVerboseMetrics | false | false | Whether to track Netty memory detailed metrics. If true, the detailed metrics of Netty PoolByteBufAllocator will be gotten, otherwise only general memory usage will be tracked. | | | | |||
| celeborn.<module>.io.lazyFD | true | false | Whether to initialize FileDescriptor lazily or not. If true, file descriptors are created only when data is going to be transferred. This can reduce the number of open files. If setting <module> to `fetch`, it works for worker fetch server. | | | | |||
| celeborn.<module>.io.maxRetries | 3 | false | Max number of times we will try IO exceptions (such as connection timeouts) per request. If set to 0, we will not do any retries. If setting <module> to `data`, it works for shuffle client push and fetch data. If setting <module> to `replicate`, it works for replicate client of worker replicating data to peer worker. If setting <module> to `push`, it works for Flink shuffle client push data. | | | | |||
| celeborn.<module>.io.mode | EPOLL | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | | |||
| celeborn.<module>.io.mode | NIO | false | Netty EventLoopGroup backend, available options: NIO, EPOLL. If epoll mode is available, the default IO mode is EPOLL; otherwise, the default is NIO. | | | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @SteNicholas Seems the doc generation depends on the developer environment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can not pass the GA, need to revert it.
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3008 +/- ##
==========================================
- Coverage 32.88% 32.55% -0.33%
==========================================
Files 331 336 +5
Lines 19800 20102 +302
Branches 1780 1800 +20
==========================================
+ Hits 6510 6542 +32
- Misses 12929 13195 +266
- Partials 361 365 +4 ☔ View full report in Codecov by Sentry. |
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
initDataClientFactoryIfNeeded(); | ||
} | ||
|
||
public <T> T callLifecycleManagerWithTimeoutRetry(Callable<T> callable, String name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of making changes everywhere - do we want to simply change askSync/askAsync to become retry aware ? With number of retries passed in as a param (for specific cases where we dont want retries for ex) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree to change askSync/askAsync.
There are a lot of exception changes caused by that the setupLifecycleManagerRef
will throws RpcTimeoutExceptions
which we need to catch. I change the Exception type to RuntimeException
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, I wonder that we can introduce two config items.
celeborn.rpc.retryWait
for the default retry wait.
celeborn.client.rpc.retryWait
for the client specific.
cc @pan3793
@@ -4884,6 +4885,14 @@ object CelebornConf extends Logging { | |||
.timeConf(TimeUnit.MILLISECONDS) | |||
.createWithDefaultString("3s") | |||
|
|||
val RPC_TIMEOUT_RETRY_WAIT: ConfigEntry[Long] = | |||
buildConf("celeborn.rpc.retryWait") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val RPC_RETRY_WAIT
And you can move this config to celeborn.rpc
part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder that you can introduce a new config celeborn.client.rpc.retryWait
for client end.
@@ -30,6 +33,7 @@ abstract class RpcEndpointRef(conf: CelebornConf) | |||
extends Serializable with Logging { | |||
|
|||
private[this] val defaultAskTimeout = conf.rpcAskTimeout | |||
private[celeborn] val waitTimeBound = conf.rpcTimeoutRetryWaitMs.toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[this] val defaultRetryWait
@@ -104,6 +106,7 @@ object RpcEnv { | |||
abstract class RpcEnv(config: RpcEnvConfig) { | |||
|
|||
private[celeborn] val defaultLookupTimeout = config.conf.rpcLookupTimeout | |||
private[celeborn] val waitTimeBound = config.conf.rpcTimeoutRetryWaitMs.toInt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private[celeborn] val defaultRetryWait
* @tparam T type of the reply message | ||
* @return the reply message from the corresponding [[RpcEndpoint]] | ||
*/ | ||
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout, retryCount: Int, retryWait: Long)
common/src/main/scala/org/apache/celeborn/common/rpc/RpcEnv.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, only nit comments
} | ||
} | ||
// should never be here | ||
val future = ask[T](message, timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about return null directly?
null
* loop of [[RpcEndpoint]]. | ||
* | ||
* @param message the message to send | ||
* @tparam T type of the reply message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add doc for other params retryCount
and retryWait
What changes were proposed in this pull request?
Retry seding RPC to LifecycleManager when TimeoutException.
Why are the changes needed?
RPC messages are processed by
Dispatcher.threadpool
which its numThreads depends onnumUsableCores
.In some cases (k8s) the numThreads of LifecycleManager are not enough while the RPCs are a lot so there are TimeoutExceptions.
Add retry when there are TimeoutExceptions.
Does this PR introduce any user-facing change?
No.
Another way is to adjust the configuration
celeborn.lifecycleManager.rpc.dispatcher.threads
to add the numThreads.This way is more affective.
How was this patch tested?
Cluster testing.