diff --git a/scripts/fabfile/run_queries.py b/scripts/fabfile/run_queries.py index e2fa373..414514d 100644 --- a/scripts/fabfile/run_queries.py +++ b/scripts/fabfile/run_queries.py @@ -8,7 +8,7 @@ def run_tpch_queries(): #cmd = './hadoop jar $JAR perf.benchmark.TPCHWorkload ' + \ cmd = submit_script_path + ' --class perf.benchmark.TPCHWorkload --deploy-mode client --master spark://bits:7077 $JAR ' + \ ' --conf $CONF' + \ - ' --numQueries 3' + \ + ' --numQueries 6' + \ ' --schema "$SCHEMA"' + \ ' --numFields $NUMFIELDS' + \ ' --numTuples $NUMTUPLES' + \ diff --git a/src/main/java/core/access/HDFSPartition.java b/src/main/java/core/access/HDFSPartition.java index 382b4da..9105330 100644 --- a/src/main/java/core/access/HDFSPartition.java +++ b/src/main/java/core/access/HDFSPartition.java @@ -19,7 +19,6 @@ import core.utils.HDFSUtils; public class HDFSPartition extends Partition { - private static final long serialVersionUID = 1L; protected FileSystem hdfs; @@ -58,16 +57,16 @@ public HDFSPartition(FileSystem hdfs, String pathAndPartitionId, this.client = client; } - public HDFSPartition(FileSystem hdfs, String pathAndPartitionId, - CuratorFramework client) { - this(hdfs, pathAndPartitionId, (short) 3, client); - } +// public HDFSPartition(FileSystem hdfs, String pathAndPartitionId, +// CuratorFramework client) { +// this(hdfs, pathAndPartitionId, (short) 1, client); +// } @Override public Partition clone() { String clonePath = path .replaceAll("partitions[0-9]*/$", "repartition/"); - Partition p = new HDFSPartition(hdfs, clonePath + partitionId, client); + Partition p = new HDFSPartition(hdfs, clonePath + partitionId, replication, client); // p.bytes = new byte[bytes.length]; // heap space! p.bytes = new byte[1024]; p.state = State.NEW; diff --git a/src/main/java/core/access/ReusableHDFSPartition.java b/src/main/java/core/access/ReusableHDFSPartition.java index 134ae3d..58c2717 100644 --- a/src/main/java/core/access/ReusableHDFSPartition.java +++ b/src/main/java/core/access/ReusableHDFSPartition.java @@ -63,16 +63,11 @@ public ReusableHDFSPartition(FileSystem hdfs, String pathAndPartitionId, this.client = client; } - public ReusableHDFSPartition(FileSystem hdfs, String pathAndPartitionId, - CuratorFramework client, BufferManager buffMgr) { - this(hdfs, pathAndPartitionId, (short) 3, client, buffMgr); - } - @Override public Partition clone() { String clonePath = path .replaceAll("partitions[0-9]*/$", "repartition/"); - Partition p = new HDFSPartition(hdfs, clonePath + partitionId, client); + Partition p = new HDFSPartition(hdfs, clonePath + partitionId, replication, client); // p.bytes = new byte[bytes.length]; // heap space! p.bytes = new byte[1024]; p.state = Partition.State.NEW; diff --git a/src/main/java/core/access/iterator/DistributedRepartitionIterator.java b/src/main/java/core/access/iterator/DistributedRepartitionIterator.java index 4ff6723..c814bef 100644 --- a/src/main/java/core/access/iterator/DistributedRepartitionIterator.java +++ b/src/main/java/core/access/iterator/DistributedRepartitionIterator.java @@ -22,7 +22,6 @@ public DistributedRepartitionIterator(FilterQuery query, RNode newIndexTree) { @Override public void finish() { - // PartitionLock l = new PartitionLock(zookeeperHosts); for (Partition p : newPartitions.values()) { diff --git a/src/main/java/core/access/spark/HPInput.java b/src/main/java/core/access/spark/HPInput.java index 98b1142..8b91c58 100644 --- a/src/main/java/core/access/spark/HPInput.java +++ b/src/main/java/core/access/spark/HPInput.java @@ -29,7 +29,6 @@ public void initialize(List files, AccessMethod am) { this.am = am; partitionIdFileMap = ArrayListMultimap.create(); partitionIdSizeMap = Maps.newHashMap(); - System.out.println("FileStatus has " + files.size() + " files."); for (FileStatus file : files) { try { int id = Integer.parseInt(FilenameUtils.getName(file.getPath() diff --git a/src/main/java/core/access/spark/SparkRecordReader.java b/src/main/java/core/access/spark/SparkRecordReader.java index be0caae..19e20eb 100644 --- a/src/main/java/core/access/spark/SparkRecordReader.java +++ b/src/main/java/core/access/spark/SparkRecordReader.java @@ -56,8 +56,9 @@ protected boolean initializeNext() throws IOException { else { Path filePath = sparkSplit.getPath(currentFile); final FileSystem fs = filePath.getFileSystem(conf); - HDFSPartition partition = new HDFSPartition(fs, - filePath.toString(), client); + HDFSPartition partition = new HDFSPartition(fs, filePath.toString(), + Short.parseShort(conf.get(SparkQueryConf.HDFS_REPLICATION_FACTOR)), + client); System.out.println("INFO: Loading path: " + filePath.toString()); try { partition.loadNext(); diff --git a/src/main/java/core/access/spark/join/SparkJoinRecordReader.java b/src/main/java/core/access/spark/join/SparkJoinRecordReader.java index 921610b..fdd6bb7 100644 --- a/src/main/java/core/access/spark/join/SparkJoinRecordReader.java +++ b/src/main/java/core/access/spark/join/SparkJoinRecordReader.java @@ -18,6 +18,7 @@ import core.access.iterator.IteratorRecord; import core.access.iterator.ReusablePartitionIterator; import core.access.spark.SparkInputFormat.SparkFileSplit; +import core.access.spark.SparkQueryConf; import core.access.spark.join.SparkJoinRecordReader.JoinTuplePair; import core.utils.BufferManager; import core.utils.Pair; @@ -112,7 +113,9 @@ protected boolean initializeNext() throws IOException { Path filePath = sparkSplit.getPath(currentFile); final FileSystem fs = filePath.getFileSystem(conf); ReusableHDFSPartition partition = new ReusableHDFSPartition(fs, - filePath.toString(), client, buffMgr); + filePath.toString(), + Short.parseShort(conf.get(SparkQueryConf.HDFS_REPLICATION_FACTOR)), + client, buffMgr); System.out.println("loading path: " + filePath.toString()); try { partition.loadNext(); @@ -142,6 +145,7 @@ protected boolean getNext() throws IOException { return false; } + @Override public boolean nextKeyValue() throws IOException, InterruptedException { if (firstRecords != null && firstRecords.hasNext()) diff --git a/src/main/java/core/adapt/opt/Optimizer.java b/src/main/java/core/adapt/opt/Optimizer.java index 2636594..e0d7213 100644 --- a/src/main/java/core/adapt/opt/Optimizer.java +++ b/src/main/java/core/adapt/opt/Optimizer.java @@ -12,7 +12,6 @@ import core.access.AccessMethod.PartitionSplit; import core.access.Predicate; -import core.access.Predicate.PREDTYPE; import core.access.Query; import core.access.Query.FilterQuery; import core.access.iterator.PartitionIterator; @@ -147,23 +146,6 @@ public PartitionSplit[] buildAccessPlan(final Query q) { } } - // Just for debug - public PartitionSplit[] testRepartitionIteratorPlan(final Query q) { - Plan pl = new Plan(); - pl.cost = 1; - pl.benefit = 1000; - Action ac = new Action(); - ac.pid = 0; - ac.option = 1; - pl.actions = ac; - - Predicate p1 = new Predicate(0, TYPE.INT, 283359707, PREDTYPE.GT); - FilterQuery dummy = new FilterQuery(new Predicate[] { p1 }); - - PartitionSplit[] psplits = this.getPartitionSplits(pl, dummy); - return psplits; - } - public PartitionSplit[] buildPlan(final Query q) { if (q instanceof FilterQuery) { FilterQuery fq = (FilterQuery) q; @@ -230,10 +212,6 @@ public PartitionSplit[] buildPlan(final Query q) { public Plan getBestPlan(Predicate[] ps) { // TODO: Multiple predicates seem to complicate the simple idea we had; // think more :-/ - - // Make sure all the bucket counts are non zero. - this.rt.checkSane(); - Plan plan = null; for (int i = 0; i < ps.length; i++) { Plan option = getBestPlanForPredicate(ps, i); diff --git a/src/main/java/core/index/robusttree/RobustTree.java b/src/main/java/core/index/robusttree/RobustTree.java index fec2898..b54ed5b 100644 --- a/src/main/java/core/index/robusttree/RobustTree.java +++ b/src/main/java/core/index/robusttree/RobustTree.java @@ -38,22 +38,6 @@ public RobustTree() { } - // TODO: Remove - public void checkSane() { - LinkedList l = new LinkedList(); - l.add(root); - - while(!(l.size() == 0)) { - RNode first = l.removeFirst(); - if (first.bucket != null ){ - System.out.println("Num Tuples: " + first.bucket.getBucketId() + " " + first.bucket.getEstimatedNumTuples()); - } else { - l.add(first.leftChild); - l.add(first.rightChild); - } - } - } - @Override public MDIndex clone() throws CloneNotSupportedException { throw new CloneNotSupportedException(); @@ -334,7 +318,6 @@ public void initializeBucketSamplesAndCounts(RNode n, double numTuples = (sampleSize * totalTuples) / totalSamples; n.bucket.setSample(sample); n.bucket.setEstimatedNumTuples(numTuples); - System.out.println("DEB: Bucket " + n.bucket.getBucketId() + " " + n.bucket.getEstimatedNumTuples()); } else { // By sorting we avoid memory allocation // Will most probably be faster