From 35b781ef5106a2da330121ac7142d26c46a8e6c5 Mon Sep 17 00:00:00 2001 From: George Tzinos Date: Wed, 6 Jun 2018 12:11:58 +0300 Subject: [PATCH] Code optimization --- src/main/scala/utils/JoinEdges.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/utils/JoinEdges.scala b/src/main/scala/utils/JoinEdges.scala index d81c807..574a9c5 100644 --- a/src/main/scala/utils/JoinEdges.scala +++ b/src/main/scala/utils/JoinEdges.scala @@ -26,9 +26,9 @@ class JoinEdges extends Serializable { //Neighbors for each node def getNeighbors(ss: SparkSession, dataset: RDD[Array[String]]) = { val edges = dataset.map(item => (item(0).toLong, item(1).toLong)) - val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong)) - val edgesGrouped = edges.groupByKey() + + val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong)) val edgesRevertedGrouped = edgesReverted.groupByKey() val neighbors = edgesGrouped.union(edgesRevertedGrouped).groupByKey().mapValues(item => item.flatten.toSeq) @@ -36,7 +36,6 @@ class JoinEdges extends Serializable { import ss.implicits._ neighbors.toDF().createOrReplaceTempView("neighbors") - val sortedEdges = edges.map(item => if (item._1 > item._2) (item._2: Long, item._1: Long) else (item._1: Long, item._2: Long)) - sortedEdges.toDF().createOrReplaceTempView("allEdges") + edges.toDF().createOrReplaceTempView("allEdges") } }