Skip to content

Commit

Permalink
Merge branch 'master' into LocalTextFile
Browse files Browse the repository at this point in the history
  • Loading branch information
ghislainfourny authored Oct 1, 2020
2 parents 95f13dc + 4e06459 commit 7dc495c
Show file tree
Hide file tree
Showing 14 changed files with 200 additions and 40 deletions.
7 changes: 5 additions & 2 deletions docs/CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@ The parameters that can be used on the command line as well as on the planned HT
| --shell | N/A | yes, no | yes runs the interactive shell. No executes a query specified with --query-path |
| --query-path | query-path | file:///folder/file.jq | A JSONiq query file to read from (from any file system, even the Web!). |
| --output-path | output-path | file:///folder/output | Where to output to (if the output is large, it will create a sharded directory, otherwise it will create a file) |
| --log-path | log-path | file:///folder/log.txt | Where to output log information |
| --materialization-cap | materialization-cap | 200 | A cap on the maximum number of items to materialize for large sequences within a query or for outputting on screen (used to be called --result-size). |
| --output-format | N/A | json, csv, avro, parquet, or any other format supported by Spark | An output format to use for the output. Formats other than json can only be output if the query outputs a highly structured sequence of objects (you can nest your query in an annotate() call to specify a schema if it does not). |
| --output-format-option:foo | N/A | bar | Options to further specify the output format (example: separator character for CSV, compression format...) |
| --overwrite | overwrite | yes, no | Whether to overwrite to --output-path. No throws an error if the output file/folder exists. |
| --materialization-cap | materialization-cap | 200 | A cap on the maximum number of items to materialize for large sequences within a query or for outputting on screen (used to be called --result-size). |
| --number-of-output-partitions | N/A | ad hoc | How many partitions to create in the output, i.e., the number of files that will be created in the output path directory.
| --log-path | log-path | file:///folder/log.txt | Where to output log information |
| --print-iterator-tree | N/A | yes, no | For debugging purposes, prints out the expression tree and runtime interator tree. |
| --show-error-info | show-error-info | yes, no | For debugging purposes. If you want to report a bug, you can use this to get the full exception stack. If no, then only a short message is shown in case of error. |
| --server | N/A | yes, no | yes runs Rumble as a server on port 8001. Run queries with http://localhost:8001/jsoniq?query-path=/folder/foo.json |
Expand Down
24 changes: 24 additions & 0 deletions src/main/java/org/rumbledb/api/SequenceOfItems.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import java.util.List;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.rumbledb.config.RumbleRuntimeConfiguration;
import org.rumbledb.context.DynamicContext;
import org.rumbledb.runtime.RuntimeIterator;
Expand Down Expand Up @@ -98,6 +100,15 @@ public boolean availableAsRDD() {
return this.iterator.isRDD();
}

/**
* Checks whether the iterator is available as a data frame for further processing without having to collect.
*
* @return true if it is available as a data frame.
*/
public boolean availableAsDataFrame() {
return this.iterator.isDataFrame();
}

/**
* Returns the sequence of items as an RDD of Items rather than iterating over them locally.
* It is not possible to do so if the iterator is open.
Expand All @@ -111,6 +122,19 @@ public JavaRDD<Item> getAsRDD() {
return this.iterator.getRDD(this.dynamicContext);
}

/**
* Returns the sequence of items as a data frame rather than iterating over them locally.
* It is not possible to do so if the iterator is open.
*
* @return a data frame.
*/
public Dataset<Row> getAsDataFrame() {
if (this.isOpen) {
throw new RuntimeException("Cannot obtain an RDD if the iterator is open.");
}
return this.iterator.getDataFrame(this.dynamicContext);
}

public long populateList(List<Item> resultList) {
resultList.clear();
this.iterator.open(this.dynamicContext);
Expand Down
45 changes: 44 additions & 1 deletion src/main/java/org/rumbledb/cli/JsoniqQueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
package org.rumbledb.cli;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.rumbledb.api.Item;
import org.rumbledb.api.Rumble;
import org.rumbledb.api.SequenceOfItems;
Expand All @@ -36,6 +39,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;


Expand Down Expand Up @@ -99,9 +103,48 @@ public List<Item> runQuery() throws IOException {
Rumble rumble = new Rumble(this.configuration);
SequenceOfItems sequence = rumble.runQuery(queryUri);

if (sequence.availableAsRDD() && outputPath != null) {
if (
!this.configuration.getOutputFormat().equals("json")
&&
!sequence.availableAsDataFrame()
) {
throw new CliException(
"Rumble cannot output another format than JSON if the query does not output a structured collection. You can create a structured collection from a sequence of objects by calling the function annotate(<your query here> , <a schema here>)."
);
}
if (sequence.availableAsDataFrame() && outputPath != null) {
Dataset<Row> df = sequence.getAsDataFrame();
if (this.configuration.getNumberOfOutputPartitions() > 0) {
df = df.repartition(this.configuration.getNumberOfOutputPartitions());
}
DataFrameWriter<Row> writer = df.write();
Map<String, String> options = this.configuration.getOutputFormatOptions();
for (String key : options.keySet()) {
writer.option(key, options.get(key));
System.out.println("[INFO] Writing with option " + key + " : " + options.get(key));
}
String format = this.configuration.getOutputFormat();
System.out.println("[INFO] Writing to format " + format);
switch (format) {
case "json":
writer.json(outputPath);
break;
case "csv":
writer.csv(outputPath);
break;
case "parquet":
writer.parquet(outputPath);
break;
default:
writer.format(format).save(outputPath);
}
} else if (sequence.availableAsRDD() && outputPath != null) {
JavaRDD<Item> rdd = sequence.getAsRDD();
JavaRDD<String> outputRDD = rdd.map(o -> o.serialize());
if (this.configuration.getNumberOfOutputPartitions() > 0) {
outputRDD = outputRDD.repartition(this.configuration.getNumberOfOutputPartitions());
}

outputRDD.saveAsTextFile(outputPath);
} else {
outputList = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,10 +279,8 @@ private RuntimeTupleIterator visitFlowrClause(
OrderByClauseSortingKey.EMPTY_ORDER emptyOrder = orderExpr.getEmptyOrder();
if (emptyOrder == OrderByClauseSortingKey.EMPTY_ORDER.NONE) {
if (clause.getStaticContext().isEmptySequenceOrderLeast()) {
System.out.println("Setting to least.");
emptyOrder = OrderByClauseSortingKey.EMPTY_ORDER.LEAST;
} else {
System.out.println("Setting to greatest.");
emptyOrder = OrderByClauseSortingKey.EMPTY_ORDER.GREATEST;
}
}
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/org/rumbledb/config/RumbleRuntimeConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class RumbleRuntimeConfiguration implements Serializable, KryoSerializable {

Expand All @@ -45,6 +46,9 @@ public class RumbleRuntimeConfiguration implements Serializable, KryoSerializabl

List<String> allowedPrefixes;
private int resultsSizeCap;
private String outputFormat;
private Map<String, String> outputFormatOptions;
private int numberOfOutputPartitions;

private static final RumbleRuntimeConfiguration defaultConfiguration = new RumbleRuntimeConfiguration();

Expand Down Expand Up @@ -112,12 +116,54 @@ public void setAllowedURIPrefixes(List<String> newValue) {
this.allowedPrefixes = newValue;
}

public String getOutputFormat() {
return this.outputFormat;
}

public void setOutputFormat(String newValue) {
this.outputFormat = newValue;
}

public int getNumberOfOutputPartitions() {
return this.numberOfOutputPartitions;
}

public void setNumberOfOutputPartitions(int newValue) {
this.numberOfOutputPartitions = newValue;
}

public Map<String, String> getOutputFormatOptions() {
return this.outputFormatOptions;
}

public void setOutputFormatOption(String key, String value) {
this.outputFormatOptions.put(key, value);
}

public void init() {
if (this.arguments.containsKey("allowed-uri-prefixes")) {
this.allowedPrefixes = Arrays.asList(this.arguments.get("allowed-uri-prefixes").split(";"));
} else {
this.allowedPrefixes = Arrays.asList();
}
if (this.arguments.containsKey("output-format")) {
this.outputFormat = this.arguments.get("output-format").toLowerCase();
} else {
this.outputFormat = "json";
}
if (this.arguments.containsKey("number-of-output-partitions")) {
this.numberOfOutputPartitions = Integer.valueOf(this.arguments.get("number-of-output-partitions"));
} else {
this.numberOfOutputPartitions = -1;
}
this.outputFormatOptions = new HashMap<>();
for (String s : this.arguments.keySet()) {
if (s.startsWith("output-format-option:")) {
String key = s.substring(21);
String value = this.arguments.get(s);
this.outputFormatOptions.put(key, value);
}
}
if (this.arguments.containsKey("materialization-cap")) {
this.resultsSizeCap = Integer.parseInt(this.arguments.get("materialization-cap"));
} else {
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/rumbledb/runtime/RuntimeTupleIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ public abstract Dataset<Row> getDataFrame(
* Builds the DataFrame projection that this clause needs to receive from its child clause.
* The intent is that the result of this method is forwarded to the child clause in getDataFrame() so it can
* optimize some values away.
* Invariant: all keys in getProjection(...) MUST be input tuple variables,
* i.e., appear in this.child.getOutputTupleVariableNames()
*
* @param parentProjection the projection needed by the parent clause.
* @return the projection needed by this clause.
Expand Down Expand Up @@ -179,13 +181,13 @@ public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
}

/**
* Returns the variables bound in descendant (previous) clauses of the current FLWOR.
* Returns the output tuple variable names.
* These variables can be removed from the dependencies of expressions in ascendent (subsequent) clauses,
* because their values are provided in the tuples rather than the dynamic context object.
*
* @return the set of variable names that are bound by descendant clauses.
*/
public Set<Name> getVariablesBoundInCurrentFLWORExpression() {
public Set<Name> getOutputTupleVariableNames() {
return new HashSet<Name>();
}

Expand All @@ -204,7 +206,7 @@ public void print(StringBuffer buffer, int indent) {
buffer.append(" | ");

buffer.append("Variables bound in current FLWOR: ");
for (Name v : getVariablesBoundInCurrentFLWORExpression()) {
for (Name v : getOutputTupleVariableNames()) {
buffer.append(v + " ");
}
buffer.append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,9 @@ public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
return result;
}

public Set<Name> getVariablesBoundInCurrentFLWORExpression() {
public Set<Name> getOutputTupleVariableNames() {
Set<Name> result = new HashSet<>();
result.addAll(this.child.getVariablesBoundInCurrentFLWORExpression());
result.addAll(this.child.getOutputTupleVariableNames());
result.add(this.variableName);
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,7 +1006,7 @@ public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
Map<Name, DynamicContext.VariableDependency> result =
new TreeMap<>(this.assignmentIterator.getVariableDependencies());
if (this.child != null) {
for (Name var : this.child.getVariablesBoundInCurrentFLWORExpression()) {
for (Name var : this.child.getOutputTupleVariableNames()) {
result.remove(var);
}
result.putAll(this.child.getVariableDependencies());
Expand All @@ -1015,10 +1015,10 @@ public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
}

@Override
public Set<Name> getVariablesBoundInCurrentFLWORExpression() {
public Set<Name> getOutputTupleVariableNames() {
Set<Name> result = new HashSet<>();
if (this.child != null) {
result.addAll(this.child.getVariablesBoundInCurrentFLWORExpression());
result.addAll(this.child.getOutputTupleVariableNames());
}
result.add(this.variableName);
if (this.positionalVariableName != null) {
Expand Down Expand Up @@ -1071,10 +1071,16 @@ public Map<Name, DynamicContext.VariableDependency> getProjection(
if (projection.get(variable) != exprDependency.get(variable)) {
// If the projection already needed a different kind of dependency, we fall back to the full
// sequence of items.
projection.put(variable, DynamicContext.VariableDependency.FULL);
if (
this.child != null && this.child.getOutputTupleVariableNames().contains(variable)
) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
}
}
} else {
projection.put(variable, exprDependency.get(variable));
if (this.child != null && this.child.getOutputTupleVariableNames().contains(variable)) {
projection.put(variable, exprDependency.get(variable));
}
}
}
return projection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,19 +392,19 @@ public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
result.put(iterator.getVariableName(), DynamicContext.VariableDependency.FULL);
}
}
for (Name var : this.child.getVariablesBoundInCurrentFLWORExpression()) {
for (Name var : this.child.getOutputTupleVariableNames()) {
result.remove(var);
}
result.putAll(this.child.getVariableDependencies());
return result;
}

public Set<Name> getVariablesBoundInCurrentFLWORExpression() {
public Set<Name> getOutputTupleVariableNames() {
Set<Name> result = new HashSet<>();
for (GroupByClauseSparkIteratorExpression iterator : this.groupingExpressions) {
result.add(iterator.getVariableName());
}
result.addAll(this.child.getVariablesBoundInCurrentFLWORExpression());
result.addAll(this.child.getOutputTupleVariableNames());
return result;
}

Expand Down Expand Up @@ -436,18 +436,24 @@ public Map<Name, DynamicContext.VariableDependency> getProjection(
for (GroupByClauseSparkIteratorExpression iterator : this.groupingExpressions) {
if (iterator.getExpression() == null) {
Name variable = iterator.getVariableName();
projection.put(variable, DynamicContext.VariableDependency.FULL);
if (this.child.getOutputTupleVariableNames().contains(variable)) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
}
continue;
}
Map<Name, DynamicContext.VariableDependency> exprDependency = iterator.getExpression()
.getVariableDependencies();
for (Name variable : exprDependency.keySet()) {
if (projection.containsKey(variable)) {
if (projection.get(variable) != exprDependency.get(variable)) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
if (this.child.getOutputTupleVariableNames().contains(variable)) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
}
}
} else {
projection.put(variable, exprDependency.get(variable));
if (this.child.getOutputTupleVariableNames().contains(variable)) {
projection.put(variable, exprDependency.get(variable));
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,26 +425,26 @@ public static boolean isExpressionIndependentFromInputTuple(
Set<Name> intersection = new HashSet<>(
sequenceIterator.getVariableDependencies().keySet()
);
intersection.retainAll(tupleIterator.getVariablesBoundInCurrentFLWORExpression());
intersection.retainAll(tupleIterator.getOutputTupleVariableNames());
return intersection.isEmpty();
}

public Map<Name, DynamicContext.VariableDependency> getVariableDependencies() {
Map<Name, DynamicContext.VariableDependency> result =
new TreeMap<>(this.assignmentIterator.getVariableDependencies());
if (this.child != null) {
for (Name var : this.child.getVariablesBoundInCurrentFLWORExpression()) {
for (Name var : this.child.getOutputTupleVariableNames()) {
result.remove(var);
}
result.putAll(this.child.getVariableDependencies());
}
return result;
}

public Set<Name> getVariablesBoundInCurrentFLWORExpression() {
public Set<Name> getOutputTupleVariableNames() {
Set<Name> result = new HashSet<>();
if (this.child != null) {
result.addAll(this.child.getVariablesBoundInCurrentFLWORExpression());
result.addAll(this.child.getOutputTupleVariableNames());
}
result.add(this.variableName);
return result;
Expand Down Expand Up @@ -481,10 +481,16 @@ public Map<Name, DynamicContext.VariableDependency> getProjection(
for (Name variable : exprDependency.keySet()) {
if (projection.containsKey(variable)) {
if (projection.get(variable) != exprDependency.get(variable)) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
if (
this.child != null && this.child.getOutputTupleVariableNames().contains(variable)
) {
projection.put(variable, DynamicContext.VariableDependency.FULL);
}
}
} else {
projection.put(variable, exprDependency.get(variable));
if (this.child != null && this.child.getOutputTupleVariableNames().contains(variable)) {
projection.put(variable, exprDependency.get(variable));
}
}
}
return projection;
Expand Down
Loading

0 comments on commit 7dc495c

Please sign in to comment.