Skip to content

Commit

Permalink
Cleaning up the output format. Making random query generator determin…
Browse files Browse the repository at this point in the history
…istic
  • Loading branch information
anilshanbhag committed Oct 7, 2015
1 parent 9fda647 commit 5d65faf
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 77 deletions.
3 changes: 0 additions & 3 deletions src/main/java/core/access/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,9 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, Joiner.on(";").join(predicates));
}
Text.writeString(out, key.toString());
// out.writeBytes(Joiner.on(",").join(predicates)+"\n");
}

public void readFields(DataInput in) throws IOException {
System.out.println("IN HERE !! !! Query:39");
String predicateStrings = Text.readString(in);
if (predicateStrings.equals(EMPTY)) {
predicates = new Predicate[0];
Expand All @@ -47,7 +45,6 @@ public void readFields(DataInput in) throws IOException {
predicates[i] = new Predicate(tokens[i]);
}
key = new CartilageIndexKey(Text.readString(in));
// String[] tokens = in.readLine().split(",");
}

public static class FilterQuery extends Query implements Serializable {
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/core/access/iterator/PartitionIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public boolean hasNext() {
}

try {
if (recordBytes.length == 3) {
System.out.println(bytes.length + " " + previous + " " + offset);
System.out.println(new String(recordBytes));
}

// TODO: Hack. Observed that sometimes there are two \n between records.
// There is something wrong with the partition writer.
// This skips small records.
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/core/access/iterator/RepartitionIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@
import core.index.robusttree.RobustTree;
import core.utils.HDFSUtils;

/**
* Repartitions the input partitions and writes it out.
* Does this by reading the new index. For each tuple, gets its new bucket id.
* Writes it out the corresponding bucket.
* @author anil
*
*/
public class RepartitionIterator extends PartitionIterator {

private FilterQuery query;
Expand Down Expand Up @@ -64,6 +71,10 @@ public RNode getIndexTree() {
return this.newIndexTree;
}

/**
* Gets a HDFS Partition as input.
* Loads the new index. PartitionWriter::setPartition does the rest.
*/
@Override
public void setPartition(Partition partition) {
super.setPartition(partition);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/core/access/spark/SparkInputFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ else if (queryConf.getRepartitionScan())
System.out.println("Number of partition splits = " + splits.length);
// splits = resizeSplits(splits, partitionIdFileMap,
// queryConf.getMaxSplitSize());
for (PartitionSplit split : splits) {
System.out.println("SPLIT: "
+ split.getIterator().getClass().getName() + " buckets: "
+ Arrays.toString(split.getPartitions()));
}
splits = resizeSplits(splits, hpInput.getPartitionIdSizeMap(),
queryConf.getMaxSplitSize(), queryConf.getMinSplitSize());
System.out.println("Number of partition splits after splitting= "
Expand Down
24 changes: 5 additions & 19 deletions src/main/java/core/access/spark/SparkRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,37 +33,23 @@ public class SparkRecordReader extends

CuratorFramework client;

// BucketCounts counter;
// PartitionLock locker;

@Override
public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {

System.out.println("Initializing SparkRecordReader");

conf = context.getConfiguration();
client = CuratorUtils.createAndStartClient(conf
.get(SparkQueryConf.ZOOKEEPER_HOSTS));
sparkSplit = (SparkFileSplit) split;

iterator = sparkSplit.getIterator();
currentFile = 0;

// FileSystem fs = sparkSplit.getPath(currentFile).getFileSystem(conf);
// counter = new BucketCounts(fs,
// conf.get(SparkQueryConf.COUNTERS_FILE));
// locker = new PartitionLock(fs, conf.get(SparkQueryConf.LOCK_DIR));

hasNext = initializeNext();
key = new LongWritable();
recordId = 0;
}

protected boolean initializeNext() throws IOException {

if (currentFile > 0)
System.out.println("Records read = " + recordId);
// if (currentFile > 0)
// System.out.println("Records read = " + recordId);

if (currentFile >= sparkSplit.getStartOffsets().length)
return false;
Expand All @@ -72,22 +58,22 @@ protected boolean initializeNext() throws IOException {
final FileSystem fs = filePath.getFileSystem(conf);
HDFSPartition partition = new HDFSPartition(fs,
filePath.toString(), client);
System.out.println("loading path: " + filePath.toString());
System.out.println("INFO: Loading path: " + filePath.toString());
try {
partition.loadNext();
iterator.setPartition(partition);
currentFile++;
return true;
} catch (java.lang.OutOfMemoryError e) {
System.out
.println("ERR: Failed to load " + filePath.toString());
System.out.println("ERR: Failed to load " + filePath.toString());
System.out.println(e.getMessage());
e.printStackTrace();
return false;
}
}
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (hasNext) {
if (iterator.hasNext()) {
Expand Down
41 changes: 0 additions & 41 deletions src/main/java/core/adapt/opt/Optimizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,47 +227,6 @@ public PartitionSplit[] buildPlan(final Query q) {
}
}

// public PartitionSplit[] buildMPPlan(final Query q) {
// if (q instanceof FilterQuery) {
// FilterQuery fq = (FilterQuery) q;
// Predicate[] ps = fq.getPredicates();
// int pLength = ps.length;
// // Find the initial set of buckets accessed
// List<RNode> nodes = this.rt.getRoot().search(ps);
// int[] initialBids = this.getBidFromRNodes(nodes);
//
// for (int i=0; i<pLength; i++) {
// Plan best = getBestPlan(ps);
// this.updateIndex(best, ps);
// ps[best.actions.pid] = null;
// }
//
// nodes = this.rt.getRoot().search(ps);
// int[] finalBids = this.getBidFromRNodes(nodes);
//
// float totalTuples = 0;
// for (int i=0; i<initialBids.length; i++) {
// int bid = initialBids[i];
// boolean found = false;
// for (int j=0; j<finalBids.length; j++) {
// if (finalBids[j] == bid) {
// found = true;
// break;
// }
// }
//
// if (!found) {
// totalTuples += Bucket.counters.getBucketCount(bid);
// }
// }
//
// return null;
// } else {
// System.err.println("Unimplemented query - Unable to build plan");
// return null;
// }
// }

public Plan getBestPlan(Predicate[] ps) {
// TODO: Multiple predicates seem to complicate the simple idea we had;
// think more :-/
Expand Down
8 changes: 3 additions & 5 deletions src/main/java/core/index/MDIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ public Bucket(int id) {
}

public double getEstimatedNumTuples() {
if (estimatedTuples == 0.0) {
System.out.println("ERR " + bucketId);
}

// Assert.assertNotEquals(estimatedTuples, 0.0);
// TODO: This call used when restoring a replaced tree in Optimizer.
// Can't use the assert below.
// Assert.assertNotEquals(estimatedTuples, 0.0);
return estimatedTuples;
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/core/index/key/Schema.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static Schema createSchema(String schemaString) {
fieldList[i] = new Field(fieldName, fieldType);
}

System.out.println("Created schema with " + fieldList.length + " fields");
System.out.println("INFO: Created schema with " + fieldList.length + " fields");
return new Schema(fieldList);
}

Expand Down
10 changes: 7 additions & 3 deletions src/main/java/perf/benchmark/TPCHWorkload.java
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ public List<FilterQuery> generateWorkload(int numQueries) {
ArrayList<FilterQuery> queries = new ArrayList<FilterQuery>();
int[] queryNums = new int[] { 3, 5, 6, 8, 10, 12, 14, 19 };

Random r = new Random();
for (int i = 0; i < numQueries; i++) {
int qNo = queryNums[r.nextInt(queryNums.length)];
int qNo = queryNums[rand.nextInt(queryNums.length)];
FilterQuery q = getQuery(qNo);
queries.add(q);
}
Expand All @@ -176,12 +175,17 @@ public void runWorkload(int numQueries) {
long start, end;
SparkQuery sq = new SparkQuery(cfg);
List<FilterQuery> queries = generateWorkload(numQueries);
System.out.println("INFO: Workload " + numQueries);
for (FilterQuery q: queries) {
System.out.println("INFO: Query:" + q.toString());
}
for (FilterQuery q : queries) {
start = System.currentTimeMillis();
long result = sq.createAdaptRDD(cfg.getHDFS_WORKING_DIR(),
q.getPredicates()).count();
end = System.currentTimeMillis();
System.out.println("Time Taken: " + (end - start) + " " + result);
System.out.println("RES: Result: " + result);
System.out.println("RES: Time Taken: " + (end - start) + " " + result);
}
}

Expand Down

0 comments on commit 5d65faf

Please sign in to comment.