diff --git a/src/main/scala/utils/JoinEdges.scala b/src/main/scala/utils/JoinEdges.scala index d81c807..574a9c5 100644 --- a/src/main/scala/utils/JoinEdges.scala +++ b/src/main/scala/utils/JoinEdges.scala @@ -26,9 +26,9 @@ class JoinEdges extends Serializable { //Neighbors for each node def getNeighbors(ss: SparkSession, dataset: RDD[Array[String]]) = { val edges = dataset.map(item => (item(0).toLong, item(1).toLong)) - val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong)) - val edgesGrouped = edges.groupByKey() + + val edgesReverted = dataset.map(item => (item(1).toLong, item(0).toLong)) val edgesRevertedGrouped = edgesReverted.groupByKey() val neighbors = edgesGrouped.union(edgesRevertedGrouped).groupByKey().mapValues(item => item.flatten.toSeq) @@ -36,7 +36,6 @@ class JoinEdges extends Serializable { import ss.implicits._ neighbors.toDF().createOrReplaceTempView("neighbors") - val sortedEdges = edges.map(item => if (item._1 > item._2) (item._2: Long, item._1: Long) else (item._1: Long, item._2: Long)) - sortedEdges.toDF().createOrReplaceTempView("allEdges") + edges.toDF().createOrReplaceTempView("allEdges") } }