-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathconsumer.py
63 lines (52 loc) · 1.86 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import re
import findspark
from pyspark.ml import PipelineModel
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, udf
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
if __name__ == "__main__":
findspark.init()
# Path to the pre-trained model
path_to_model = r''
# Config
spark = SparkSession \
.builder \
.master("local[*]") \
.appName("TwitterSentimentAnalysis") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2") \
.getOrCreate()
# Spark Context
sc = spark.sparkContext
sc.setLogLevel('ERROR')
# Schema for the incoming data
schema = StructType([StructField("message", StringType())])
# Read the data from kafka
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "twitter") \
.option("startingOffsets", "latest") \
.option("header", "true") \
.load() \
.selectExpr("CAST(value AS STRING) as message")
df = df \
.withColumn("value", from_json("message", schema))
# Pre-processing the data
pre_process = udf(
lambda x: re.sub(r'[^A-Za-z\n ]|(http\S+)|(www.\S+)', '', x.lower().strip()).split(), ArrayType(StringType())
)
df = df.withColumn("cleaned_data", pre_process(df.message)).dropna()
# Load the pre-trained model
pipeline_model = PipelineModel.load(path_to_model)
# Make predictions
prediction = pipeline_model.transform(df)
# Select the columns of interest
prediction = prediction.select(prediction.message, prediction.prediction)
# Print prediction in console
prediction \
.writeStream \
.format("console") \
.outputMode("update") \
.start() \
.awaitTermination()