Skip to content

Commit

Permalink
rename config
Browse files Browse the repository at this point in the history
  • Loading branch information
jiang13021 committed Dec 18, 2024
1 parent 977ef8f commit 2eef55a
Show file tree
Hide file tree
Showing 7 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public class ShuffleClientImpl extends ShuffleClient {
JavaUtils.newConcurrentHashMap();
private boolean pushReplicateEnabled;
private boolean fetchExcludeWorkerOnFailureEnabled;
private boolean clientPushChecksumHeaderEnabled;
private boolean clientShuffleChecksumEnabled;

private final ExecutorService pushDataRetryPool;

Expand Down Expand Up @@ -184,7 +184,7 @@ public ShuffleClientImpl(String appUniqueId, CelebornConf conf, UserIdentifier u
shuffleCompressionEnabled = !conf.shuffleCompressionCodec().equals(CompressionCodec.NONE);
pushReplicateEnabled = conf.clientPushReplicateEnabled();
fetchExcludeWorkerOnFailureEnabled = conf.clientFetchExcludeWorkerOnFailureEnabled();
clientPushChecksumHeaderEnabled = conf.clientPushChecksumHeaderEnabled();
clientShuffleChecksumEnabled = conf.clientShuffleChecksumEnabled();
if (conf.clientPushReplicateEnabled()) {
pushDataTimeout = conf.pushDataTimeoutMs() * 2;
} else {
Expand Down Expand Up @@ -990,14 +990,14 @@ public int pushOrMergeData(
}

final int headerSize;
if (clientPushChecksumHeaderEnabled) {
if (clientShuffleChecksumEnabled) {
headerSize = PushDataHeaderUtils.BATCH_HEADER_SIZE;
} else {
headerSize = PushDataHeaderUtils.BATCH_HEADER_SIZE_WITHOUT_CHECKSUM;
}
final byte[] body = new byte[headerSize + length];
PushDataHeaderUtils.buildDataHeader(
body, mapId, attemptId, nextBatchId, length, clientPushChecksumHeaderEnabled);
body, mapId, attemptId, nextBatchId, length, clientShuffleChecksumEnabled);
System.arraycopy(data, offset, body, headerSize, length);

if (doPush) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
private int partitionId;
private ExceptionMaker exceptionMaker;
private boolean closed = false;
private boolean checksumHeaderEnable;
private boolean checksumEnabled;

CelebornInputStreamImpl(
CelebornConf conf,
Expand Down Expand Up @@ -226,8 +226,8 @@ private static final class CelebornInputStreamImpl extends CelebornInputStream {
init();
firstChunk = false;
}
this.checksumHeaderEnable = conf.clientPushChecksumHeaderEnabled();
if (checksumHeaderEnable) {
this.checksumEnabled = conf.clientShuffleChecksumEnabled();
if (checksumEnabled) {
sizeBuf = new byte[PushDataHeaderUtils.BATCH_HEADER_SIZE];
} else {
sizeBuf = new byte[PushDataHeaderUtils.BATCH_HEADER_SIZE_WITHOUT_CHECKSUM];
Expand Down Expand Up @@ -593,7 +593,7 @@ private boolean fillBuffer() throws IOException {
boolean hasData = false;
while (currentChunk.isReadable() || moveToNextChunk()) {
currentChunk.readBytes(sizeBuf);
if (checksumHeaderEnable && !PushDataHeaderUtils.checkHeaderChecksum32(sizeBuf)) {
if (checksumEnabled && !PushDataHeaderUtils.checkHeaderChecksum32(sizeBuf)) {
throw new CelebornIOException("Data Corrupted: checksum not match");
}
int mapId = PushDataHeaderUtils.getMapId(sizeBuf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def clientPushSendBufferPoolExpireTimeout: Long = get(CLIENT_PUSH_SENDBUFFERPOOL_EXPIRETIMEOUT)
def clientPushSendBufferPoolExpireCheckInterval: Long =
get(CLIENT_PUSH_SENDBUFFERPOOL_CHECKEXPIREINTERVAL)
def clientPushChecksumHeaderEnabled: Boolean = get(CLIENT_PUSH_CHECKSUM_HEADER_ENABLED)

// //////////////////////////////////////////////////////
// Client Shuffle //
Expand Down Expand Up @@ -1086,6 +1085,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def registerShuffleFilterExcludedWorkerEnabled: Boolean =
get(REGISTER_SHUFFLE_FILTER_EXCLUDED_WORKER_ENABLED)
def reviseLostShufflesEnabled: Boolean = get(REVISE_LOST_SHUFFLES_ENABLED)
def clientShuffleChecksumEnabled: Boolean = get(CLIENT_SHUFFLE_CHECKSUM_ENABLED)

// //////////////////////////////////////////////////////
// Worker //
Expand Down Expand Up @@ -1206,7 +1206,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
}).reduce(_ ++ _)
}.orElse(Some(Map("MEMORY" -> List("SSD", "HDD", "HDFS", "OSS"))))

def workerVerifyChecksumEnabled: Boolean = get(WORKER_VERIFY_CHECKSUM_ENABLED)
def workerChecksumVerifyEnabled: Boolean = get(WORKER_CHECKSUM_VERIFY_ENABLED)

// //////////////////////////////////////////////////////
// Decommission //
Expand Down Expand Up @@ -4116,10 +4116,10 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(0)

val CLIENT_PUSH_CHECKSUM_HEADER_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.push.checksum.header.enabled")
val CLIENT_SHUFFLE_CHECKSUM_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.client.shuffle.checksum.enabled")
.categories("client")
.doc("Whether to enable checksum header in push data request.")
.doc("Whether to enable checksum for shuffle data.")
.version("0.6.0")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -5851,8 +5851,8 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(0)

val WORKER_VERIFY_CHECKSUM_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.verify.checksum.enable")
val WORKER_CHECKSUM_VERIFY_ENABLED: ConfigEntry[Boolean] =
buildConf("celeborn.worker.checksum.verify.enabled")
.categories("worker")
.version("0.6.0")
.doc("Whether to verify checksum when handling pushed data.")
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/client.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ license: |
| celeborn.client.mr.pushData.max | 32m | false | Max size for a push data sent from mr client. | 0.4.0 | |
| celeborn.client.push.buffer.initial.size | 8k | false | | 0.3.0 | celeborn.push.buffer.initial.size |
| celeborn.client.push.buffer.max.size | 64k | false | Max size of reducer partition buffer memory for shuffle hash writer. The pushed data will be buffered in memory before sending to Celeborn worker. For performance consideration keep this buffer size higher than 32K. Example: If reducer amount is 2000, buffer size is 64K, then each task will consume up to `64KiB * 2000 = 125MiB` heap memory. | 0.3.0 | celeborn.push.buffer.max.size |
| celeborn.client.push.checksum.header.enabled | false | false | Whether to enable checksum header in push data request. | 0.6.0 | |
| celeborn.client.push.excludeWorkerOnFailure.enabled | false | false | Whether to enable shuffle client-side push exclude workers on failures. | 0.3.0 | |
| celeborn.client.push.limit.inFlight.sleepInterval | 50ms | false | Sleep interval when check netty in-flight requests to be done. | 0.3.0 | celeborn.push.limit.inFlight.sleepInterval |
| celeborn.client.push.limit.inFlight.timeout | <undefined> | false | Timeout for netty in-flight requests to be done. Default value should be `celeborn.client.push.timeout * 2`. | 0.3.0 | celeborn.push.limit.inFlight.timeout |
Expand Down Expand Up @@ -95,6 +94,7 @@ license: |
| celeborn.client.shuffle.batchHandleReleasePartition.threads | 8 | false | Threads number for LifecycleManager to handle release partition request in batch. | 0.3.0 | |
| celeborn.client.shuffle.batchHandleRemoveExpiredShuffles.enabled | false | false | Whether to batch remove expired shuffles. This is an optimization switch on removing expired shuffles. | 0.6.0 | |
| celeborn.client.shuffle.checkWorker.enabled | true | false | When true, before registering shuffle, LifecycleManager should check if current cluster have available workers, if cluster don't have available workers, fallback to default shuffle. | 0.5.0 | celeborn.client.spark.shuffle.checkWorker.enabled |
| celeborn.client.shuffle.checksum.enabled | false | false | Whether to enable checksum for shuffle data. | 0.6.0 | |
| celeborn.client.shuffle.compression.codec | LZ4 | false | The codec used to compress shuffle data. By default, Celeborn provides three codecs: `lz4`, `zstd`, `none`. `none` means that shuffle compression is disabled. Since Flink version 1.16, zstd is supported for Flink shuffle client. | 0.3.0 | celeborn.shuffle.compression.codec,remote-shuffle.job.compression.codec |
| celeborn.client.shuffle.compression.zstd.level | 1 | false | Compression level for Zstd compression codec, its value should be an integer between -5 and 22. Increasing the compression level will result in better compression at the expense of more CPU and memory. | 0.3.0 | celeborn.shuffle.compression.zstd.level |
| celeborn.client.shuffle.decompression.lz4.xxhash.instance | <undefined> | false | Decompression XXHash instance for Lz4. Available options: JNI, JAVASAFE, JAVAUNSAFE. | 0.3.2 | |
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ license: |
| celeborn.worker.activeConnection.max | <undefined> | false | If the number of active connections on a worker exceeds this configuration value, the worker will be marked as high-load in the heartbeat report, and the master will not include that node in the response of RequestSlots. | 0.3.1 | |
| celeborn.worker.applicationRegistry.cache.size | 10000 | false | Cache size of the application registry on Workers. | 0.5.0 | |
| celeborn.worker.bufferStream.threadsPerMountpoint | 8 | false | Threads count for read buffer per mount point. | 0.3.0 | |
| celeborn.worker.checksum.verify.enabled | false | false | Whether to verify checksum when handling pushed data. | 0.6.0 | |
| celeborn.worker.clean.threads | 64 | false | Thread number of worker to clean up expired shuffle keys. | 0.3.2 | |
| celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | |
| celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.commit.threads |
Expand Down Expand Up @@ -180,7 +181,6 @@ license: |
| celeborn.worker.storage.storagePolicy.createFilePolicy | <undefined> | false | This defined the order for creating files across available storages. Available storages options are: MEMORY,SSD,HDD,HDFS,OSS | 0.5.1 | |
| celeborn.worker.storage.storagePolicy.evictPolicy | <undefined> | false | This define the order of evict files if the storages are available. Available storages: MEMORY,SSD,HDD,HDFS. Definition: StorageTypes|StorageTypes|StorageTypes. Example: MEMORY,SSD|SSD,HDFS. The example means that a MEMORY shuffle file can be evicted to SSD and a SSD shuffle file can be evicted to HDFS. | 0.5.1 | |
| celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | false | Worker's working dir path name. | 0.3.0 | celeborn.worker.workingDir |
| celeborn.worker.verify.checksum.enable | false | false | Whether to verify checksum when handling pushed data. | 0.6.0 | |
| celeborn.worker.writer.close.timeout | 120s | false | Timeout for a file writer to close | 0.2.0 | |
| celeborn.worker.writer.create.maxAttempts | 3 | false | Retry count for a file writer to create if its creation was failed. | 0.2.0 | |
<!--end-include-->
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
private var storageManager: StorageManager = _
private var workerPartitionSplitEnabled: Boolean = _
private var workerReplicateRandomConnectionEnabled: Boolean = _
private var workerVerifyChecksumEnabled: Boolean = _
private var workerChecksumVerifyEnabled: Boolean = _

private var testPushPrimaryDataTimeout: Boolean = _
private var testPushReplicaDataTimeout: Boolean = _
Expand All @@ -85,7 +85,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
shutdown = worker.shutdown
workerPartitionSplitEnabled = worker.conf.workerPartitionSplitEnabled
workerReplicateRandomConnectionEnabled = worker.conf.workerReplicateRandomConnectionEnabled
workerVerifyChecksumEnabled = worker.conf.workerVerifyChecksumEnabled
workerChecksumVerifyEnabled = worker.conf.workerChecksumVerifyEnabled

testPushPrimaryDataTimeout = worker.conf.testPushPrimaryDataTimeout
testPushReplicaDataTimeout = worker.conf.testPushReplicaDataTimeout
Expand Down Expand Up @@ -1490,7 +1490,7 @@ class PushDataHandler(val workerSource: WorkerSource) extends BaseMessageHandler
shuffleKey: String,
index: Int): Unit = {
try {
if (workerVerifyChecksumEnabled) {
if (workerChecksumVerifyEnabled) {
if (!verifyDataChecksum(body)) {
throw new CelebornChecksumException(StatusCode.PUSH_DATA_CHECKSUM_FAIL)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PushDataWithChecksumSuite extends AnyFunSuite
override def beforeAll(): Unit = {
logInfo("test initialized , setup Celeborn mini cluster")
val workerConf = Map(
CelebornConf.WORKER_VERIFY_CHECKSUM_ENABLED.key -> "true")
CelebornConf.WORKER_CHECKSUM_VERIFY_ENABLED.key -> "true")
val (master, _) = setupMiniClusterWithRandomPorts(masterConf = Map(), workerConf)
masterPort = master.conf.masterPort
}
Expand All @@ -65,7 +65,7 @@ class PushDataWithChecksumSuite extends AnyFunSuite

val clientConf = new CelebornConf()
.set(CelebornConf.MASTER_ENDPOINTS.key, s"localhost:$masterPort")
.set(CelebornConf.CLIENT_PUSH_CHECKSUM_HEADER_ENABLED.key, "true")
.set(CelebornConf.CLIENT_SHUFFLE_CHECKSUM_ENABLED.key, "true")
val lifecycleManager = new LifecycleManager(APP, clientConf)
val shuffleClient = new ShuffleClientImpl(APP, clientConf, UserIdentifier("mock", "mock"))
shuffleClient.setupLifecycleManagerRef(lifecycleManager.self)
Expand Down

0 comments on commit 2eef55a

Please sign in to comment.