From 5d65fafdabfc2cb99bdbce603afdb51b23f12767 Mon Sep 17 00:00:00 2001 From: Anil Shanbhag Date: Tue, 6 Oct 2015 23:04:09 -0400 Subject: [PATCH] Cleaning up the output format. Making random query generator deterministic --- src/main/java/core/access/Query.java | 3 -- .../access/iterator/PartitionIterator.java | 5 --- .../access/iterator/RepartitionIterator.java | 11 +++++ .../core/access/spark/SparkInputFormat.java | 5 +++ .../core/access/spark/SparkRecordReader.java | 24 +++-------- src/main/java/core/adapt/opt/Optimizer.java | 41 ------------------- src/main/java/core/index/MDIndex.java | 8 ++-- src/main/java/core/index/key/Schema.java | 2 +- .../java/perf/benchmark/TPCHWorkload.java | 10 +++-- 9 files changed, 32 insertions(+), 77 deletions(-) diff --git a/src/main/java/core/access/Query.java b/src/main/java/core/access/Query.java index c00c99c..698fe63 100644 --- a/src/main/java/core/access/Query.java +++ b/src/main/java/core/access/Query.java @@ -32,11 +32,9 @@ public void write(DataOutput out) throws IOException { Text.writeString(out, Joiner.on(";").join(predicates)); } Text.writeString(out, key.toString()); - // out.writeBytes(Joiner.on(",").join(predicates)+"\n"); } public void readFields(DataInput in) throws IOException { - System.out.println("IN HERE !! !! Query:39"); String predicateStrings = Text.readString(in); if (predicateStrings.equals(EMPTY)) { predicates = new Predicate[0]; @@ -47,7 +45,6 @@ public void readFields(DataInput in) throws IOException { predicates[i] = new Predicate(tokens[i]); } key = new CartilageIndexKey(Text.readString(in)); - // String[] tokens = in.readLine().split(","); } public static class FilterQuery extends Query implements Serializable { diff --git a/src/main/java/core/access/iterator/PartitionIterator.java b/src/main/java/core/access/iterator/PartitionIterator.java index 9ac382b..413cdad 100644 --- a/src/main/java/core/access/iterator/PartitionIterator.java +++ b/src/main/java/core/access/iterator/PartitionIterator.java @@ -84,11 +84,6 @@ public boolean hasNext() { } try { - if (recordBytes.length == 3) { - System.out.println(bytes.length + " " + previous + " " + offset); - System.out.println(new String(recordBytes)); - } - // TODO: Hack. Observed that sometimes there are two \n between records. // There is something wrong with the partition writer. // This skips small records. diff --git a/src/main/java/core/access/iterator/RepartitionIterator.java b/src/main/java/core/access/iterator/RepartitionIterator.java index 5d729a0..7e80356 100644 --- a/src/main/java/core/access/iterator/RepartitionIterator.java +++ b/src/main/java/core/access/iterator/RepartitionIterator.java @@ -19,6 +19,13 @@ import core.index.robusttree.RobustTree; import core.utils.HDFSUtils; +/** + * Repartitions the input partitions and writes it out. + * Does this by reading the new index. For each tuple, gets its new bucket id. + * Writes it out the corresponding bucket. + * @author anil + * + */ public class RepartitionIterator extends PartitionIterator { private FilterQuery query; @@ -64,6 +71,10 @@ public RNode getIndexTree() { return this.newIndexTree; } + /** + * Gets a HDFS Partition as input. + * Loads the new index. PartitionWriter::setPartition does the rest. + */ @Override public void setPartition(Partition partition) { super.setPartition(partition); diff --git a/src/main/java/core/access/spark/SparkInputFormat.java b/src/main/java/core/access/spark/SparkInputFormat.java index 6f5c693..9d5bc22 100644 --- a/src/main/java/core/access/spark/SparkInputFormat.java +++ b/src/main/java/core/access/spark/SparkInputFormat.java @@ -114,6 +114,11 @@ else if (queryConf.getRepartitionScan()) System.out.println("Number of partition splits = " + splits.length); // splits = resizeSplits(splits, partitionIdFileMap, // queryConf.getMaxSplitSize()); + for (PartitionSplit split : splits) { + System.out.println("SPLIT: " + + split.getIterator().getClass().getName() + " buckets: " + + Arrays.toString(split.getPartitions())); + } splits = resizeSplits(splits, hpInput.getPartitionIdSizeMap(), queryConf.getMaxSplitSize(), queryConf.getMinSplitSize()); System.out.println("Number of partition splits after splitting= " diff --git a/src/main/java/core/access/spark/SparkRecordReader.java b/src/main/java/core/access/spark/SparkRecordReader.java index a3f1b87..be0caae 100644 --- a/src/main/java/core/access/spark/SparkRecordReader.java +++ b/src/main/java/core/access/spark/SparkRecordReader.java @@ -33,37 +33,23 @@ public class SparkRecordReader extends CuratorFramework client; - // BucketCounts counter; - // PartitionLock locker; - @Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - - System.out.println("Initializing SparkRecordReader"); - conf = context.getConfiguration(); client = CuratorUtils.createAndStartClient(conf .get(SparkQueryConf.ZOOKEEPER_HOSTS)); sparkSplit = (SparkFileSplit) split; - iterator = sparkSplit.getIterator(); currentFile = 0; - - // FileSystem fs = sparkSplit.getPath(currentFile).getFileSystem(conf); - // counter = new BucketCounts(fs, - // conf.get(SparkQueryConf.COUNTERS_FILE)); - // locker = new PartitionLock(fs, conf.get(SparkQueryConf.LOCK_DIR)); - hasNext = initializeNext(); key = new LongWritable(); recordId = 0; } protected boolean initializeNext() throws IOException { - - if (currentFile > 0) - System.out.println("Records read = " + recordId); +// if (currentFile > 0) +// System.out.println("Records read = " + recordId); if (currentFile >= sparkSplit.getStartOffsets().length) return false; @@ -72,15 +58,14 @@ protected boolean initializeNext() throws IOException { final FileSystem fs = filePath.getFileSystem(conf); HDFSPartition partition = new HDFSPartition(fs, filePath.toString(), client); - System.out.println("loading path: " + filePath.toString()); + System.out.println("INFO: Loading path: " + filePath.toString()); try { partition.loadNext(); iterator.setPartition(partition); currentFile++; return true; } catch (java.lang.OutOfMemoryError e) { - System.out - .println("ERR: Failed to load " + filePath.toString()); + System.out.println("ERR: Failed to load " + filePath.toString()); System.out.println(e.getMessage()); e.printStackTrace(); return false; @@ -88,6 +73,7 @@ protected boolean initializeNext() throws IOException { } } + @Override public boolean nextKeyValue() throws IOException, InterruptedException { while (hasNext) { if (iterator.hasNext()) { diff --git a/src/main/java/core/adapt/opt/Optimizer.java b/src/main/java/core/adapt/opt/Optimizer.java index 42d3983..2636594 100644 --- a/src/main/java/core/adapt/opt/Optimizer.java +++ b/src/main/java/core/adapt/opt/Optimizer.java @@ -227,47 +227,6 @@ public PartitionSplit[] buildPlan(final Query q) { } } - // public PartitionSplit[] buildMPPlan(final Query q) { - // if (q instanceof FilterQuery) { - // FilterQuery fq = (FilterQuery) q; - // Predicate[] ps = fq.getPredicates(); - // int pLength = ps.length; - // // Find the initial set of buckets accessed - // List nodes = this.rt.getRoot().search(ps); - // int[] initialBids = this.getBidFromRNodes(nodes); - // - // for (int i=0; i generateWorkload(int numQueries) { ArrayList queries = new ArrayList(); int[] queryNums = new int[] { 3, 5, 6, 8, 10, 12, 14, 19 }; - Random r = new Random(); for (int i = 0; i < numQueries; i++) { - int qNo = queryNums[r.nextInt(queryNums.length)]; + int qNo = queryNums[rand.nextInt(queryNums.length)]; FilterQuery q = getQuery(qNo); queries.add(q); } @@ -176,12 +175,17 @@ public void runWorkload(int numQueries) { long start, end; SparkQuery sq = new SparkQuery(cfg); List queries = generateWorkload(numQueries); + System.out.println("INFO: Workload " + numQueries); + for (FilterQuery q: queries) { + System.out.println("INFO: Query:" + q.toString()); + } for (FilterQuery q : queries) { start = System.currentTimeMillis(); long result = sq.createAdaptRDD(cfg.getHDFS_WORKING_DIR(), q.getPredicates()).count(); end = System.currentTimeMillis(); - System.out.println("Time Taken: " + (end - start) + " " + result); + System.out.println("RES: Result: " + result); + System.out.println("RES: Time Taken: " + (end - start) + " " + result); } }