Skip to content

Commit

Permalink
Merge.
Browse files Browse the repository at this point in the history
  • Loading branch information
ghislainfourny committed Dec 3, 2020
2 parents ed66c8f + aba9803 commit 6308381
Show file tree
Hide file tree
Showing 71 changed files with 1,198 additions and 597 deletions.
4 changes: 2 additions & 2 deletions docs/Getting started.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ Create, in the same directory as Rumble to keep it simple, a file data.json and

In a shell, from the directory where the rumble .jar lies, type, all on one line:

spark-submit spark-rumble-1.9.0.jar --shell yes
spark-submit spark-rumble-1.9.1.jar --shell yes
The Rumble shell appears:

____ __ __
/ __ \__ ______ ___ / /_ / /__
/ /_/ / / / / __ `__ \/ __ \/ / _ \ The distributed JSONiq engine
/ _, _/ /_/ / / / / / / /_/ / / __/ 1.9.0 "Scots pine" beta
/ _, _/ /_/ / / / / / / /_/ / / __/ 1.9.1 "Scots pine" beta
/_/ |_|\__,_/_/ /_/ /_/_.___/_/\___/


Expand Down
8 changes: 4 additions & 4 deletions docs/HTTPServer.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

Rumble can be run as an HTTP server that listens for queries. In order to do so, you can use the --server and --port parameters:

spark-submit spark-rumble-1.9.0.jar --server yes --port 8001
spark-submit spark-rumble-1.9.1.jar --server yes --port 8001

This command will not return until you force it to (Ctrl+C on Linux and Mac). This is because the server has to run permanently to listen to incoming requests.

Expand Down Expand Up @@ -92,19 +92,19 @@ Then there are two options
- Connect to the master with SSH with an extra parameter for securely tunneling the HTTP connection (for example `-L 8001:localhost:8001` or any port of your choosing)
- Download the Rumble jar to the master node

wget https://github.com/RumbleDB/rumble/releases/download/v1.9.0/spark-rumble-1.9.0.jar
wget https://github.com/RumbleDB/rumble/releases/download/v1.9.1/spark-rumble-1.9.1.jar

- Launch the HTTP server on the master node (it will be accessible under `http://localhost:8001/jsoniq`).

spark-submit spark-rumble-1.9.0.jar --server yes --port 8001
spark-submit spark-rumble-1.9.1.jar --server yes --port 8001

- And then use Jupyter notebooks in the same way you would do it locally (it magically works because of the tunneling)

### With the EC2 hostname

There is also another way that does not need any tunnelling: you can specify the hostname of your EC2 machine (copied over from the EC2 dashboard) with the --host parameter. For example, with the placeholder <ec2-hostname>:

spark-submit spark-rumble-1.9.0.jar --server yes --port 8001 --host <ec2-hostname>
spark-submit spark-rumble-1.9.1.jar --server yes --port 8001 --host <ec2-hostname>

You also need to make sure in your EMR security group that the chosen port (e.g., 8001) is accessible from the machine in which you run your Jupyter notebook. Then, you can point your Jupyter notebook on this machine to `http://<ec2-hostname>:8001/jsoniq`.

Expand Down
12 changes: 6 additions & 6 deletions docs/Run on a cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,21 @@ simply by modifying the command line parameters as documented [here for spark-su

If the Spark cluster is running on yarn, then the --master option can be changed from local[\*] to yarn compared to the getting started guide. Most of the time, though (e.g., on Amazon EMR), it needs not be specified, as this is already set up in the environment.

spark-submit spark-rumble-1.9.0.jar --shell yes
spark-submit spark-rumble-1.9.1.jar --shell yes
or explicitly:

spark-submit --master yarn --deploy-mode client spark-rumble-1.9.0.jar --shell yes
spark-submit --master yarn --deploy-mode client spark-rumble-1.9.1.jar --shell yes

You can also adapt the number of executors, etc.

spark-submit --num-executors 30 --executor-cores 3 --executor-memory 10g
spark-rumble-1.9.0.jar --shell yes
spark-rumble-1.9.1.jar --shell yes

The size limit for materialization can also be made higher with --materialization-cap (the default is 200). This affects the number of items displayed on the shells as an answer to a query, as well as any materializations happening within the query with push-down is not supported. Warnings are issued if the cap is reached.

spark-submit --num-executors 30 --executor-cores 3 --executor-memory 10g
spark-rumble-1.9.0.jar
spark-rumble-1.9.1.jar
--shell yes --materialization-cap 10000

## Creation functions
Expand Down Expand Up @@ -59,15 +59,15 @@ Note that by default only the first 1000 items in the output will be displayed o
Rumble also supports executing a single query from the command line, reading from HDFS and outputting the results to HDFS, with the query file being either local or on HDFS. For this, use the --query-path, --output-path and --log-path parameters.

spark-submit --num-executors 30 --executor-cores 3 --executor-memory 10g
spark-rumble-1.9.0.jar
spark-rumble-1.9.1.jar
--query-path "hdfs:///user/me/query.jq"
--output-path "hdfs:///user/me/results/output"
--log-path "hdfs:///user/me/logging/mylog"

The query path, output path and log path can be any of the supported schemes (HDFS, file, S3, WASB...) and can be relative or absolute.

spark-submit --num-executors 30 --executor-cores 3 --executor-memory 10g
spark-rumble-1.9.0.jar
spark-rumble-1.9.1.jar
--query-path "/home/me/my-local-machine/query.jq"
--output-path "/user/me/results/output"
--log-path "hdfs:///user/me/logging/mylog"
Expand Down
4 changes: 2 additions & 2 deletions docs/install.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ After successful completion, you can check the `target` directory, which should

The most straightforward to test if the above steps were successful is to run the Rumble shell locally, like so:

$ spark-submit target/spark-rumble-1.9.0.jar --shell yes
$ spark-submit target/spark-rumble-1.9.1.jar --shell yes

The Rumble shell should start:

Expand All @@ -73,7 +73,7 @@ The Rumble shell should start:
____ __ __
/ __ \__ ______ ___ / /_ / /__
/ /_/ / / / / __ `__ \/ __ \/ / _ \ The distributed JSONiq engine
/ _, _/ /_/ / / / / / / /_/ / / __/ 1.9.0 "Scots pine" beta
/ _, _/ /_/ / / / / / / /_/ / / __/ 1.9.1 "Scots pine" beta
/_/ |_|\__,_/_/ /_/ /_/_.___/_/\___/

Master: local[2]
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

<groupId>com.github.rumbledb</groupId>
<artifactId>spark-rumble</artifactId>
<version>1.9.0</version>
<version>1.9.1</version>
<packaging>jar</packaging>
<name>Rumble</name>
<description>A JSONiq engine to query large-scale JSON datasets stored on HDFS. Spark under the hood.</description>
Expand Down
16 changes: 0 additions & 16 deletions src/main/java/org/rumbledb/api/Item.java
Original file line number Diff line number Diff line change
Expand Up @@ -620,22 +620,6 @@ public Item subtract(Item other) {
throw new UnsupportedOperationException("Operation not defined");
}

public Item multiply(Item other) {
throw new UnsupportedOperationException("Operation not defined");
}

public Item divide(Item other) {
throw new UnsupportedOperationException("Operation not defined");
}

public Item modulo(Item other) {
throw new UnsupportedOperationException("Operation not defined");
}

public Item idivide(Item other) {
throw new UnsupportedOperationException("Operation not defined");
}

/**
* Returns the dynamic type of the item (for error message purposes).
*
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/rumbledb/cli/JsoniqQueryExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ public List<Item> runQuery() throws IOException {
Map<String, String> options = this.configuration.getOutputFormatOptions();
for (String key : options.keySet()) {
writer.option(key, options.get(key));
System.out.println("[INFO] Writing with option " + key + " : " + options.get(key));
System.err.println("[INFO] Writing with option " + key + " : " + options.get(key));
}
String format = this.configuration.getOutputFormat();
System.out.println("[INFO] Writing to format " + format);
System.err.println("[INFO] Writing to format " + format);
switch (format) {
case "json":
writer.json(outputPath);
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/org/rumbledb/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public static void main(String[] args) throws IOException {
try {
sparksoniqConf = new RumbleRuntimeConfiguration(args);

JsonIterUtils.applyJsonIterFaultyInitializationWorkAround();
JsonIterUtils.applyJsonIterFaultyInitializationWorkAround(sparksoniqConf);

if (sparksoniqConf.isShell()) {
launchShell(sparksoniqConf);
Expand Down Expand Up @@ -112,11 +112,11 @@ private static void handleException(Throwable ex, boolean showErrorInfo) {
}
System.exit(43);
} else {
System.out.println("An error has occured: " + ex.getMessage());
System.out.println(
System.err.println("An error has occured: " + ex.getMessage());
System.err.println(
"We should investigate this 🙈. Please contact us or file an issue on GitHub with your query."
);
System.out.println("Link: https://github.com/RumbleDB/rumble/issues");
System.err.println("Link: https://github.com/RumbleDB/rumble/issues");
if (showErrorInfo) {
ex.printStackTrace();
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/rumbledb/compiler/InferTypeVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
*/
public class InferTypeVisitor extends AbstractNodeVisitor<StaticContext> {

@SuppressWarnings("unused")
private RumbleRuntimeConfiguration rumbleRuntimeConfiguration;

/**
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/rumbledb/compiler/TranslationVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,12 @@ public Node visitUnaryExpr(JsoniqParser.UnaryExprContext ctx) {
if (ctx.op == null || ctx.op.isEmpty()) {
return mainExpression;
}
boolean negated = !ctx.op.isEmpty() && ctx.op.get(0).getText().contentEquals("-");
boolean negated = false;
for (Token t : ctx.op) {
if (t.getText().contentEquals("-")) {
negated = !negated;
}
}
return new UnaryExpression(
mainExpression,
negated,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,8 @@ private Map<Name, Node> buildNameToNodeMap(Prolog prolog) {
visit(variableDeclaration, null);
nameToNodeMap.put(variableDeclaration.getVariableName(), variableDeclaration);
if (this.rumbleRuntimeConfiguration.isPrintIteratorTree()) {
System.out.print(variableDeclaration.getVariableName());
System.out.println(
System.err.print(variableDeclaration.getVariableName());
System.err.println(
String.join(
", ",
getInputVariableDependencies(variableDeclaration).stream()
Expand All @@ -458,8 +458,8 @@ private Map<Name, Node> buildNameToNodeMap(Prolog prolog) {
visit(functionDeclaration, null);
nameToNodeMap.put(functionDeclaration.getFunctionIdentifier().getNameWithArity(), functionDeclaration);
if (this.rumbleRuntimeConfiguration.isPrintIteratorTree()) {
System.out.print(functionDeclaration.getFunctionIdentifier().toString());
System.out.println(
System.err.print(functionDeclaration.getFunctionIdentifier().toString());
System.err.println(
String.join(
", ",
getInputVariableDependencies(functionDeclaration).stream()
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/rumbledb/compiler/VisitorHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static RuntimeIterator generateRuntimeIterator(Node node, RumbleRuntimeCo
if (conf.isPrintIteratorTree()) {
StringBuffer sb = new StringBuffer();
result.print(sb, 0);
System.out.println(sb);
System.err.println(sb);
}
return result;
}
Expand All @@ -60,13 +60,13 @@ private static void inferTypes(Module module, RumbleRuntimeConfiguration conf) {
}

private static void printTree(Module node, RumbleRuntimeConfiguration conf) {
System.out.println("***************");
System.out.println("Expression tree");
System.out.println("***************");
System.out.println("Unset execution modes: " + node.numberOfUnsetExecutionModes());
System.out.println(node);
System.out.println();
System.out.println(node.getStaticContext());
System.err.println("***************");
System.err.println("Expression tree");
System.err.println("***************");
System.err.println("Unset execution modes: " + node.numberOfUnsetExecutionModes());
System.err.println(node);
System.err.println();
System.err.println(node.getStaticContext());
}

public static MainModule parseMainModuleFromLocation(URI location, RumbleRuntimeConfiguration configuration)
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/rumbledb/config/RumbleRuntimeConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class RumbleRuntimeConfiguration implements Serializable, KryoSerializabl
private int numberOfOutputPartitions;
private Map<Name, List<Item>> externalVariableValues;
private Map<Name, String> unparsedExternalVariableValues;
private boolean deactivateJsoniterStreaming;

private static final RumbleRuntimeConfiguration defaultConfiguration = new RumbleRuntimeConfiguration();

Expand Down Expand Up @@ -187,6 +188,11 @@ public void init() {
this.unparsedExternalVariableValues.put(name, this.arguments.get(s));
}
}
if (this.arguments.containsKey("deactivate-jsoniter-streaming")) {
this.deactivateJsoniterStreaming = this.arguments.get("deactivate-jsoniter-streaming").equals("yes");
} else {
this.deactivateJsoniterStreaming = false;
}
}

public boolean getOverwrite() {
Expand Down Expand Up @@ -293,6 +299,14 @@ public boolean printInferredTypes() {
return this.arguments.containsKey("print-inferred-types")
&& this.arguments.get("print-inferred-types").equals("yes");
}

public boolean getDeactivateJsoniterStreaming() {
return this.deactivateJsoniterStreaming;
}

public void setDeactivateJsoniterStreaming(boolean b) {
this.deactivateJsoniterStreaming = b;
}

public boolean isLocal() {
String masterConfig = SparkSessionManager.getInstance().getJavaSparkContext().getConf().get("spark.master");
Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/rumbledb/context/NamedFunctions.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ public static RuntimeIterator getBuiltInFunctionIterator(
ExceptionMetadata metadata
) {
BuiltinFunction builtinFunction = BuiltinFunctionCatalogue.getBuiltinFunction(identifier);

for (int i = 0; i < arguments.size(); i++) {
if (
!builtinFunction.getSignature()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package org.rumbledb.exceptions;

public class NoNativeQueryException extends RuntimeException {

private static final long serialVersionUID = 1L;

public NoNativeQueryException() {
super("It was not possible to generate a native sparkSQL query for this expression");
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/rumbledb/exceptions/RumbleException.java
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public static RumbleException unnestException(Throwable ex) {
} else if (ex instanceof RumbleException) {
return (RumbleException) ex;
} else {
return null;
RumbleException e2 = new OurBadException("Unanticipated exception!");
e2.initCause(ex);
return e2;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void initHighestExecutionMode(VisitorConfig visitorConfig) {
}

for (Expression expression : this.expressions) {
if (!expression.getHighestExecutionMode(visitorConfig).isRDD()) {
if (!expression.getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()) {
this.highestExecutionMode = ExecutionMode.LOCAL;
return;
}
Expand Down
6 changes: 5 additions & 1 deletion src/main/java/org/rumbledb/expressions/ExecutionMode.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@ public enum ExecutionMode {
RDD,
DATAFRAME;

public boolean isRDD() {
public boolean isRDDOrDataFrame() {
return this == ExecutionMode.RDD || this == ExecutionMode.DATAFRAME;
}

public boolean isDataFrame() {
return this == ExecutionMode.DATAFRAME;
}

public boolean isRDD() {
return this == ExecutionMode.RDD;
}

public String toString() {
switch (this) {
case UNSET:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public List<Node> getChildren() {
@Override
public void initHighestExecutionMode(VisitorConfig visitorConfig) {
if (
this.thenExpression.getHighestExecutionMode(visitorConfig).isRDD()
&& this.elseExpression.getHighestExecutionMode(visitorConfig).isRDD()
this.thenExpression.getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()
&& this.elseExpression.getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()
) {
this.highestExecutionMode = ExecutionMode.RDD;
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public void initHighestExecutionMode(VisitorConfig visitorConfig) {

if (this.highestExecutionMode == ExecutionMode.RDD) {
for (SwitchCase c : this.cases) {
if (!c.getReturnExpression().getHighestExecutionMode(visitorConfig).isRDD()) {
if (!c.getReturnExpression().getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()) {
this.highestExecutionMode = ExecutionMode.LOCAL;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void initHighestExecutionMode(VisitorConfig visitorConfig) {

if (this.highestExecutionMode == ExecutionMode.RDD) {
for (TypeswitchCase c : this.cases) {
if (!c.getReturnExpression().getHighestExecutionMode(visitorConfig).isRDD()) {
if (!c.getReturnExpression().getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()) {
this.highestExecutionMode = ExecutionMode.LOCAL;
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public Expression getExpression() {
@Override
public void initHighestExecutionMode(VisitorConfig visitorConfig) {
this.highestExecutionMode =
(this.expression.getHighestExecutionMode(visitorConfig).isRDD()
(this.expression.getHighestExecutionMode(visitorConfig).isRDDOrDataFrame()
|| (this.previousClause != null
&& this.previousClause.getHighestExecutionMode(visitorConfig).isDataFrame()))
? ExecutionMode.DATAFRAME
Expand Down
Loading

0 comments on commit 6308381

Please sign in to comment.