Skip to content

Commit

Permalink
Removing hard coded replication factor, removing some debug statements
Browse files Browse the repository at this point in the history
  • Loading branch information
anilshanbhag committed Oct 7, 2015
1 parent 5d65faf commit 3bd2dc0
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 57 deletions.
2 changes: 1 addition & 1 deletion scripts/fabfile/run_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ def run_tpch_queries():
#cmd = './hadoop jar $JAR perf.benchmark.TPCHWorkload ' + \
cmd = submit_script_path + ' --class perf.benchmark.TPCHWorkload --deploy-mode client --master spark://bits:7077 $JAR ' + \
' --conf $CONF' + \
' --numQueries 3' + \
' --numQueries 6' + \
' --schema "$SCHEMA"' + \
' --numFields $NUMFIELDS' + \
' --numTuples $NUMTUPLES' + \
Expand Down
11 changes: 5 additions & 6 deletions src/main/java/core/access/HDFSPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import core.utils.HDFSUtils;

public class HDFSPartition extends Partition {

private static final long serialVersionUID = 1L;

protected FileSystem hdfs;
Expand Down Expand Up @@ -58,16 +57,16 @@ public HDFSPartition(FileSystem hdfs, String pathAndPartitionId,
this.client = client;
}

public HDFSPartition(FileSystem hdfs, String pathAndPartitionId,
CuratorFramework client) {
this(hdfs, pathAndPartitionId, (short) 3, client);
}
// public HDFSPartition(FileSystem hdfs, String pathAndPartitionId,
// CuratorFramework client) {
// this(hdfs, pathAndPartitionId, (short) 1, client);
// }

@Override
public Partition clone() {
String clonePath = path
.replaceAll("partitions[0-9]*/$", "repartition/");
Partition p = new HDFSPartition(hdfs, clonePath + partitionId, client);
Partition p = new HDFSPartition(hdfs, clonePath + partitionId, replication, client);
// p.bytes = new byte[bytes.length]; // heap space!
p.bytes = new byte[1024];
p.state = State.NEW;
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/core/access/ReusableHDFSPartition.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,11 @@ public ReusableHDFSPartition(FileSystem hdfs, String pathAndPartitionId,
this.client = client;
}

public ReusableHDFSPartition(FileSystem hdfs, String pathAndPartitionId,
CuratorFramework client, BufferManager buffMgr) {
this(hdfs, pathAndPartitionId, (short) 3, client, buffMgr);
}

@Override
public Partition clone() {
String clonePath = path
.replaceAll("partitions[0-9]*/$", "repartition/");
Partition p = new HDFSPartition(hdfs, clonePath + partitionId, client);
Partition p = new HDFSPartition(hdfs, clonePath + partitionId, replication, client);
// p.bytes = new byte[bytes.length]; // heap space!
p.bytes = new byte[1024];
p.state = Partition.State.NEW;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public DistributedRepartitionIterator(FilterQuery query, RNode newIndexTree) {

@Override
public void finish() {

// PartitionLock l = new PartitionLock(zookeeperHosts);

for (Partition p : newPartitions.values()) {
Expand Down
1 change: 0 additions & 1 deletion src/main/java/core/access/spark/HPInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public void initialize(List<FileStatus> files, AccessMethod am) {
this.am = am;
partitionIdFileMap = ArrayListMultimap.create();
partitionIdSizeMap = Maps.newHashMap();
System.out.println("FileStatus has " + files.size() + " files.");
for (FileStatus file : files) {
try {
int id = Integer.parseInt(FilenameUtils.getName(file.getPath()
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/core/access/spark/SparkRecordReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ protected boolean initializeNext() throws IOException {
else {
Path filePath = sparkSplit.getPath(currentFile);
final FileSystem fs = filePath.getFileSystem(conf);
HDFSPartition partition = new HDFSPartition(fs,
filePath.toString(), client);
HDFSPartition partition = new HDFSPartition(fs, filePath.toString(),
Short.parseShort(conf.get(SparkQueryConf.HDFS_REPLICATION_FACTOR)),
client);
System.out.println("INFO: Loading path: " + filePath.toString());
try {
partition.loadNext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import core.access.iterator.IteratorRecord;
import core.access.iterator.ReusablePartitionIterator;
import core.access.spark.SparkInputFormat.SparkFileSplit;
import core.access.spark.SparkQueryConf;
import core.access.spark.join.SparkJoinRecordReader.JoinTuplePair;
import core.utils.BufferManager;
import core.utils.Pair;
Expand Down Expand Up @@ -112,7 +113,9 @@ protected boolean initializeNext() throws IOException {
Path filePath = sparkSplit.getPath(currentFile);
final FileSystem fs = filePath.getFileSystem(conf);
ReusableHDFSPartition partition = new ReusableHDFSPartition(fs,
filePath.toString(), client, buffMgr);
filePath.toString(),
Short.parseShort(conf.get(SparkQueryConf.HDFS_REPLICATION_FACTOR)),
client, buffMgr);
System.out.println("loading path: " + filePath.toString());
try {
partition.loadNext();
Expand Down Expand Up @@ -142,6 +145,7 @@ protected boolean getNext() throws IOException {
return false;
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

if (firstRecords != null && firstRecords.hasNext())
Expand Down
22 changes: 0 additions & 22 deletions src/main/java/core/adapt/opt/Optimizer.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import core.access.AccessMethod.PartitionSplit;
import core.access.Predicate;
import core.access.Predicate.PREDTYPE;
import core.access.Query;
import core.access.Query.FilterQuery;
import core.access.iterator.PartitionIterator;
Expand Down Expand Up @@ -147,23 +146,6 @@ public PartitionSplit[] buildAccessPlan(final Query q) {
}
}

// Just for debug
public PartitionSplit[] testRepartitionIteratorPlan(final Query q) {
Plan pl = new Plan();
pl.cost = 1;
pl.benefit = 1000;
Action ac = new Action();
ac.pid = 0;
ac.option = 1;
pl.actions = ac;

Predicate p1 = new Predicate(0, TYPE.INT, 283359707, PREDTYPE.GT);
FilterQuery dummy = new FilterQuery(new Predicate[] { p1 });

PartitionSplit[] psplits = this.getPartitionSplits(pl, dummy);
return psplits;
}

public PartitionSplit[] buildPlan(final Query q) {
if (q instanceof FilterQuery) {
FilterQuery fq = (FilterQuery) q;
Expand Down Expand Up @@ -230,10 +212,6 @@ public PartitionSplit[] buildPlan(final Query q) {
public Plan getBestPlan(Predicate[] ps) {
// TODO: Multiple predicates seem to complicate the simple idea we had;
// think more :-/

// Make sure all the bucket counts are non zero.
this.rt.checkSane();

Plan plan = null;
for (int i = 0; i < ps.length; i++) {
Plan option = getBestPlanForPredicate(ps, i);
Expand Down
17 changes: 0 additions & 17 deletions src/main/java/core/index/robusttree/RobustTree.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,6 @@ public RobustTree() {

}

// TODO: Remove
public void checkSane() {
LinkedList<RNode> l = new LinkedList<RNode>();
l.add(root);

while(!(l.size() == 0)) {
RNode first = l.removeFirst();
if (first.bucket != null ){
System.out.println("Num Tuples: " + first.bucket.getBucketId() + " " + first.bucket.getEstimatedNumTuples());
} else {
l.add(first.leftChild);
l.add(first.rightChild);
}
}
}

@Override
public MDIndex clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException();
Expand Down Expand Up @@ -334,7 +318,6 @@ public void initializeBucketSamplesAndCounts(RNode n,
double numTuples = (sampleSize * totalTuples) / totalSamples;
n.bucket.setSample(sample);
n.bucket.setEstimatedNumTuples(numTuples);
System.out.println("DEB: Bucket " + n.bucket.getBucketId() + " " + n.bucket.getEstimatedNumTuples());
} else {
// By sorting we avoid memory allocation
// Will most probably be faster
Expand Down

0 comments on commit 3bd2dc0

Please sign in to comment.