Skip to content

Commit

Permalink
upgrade: scala -> 2.12.12, spark -> 3.1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhaoHonghong committed Dec 14, 2021
1 parent fe429eb commit 90bd63f
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 21 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ allprojects {

// The gradle variables defined here are visible in sub-projects
ext {
sparkVersion = '2.1.0'
sparkVersion = '3.1.1'
}

subprojects {
Expand Down Expand Up @@ -93,7 +93,7 @@ subprojects {
}

tasks.withType(ScalaCompile) {
scalaCompileOptions.additionalParameters = ["-feature", "-deprecation", "-verbose", "-optimize", "-unchecked", "-Yinline-warnings", "-g:vars"]
scalaCompileOptions.additionalParameters = ["-feature", "-deprecation", "-verbose", "-unchecked", "-g:vars"]

configure(scalaCompileOptions.forkOptions) {
memoryMaximumSize = '1g'
Expand Down
4 changes: 2 additions & 2 deletions scanns/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ apply plugin: 'scala'
dependencies {
// This is Scala's option parsing library
compile("com.github.scopt:scopt$scalaSuffix:3.5.0")
compile("com.databricks:spark-avro$scalaSuffix:3.2.0")
compile("org.apache.spark:spark-mllib$scalaSuffix:2.1.0")
// compile("org.apache.spark:spark-avro$scalaSuffix:3.2.0")
compile("org.apache.spark:spark-mllib$scalaSuffix:3.1.1")

testCompile(project(":test-utils$scalaSuffix"))
testCompile("org.mockito:mockito-core:1.+")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package com.linkedin.nn

import com.databricks.spark.avro._
import com.linkedin.nn.Types.ItemId
import com.linkedin.nn.algorithm.{BruteForceNNS, CosineSignRandomProjectionNNS, JaccardMinHashNNS, L2ScalarRandomProjectionNNS}
import com.linkedin.nn.params.NNSCLIParams
Expand Down Expand Up @@ -60,7 +59,7 @@ object NearestNeighborSearchDriver {
sparkSession: SparkSession): RDD[(ItemId, Vector)] = {

val numFeatures = featureIndexMapB.value.size
sparkSession.read.avro(path)
sparkSession.read.parquet(path)
.select(idColumnName, attributesColumnName)
.rdd
.map(r =>
Expand Down Expand Up @@ -92,7 +91,7 @@ object NearestNeighborSearchDriver {
StructField("itemId", LongType, nullable = false),
StructField("candidateId", LongType, nullable = false),
StructField("distance", DoubleType, nullable = false)))
sparkSession.createDataFrame(output, schema).write.mode(SaveMode.Overwrite).avro(outputPath)
sparkSession.createDataFrame(output, schema).write.mode(SaveMode.Overwrite).parquet(outputPath)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ abstract class LSHNearestNeighborSearchModel[T <: LSHNearestNeighborSearchModel[
*/
private[model] class NearestNeighborIterator(bucketsIt: Iterator[Array[mutable.ArrayBuffer[ItemId]]],
itemVectors: mutable.Map[ItemId, Vector],
numNearestNeighbors: Int) extends Iterator[(ItemId, Iterator[ItemIdDistancePair])]
numNearestNeighbors: Int) extends Iterator[(ItemId, IndexedSeq[ItemIdDistancePair])]
with Serializable {

// this will be the next element that the iterator returns on a call to next()
private var nextResult: Option[(ItemId, Iterator[ItemIdDistancePair])] = None
private var nextResult: Option[(ItemId, IndexedSeq[ItemIdDistancePair])] = None

// this is the current tuple in the bucketsIt iterator that is being scanned
private var currentTuple = if (bucketsIt.hasNext) Some(bucketsIt.next) else None
Expand All @@ -77,7 +77,7 @@ abstract class LSHNearestNeighborSearchModel[T <: LSHNearestNeighborSearchModel[
.map(c => (c, distance.compute(itemVectors(c), itemVectors(x(0)(currentIndex)))))
.foreach(queue.enqueue(_))
if (queue.nonEmpty()) {
nextResult = Some((x(0)(currentIndex), queue.iterator()))
nextResult = Some((x(0)(currentIndex), queue.iterator().toIndexedSeq))
done = true
}
currentIndex += 1
Expand All @@ -98,7 +98,7 @@ abstract class LSHNearestNeighborSearchModel[T <: LSHNearestNeighborSearchModel[

override def hasNext: Boolean = nextResult.isDefined

override def next(): (ItemId, Iterator[ItemIdDistancePair]) = {
override def next(): (ItemId, IndexedSeq[ItemIdDistancePair]) = {
if (hasNext) {
val ret = nextResult.get
populateNext()
Expand Down Expand Up @@ -261,6 +261,9 @@ abstract class LSHNearestNeighborSearchModel[T <: LSHNearestNeighborSearchModel[
} else {
explodeData(transform(candidatePool)).partitionBy(hashPartitioner)
}
val zero: TopNQueue = new TopNQueue(k)
def seqOp(U: TopNQueue, V: IndexedSeq[ItemIdDistancePair]): TopNQueue = {U.enqueue(V:_*); U}
def combOp(X: TopNQueue, Y: TopNQueue): TopNQueue = {X.enqueue(Y.iterator().toSeq:_*); X}
srcItemsExploded.zipPartitions(candidatePoolExploded) {
case (srcIt, candidateIt) => {
val itemVectors = mutable.Map[ItemId, Vector]()
Expand All @@ -284,14 +287,8 @@ abstract class LSHNearestNeighborSearchModel[T <: LSHNearestNeighborSearchModel[
new NearestNeighborIterator(hashBuckets.valuesIterator, itemVectors, k)
}
}
.groupByKey()
.mapValues { candidateIter =>
val topN = new TopNQueue(k)
candidateIter.flatten.foreach(topN.enqueue(_))
topN.iterator()
}
.flatMap{ x => x._2.map(z => (x._1, z._1, z._2)) }
.repartition($(numOutputPartitions))
.aggregateByKey(zero, $(numOutputPartitions))(seqOp, combOp)
.flatMap{ x => x._2.iterator().map(z => (x._1, z._1, z._2)) }
}

/**
Expand Down
4 changes: 2 additions & 2 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ apply plugin: 'scala-cross-build'

scalaCrossBuild {

defaultScalaVersion '2.11.8'
targetScalaVersions '2.10.6','2.11.8'
defaultScalaVersion '2.12.12'
targetScalaVersions '2.12.12'
buildDefaultOnly false

projectsToCrossBuild(
Expand Down

0 comments on commit 90bd63f

Please sign in to comment.