Skip to content

Commit

Permalink
use new format for reading/writing on spark
Browse files Browse the repository at this point in the history
  • Loading branch information
Gerard Casas Saez committed Jun 25, 2019
1 parent 99bb4f5 commit 0d6748b
Showing 1 changed file with 6 additions and 2 deletions.
8 changes: 6 additions & 2 deletions media-spark/src/main/java/MediaSpark.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@


import java.io.File;
import java.util.Date;

public class MediaSpark {
public static void main(String[] args) {
Expand All @@ -17,9 +18,10 @@ public static void main(String[] args) {
SparkSession spark = SparkSession.builder().appName("Media Spark").getOrCreate();
//Dataset<String> logData = spark.read().textFile(logFile).cache();

String eventName = args[0];
// A JSON dataset is pointed to by path
// Iterate through the directory to input all the timeline JSON files from an event
Dataset<Row> timeline = spark.read().json(args[0]);
Dataset<Row> timeline = spark.read().json(String.format("gs://epic-collect/%s/*/*/*/*/*", eventName));

// Creates a temporary view using the DataFrame
timeline.createOrReplaceTempView("timeline");
Expand All @@ -34,8 +36,10 @@ public static void main(String[] args) {
// Select the media id, the image link, and the tweet url from the media block
Dataset<Row> namesDF2 = spark.sql("SELECT user, col.id AS media_id, col.media_url_https AS image_link, col.expanded_url AS tweet_url FROM expTimeline");

long count = namesDF2.count();

// Write the result of the query to our destination JSON file
namesDF2.coalesce(1).write().mode(SaveMode.Overwrite).json(args[1]);
namesDF2.coalesce(1).write().mode(SaveMode.Overwrite).json(String.format("gs://epic-analysis-results/spark/media/%s/%d/%d/", eventName, (new Date()).getTime(), count));

// Stop Spark session
spark.stop();
Expand Down

0 comments on commit 0d6748b

Please sign in to comment.