Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Spark Streaming using Spark Operator #516

Merged
merged 11 commits into from
May 16, 2024
49 changes: 49 additions & 0 deletions streaming/spark-streaming/examples/consumer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Dockerfile for Apache Spark with additional JARs downloaded at build time
FROM apache/spark-py:v3.3.2
WORKDIR /app

# Use root to create a new user and configure permissions
USER root

# Install wget to download JAR files
RUN apt-get update && apt-get install -y wget && \
rm -rf /var/lib/apt/lists/*

# Create a new user 'spark-user' with UID 1001
RUN groupadd -r spark-group && useradd -r -u 1001 -g spark-group spark-user

RUN mkdir -p /home/spark-user/.ivy2/cache
RUN mkdir -p /app/jars

RUN cd /app/jars && \
wget -q "https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/aws-java-sdk-bundle-1.11.1026.jar" && \
wget -q "https://repo1.maven.org/maven2/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.11.1/commons-pool2-2.11.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/hadoop-aws-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-api/3.3.2/hadoop-client-api-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-client-runtime/3.3.2/hadoop-client-runtime-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.3_2.12/1.0.0/iceberg-spark-runtime-3.3_2.12-1.0.0.jar" && \
wget -q "https://repo1.maven.org/maven2/com/google/code/findbugs/jsr305/3.0.0/jsr305-3.0.0.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.8.1/kafka-clients-2.8.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/lz4/lz4-java/1.7.1/lz4-java-1.7.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.12.3/parquet-avro-1.12.3.jar" && \
wget -q "https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.12.15/scala-library-2.12.15.jar" && \
wget -q "https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.30/slf4j-api-1.7.30.jar" && \
wget -q "https://repo1.maven.org/maven2/org/xerial/snappy/snappy-java/1.1.8.1/snappy-java-1.1.8.1.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.3.2/spark-sql-kafka-0-10_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.3.2/spark-tags_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.3.2/spark-token-provider-kafka-0-10_2.12-3.3.2.jar" && \
wget -q "https://repo1.maven.org/maven2/org/wildfly/openssl/wildfly-openssl/1.0.7.Final/wildfly-openssl-1.0.7.Final.jar"

# Set the owner of the Ivy cache directory and /app to the new user
RUN chown -R spark-user:spark-group /home/spark-user/
RUN chown -R spark-user:spark-group /app/

# Switch to the new user for running the application
USER spark-user

# Add the Spark application script to the container
ADD app.py /app

# Set the entry point for the container
ENTRYPOINT ["/opt/entrypoint.sh"]
81 changes: 81 additions & 0 deletions streaming/spark-streaming/examples/consumer/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import from_json
import os

# Variables
s3_bucket_name = os.getenv("S3_BUCKET_NAME", "my-iceberg-data-bucket")
kafka_address = os.getenv("KAFKA_ADDRESS", 'b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092')

def create_spark_session():
spark = SparkSession.builder \
.appName("KafkaToIceberg") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2") \
.config("spark.jars.repositories", "https://repo1.maven.org/maven2/") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.DefaultAWSCredentialsProviderChain") \
.config("spark.sql.warehouse.dir", f"s3a://{s3_bucket_name}/iceberg/warehouse/") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.kryo.registrationRequired", "false") \
.getOrCreate()
return spark

def consume_and_write():
spark = create_spark_session()
# Debug spark DEBUG
spark.sparkContext.setLogLevel("ERROR")
# Create the table
spark.sql(f"""
CREATE TABLE IF NOT EXISTS local.my_table (
id STRING,
timestamp STRING,
alert_type STRING,
severity STRING,
description STRING
)
USING iceberg
LOCATION 's3a://{s3_bucket_name}/iceberg/warehouse/my_table'
TBLPROPERTIES (
'write.format.default'='parquet' -- Explicitly specifying Parquet format
)
""")

# Read from Kafka
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_address) \
.option("subscribe", "security-topic") \
.option("startingOffsets", "earliest") \
.option("failOnDataLoss", "false") \
.load()

# Define the schema for the JSON data
json_schema = StructType([
StructField("id", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("alert_type", StringType(), True),
StructField("severity", StringType(), True),
StructField("description", StringType(), True)
])

# Parse JSON and select the required columns
parsed_df = df.selectExpr("CAST(value AS STRING) as json") \
.select(from_json("json", json_schema).alias("data")) \
.select("data.id", "data.timestamp", "data.alert_type", "data.severity", "data.description")

# Write the stream to Iceberg using table name
query = parsed_df.writeStream \
.format("iceberg") \
.option("checkpointLocation", f"s3a://{s3_bucket_name}/iceberg/checkpoints/") \
.option("path", f"s3a://{s3_bucket_name}/iceberg/warehouse/my_table") \
.outputMode("append") \
.start()

query.awaitTermination() # Wait for the stream to finish

if __name__ == "__main__":
consume_and_write()
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# This script is used to run the spark-consumer application on EKS,
# users need to replace MY_BUCKET_NAME and MY_KAFKA_BROKERS_ADRESS to match your environment.
---
apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
name: spark-consumer
namespace: spark-team-a
spec:
type: Python
pythonVersion: "3"
mode: cluster
image: "public.ecr.aws/data-on-eks/consumer-spark-streaming-3.3.2-kafka:1" # You can build your own image using the Dockerfile in this folder
mainApplicationFile: "local:///app/app.py"
sparkVersion: "3.3.2"
deps:
jars:
- "local:///app/jars/commons-logging-1.1.3.jar"
- "local:///app/jars/commons-pool2-2.11.1.jar"
- "local:///app/jars/hadoop-client-api-3.3.2.jar"
- "local:///app/jars/hadoop-client-runtime-3.3.2.jar"
- "local:///app/jars/jsr305-3.0.0.jar"
- "local:///app/jars/kafka-clients-2.8.1.jar"
- "local:///app/jars/lz4-java-1.7.1.jar"
- "local:///app/jars/scala-library-2.12.15.jar"
- "local:///app/jars/slf4j-api-1.7.30.jar"
- "local:///app/jars/snappy-java-1.1.8.1.jar"
- "local:///app/jars/spark-sql-kafka-0-10_2.12-3.3.2.jar"
- "local:///app/jars/spark-tags_2.12-3.3.2.jar"
- "local:///app/jars/spark-token-provider-kafka-0-10_2.12-3.3.2.jar"
- "local:///app/jars/iceberg-spark-runtime-3.3_2.12-1.0.0.jar"
- "local:///app/jars/hadoop-aws-3.3.2.jar"
- "local:///app/jars/aws-java-sdk-bundle-1.11.1026.jar"
- "local:///app/jars/wildfly-openssl-1.0.7.Final.jar"
- "local:///app/jars/parquet-avro-1.12.3.jar"
sparkConf:
"spark.app.name": "KafkaToIceberg"
"spark.jars.repositories": "https://repo1.maven.org/maven2/"
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
"spark.sql.catalog.local": "org.apache.iceberg.spark.SparkCatalog"
"spark.sql.catalog.local.type": "hadoop"
"spark.sql.catalog.local.warehouse": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket
"spark.hadoop.fs.s3a.impl": "org.apache.hadoop.fs.s3a.S3AFileSystem"
"spark.hadoop.fs.s3a.aws.credentials.provider": "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"
"spark.sql.warehouse.dir": "s3a://__MY_BUCKET_NAME__/iceberg/warehouse/" # Replace bucket name with your S3 bucket name: s3_bucket_id_iceberg_bucket
lusoal marked this conversation as resolved.
Show resolved Hide resolved
"spark.metrics.conf.*.sink.prometheusServlet.class": "org.apache.spark.metrics.sink.PrometheusServlet"
"spark.metrics.conf.*.sink.prometheusServlet.path": "/metrics"
"spark.metrics.conf.master.sink.prometheusServlet.path": "/metrics/master"
"spark.metrics.conf.applications.sink.prometheusServlet.path": "/metrics/applications"
"spark.ui.prometheus.enabled": "true"
"spark.ui.prometheus.port": "4040"
restartPolicy:
type: OnFailure
onFailureRetries: 2
onFailureRetryInterval: 10
onSubmissionFailureRetries: 3
onSubmissionFailureRetryInterval: 20
dynamicAllocation:
enabled: true
initialExecutors: 3
minExecutors: 3
maxExecutors: 10
driver:
cores: 1
coreLimit: "1200m"
memory: "1024m"
labels:
version: "3.3.2"
app: spark
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /metrics
prometheus.io/port: '4040'
serviceAccount: spark-team-a
nodeSelector:
NodeGroupType: "SparkComputeOptimized"
tolerations:
- key: "spark-compute-optimized"
operator: "Exists"
effect: "NoSchedule"
env:
- name: S3_BUCKET_NAME
value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket
- name: KAFKA_ADDRESS
value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers
# value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092"
executor:
lusoal marked this conversation as resolved.
Show resolved Hide resolved
cores: 2
memory: "1024m"
labels:
version: "3.3.2"
app: spark
annotations:
prometheus.io/scrape: 'true'
prometheus.io/path: /metrics
prometheus.io/port: '4040'
serviceAccount: spark-team-a
nodeSelector:
NodeGroupType: "SparkComputeOptimized"
tolerations:
- key: "spark-compute-optimized"
operator: "Exists"
effect: "NoSchedule"
env:
- name: S3_BUCKET_NAME
value: "__MY_BUCKET_NAME__" # Replace with your S3 bucket name: s3_bucket_id_iceberg_bucket
- name: KAFKA_ADDRESS
value: "__MY_KAFKA_BROKERS_ADRESS__" # Replace with your Kafka brokers address: bootstrap_brokers
# value: "b-1.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092,b-2.kafkademospark.mkjcj4.c12.kafka.us-west-2.amazonaws.com:9092"
2 changes: 2 additions & 0 deletions streaming/spark-streaming/examples/consumer/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
py4j==0.10.9.5
pyspark==3.3.2
28 changes: 28 additions & 0 deletions streaming/spark-streaming/examples/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:latest
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9093:9093" # Added a new port for the external listener
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,PLAINTEXT_HOST://0.0.0.0:9093 # Changed the PLAINTEXT_HOST to a different port
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9093 # Changed the PLAINTEXT_HOST to advertise the new port
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
52 changes: 52 additions & 0 deletions streaming/spark-streaming/examples/producer/00_deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
---
# This is the producer deployment file, you can adjust the number of replicas to produce more data.
# You will need to change __MY_AWS_REGION__, __MY_KAFKA_BROKERS__, and __MY_PRODUCER_ROLE_ARN__ to match your environment.
apiVersion: v1
kind: ServiceAccount
metadata:
name: producer-sa
annotations:
eks.amazonaws.com/role-arn: __MY_PRODUCER_ROLE_ARN__ # Replace with your producer role ARN: producer_iam_role_arn
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: producer-deployment
spec:
replicas: 100 # Adjusted to match the required number of replicas
selector:
matchLabels:
app: producer
template:
metadata:
labels:
app: producer
spec:
serviceAccountName: producer-sa
containers:
- name: producer
image: public.ecr.aws/data-on-eks/producer-kafka:1
#image: public.ecr.aws/data-on-eks/producer-kafka:1
command: ["python", "app.py"]
env:
- name: RATE_PER_SECOND
value: "100000"
- name: NUM_OF_MESSAGES
value: "10000000"
- name: AWS_REGION
value: "__MY_AWS_REGION__" # Replace with your AWS region
- name: BOOTSTRAP_BROKERS
value: "__MY_KAFKA_BROKERS__" # Replace with your bootstrap brokers: bootstrap_brokers
resources:
limits:
cpu: "2" # Increased CPU limit
memory: "4Gi" # Increased memory limit
requests:
cpu: "1" # Increased CPU request
memory: "2Gi" # Increased memory request
volumeMounts:
- name: shared-volume
mountPath: /mnt
volumes:
- name: shared-volume
emptyDir: {}
50 changes: 50 additions & 0 deletions streaming/spark-streaming/examples/producer/01_delete_topic.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-delete-topic-script
namespace: default
data:
delete_topic.py: |
from kafka.admin import KafkaAdminClient

def delete_topic(bootstrap_servers, topic_name):
"""Delete a Kafka topic."""
client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
try:
client.delete_topics([topic_name])
print(f"Topic {topic_name} deleted successfully.")
except Exception as e:
print(f"Failed to delete topic {topic_name}: {e}")

# Configuration
import os
bootstrap_servers = os.getenv('BOOTSTRAP_BROKERS', 'localhost:9092') # Replace with your Kafka broker address
topic_name = os.getenv('TOPIC_NAME', 'security-topic') # Replace with your topic name

# Delete Kafka topic
delete_topic(bootstrap_servers, topic_name)

---
apiVersion: v1
kind: Pod
metadata:
name: kafka-delete-topic-pod
namespace: default
spec:
containers:
- name: delete-topic
image: public.ecr.aws/data-on-eks/producer-kafka:1 # Use an appropriate Python image
command: ["python", "/scripts/delete_topic.py"]
env:
- name: BOOTSTRAP_BROKERS
value: "__MY_KAFKA_BROKERS__" # Replace with your Kafka broker address
- name: TOPIC_NAME
value: "security-topic" # Replace with your topic name
volumeMounts:
- name: script-volume
mountPath: /scripts
restartPolicy: Never
volumes:
- name: script-volume
configMap:
name: kafka-delete-topic-script
14 changes: 14 additions & 0 deletions streaming/spark-streaming/examples/producer/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Use an official Python runtime as a parent image
FROM python:3.9-slim

# Set the working directory in the container
WORKDIR /usr/src/app

# Copy the local code to the container
COPY . .

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements.txt

# Run app.py when the container launches
CMD ["python", "app.py"]
Loading
Loading