Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
leixm committed Dec 6, 2024
1 parent 21cc1b7 commit a13e20b
Show file tree
Hide file tree
Showing 16 changed files with 361 additions and 146 deletions.
66 changes: 40 additions & 26 deletions common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -665,8 +665,6 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
def estimatedPartitionSizeForEstimationUpdateInterval: Long =
get(ESTIMATED_PARTITION_SIZE_UPDATE_INTERVAL)
def masterResourceConsumptionInterval: Long = get(MASTER_RESOURCE_CONSUMPTION_INTERVAL)
def masterUserDiskUsageThreshold: Long = get(MASTER_USER_DISK_USAGE_THRESHOLD)
def masterClusterDiskUsageThreshold: Long = get(MASTER_CLUSTER_DISK_USAGE_THRESHOLD)
def clusterName: String = get(CLUSTER_NAME)

// //////////////////////////////////////////////////////
Expand Down Expand Up @@ -2930,26 +2928,6 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val MASTER_USER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.user.threshold")
.categories("master")
.doc("When user resource consumption exceeds quota, Master will " +
"interrupt some apps until user resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val MASTER_CLUSTER_DISK_USAGE_THRESHOLD: ConfigEntry[Long] =
buildConf("celeborn.master.userResourceConsumption.cluster.threshold")
.categories("master")
.doc("When cluster resource consumption exceeds quota, Master will " +
"interrupt some apps until cluster resource consumption is less " +
"than this value. Default value is Long.MaxValue which means disable check.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val CLUSTER_NAME: ConfigEntry[String] =
buildConf("celeborn.cluster.name")
.categories("master", "worker")
Expand Down Expand Up @@ -5396,7 +5374,7 @@ object CelebornConf extends Logging {
.stringConf
.createWithDefault(IdentityProvider.DEFAULT_USERNAME)

val QUOTA_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskBytesWritten")
.categories("quota")
.dynamic
Expand All @@ -5405,7 +5383,7 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_DISK_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.diskFileCount")
.categories("quota")
.dynamic
Expand All @@ -5414,7 +5392,7 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsBytesWritten")
.categories("quota")
.dynamic
Expand All @@ -5423,7 +5401,7 @@ object CelebornConf extends Logging {
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_HDFS_FILE_COUNT: ConfigEntry[Long] =
val QUOTA_TENANT_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.tenant.hdfsFileCount")
.categories("quota")
.dynamic
Expand Down Expand Up @@ -6072,4 +6050,40 @@ object CelebornConf extends Logging {
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for user written disk bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_DISK_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.diskFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for user written disk file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_BYTES_WRITTEN: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsBytesWritten")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for user written hdfs bytes.")
.version("0.6.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)

val QUOTA_USER_HDFS_FILE_COUNT: ConfigEntry[Long] =
buildConf("celeborn.quota.user.hdfsFileCount")
.categories("quota")
.dynamic
.doc("Quota dynamic configuration for user written hdfs file count.")
.version("0.6.0")
.longConf
.createWithDefault(Long.MaxValue)
}
2 changes: 0 additions & 2 deletions docs/configuration/master.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ license: |
| celeborn.master.slot.assign.loadAware.numDiskGroups | 5 | false | This configuration is a guidance for load-aware slot allocation algorithm. This value is control how many disk groups will be created. | 0.3.0 | celeborn.slots.assign.loadAware.numDiskGroups |
| celeborn.master.slot.assign.maxWorkers | 10000 | false | Max workers that slots of one shuffle can be allocated on. Will choose the smaller positive one from Master side and Client side, see `celeborn.client.slot.assign.maxWorkers`. | 0.3.1 | |
| celeborn.master.slot.assign.policy | ROUNDROBIN | false | Policy for master to assign slots, Celeborn supports two types of policy: roundrobin and loadaware. Loadaware policy will be ignored when `HDFS` is enabled in `celeborn.storage.availableTypes` | 0.3.0 | celeborn.slots.assign.policy |
| celeborn.master.userResourceConsumption.cluster.threshold | 9223372036854775807b | false | When cluster resource consumption exceeds quota, Master will interrupt some apps until cluster resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
| celeborn.master.userResourceConsumption.update.interval | 30s | false | Time length for a window about compute user resource consumption. | 0.3.0 | |
| celeborn.master.userResourceConsumption.user.threshold | 9223372036854775807b | false | When user resource consumption exceeds quota, Master will interrupt some apps until user resource consumption is less than this value. Default value is Long.MaxValue which means disable check. | 0.6.0 | |
| celeborn.master.workerUnavailableInfo.expireTimeout | 1800s | false | Worker unavailable info would be cleared when the retention period is expired. Set -1 to disable the expiration. | 0.3.1 | |
| celeborn.quota.enabled | true | false | When Master side sets to true, the master will enable to check the quota via QuotaManager. When Client side sets to true, LifecycleManager will request Master side to check whether the current user has enough quota before registration of shuffle. Fallback to the default shuffle service when Master side checks that there is no enough quota for current user. | 0.2.0 | |
| celeborn.redaction.regex | (?i)secret|password|token|access[.]key | false | Regex to decide which Celeborn configuration properties and environment variables in master and worker environments contain sensitive information. When this regex matches a property key or value, the value is redacted from the logging. | 0.5.0 | |
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/quota.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,8 @@ license: |
| celeborn.quota.tenant.diskFileCount | 9223372036854775807b | true | Quota dynamic configuration for written disk file count. | 0.5.0 | |
| celeborn.quota.tenant.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for written hdfs bytes. | 0.5.0 | |
| celeborn.quota.tenant.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for written hdfs file count. | 0.5.0 | |
| celeborn.quota.user.diskBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for user written disk bytes. | 0.6.0 | |
| celeborn.quota.user.diskFileCount | 9223372036854775807 | true | Quota dynamic configuration for user written disk file count. | 0.6.0 | |
| celeborn.quota.user.hdfsBytesWritten | 9223372036854775807b | true | Quota dynamic configuration for user written hdfs bytes. | 0.6.0 | |
| celeborn.quota.user.hdfsFileCount | 9223372036854775807 | true | Quota dynamic configuration for user written hdfs file count. | 0.6.0 | |
<!--end-include-->
8 changes: 8 additions & 0 deletions docs/migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ license: |

# Upgrading from 0.5 to 0.6

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskBytesWritten` to `celeborn.quota.user.diskBytesWritten`. Please use `celeborn.quota.user.diskBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.diskFileCount` to `celeborn.quota.user.diskFileCount`. Please use `celeborn.quota.user.diskFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsBytesWritten` to `celeborn.quota.user.hdfsBytesWritten`. Please use `celeborn.quota.user.hdfsBytesWritten` if you want to set user level quota.

- Since 0.6.0, Celeborn modified `celeborn.quota.tenant.hdfsFileCount` to `celeborn.quota.user.hdfsFileCount`. Please use `celeborn.quota.user.hdfsFileCount` if you want to set user level quota.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.low.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.low.watermark` instead.

- Since 0.6.0, Celeborn deprecate `celeborn.worker.congestionControl.high.watermark`. Please use `celeborn.worker.congestionControl.diskBuffer.high.watermark` instead.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,21 +250,18 @@ public void updateWorkerHeartbeatMeta(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
WorkerStatus workerStatus,
boolean highWorkload) {
WorkerInfo worker =
new WorkerInfo(
host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, userResourceConsumption);
new WorkerInfo(host, rpcPort, pushPort, fetchPort, replicatePort, -1, disks, null);
AtomicLong availableSlots = new AtomicLong();
LOG.debug("update worker {}:{} heartbeat {}", host, rpcPort, disks);
synchronized (workersMap) {
Optional<WorkerInfo> workerInfo = Optional.ofNullable(workersMap.get(worker.toUniqueId()));
workerInfo.ifPresent(
info -> {
info.updateThenGetDiskInfos(disks, Option.apply(estimatedPartitionSize));
info.updateThenGetUserResourceConsumption(userResourceConsumption);
availableSlots.set(info.totalAvailableSlots());
info.lastHeartbeat_$eq(time);
info.setWorkerStatus(workerStatus);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,22 +118,12 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
String requestId) {
updateWorkerHeartbeatMeta(
host,
rpcPort,
pushPort,
fetchPort,
replicatePort,
disks,
userResourceConsumption,
time,
workerStatus,
highWorkload);
host, rpcPort, pushPort, fetchPort, replicatePort, disks, time, workerStatus, highWorkload);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ public void handleWorkerHeartbeat(
int fetchPort,
int replicatePort,
Map<String, DiskInfo> disks,
Map<UserIdentifier, ResourceConsumption> userResourceConsumption,
long time,
boolean highWorkload,
WorkerStatus workerStatus,
Expand All @@ -290,8 +289,6 @@ public void handleWorkerHeartbeat(
.setFetchPort(fetchPort)
.setReplicatePort(replicatePort)
.putAllDisks(MetaUtil.toPbDiskInfos(disks))
.putAllUserResourceConsumption(
MetaUtil.toPbUserResourceConsumption(userResourceConsumption))
.setWorkerStatus(MetaUtil.toPbWorkerStatus(workerStatus))
.setTime(time)
.setHighWorkload(highWorkload)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,16 @@ private[celeborn] class Master(
private val hasHDFSStorage = conf.hasHDFSStorage
private val hasS3Storage = conf.hasS3Storage

// workerUniqueId -> ResourceConsumption
private val workerToResourceConsumptions =
JavaUtils.newConcurrentHashMap[String, util.Map[UserIdentifier, ResourceConsumption]]()
private val quotaManager = new QuotaManager(
statusSystem,
workerToResourceConsumptions,
masterSource,
resourceConsumptionSource,
conf,
configService)
private val tagsManager = new TagsManager(Option(configService))
private val masterResourceConsumptionInterval = conf.masterResourceConsumptionInterval
private val userResourceConsumptions =
JavaUtils.newConcurrentHashMap[UserIdentifier, (ResourceConsumption, Long)]()

private val slotsAssignMaxWorkers = conf.masterSlotAssignMaxWorkers
private val slotsAssignLoadAwareDiskGroupNum = conf.masterSlotAssignLoadAwareDiskGroupNum
Expand Down Expand Up @@ -670,13 +670,13 @@ private[celeborn] class Master(
fetchPort,
replicatePort,
disks.map { disk => disk.mountPoint -> disk }.toMap.asJava,
userResourceConsumption,
System.currentTimeMillis(),
highWorkload,
workerStatus,
requestId)
}

workerToResourceConsumptions.put(targetWorker.toUniqueId(), userResourceConsumption)
val expiredShuffleKeys = new util.HashSet[String]
activeShuffleKeys.asScala.foreach { shuffleKey =>
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
Expand Down
Loading

0 comments on commit a13e20b

Please sign in to comment.