From 222ac0cf218070ed36f871dba3816eebc7bd4fe9 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Sun, 17 May 2015 20:16:34 -0700 Subject: [PATCH 1/8] Add ImageNet SIFT+LCS+FV pipeline --- src/main/scala/loaders/ImageNetLoader.scala | 2 + .../images/imagenet/ImageNetSiftLcsFV.scala | 252 ++++++++++++++++++ 2 files changed, 254 insertions(+) create mode 100644 src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala diff --git a/src/main/scala/loaders/ImageNetLoader.scala b/src/main/scala/loaders/ImageNetLoader.scala index 6e29a623..e0ce53a1 100644 --- a/src/main/scala/loaders/ImageNetLoader.scala +++ b/src/main/scala/loaders/ImageNetLoader.scala @@ -9,6 +9,8 @@ import utils.LabeledImage */ object ImageNetLoader { + + val NUM_CLASSES = 1000 /** * Loads images from @dataPath and associates images with the labels provided in @labelPath diff --git a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala new file mode 100644 index 00000000..ae356842 --- /dev/null +++ b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala @@ -0,0 +1,252 @@ +package pipelines.images.imagenet + +import java.io.File + +import breeze.linalg._ +import breeze.stats._ + +import scopt.OptionParser + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.rdd.RDD + +import evaluation.MulticlassClassifierEvaluator +import loaders.ImageNetLoader +import pipelines.Logging + +import nodes.images.external.{FisherVector, SIFTExtractor} +import nodes.images._ +import nodes.learning._ +import nodes.stats.{ColumnSampler, NormalizeRows, SignedHellingerMapper} +import nodes.util.{FloatToDouble, MatrixVectorizer, Cacher} +import nodes.util.{ClassLabelIndicatorsFromIntLabels, ZipVectors, MaxClassifier} + +import utils.Image + +object ImageNetSiftLcsFV extends Serializable with Logging { + val appName = "ImageNetSiftLcsFV" + + def getSiftFeatures( + conf: ImageNetSiftLcsFVConfig, + trainParsed: RDD[Image], + testParsed: RDD[Image]): (RDD[DenseVector[Double]], RDD[DenseVector[Double]]) = { + // Part 1: Scale and convert images to grayscale. + val grayscaler = PixelScaler then GrayScaler + val grayRDD = grayscaler(trainParsed) + + // Part 1a: If necessary, perform PCA on samples of the SIFT features, or load a PCA matrix from + // disk. + val pcaTransformer = conf.siftPcaFile match { + case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) + case None => { + val pcapipe = new SIFTExtractor(scaleStep = conf.siftScaleStep) then + new ColumnSampler(conf.numPcaSamples) + val pca = new PCAEstimator(conf.descDim).fit(pcapipe(grayRDD)) + + new BatchPCATransformer(pca.pcaMat) + } + } + + // Part 2: Compute dimensionality-reduced PCA features. + val featurizer = new SIFTExtractor(scaleStep = conf.siftScaleStep) then + pcaTransformer + val pcaTransformedRDD = featurizer(grayRDD) + + // Part 2a: If necessary, compute a GMM based on the dimensionality-reduced features, or load + // from disk. + val gmm = conf.siftGmmMeanFile match { + case Some(f) => + new GaussianMixtureModel( + csvread(new File(conf.siftGmmMeanFile.get)), + csvread(new File(conf.siftGmmVarFile.get)), + csvread(new File(conf.siftGmmWtsFile.get)).toDenseVector) + case None => + val sampler = new ColumnSampler(conf.numGmmSamples) + new GaussianMixtureModelEstimator(conf.vocabSize) + .fit(sampler(pcaTransformedRDD).map(convert(_, Double))) + } + + // Part 3: Compute Fisher Vectors and signed-square-root normalization. + val fisherFeaturizer = new FisherVector(gmm) then + FloatToDouble then + MatrixVectorizer then + NormalizeRows then + SignedHellingerMapper then + NormalizeRows then + new Cacher[DenseVector[Double]] + + val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) + + val testFeatures = (grayscaler then featurizer then fisherFeaturizer).apply(testParsed) + (trainingFeatures, testFeatures) + } + + def getLcsFeatures( + conf: ImageNetSiftLcsFVConfig, + trainParsed: RDD[Image], + testParsed: RDD[Image]): (RDD[DenseVector[Double]], RDD[DenseVector[Double]]) = { + // Part 1a: If necessary, perform PCA on samples of the LCS features, or load a PCA matrix from + // disk. + val pcaTransformer = conf.lcsPcaFile match { + case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) + case None => { + val pcapipe = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then + new ColumnSampler(conf.numPcaSamples) + val pca = new PCAEstimator(conf.descDim).fit(pcapipe(trainParsed)) + + new BatchPCATransformer(pca.pcaMat) + } + } + + // Part 2: Compute dimensionality-reduced PCA features. + val featurizer = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then + pcaTransformer + val pcaTransformedRDD = featurizer(trainParsed) + + // Part 2a: If necessary, compute a GMM based on the dimensionality-reduced features, or load + // from disk. + val gmm = conf.lcsGmmMeanFile match { + case Some(f) => + new GaussianMixtureModel( + csvread(new File(conf.lcsGmmMeanFile.get)), + csvread(new File(conf.lcsGmmVarFile.get)), + csvread(new File(conf.lcsGmmWtsFile.get)).toDenseVector) + case None => + val sampler = new ColumnSampler(conf.numGmmSamples) + new GaussianMixtureModelEstimator(conf.vocabSize) + .fit(sampler(pcaTransformedRDD).map(convert(_, Double))) + } + + // Part 3: Compute Fisher Vectors and signed-square-root normalization. + val fisherFeaturizer = new FisherVector(gmm) then + FloatToDouble then + MatrixVectorizer then + NormalizeRows then + SignedHellingerMapper then + NormalizeRows then + new Cacher[DenseVector[Double]] + + val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) + + val testFeatures = (featurizer then fisherFeaturizer).apply(testParsed) + (trainingFeatures, testFeatures) + } + + def run(sc: SparkContext, conf: ImageNetSiftLcsFVConfig) { + // Load the data and extract training labels. + val parsedRDD = ImageNetLoader( + sc, + conf.trainLocation, + conf.labelPath) + + val labelGrabber = LabelExtractor then + ClassLabelIndicatorsFromIntLabels(ImageNetLoader.NUM_CLASSES) then + new Cacher[DenseVector[Double]] + val trainingLabels = labelGrabber(parsedRDD) + + // Load test data and get actual labels + val testParsedRDD = ImageNetLoader( + sc, + conf.testLocation, + conf.labelPath) + val testActual = (LabelExtractor then new Cacher[Int]).apply(testParsedRDD) + + // Get SIFT + FV features + val (trainSift, testSift) = getSiftFeatures(conf, ImageExtractor(parsedRDD), + ImageExtractor(testParsedRDD)) + + // Get LCS + FV features + val (trainLcs, testLcs) = getLcsFeatures(conf, ImageExtractor(parsedRDD), + ImageExtractor(testParsedRDD)) + + val trainingFeatures = ZipVectors(Seq(trainSift, trainLcs)) + val testFeatures = ZipVectors(Seq(testSift, testLcs)) + + // Fit a weighted least squares model to the data. + val model = new BlockWeightedLeastSquaresEstimator( + 4096, 1, conf.lambda, conf.mixtureWeight).fit(trainingFeatures, trainingLabels) + + // Apply the model to test data and compute test error + val testPredictedValues = model(testFeatures) + val testPredicted = MaxClassifier(testPredictedValues) + val evaluator = MulticlassClassifierEvaluator(testPredicted, testActual, + ImageNetLoader.NUM_CLASSES) + + logInfo("TEST Error is " + (100.0 * evaluator.totalError) + "%") + } + + case class ImageNetSiftLcsFVConfig( + trainLocation: String = "", + testLocation: String = "", + labelPath: String = "", + lambda: Double = 1e-4, + mixtureWeight: Double = 0.25, + descDim: Int = 64, + vocabSize: Int = 16, + siftScaleStep: Int = 0, + lcsStride: Int = 4, + lcsBorder: Int = 16, + lcsPatch: Int = 6, + siftPcaFile: Option[String] = None, + siftGmmMeanFile: Option[String]= None, + siftGmmVarFile: Option[String] = None, + siftGmmWtsFile: Option[String] = None, + lcsPcaFile: Option[String] = None, + lcsGmmMeanFile: Option[String]= None, + lcsGmmVarFile: Option[String] = None, + lcsGmmWtsFile: Option[String] = None, + numPcaSamples: Int = 1e6.toInt, + numGmmSamples: Int = 1e6.toInt) + + def parse(args: Array[String]): ImageNetSiftLcsFVConfig = { + new OptionParser[ImageNetSiftLcsFVConfig](appName) { + head(appName, "0.1") + help("help") text("prints this usage text") + opt[String]("trainLocation") required() action { (x,c) => c.copy(trainLocation=x) } + opt[String]("testLocation") required() action { (x,c) => c.copy(testLocation=x) } + opt[String]("labelPath") required() action { (x,c) => c.copy(labelPath=x) } + + // Solver params + opt[Double]("lambda") action { (x,c) => c.copy(lambda=x) } + opt[Double]("mixtureWeight") action { (x,c) => c.copy(mixtureWeight=x) } + + // PCA, GMM params + opt[Int]("descDim") action { (x,c) => c.copy(descDim=x) } + opt[Int]("vocabSize") action { (x,c) => c.copy(vocabSize=x) } + opt[Int]("numPcaSamples") action { (x,c) => c.copy(numPcaSamples=x) } + opt[Int]("numGmmSamples") action { (x,c) => c.copy(numGmmSamples=x) } + + // SIFT, LCS params + opt[Int]("siftScaleStep") action { (x,c) => c.copy(siftScaleStep=x) } + opt[Int]("lcsStride") action { (x,c) => c.copy(lcsStride=x) } + opt[Int]("lcsBorder") action { (x,c) => c.copy(lcsBorder=x) } + opt[Int]("lcsPatch") action { (x,c) => c.copy(lcsPatch=x) } + + // Optional file to load stuff from + opt[String]("siftPcaFile") action { (x,c) => c.copy(siftPcaFile=Some(x)) } + opt[String]("siftGmmMeanFile") action { (x,c) => c.copy(siftGmmMeanFile=Some(x)) } + opt[String]("siftGmmVarFile") action { (x,c) => c.copy(siftGmmVarFile=Some(x)) } + opt[String]("siftGmmWtsFile") action { (x,c) => c.copy(siftGmmWtsFile=Some(x)) } + + opt[String]("lcsPcaFile") action { (x,c) => c.copy(lcsPcaFile=Some(x)) } + opt[String]("lcsGmmMeanFile") action { (x,c) => c.copy(lcsGmmMeanFile=Some(x)) } + opt[String]("lcsGmmVarFile") action { (x,c) => c.copy(lcsGmmVarFile=Some(x)) } + opt[String]("lcsGmmWtsFile") action { (x,c) => c.copy(lcsGmmWtsFile=Some(x)) } + }.parse(args, ImageNetSiftLcsFVConfig()).get + } + + /** + * The actual driver receives its configuration parameters from spark-submit usually. + * @param args + */ + def main(args: Array[String]) = { + val appConfig = parse(args) + + val conf = new SparkConf().setAppName(appName) + conf.setIfMissing("spark.master", "local[2]") + val sc = new SparkContext(conf) + run(sc, appConfig) + + sc.stop() + } +} From 6d11876dce47efafa6ac59e3bab58571ce875db3 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 08:00:46 +0000 Subject: [PATCH 2/8] Fix shape of LCS output --- src/main/scala/nodes/images/LCSExtractor.scala | 6 +++--- src/test/scala/nodes/images/LCSExtractorSuite.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/scala/nodes/images/LCSExtractor.scala b/src/main/scala/nodes/images/LCSExtractor.scala index b334c5bb..c9bcaa05 100644 --- a/src/main/scala/nodes/images/LCSExtractor.scala +++ b/src/main/scala/nodes/images/LCSExtractor.scala @@ -90,7 +90,7 @@ class LCSExtractor( c = c + 1 } - val lcsValues = new DenseMatrix[Float](numPoolsX * numPoolsY, numLCSValues) + val lcsValues = new DenseMatrix[Float](numLCSValues, numPoolsX * numPoolsY) var lcsIdx = 0 // Start at strideStart in (x, y) and @@ -114,10 +114,10 @@ class LCSExtractor( for (nx <- subPatchRange; ny <- subPatchRange) { // lcsValues(lcsIdx) = means(c).get((xPos + nx), (yPos + ny), 0) - lcsValues(xKeyPoint * numPoolsY + yKeyPoint, lcsIdx) = + lcsValues(lcsIdx, xKeyPoint * numPoolsY + yKeyPoint) = means(c).get((xPos + nx), (yPos + ny), 0).toFloat lcsIdx = lcsIdx + 1 - lcsValues(xKeyPoint * numPoolsY + yKeyPoint, lcsIdx) = + lcsValues(lcsIdx, xKeyPoint * numPoolsY + yKeyPoint) = stds(c).get((xPos + nx), (yPos + ny), 0).toFloat lcsIdx = lcsIdx + 1 } diff --git a/src/test/scala/nodes/images/LCSExtractorSuite.scala b/src/test/scala/nodes/images/LCSExtractorSuite.scala index f7c00da1..a05b1adb 100644 --- a/src/test/scala/nodes/images/LCSExtractorSuite.scala +++ b/src/test/scala/nodes/images/LCSExtractorSuite.scala @@ -13,7 +13,7 @@ class LCSExtractorSuite extends FunSuite with Logging { val lf = new LCSExtractor(stride=4, subPatchSize=6, strideStart=16) val lcsDescriptors = convert(lf.apply(testImage), Double) - val firstKeyPointSum = sum(lcsDescriptors(0, ::)) + val firstKeyPointSum = sum(lcsDescriptors(::, 0)) val fullFeatureSum = sum(lcsDescriptors) // Values found from running matlab code on same input file. From 8ef4b580e1f530878991af58fa7ac23dac44bb75 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 08:01:42 +0000 Subject: [PATCH 3/8] Add ability to pass on numFeatures to VectorSplitter --- .../scala/nodes/learning/BlockLinearMapper.scala | 13 ++++++++++--- .../nodes/learning/BlockWeightedLeastSquares.scala | 11 +++++++++-- src/main/scala/nodes/util/VectorSplitter.scala | 10 +++++++--- 3 files changed, 26 insertions(+), 8 deletions(-) diff --git a/src/main/scala/nodes/learning/BlockLinearMapper.scala b/src/main/scala/nodes/learning/BlockLinearMapper.scala index a237849b..3617c7f6 100644 --- a/src/main/scala/nodes/learning/BlockLinearMapper.scala +++ b/src/main/scala/nodes/learning/BlockLinearMapper.scala @@ -136,7 +136,7 @@ class BlockLeastSquaresEstimator(blockSize: Int, numIter: Int, lambda: Double = val b = RowPartitionedMatrix.fromArray( labelScaler.apply(trainingLabels).map(_.toArray)).cache() val numRows = Some(b.numRows()) - val numCols = Some(trainingFeatures.head.first().length.toLong) + val numCols = Some(blockSize.toLong) // NOTE: This will cause trainingFeatures to be evaluated twice // which might not be optimal if its not cached ? @@ -159,8 +159,15 @@ class BlockLeastSquaresEstimator(blockSize: Int, numIter: Int, lambda: Double = override def fit( trainingFeatures: RDD[DenseVector[Double]], trainingLabels: RDD[DenseVector[Double]]): BlockLinearMapper = { - val vectorSplitter = new VectorSplitter(blockSize) + fit(trainingFeatures, trainingLabels, None) + } + + def fit( + trainingFeatures: RDD[DenseVector[Double]], + trainingLabels: RDD[DenseVector[Double]], + numFeaturesOpt: Option[Int]): BlockLinearMapper = { + val vectorSplitter = new VectorSplitter(blockSize, numFeaturesOpt) val featureBlocks = vectorSplitter.apply(trainingFeatures) - fit(featureBlocks, trainingLabels) + fit(featureBlocks, trainingLabels) } } diff --git a/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala b/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala index e1a9c13e..f27bada7 100644 --- a/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala +++ b/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala @@ -45,7 +45,14 @@ class BlockWeightedLeastSquaresEstimator( override def fit( trainingFeatures: RDD[DenseVector[Double]], trainingLabels: RDD[DenseVector[Double]]): BlockLinearMapper = { - val trainingFeaturesSplit = new VectorSplitter(blockSize).apply(trainingFeatures) + fit(trainingFeatures, trainingLabels, None) + } + + def fit( + trainingFeatures: RDD[DenseVector[Double]], + trainingLabels: RDD[DenseVector[Double]], + numFeaturesOpt: Option[Int]): BlockLinearMapper = { + val trainingFeaturesSplit = new VectorSplitter(blockSize, numFeaturesOpt).apply(trainingFeatures) fit(trainingFeaturesSplit, trainingLabels) } @@ -99,7 +106,7 @@ object BlockWeightedLeastSquaresEstimator extends Logging { // Initialize models to zero here. Each model is a (W, b) // NOTE: We get first element from every training block here val models = trainingFeatures.map { block => - val blockSize = block.first.length + // val blockSize = block.first.length DenseMatrix.zeros[Double](blockSize, nClasses) }.toArray diff --git a/src/main/scala/nodes/util/VectorSplitter.scala b/src/main/scala/nodes/util/VectorSplitter.scala index f3dd4ebd..6f43bdbf 100644 --- a/src/main/scala/nodes/util/VectorSplitter.scala +++ b/src/main/scala/nodes/util/VectorSplitter.scala @@ -7,9 +7,13 @@ import pipelines.FunctionNode /** * This transformer splits the input vector into a number of blocks. */ -class VectorSplitter(blockSize: Int) extends FunctionNode[RDD[DenseVector[Double]], Seq[RDD[DenseVector[Double]]]] { +class VectorSplitter( + blockSize: Int, + numFeaturesOpt: Option[Int] = None) + extends FunctionNode[RDD[DenseVector[Double]], Seq[RDD[DenseVector[Double]]]] { + override def apply(in: RDD[DenseVector[Double]]): Seq[RDD[DenseVector[Double]]] = { - val numFeatures = in.first.length + val numFeatures = numFeaturesOpt.getOrElse(in.first.length) val numBlocks = math.ceil(numFeatures.toDouble / blockSize).toInt (0 until numBlocks).map { blockNum => in.map { vec => @@ -20,7 +24,7 @@ class VectorSplitter(blockSize: Int) extends FunctionNode[RDD[DenseVector[Double } def splitVector(in: DenseVector[Double]): Seq[DenseVector[Double]] = { - val numFeatures = in.length + val numFeatures = numFeaturesOpt.getOrElse(in.length) val numBlocks = math.ceil(numFeatures.toDouble / blockSize).toInt (0 until numBlocks).map { blockNum => // Expliclity call toArray as breeze's slice is lazy From c9e21edfed7a72ae4475eff41d678e6b48489592 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 08:02:10 +0000 Subject: [PATCH 4/8] VOC: Set stepScale back to zero. Also pass in numFeatures to block solver --- src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala b/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala index 70aa4451..70812d23 100644 --- a/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala +++ b/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala @@ -80,7 +80,8 @@ object VOCSIFTFisher extends Serializable { val trainingFeatures = fisherFeaturizer(firstCachedRDD) // Part 4: Fit a linear model to the data. - val model = new BlockLeastSquaresEstimator(4096, 1, conf.lambda).fit(trainingFeatures, trainingLabels) + val model = new BlockLeastSquaresEstimator(4096, 1, conf.lambda).fit( + trainingFeatures, trainingLabels, Some(2 * conf.descDim * conf.vocabSize)) firstCachedRDD.unpersist() trainingFeatures.unpersist() @@ -113,7 +114,7 @@ object VOCSIFTFisher extends Serializable { lambda: Double = 0.5, descDim: Int = 80, vocabSize: Int = 256, - scaleStep: Int = 1, + scaleStep: Int = 0, pcaFile: Option[String] = None, gmmMeanFile: Option[String]= None, gmmVarFile: Option[String] = None, From ae8e72ce79830cc98bc182c9960b6cf564b4b24b Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 08:03:06 +0000 Subject: [PATCH 5/8] Add option to pass in numRows to ColumnSampler Also add utility function to shuffle an Array --- src/main/scala/nodes/stats/Sampling.scala | 8 +++++--- src/main/scala/utils/MatrixUtils.scala | 16 ++++++++++++++++ 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/src/main/scala/nodes/stats/Sampling.scala b/src/main/scala/nodes/stats/Sampling.scala index 5dae15fb..000c41c0 100644 --- a/src/main/scala/nodes/stats/Sampling.scala +++ b/src/main/scala/nodes/stats/Sampling.scala @@ -8,9 +8,11 @@ import pipelines.{FunctionNode, Transformer} * Given a collection of Dense Matrices, this will generate a sample of `numSamples` columns from the entire set. * @param numSamples */ -class ColumnSampler(numSamples: Int) extends Transformer[DenseMatrix[Float], DenseVector[Float]] { +class ColumnSampler( + numSamples: Int, + numImgsOpt: Option[Int] = None) extends Transformer[DenseMatrix[Float], DenseVector[Float]] { override def apply(in: RDD[DenseMatrix[Float]]): RDD[DenseVector[Float]] = { - val numImgs = in.count.toInt + val numImgs = numImgsOpt.getOrElse(in.count.toInt) val samplesPerImage = numSamples/numImgs in.flatMap(mat => { @@ -31,4 +33,4 @@ class Sampler[T](val size: Int, val seed: Int = 42) extends FunctionNode[RDD[T], def apply(in: RDD[T]): Array[T] = { in.takeSample(false, size, seed) } -} \ No newline at end of file +} diff --git a/src/main/scala/utils/MatrixUtils.scala b/src/main/scala/utils/MatrixUtils.scala index 1d288649..3c5fc712 100644 --- a/src/main/scala/utils/MatrixUtils.scala +++ b/src/main/scala/utils/MatrixUtils.scala @@ -68,4 +68,20 @@ object MatrixUtils extends Serializable { (in(rows,::)).toDenseMatrix } + // In place deterministic shuffle + def shuffleArray[T](arr: Array[T]) = { + // Shuffle each row in the same fashion + val rnd = new java.util.Random(42) + var i = arr.length - 1 + while (i > 0) { + val index = rnd.nextInt(i + 1) + // Simple swap + val a = arr(index) + arr(index) = arr(i) + arr(i) = a + i = i - 1 + } + arr + } + } From 8564ae0d6f230eb0c1ba617bb48579b3d9d994ac Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 08:50:58 +0000 Subject: [PATCH 6/8] Use Top-5 error, re-use samples for PCA,GMM Also pass in appropriate values to sampler, solver to avoid multiple passes. Also caches the right set of things now --- .../images/imagenet/ImageNetSiftLcsFV.scala | 80 ++++++++++++------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala index ae356842..db8417a0 100644 --- a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala +++ b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala @@ -19,9 +19,9 @@ import nodes.images._ import nodes.learning._ import nodes.stats.{ColumnSampler, NormalizeRows, SignedHellingerMapper} import nodes.util.{FloatToDouble, MatrixVectorizer, Cacher} -import nodes.util.{ClassLabelIndicatorsFromIntLabels, ZipVectors, MaxClassifier} +import nodes.util.{ClassLabelIndicatorsFromIntLabels, ZipVectors, TopKClassifier} -import utils.Image +import utils.{Image, MatrixUtils, Stats} object ImageNetSiftLcsFV extends Serializable with Logging { val appName = "ImageNetSiftLcsFV" @@ -34,14 +34,18 @@ object ImageNetSiftLcsFV extends Serializable with Logging { val grayscaler = PixelScaler then GrayScaler val grayRDD = grayscaler(trainParsed) + val numImgs = trainParsed.count.toInt + var siftSamples: Option[RDD[DenseVector[Float]]] = None + // Part 1a: If necessary, perform PCA on samples of the SIFT features, or load a PCA matrix from // disk. val pcaTransformer = conf.siftPcaFile match { case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) case None => { val pcapipe = new SIFTExtractor(scaleStep = conf.siftScaleStep) then - new ColumnSampler(conf.numPcaSamples) - val pca = new PCAEstimator(conf.descDim).fit(pcapipe(grayRDD)) + new ColumnSampler(conf.numPcaSamples, Some(numImgs)) + siftSamples = Some(pcapipe(grayRDD).cache()) + val pca = new PCAEstimator(conf.descDim).fit(siftSamples.get) new BatchPCATransformer(pca.pcaMat) } @@ -61,9 +65,15 @@ object ImageNetSiftLcsFV extends Serializable with Logging { csvread(new File(conf.siftGmmVarFile.get)), csvread(new File(conf.siftGmmWtsFile.get)).toDenseVector) case None => - val sampler = new ColumnSampler(conf.numGmmSamples) + val samples = siftSamples.getOrElse { + val siftSampler = new SIFTExtractor(scaleStep = conf.siftScaleStep) then + new ColumnSampler(conf.numGmmSamples, Some(numImgs)) + siftSampler(grayRDD) + } + val vectorPCATransformer = new PCATransformer(pcaTransformer.pcaMat) new GaussianMixtureModelEstimator(conf.vocabSize) - .fit(sampler(pcaTransformedRDD).map(convert(_, Double))) + .fit(MatrixUtils.shuffleArray( + vectorPCATransformer(samples).map(convert(_, Double)).collect()).take(1e6.toInt)) } // Part 3: Compute Fisher Vectors and signed-square-root normalization. @@ -73,7 +83,7 @@ object ImageNetSiftLcsFV extends Serializable with Logging { NormalizeRows then SignedHellingerMapper then NormalizeRows then - new Cacher[DenseVector[Double]] + new Cacher[DenseVector[Double]](Some("sift-fisher")) val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) @@ -85,14 +95,18 @@ object ImageNetSiftLcsFV extends Serializable with Logging { conf: ImageNetSiftLcsFVConfig, trainParsed: RDD[Image], testParsed: RDD[Image]): (RDD[DenseVector[Double]], RDD[DenseVector[Double]]) = { + + val numImgs = trainParsed.count.toInt + var lcsSamples: Option[RDD[DenseVector[Float]]] = None // Part 1a: If necessary, perform PCA on samples of the LCS features, or load a PCA matrix from // disk. val pcaTransformer = conf.lcsPcaFile match { case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) case None => { val pcapipe = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then - new ColumnSampler(conf.numPcaSamples) - val pca = new PCAEstimator(conf.descDim).fit(pcapipe(trainParsed)) + new ColumnSampler(conf.numPcaSamples, Some(numImgs)) + lcsSamples = Some(pcapipe(trainParsed).cache()) + val pca = new PCAEstimator(conf.descDim).fit(lcsSamples.get) new BatchPCATransformer(pca.pcaMat) } @@ -112,9 +126,15 @@ object ImageNetSiftLcsFV extends Serializable with Logging { csvread(new File(conf.lcsGmmVarFile.get)), csvread(new File(conf.lcsGmmWtsFile.get)).toDenseVector) case None => - val sampler = new ColumnSampler(conf.numGmmSamples) + val samples = lcsSamples.getOrElse { + val lcsSampler = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then + new ColumnSampler(conf.numPcaSamples, Some(numImgs)) + lcsSampler(trainParsed) + } + val vectorPCATransformer = new PCATransformer(pcaTransformer.pcaMat) new GaussianMixtureModelEstimator(conf.vocabSize) - .fit(sampler(pcaTransformedRDD).map(convert(_, Double))) + .fit(MatrixUtils.shuffleArray( + vectorPCATransformer(samples).map(convert(_, Double)).collect()).take(1e6.toInt)) } // Part 3: Compute Fisher Vectors and signed-square-root normalization. @@ -124,7 +144,7 @@ object ImageNetSiftLcsFV extends Serializable with Logging { NormalizeRows then SignedHellingerMapper then NormalizeRows then - new Cacher[DenseVector[Double]] + new Cacher[DenseVector[Double]](Some("lcs-fisher")) val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) @@ -137,53 +157,57 @@ object ImageNetSiftLcsFV extends Serializable with Logging { val parsedRDD = ImageNetLoader( sc, conf.trainLocation, - conf.labelPath) + conf.labelPath).cache().setName("trainData") val labelGrabber = LabelExtractor then ClassLabelIndicatorsFromIntLabels(ImageNetLoader.NUM_CLASSES) then new Cacher[DenseVector[Double]] val trainingLabels = labelGrabber(parsedRDD) + trainingLabels.count // Load test data and get actual labels val testParsedRDD = ImageNetLoader( sc, conf.testLocation, - conf.labelPath) - val testActual = (LabelExtractor then new Cacher[Int]).apply(testParsedRDD) + conf.labelPath).cache().setName("testData") + val testActual = (labelGrabber then TopKClassifier(1)).apply(testParsedRDD) + + val trainParsedImgs = (ImageExtractor).apply(parsedRDD) + val testParsedImgs = (ImageExtractor).apply(testParsedRDD) // Get SIFT + FV features - val (trainSift, testSift) = getSiftFeatures(conf, ImageExtractor(parsedRDD), - ImageExtractor(testParsedRDD)) + val (trainSift, testSift) = getSiftFeatures(conf, trainParsedImgs, testParsedImgs) // Get LCS + FV features - val (trainLcs, testLcs) = getLcsFeatures(conf, ImageExtractor(parsedRDD), - ImageExtractor(testParsedRDD)) + val (trainLcs, testLcs) = getLcsFeatures(conf, trainParsedImgs, testParsedImgs) val trainingFeatures = ZipVectors(Seq(trainSift, trainLcs)) val testFeatures = ZipVectors(Seq(testSift, testLcs)) + trainingFeatures.count + val numTestImgs = testFeatures.count + // Fit a weighted least squares model to the data. val model = new BlockWeightedLeastSquaresEstimator( - 4096, 1, conf.lambda, conf.mixtureWeight).fit(trainingFeatures, trainingLabels) + 4096, 1, conf.lambda, conf.mixtureWeight).fit( + trainingFeatures, trainingLabels, Some(2 * 2 * conf.descDim * conf.vocabSize)) // Apply the model to test data and compute test error val testPredictedValues = model(testFeatures) - val testPredicted = MaxClassifier(testPredictedValues) - val evaluator = MulticlassClassifierEvaluator(testPredicted, testActual, - ImageNetLoader.NUM_CLASSES) + val testPredicted = TopKClassifier(5).apply(testPredictedValues) - logInfo("TEST Error is " + (100.0 * evaluator.totalError) + "%") + logInfo("TEST Error is " + Stats.getErrPercent(testPredicted, testActual, numTestImgs) + "%") } case class ImageNetSiftLcsFVConfig( trainLocation: String = "", testLocation: String = "", labelPath: String = "", - lambda: Double = 1e-4, + lambda: Double = 6e-5, mixtureWeight: Double = 0.25, descDim: Int = 64, vocabSize: Int = 16, - siftScaleStep: Int = 0, + siftScaleStep: Int = 1, lcsStride: Int = 4, lcsBorder: Int = 16, lcsPatch: Int = 6, @@ -195,8 +219,8 @@ object ImageNetSiftLcsFV extends Serializable with Logging { lcsGmmMeanFile: Option[String]= None, lcsGmmVarFile: Option[String] = None, lcsGmmWtsFile: Option[String] = None, - numPcaSamples: Int = 1e6.toInt, - numGmmSamples: Int = 1e6.toInt) + numPcaSamples: Int = 1e7.toInt, + numGmmSamples: Int = 1e7.toInt) def parse(args: Array[String]): ImageNetSiftLcsFVConfig = { new OptionParser[ImageNetSiftLcsFVConfig](appName) { From 5416570c050046afda2f9dd95b5d5cc33cef6881 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Mon, 18 May 2015 17:54:46 -0700 Subject: [PATCH 7/8] Address code review, add sqrt to sift --- .../learning/BlockWeightedLeastSquares.scala | 4 +-- src/main/scala/nodes/stats/Sampling.scala | 7 +++-- .../nodes/stats/SignedHellingerMapper.scala | 10 +++++-- .../images/imagenet/ImageNetSiftLcsFV.scala | 29 +++++++++---------- .../pipelines/images/voc/VOCSIFTFisher.scala | 6 ++-- src/main/scala/utils/MatrixUtils.scala | 4 +-- 6 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala b/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala index f27bada7..f99bb763 100644 --- a/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala +++ b/src/main/scala/nodes/learning/BlockWeightedLeastSquares.scala @@ -104,9 +104,9 @@ object BlockWeightedLeastSquaresEstimator extends Logging { }.collect():_*) // Initialize models to zero here. Each model is a (W, b) - // NOTE: We get first element from every training block here val models = trainingFeatures.map { block => - // val blockSize = block.first.length + // TODO: This assumes uniform block sizes. We should check the number of columns + // in each block to ensure safety. DenseMatrix.zeros[Double](blockSize, nClasses) }.toArray diff --git a/src/main/scala/nodes/stats/Sampling.scala b/src/main/scala/nodes/stats/Sampling.scala index 000c41c0..ac24b1f9 100644 --- a/src/main/scala/nodes/stats/Sampling.scala +++ b/src/main/scala/nodes/stats/Sampling.scala @@ -10,8 +10,10 @@ import pipelines.{FunctionNode, Transformer} */ class ColumnSampler( numSamples: Int, - numImgsOpt: Option[Int] = None) extends Transformer[DenseMatrix[Float], DenseVector[Float]] { - override def apply(in: RDD[DenseMatrix[Float]]): RDD[DenseVector[Float]] = { + numImgsOpt: Option[Int] = None) + extends FunctionNode[RDD[DenseMatrix[Float]], RDD[DenseVector[Float]]] { + + def apply(in: RDD[DenseMatrix[Float]]): RDD[DenseVector[Float]] = { val numImgs = numImgsOpt.getOrElse(in.count.toInt) val samplesPerImage = numSamples/numImgs @@ -22,7 +24,6 @@ class ColumnSampler( }) } - def apply(in: DenseMatrix[Float]): DenseVector[Float] = ??? } /** diff --git a/src/main/scala/nodes/stats/SignedHellingerMapper.scala b/src/main/scala/nodes/stats/SignedHellingerMapper.scala index 0c79561c..a3d5534f 100644 --- a/src/main/scala/nodes/stats/SignedHellingerMapper.scala +++ b/src/main/scala/nodes/stats/SignedHellingerMapper.scala @@ -1,6 +1,6 @@ package nodes.stats -import breeze.linalg.DenseVector +import breeze.linalg.{DenseVector, DenseMatrix} import breeze.numerics._ import pipelines.Transformer @@ -13,4 +13,10 @@ object SignedHellingerMapper extends Transformer[DenseVector[Double], DenseVecto def apply(in: DenseVector[Double]): DenseVector[Double] = { signum(in) :* sqrt(abs(in)) } -} \ No newline at end of file +} + +object BatchSignedHellingerMapper extends Transformer[DenseMatrix[Float], DenseMatrix[Float]] { + def apply(in: DenseMatrix[Float]): DenseMatrix[Float] = { + in.map(x => (math.signum(x) * math.sqrt(math.abs(x))).toFloat) + } +} diff --git a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala index db8417a0..c58f0135 100644 --- a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala +++ b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala @@ -17,7 +17,7 @@ import pipelines.Logging import nodes.images.external.{FisherVector, SIFTExtractor} import nodes.images._ import nodes.learning._ -import nodes.stats.{ColumnSampler, NormalizeRows, SignedHellingerMapper} +import nodes.stats.{ColumnSampler, NormalizeRows, SignedHellingerMapper, BatchSignedHellingerMapper} import nodes.util.{FloatToDouble, MatrixVectorizer, Cacher} import nodes.util.{ClassLabelIndicatorsFromIntLabels, ZipVectors, TopKClassifier} @@ -37,14 +37,16 @@ object ImageNetSiftLcsFV extends Serializable with Logging { val numImgs = trainParsed.count.toInt var siftSamples: Option[RDD[DenseVector[Float]]] = None + val siftHellinger = (new SIFTExtractor(scaleStep = conf.siftScaleStep) then + BatchSignedHellingerMapper) + // Part 1a: If necessary, perform PCA on samples of the SIFT features, or load a PCA matrix from // disk. val pcaTransformer = conf.siftPcaFile match { case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) case None => { - val pcapipe = new SIFTExtractor(scaleStep = conf.siftScaleStep) then - new ColumnSampler(conf.numPcaSamples, Some(numImgs)) - siftSamples = Some(pcapipe(grayRDD).cache()) + siftSamples = Some( + new ColumnSampler(conf.numPcaSamples, Some(numImgs)).apply(siftHellinger(grayRDD)).cache()) val pca = new PCAEstimator(conf.descDim).fit(siftSamples.get) new BatchPCATransformer(pca.pcaMat) @@ -52,8 +54,7 @@ object ImageNetSiftLcsFV extends Serializable with Logging { } // Part 2: Compute dimensionality-reduced PCA features. - val featurizer = new SIFTExtractor(scaleStep = conf.siftScaleStep) then - pcaTransformer + val featurizer = siftHellinger then pcaTransformer val pcaTransformedRDD = featurizer(grayRDD) // Part 2a: If necessary, compute a GMM based on the dimensionality-reduced features, or load @@ -66,9 +67,7 @@ object ImageNetSiftLcsFV extends Serializable with Logging { csvread(new File(conf.siftGmmWtsFile.get)).toDenseVector) case None => val samples = siftSamples.getOrElse { - val siftSampler = new SIFTExtractor(scaleStep = conf.siftScaleStep) then - new ColumnSampler(conf.numGmmSamples, Some(numImgs)) - siftSampler(grayRDD) + new ColumnSampler(conf.numGmmSamples, Some(numImgs)).apply(siftHellinger(grayRDD)) } val vectorPCATransformer = new PCATransformer(pcaTransformer.pcaMat) new GaussianMixtureModelEstimator(conf.vocabSize) @@ -103,9 +102,10 @@ object ImageNetSiftLcsFV extends Serializable with Logging { val pcaTransformer = conf.lcsPcaFile match { case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) case None => { - val pcapipe = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then - new ColumnSampler(conf.numPcaSamples, Some(numImgs)) - lcsSamples = Some(pcapipe(trainParsed).cache()) + val pcapipe = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) + lcsSamples = Some( + new ColumnSampler(conf.numPcaSamples, Some(numImgs)).apply( + pcapipe(trainParsed)).cache()) val pca = new PCAEstimator(conf.descDim).fit(lcsSamples.get) new BatchPCATransformer(pca.pcaMat) @@ -127,9 +127,8 @@ object ImageNetSiftLcsFV extends Serializable with Logging { csvread(new File(conf.lcsGmmWtsFile.get)).toDenseVector) case None => val samples = lcsSamples.getOrElse { - val lcsSampler = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) then - new ColumnSampler(conf.numPcaSamples, Some(numImgs)) - lcsSampler(trainParsed) + val lcs = new LCSExtractor(conf.lcsStride, conf.lcsBorder, conf.lcsPatch) + new ColumnSampler(conf.numPcaSamples, Some(numImgs)).apply(lcs(trainParsed)) } val vectorPCATransformer = new PCATransformer(pcaTransformer.pcaMat) new GaussianMixtureModelEstimator(conf.vocabSize) diff --git a/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala b/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala index 70812d23..2b38e700 100644 --- a/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala +++ b/src/main/scala/pipelines/images/voc/VOCSIFTFisher.scala @@ -40,9 +40,9 @@ object VOCSIFTFisher extends Serializable { val pcaTransformer = conf.pcaFile match { case Some(fname) => new BatchPCATransformer(convert(csvread(new File(fname)), Float).t) case None => { - val pcapipe = new SIFTExtractor(scaleStep = conf.scaleStep) then - new ColumnSampler(conf.numPcaSamples) - val pca = new PCAEstimator(conf.descDim).fit(pcapipe(grayRDD)) + val pcapipe = new SIFTExtractor(scaleStep = conf.scaleStep) + val colSampler = new ColumnSampler(conf.numPcaSamples) + val pca = new PCAEstimator(conf.descDim).fit(colSampler(pcapipe(grayRDD))) new BatchPCATransformer(pca.pcaMat) } diff --git a/src/main/scala/utils/MatrixUtils.scala b/src/main/scala/utils/MatrixUtils.scala index 3c5fc712..55eff6e0 100644 --- a/src/main/scala/utils/MatrixUtils.scala +++ b/src/main/scala/utils/MatrixUtils.scala @@ -69,9 +69,9 @@ object MatrixUtils extends Serializable { } // In place deterministic shuffle - def shuffleArray[T](arr: Array[T]) = { + def shuffleArray[T](arr: Array[T], seed: Int = 42) = { // Shuffle each row in the same fashion - val rnd = new java.util.Random(42) + val rnd = new java.util.Random(seed) var i = arr.length - 1 while (i > 0) { val index = rnd.nextInt(i + 1) From 7e93dc7536e49bf8e54ef32992779b80d208a0a5 Mon Sep 17 00:00:00 2001 From: Shivaram Venkataraman Date: Tue, 19 May 2015 11:16:43 -0700 Subject: [PATCH 8/8] Pull out fisherFeaturizer into a function --- .../images/imagenet/ImageNetSiftLcsFV.scala | 36 +++++++++---------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala index c58f0135..dd3c256d 100644 --- a/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala +++ b/src/main/scala/pipelines/images/imagenet/ImageNetSiftLcsFV.scala @@ -26,6 +26,18 @@ import utils.{Image, MatrixUtils, Stats} object ImageNetSiftLcsFV extends Serializable with Logging { val appName = "ImageNetSiftLcsFV" + def constructFisherFeaturizer(gmm: GaussianMixtureModel, name: Option[String] = None) = { + // Part 3: Compute Fisher Vectors and signed-square-root normalization. + val fisherFeaturizer = new FisherVector(gmm) then + FloatToDouble then + MatrixVectorizer then + NormalizeRows then + SignedHellingerMapper then + NormalizeRows then + new Cacher[DenseVector[Double]](name) + fisherFeaturizer + } + def getSiftFeatures( conf: ImageNetSiftLcsFVConfig, trainParsed: RDD[Image], @@ -74,19 +86,10 @@ object ImageNetSiftLcsFV extends Serializable with Logging { .fit(MatrixUtils.shuffleArray( vectorPCATransformer(samples).map(convert(_, Double)).collect()).take(1e6.toInt)) } - - // Part 3: Compute Fisher Vectors and signed-square-root normalization. - val fisherFeaturizer = new FisherVector(gmm) then - FloatToDouble then - MatrixVectorizer then - NormalizeRows then - SignedHellingerMapper then - NormalizeRows then - new Cacher[DenseVector[Double]](Some("sift-fisher")) - + val fisherFeaturizer = constructFisherFeaturizer(gmm, Some("sift-fisher")) val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) - val testFeatures = (grayscaler then featurizer then fisherFeaturizer).apply(testParsed) + (trainingFeatures, testFeatures) } @@ -136,18 +139,11 @@ object ImageNetSiftLcsFV extends Serializable with Logging { vectorPCATransformer(samples).map(convert(_, Double)).collect()).take(1e6.toInt)) } - // Part 3: Compute Fisher Vectors and signed-square-root normalization. - val fisherFeaturizer = new FisherVector(gmm) then - FloatToDouble then - MatrixVectorizer then - NormalizeRows then - SignedHellingerMapper then - NormalizeRows then - new Cacher[DenseVector[Double]](Some("lcs-fisher")) + val fisherFeaturizer = constructFisherFeaturizer(gmm, Some("lcs-fisher")) val trainingFeatures = fisherFeaturizer(pcaTransformedRDD) - val testFeatures = (featurizer then fisherFeaturizer).apply(testParsed) + (trainingFeatures, testFeatures) }