Skip to content

Commit

Permalink
feat: Spark benchmarking updates with generic files (#688)
Browse files Browse the repository at this point in the history
  • Loading branch information
vara-bonthu authored Oct 30, 2024
1 parent 4889f09 commit 13ecbc4
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 327 deletions.
72 changes: 10 additions & 62 deletions analytics/terraform/spark-k8s-operator/addons.tf
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ module "eks_data_addons" {
tags:
Name: ${module.eks.cluster_name}-node
instanceStorePolicy: RAID0
blockDeviceMappings:
- deviceName: /dev/xvda
ebs:
volumeSize: 200Gi
volumeType: gp3
encrypted: true
deleteOnTermination: true
nodePool:
labels:
- type: karpenter
Expand All @@ -136,10 +143,10 @@ module "eks_data_addons" {
values: ["r"]
- key: "karpenter.k8s.aws/instance-family"
operator: In
values: ["r6gd"]
- key: "karpenter.k8s.aws/instance-cpu"
values: ["r6g", "r6gd", "r7g", "r7gd", "r8g"]
- key: "karpenter.k8s.aws/instance-size"
operator: In
values: ["4", "8", "16", "32"]
values: ["4xlarge", "8xlarge", "12xlarge", "16xlarge"]
- key: "karpenter.k8s.aws/instance-hypervisor"
operator: In
values: ["nitro"]
Expand All @@ -155,61 +162,6 @@ module "eks_data_addons" {
EOT
]
}
spark-graviton-benchmark = {
values = [
<<-EOT
name: spark-graviton-benchmark
clusterName: ${module.eks.cluster_name}
ec2NodeClass:
amiFamily: AL2023
amiSelectorTerms:
- alias: al2023@latest # Amazon Linux 2023
karpenterRole: ${split("/", module.eks_blueprints_addons.karpenter.node_iam_role_arn)[1]}
subnetSelectorTerms:
tags:
Name: "${module.eks.cluster_name}-private*"
securityGroupSelectorTerms:
tags:
Name: ${module.eks.cluster_name}-node
instanceStorePolicy: RAID0
blockDeviceMappings:
- deviceName: /dev/xvda
ebs:
volumeSize: 300Gi
volumeType: gp3
encrypted: true
deleteOnTermination: true
nodePool:
labels:
- NodeGroupType: SparkGravitonBenchmark
requirements:
- key: "karpenter.sh/capacity-type"
operator: In
values: ["on-demand"]
- key: "kubernetes.io/arch"
operator: In
values: ["arm64"]
- key: "karpenter.k8s.aws/instance-category"
operator: In
values: ["r"]
- key: "karpenter.k8s.aws/instance-family"
operator: In
values: ["r6g", "r6gd", "r7g", "r7gd", "r8g"]
- key: "karpenter.k8s.aws/instance-size"
operator: In
values: ["8xlarge", "12xlarge", "16xlarge"]
- key: "karpenter.k8s.aws/instance-generation"
operator: Gt
values: ["2"]
limits:
cpu: 2000
disruption:
consolidationPolicy: WhenEmptyOrUnderutilized
consolidateAfter: 1m
weight: 100
EOT
]
}
spark-memory-optimized = {
values = [
<<-EOT
Expand Down Expand Up @@ -410,10 +362,6 @@ module "eks_data_addons" {
version = "2.0.2"
values = [
<<-EOT
controller:
batchScheduler:
enable: true
default: "yunikorn"
spark:
# -- List of namespaces where to run spark jobs.
# If empty string is included, all namespaces will be allowed.
Expand Down
140 changes: 43 additions & 97 deletions analytics/terraform/spark-k8s-operator/eks.tf
Original file line number Diff line number Diff line change
Expand Up @@ -142,86 +142,25 @@ module "eks" {
}
}

spark_ondemand_r5d = {
name = "spark-ondemand-r5d"
description = "Spark managed node group for Driver pods"
# Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned
subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) :
substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0)
]

min_size = 0
max_size = 20
desired_size = 0

instance_types = ["r5d.xlarge"] # r5d.xlarge 4vCPU - 32GB - 1 x 150 NVMe SSD - Up to 10Gbps - Up to 4,750 Mbps EBS Bandwidth

labels = {
WorkerType = "ON_DEMAND"
NodeGroupType = "spark-on-demand-ca"
}

taints = [{
key = "spark-on-demand-ca",
value = true
effect = "NO_SCHEDULE"
}]

tags = {
Name = "spark-ondemand-r5d"
WorkerType = "ON_DEMAND"
NodeGroupType = "spark-on-demand-ca"
}
}

# ec2-instance-selector --vcpus=48 --gpus 0 -a arm64 --allow-list '.*d.*'
# This command will give you the list of the instances with similar vcpus for arm64 dense instances
spark_spot_x86_48cpu = {
name = "spark-spot-48cpu"
description = "Spark Spot node group for executor workloads"
# Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned
subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) :
substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0)
]

min_size = 0
max_size = 12
desired_size = 0

instance_types = ["r5d.12xlarge", "r6id.12xlarge", "c5ad.12xlarge", "c5d.12xlarge", "c6id.12xlarge", "m5ad.12xlarge", "m5d.12xlarge", "m6id.12xlarge"] # 48cpu - 2 x 1425 NVMe SSD

labels = {
WorkerType = "SPOT"
NodeGroupType = "spark-spot-ca"
}

taints = [{
key = "spark-spot-ca"
value = true
effect = "NO_SCHEDULE"
}]

tags = {
Name = "spark-node-grp"
WorkerType = "SPOT"
NodeGroupType = "spark"
}
}

# The following Node groups are a placeholder to create Node groups for running Spark TPC-DS benchmarks
spark_graviton_r8g = {
name = "spark-graviton-r8g"
description = "Spark managed node group for Graviton Benchmarks"
spark_benchmark_ebs = {
name = "spark_benchmark_ebs"
description = "Managed node group for Spark Benchmarks with EBS using x86 or ARM"
# Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned
subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) :
substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0)
]

ami_type = "AL2023_ARM_64_STANDARD"
# Change ami_type= AL2023_x86_64_STANDARD for x86 instances
ami_type = "AL2023_ARM_64_STANDARD" # arm64

# Node group will be created with zero instances when you deploy the blueprint.
# You can change the min_size and desired_size to 6 instances
# desired_size might not be applied through terrafrom once the node group is created so this needs to be adjusted in AWS Console.
min_size = 0 # Change min and desired to 6 for running benchmarks
max_size = 8
desired_size = 0
desired_size = 0 # Change min and desired to 6 for running benchmarks

# This storage is used as a shuffle for non NVMe SSD instances. e.g., r8g instances
block_device_mappings = {
xvda = {
Expand All @@ -236,55 +175,62 @@ module "eks" {
}
}

# Change the instance type as you desire and match with ami_type
instance_types = ["r8g.12xlarge"] # Change Instance type to run the benchmark with various instance types

labels = {
NodeGroupType = "spark-graviton-benchmark-mng-r8g"
NodeGroupType = "spark_benchmark_ebs"
}

tags = {
Name = "spark-graviton-benchmark-mng-r8g"
NodeGroupType = "spark-graviton-benchmark-mng-r8g"
Name = "spark_benchmark_ebs"
NodeGroupType = "spark_benchmark_ebs"
}
}

spark_graviton_r6g = {
name = "spark-graviton-r6g"
description = "Spark managed node group for Graviton Benchmarks"
spark_benchmark_ssd = {
name = "spark_benchmark_ssd"
description = "Managed node group for Spark Benchmarks with NVMEe SSD using x86 or ARM"
# Filtering only Secondary CIDR private subnets starting with "100.". Subnet IDs where the nodes/node groups will be provisioned
subnet_ids = [element(compact([for subnet_id, cidr_block in zipmap(module.vpc.private_subnets, module.vpc.private_subnets_cidr_blocks) :
substr(cidr_block, 0, 4) == "100." ? subnet_id : null]), 0)
]

ami_type = "AL2023_ARM_64_STANDARD"
ami_type = "AL2023_x86_64_STANDARD" # x86

min_size = 0
# Node group will be created with zero instances when you deploy the blueprint.
# You can change the min_size and desired_size to 6 instances
# desired_size might not be applied through terrafrom once the node group is created so this needs to be adjusted in AWS Console.
min_size = 0 # Change min and desired to 6 for running benchmarks
max_size = 8
desired_size = 0
# This storage is used as a shuffle for non NVMe SSD instances. e.g., r8g instances
block_device_mappings = {
xvda = {
device_name = "/dev/xvda"
ebs = {
volume_size = 300
volume_type = "gp3"
iops = 3000
encrypted = true
delete_on_termination = true
}
desired_size = 0 # Change min and desired to 6 for running benchmarks

instance_types = ["c5d.12xlarge"] # c5d.12xlarge = 2 x 900 NVMe SSD

cloudinit_pre_nodeadm = [
{
content_type = "application/node.eks.aws"
content = <<-EOT
---
apiVersion: node.eks.aws/v1alpha1
kind: NodeConfig
spec:
instance:
localStorage:
strategy: RAID0
EOT
}
}

instance_types = ["r6g.12xlarge"] # Change Instance type to run the benchmark with various instance types
]

labels = {
NodeGroupType = "spark-graviton-benchmark-mng-r6g"
NodeGroupType = "spark_benchmark_ssd"
}

tags = {
Name = "spark-graviton-benchmark-mng-r6g"
NodeGroupType = "spark-graviton-benchmark-mng-r6g"
Name = "spark_benchmark_ssd"
NodeGroupType = "spark_benchmark_ssd"
}
}

}
}
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
# > **Note:** Before running these jobs, make sure the following prerequisites are completed:

# 1. Set up an S3 bucket to store the generated data.
# 2. Update the `eks.tf` file to modify `min_size=6` and `desired_size=6` for the managed node group `"spark_graviton_r8g"`, then run `terraform apply`.
# 2. Update the `eks.tf` file to modify `min_size=6` and `desired_size=6` for the managed node group `"spark_benchmark_ebs"`, then run `terraform apply`.
# 3. Confirm that the `"spark-team-a"` namespace exists.
# 4. Replace `<S3_BUCKET>` in the configuration with your actual bucket name.
# 5. Run `kubectl apply -f <filename>` to apply the configuration.

# If you are repeating the tests to re-runw tih different instance types then make sure you update the following
# 1/ Joba name to match with instance type e.g., name: tpcds-benchmark-1tb-ebs-r6g

---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: tpcds-benchmark-1tb-r8g
name: tpcds-benchmark-1tb-ebs # Change for each test with instancetype etc,
namespace: spark-team-a

spec:
batchScheduler: yunikorn
batchSchedulerOptions:
queue: root.default
# Temporarily commented out until the YuniKorn issue is resolved; falls back to the default Kubernetes scheduler
# batchScheduler: yunikorn
# batchSchedulerOptions:
# queue: root.default
type: Scala
mode: cluster
image: public.ecr.aws/data-on-eks/spark3.5.3-scala2.12-java17-python3-ubuntu-tpcds:v2
Expand Down Expand Up @@ -51,7 +54,7 @@ spec:
"spark.sql.adaptive.localShuffleReader.enabled": "true"
"spark.sql.adaptive.coalescePartitions.enabled": "true"
"spark.sql.adaptive.skewJoin.enabled": "true"
"spark.kubernetes.executor.podNamePrefix": "benchmark-exec-r8g-"
"spark.kubernetes.executor.podNamePrefix": "benchmark-exec-ebs"
# S3 Optimizations
# "spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider" # This is using AWS SDK V1 in maintenance mode
"spark.hadoop.fs.s3a.aws.credentials.provider.mapping": "com.amazonaws.auth.WebIdentityTokenCredentialsProvider=software.amazon.awssdk.auth.credentials.WebIdentityTokenFileCredentialsProvider"
Expand Down Expand Up @@ -94,32 +97,30 @@ spec:
"spark.hadoop.fs.s3a.multipart.purge": "true" # Automatically clear failed multipart uploads
"spark.hadoop.fs.s3a.multipart.threshold": "134217728" # 128 MB threshold to start multi-part upload
driver:
cores: 4
coreLimit: "4.3"
memory: "5g"
memoryOverhead: "1g"
cores: 5
memory: "20g"
memoryOverhead: "6g"
serviceAccount: spark-team-a
securityContext:
runAsUser: 185
env:
- name: JAVA_HOME
value: "/opt/java/openjdk"
nodeSelector:
NodeGroupType: spark-graviton-benchmark-mng-r8g
NodeGroupType: spark_benchmark_ebs
executor:
cores: 4
coreLimit: "4.3"
memory: "6g"
memoryOverhead: "2g"
cores: 5
memory: "20g"
memoryOverhead: "6g"
# 8 executors per node
instances: 47
instances: 36 # 6 pods per node; 6 nodes with EKS Managed Node group
serviceAccount: spark-team-a
securityContext:
runAsUser: 185
env:
- name: JAVA_HOME
value: "/opt/java/openjdk"
nodeSelector:
NodeGroupType: spark-graviton-benchmark-mng-r8g
NodeGroupType: spark_benchmark_ebs
restartPolicy:
type: Never
Loading

0 comments on commit 13ecbc4

Please sign in to comment.