-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspark_streaming.scala
123 lines (99 loc) · 4.87 KB
/
spark_streaming.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// to start spark streaming
./spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1 ...
// Connecting Kafka
val UsYoutubeDf = spark.readStream.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "usyoutube").load
// import StructType for Schema (StringType, IntegerType, BooleanType)
import org.apache.spark.sql.types._
// Defining Schema
val activationSchema = StructType(List( StructField("video_id", StringType, true),
StructField("title", StringType, true),
StructField("published_at", StringType, true),
StructField("channel_id", StringType, true),
StructField("channel_title", StringType, true),
StructField("category_id", IntegerType, true),
StructField("trending_date", StringType, true),
StructField("view_count", IntegerType, true),
StructField("likes", IntegerType, true),
StructField("dislikes", IntegerType, true),
StructField("comment_count", IntegerType, true),
StructField("comments_disabled", BooleanType, true),
StructField("ratings_disabled", BooleanType, true),
StructField("category_title", StringType, true)))
// JSON to df
val youTubeSchemaDf = UsYoutubeDf
.select(from_json($"value".cast("string"), activationSchema)
.alias("usYoutube"))
.select("usYoutube.*")
youTubeSchemaDf.printSchema
/*
root
|-- video_id: string (nullable = true)
|-- title: string (nullable = true)
|-- published_at: string (nullable = true)
|-- channel_id: string (nullable = true)
|-- channel_title: string (nullable = true)
|-- category_id: integer (nullable = true)
|-- trending_date: string (nullable = true)
|-- view_count: integer (nullable = true)
|-- likes: integer (nullable = true)
|-- dislikes: integer (nullable = true)
|-- comment_count: integer (nullable = true)
|-- comments_disabled: boolean (nullable = true)
|-- ratings_disabled: boolean (nullable = true)
|-- category_title: string (nullable = true)
*/
//mostviewedchannel
val mostviewedchannelDf = youTubeSchemaDf
.groupBy("trending_date","channel_title")
.agg(sum("view_count").as("total_view_count_by_channel"))
.sort($"total_view_count_by_channel".desc)
import org.apache.spark.sql.streaming.Trigger.ProcessingTime
val mostviewedchannelDfQuery = mostviewedchannelDf
.selectExpr("CAST(trending_date as STRING)","CAST(channel_title AS STRING)", "CAST(total_view_count_by_channel AS INTEGER)", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("checkpointLocation", "/home/tugrulgkc34/consumermostviewedchannel")
.option("failOnDataLoss", "false")
.outputMode("complete")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "mostviewedchannel").start()
//mostviewedcategory
val mostviewedcategoryDf = youTubeSchemaDf
.groupBy("trending_date","category_title")
.agg(sum("view_count").as("total_view_count_by_category"))
.sort($"total_view_count_by_category".desc)
val mostviewedcategoryDfQuery = mostviewedcategoryDf
.selectExpr("CAST(trending_date as STRING)","CAST(category_title AS STRING)", "CAST(total_view_count_by_category AS INTEGER)", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("checkpointLocation", "/home/tugrulgkc34/consumermostviewedcategory")
.option("failOnDataLoss", "false")
.outputMode("complete")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "mostviewedcategory").start()
//mostlikedchannel
val mostlikedchannelDf = youTubeSchemaDf
.groupBy("trending_date","channel_title")
.agg(sum("likes").as("total_likes_count_by_channel"))
.sort($"total_likes_count_by_channel".desc)
val mostlikedchannelDfDfQuery = mostlikedchannelDf
.selectExpr("CAST(trending_date as STRING)","CAST(channel_title AS STRING)", "CAST(total_likes_count_by_channel AS INTEGER)", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("checkpointLocation", "/home/tugrulgkc34/consumermostlikedchannel")
.option("failOnDataLoss", "false")
.outputMode("complete")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "mostlikedchannel").start()
//mostlikedcategory
val mostlikedcategoryDf = youTubeSchemaDf
.groupBy("trending_date","category_title")
.agg(sum("likes").as("total_likes_count_by_category"))
.sort($"total_likes_count_by_category".desc)
val mostlikedcategoryDfQuery = mostlikedcategoryDf
.selectExpr("CAST(trending_date as STRING)","CAST(category_title AS STRING)", "CAST(total_likes_count_by_category AS INTEGER)", "to_json(struct(*)) AS value")
.writeStream.format("kafka")
.option("checkpointLocation", "/home/tugrulgkc34/consumermostlikedcategory")
.option("failOnDataLoss", "false")
.outputMode("complete")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "mostlikedcategory").start()