Skip to content

Commit

Permalink
Code optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
gtzinos committed Jun 6, 2018
1 parent 9ee2ebe commit 35b781e
Showing 1 changed file with 3 additions and 4 deletions.
7 changes: 3 additions & 4 deletions src/main/scala/utils/JoinEdges.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@ class JoinEdges extends Serializable {
//Neighbors for each node
def getNeighbors(ss: SparkSession, dataset: RDD[Array[String]]) = {
val edges = dataset.map(item => (item(0).toLong, item(1).toLong))
val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong))

val edgesGrouped = edges.groupByKey()

val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong))
val edgesRevertedGrouped = edgesReverted.groupByKey()

val neighbors = edgesGrouped.union(edgesRevertedGrouped).groupByKey().mapValues(item => item.flatten.toSeq)

import ss.implicits._
neighbors.toDF().createOrReplaceTempView("neighbors")

val sortedEdges = edges.map(item => if (item._1 > item._2) (item._2: Long, item._1: Long) else (item._1: Long, item._2: Long))
sortedEdges.toDF().createOrReplaceTempView("allEdges")
edges.toDF().createOrReplaceTempView("allEdges")
}
}

0 comments on commit 35b781e

Please sign in to comment.