Skip to content

Commit

Permalink
Adding Data Flow Spark Structured Streaming examples. (#3)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitri Goroh <[email protected]>

Co-authored-by: Dmitri Goroh <[email protected]>
  • Loading branch information
dmitrigoroh and dmitri-goroh-oracle authored Jan 26, 2022
1 parent f61caa2 commit 33f7816
Show file tree
Hide file tree
Showing 14 changed files with 1,041 additions and 2 deletions.
30 changes: 30 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,31 @@
# Maven noise
target/
bin/
dependency-reduced-pom.xml

# Osx noise
.DS_Store
profile
.metadata/

# IntelliJ Idea noise
.idea
/.idea/
*.iws
*.ipr
*.iml
*.releaseBackup

# Eclipse noise
.settings
.settings/*
.project
.classpath
maven-eclipse.xml

# OCI Build noise
/input_ocibuild*/
/output_ocibuild*/

# Other
.externalToolBuilders
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ You must have Set Up Your Tenancy and be able to Access Data Flow
|--------------------|:-----------:|:------:|:----:|:-----:|
| CSV to Parquet |This application shows how to use PySpark to convert CSV data store in OCI Object Store to Apache Parquet format which is then written back to Object Store. |[CSV to Parquet](./python/csv_to_parquet)| [CSV to Parquet](./java/csv_to_parquet)| [CSV to Parquet](./scala/csv_to_parquet)|
| Load to ADW |This application shows how to read a file from OCI Object Store, perform some transformation and write the results to an Autonomous Data Warehouse instance. |[Load to ADW](./python/loadadw)| [Load to ADW](./java/loadadw)|[Load to ADW](./scala/loadadw)|
| Random Forest Regression |This application shows how to build a model and make prediction using Random Forest Regression. |[Random Forest Regression](./python/random_forest_regression)|
| Oracle NoSQL Database cloud service |This application shows how to interface with Oracle NoSQL Database cloud service. |[Oracle NoSQL Database cloud service](./python/oracle_nosql)|
| Structured Streaming Kafka Word Count |This Structured Streaming application shows how to read Kafka stream and calculate word frequencies over one minute window interval|[Structured Kafka Word Count](./python/structured_streaming_kafka_word_count)| [Structured Kafka Word Count](./java/structured_streaming_kafka_word_count)||
| Random Forest Regression |This application shows how to build a model and make prediction using Random Forest Regression. |[Random Forest Regression](./python/random_forest_regression)|
| Oracle NoSQL Database cloud service |This application shows how to interface with Oracle NoSQL Database cloud service. |[Oracle NoSQL Database cloud service](./python/oracle_nosql)|

For step-by-step instructions, see the README files included with each sample.

Expand Down
31 changes: 31 additions & 0 deletions java/structured_streaming_kafka_word_count/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Maven noise
target/
bin/
dependency-reduced-pom.xml

# Osx noise
.DS_Store
profile
.metadata/

# IntelliJ Idea noise
.idea
/.idea/
*.iws
*.ipr
*.iml
*.releaseBackup

# Eclipse noise
.settings
.settings/*
.project
.classpath
maven-eclipse.xml

# OCI Build noise
/input_ocibuild*/
/output_ocibuild*/

# Other
.externalToolBuilders
71 changes: 71 additions & 0 deletions java/structured_streaming_kafka_word_count/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Overview
This Structured Streaming application shows how to read Kafka stream and calculate word frequencies over one-minute window intervals.

Word count problem is a classical example used in [Structured Streaming Programming Guide](https://spark.apache.org/docs/3.0.2/structured-streaming-programming-guide.html)

## Prerequisites
### For running Locally
* Refer to section on how to set the Prerequisites before you deploy the application locally [Setup Spark locally](https://docs.oracle.com/en-us/iaas/data-flow/data-flow-tutorial/develop-apps-locally/front.htm).
### For running on Data Flow
* Ensure your tenancy is configured according to the Data Flow onboard instructions. [Getting Started with Data Flow](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/dfs_getting_started.htm#set_up_admin)
* Ensure that Data Flow Spark Streaming configuration is also in place. [Getting Started with Spark Streaming](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/spark-streaming.htm#streaming-get-started)

## Instructions
1. Setup OSS Kafka instance. See [Getting Started with Spark Streaming](https://docs.cloud.oracle.com/en-us/iaas/data-flow/using/spark-streaming.htm#streaming-get-started)
2. Prepared /producer/oss-producer-from-file.py for work, download source text and update constants with relevant information.
3. Prepare command line for local and Data Flow based run:
```sh
Usage: StructuredKafkaWordCount <bootstrap-servers> <subscribe-topics> <kafkaAuthentication> <checkpoint-location> <type> ...
<kafkaAuthentication>: plain <username> <password>
<kafkaAuthentication>: RP <stream-pool-id>
<type>: console
<type>: csv <output-location>
```
4. Compile Structured Streaming app (StructuredKafkaWordCount)
5. First start Structured Streaming app (StructuredKafkaWordCount) locally or in the cloud.
6. Second start data producer oss-producer-from-file.py (`python3 oss-producer-from-file.py`) locally or in the cloud.

### To Compile

```sh
mvn package
```

### To Test Locally

```sh
spark-submit --class example.StructuredKafkaWordCount target/StructuredKafkaWordCount.jar <kafka bootstrap server>:9092 <kafka topic> plain <tenancy name>/<user name>/<stream pool id> <user security token> /tmp/checkpoint csv /tmp/output
```
More info on spark-submit [Submitting Applications](https://spark.apache.org/docs/3.0.2/submitting-applications.html) and what is supported by Data Flow [Spark-Submit Functionality in Data Flow](https://docs.oracle.com/en-us/iaas/data-flow/using/spark-submit.htm)

### To use OCI CLI to run the Java Application

```sh
oci data-flow application create \
--compartment-id <compartment_ocid> \
--display-name "StructuredKafkaWordCount" \
--driver-shape VM.Standard2.1 \
--executor-shape VM.Standard2.1 \
--num-executors 1 \
--spark-version 3.0.2 \
--type streaming \
--file-uri "oci://<bucket>@<namespace>/StructuredKafkaWordCount.jar" \
--language Java
--class-name example.StructuredKafkaWordCount
--arguments "<kafka bootstrap server>:9092 <kafka topic> plain <tenancy name>/<user name>/<stream pool id> <user security token> oci://<bucket>@<namespace>/checkpoint csv oci://<bucket>@<namespace>/output"
```
Make note of the Application ID produced.

```sh
oci data-flow run create \
--compartment-id <compartment_ocid> \
--application-id <application_ocid> \
--display-name "CSV to Parquet Java"
```
Arguments can be updated to switch from plain password authentication with Kafka to Data Flow Resource Principal which is more suitable for production scenarios

e.g.
```sh
--arguments "<kafka bootstrap server>:9092 <kafka topic> RP <stream pool id> oci://<bucket>@<namespace>/checkpoint csv oci://<bucket>@<namespace>/output"
```
For more details on OCI CLI configuration options see [OCI CLI Command Reference ](https://docs.oracle.com/en-us/iaas/tools/oci-cli/3.4.4/oci_cli_docs/cmdref/data-flow/application/create.html)
103 changes: 103 additions & 0 deletions java/structured_streaming_kafka_word_count/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>StructuredKafkaWordCount</artifactId>
<version>1.0.0-SNAPSHOT</version>

<properties>
<spark.version>3.0.2</spark.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>com.oracle.oci.sdk</groupId>
<artifactId>oci-java-sdk-addons-sasl</artifactId>
<optional>false</optional>
<version>1.36.1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.12</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<skipAssembly>false</skipAssembly>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
<!-- The final uber jar file name will not have a version component. -->
<finalName>${project.artifactId}</finalName>
<createDependencyReducedPom>false</createDependencyReducedPom>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"/>
</transformers>
<relocations>
<relocation>
<pattern>com.google.</pattern>
<shadedPattern>com.shaded.google.</shadedPattern>
</relocation>
<relocation>
<pattern>com.oracle.bmc.</pattern>
<shadedPattern>com.shaded.oracle.bmc.</shadedPattern>
</relocation>
</relocations>
<!-- exclude signed Manifests -->
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import datetime, re, time
from kafka import KafkaProducer

# Constants
# Download enwik8 from http://mattmahoney.net/dc/textdata.html or any other large enough text
source_file_path = '/tmp/enwik8'
kafka_bootstrap_server = 'cell-1.streaming.us-phoenix-1.oci.oraclecloud.com'
kafka_topic = 'KafkaTopicName'
kafka_username = '[email protected]'
kafka_user_tenancy_name = 'customerTenancyName'
kafka_streampool_id = 'ocid1.streampool.oc1.phx.somehash'
kafka_token = 'sEcUrItY_ToKeN'
read_lines_limit = 101209
cadense_sec = 60

print("Creating Kafka producer...", end=" ")
producer = KafkaProducer(
bootstrap_servers=kafka_bootstrap_server + ':9092',
security_protocol='SASL_SSL', sasl_mechanism='PLAIN',
sasl_plain_username=kafka_user_tenancy_name + '/' + kafka_username + '/' + kafka_streampool_id,
sasl_plain_password=kafka_token)
print("Done.")

print("Streaming... (Press Ctrl+C to cancel)")
file_upload_iteration = 0
while True:
time_key = datetime.datetime.utcnow()
file_upload_iteration += 1

print(f"Iteration {file_upload_iteration}, key={time_key}", end=" ", flush=True)
iteration_start_time = time.time()
lines_counter = 0
with open(source_file_path) as fp:
for line in fp:
if lines_counter > read_lines_limit:
break

word_list = list(filter(None, re.split("\W+", line.strip())))
if not word_list:
continue

text = time_key.isoformat() + " " + ' '.join(word_list)
producer.send(kafka_topic, key=time_key.isoformat().encode('utf-8'), value=text.encode('utf-8'))
lines_counter += 1

iteration_end_time = time.time()
iteration_time_lapsed = iteration_end_time - iteration_start_time
print(f" - Done. {iteration_time_lapsed:.2f} sec.")
time_key += datetime.timedelta(seconds=cadense_sec)
alignment_sleep_time = cadense_sec-iteration_time_lapsed
if alignment_sleep_time > 0:
time.sleep(alignment_sleep_time)
Loading

0 comments on commit 33f7816

Please sign in to comment.