Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch for the ITs for MSQ #1

Open
wants to merge 12 commits into
base: target-msq-first-it
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonValue;
import com.google.common.base.Preconditions;
import org.apache.druid.msq.kernel.QueryDefinition;
import org.apache.druid.msq.kernel.StageDefinition;
Expand Down Expand Up @@ -84,7 +83,7 @@ public static MSQStagesReport create(
return new MSQStagesReport(stages);
}

@JsonValue
@JsonProperty("stages")
public List<Stage> getStages()
{
return stages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,13 @@ public String getReportKey()
return REPORT_KEY;
}

@JsonProperty("type")
private String getType()
{
return REPORT_KEY;
}


@Override
@JsonProperty
public Object getPayload()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ DRUID_INSTANCE=
# Optional as this is the default?
#druid_extensions_directory=/usr/local/druid/extensions
#druid_extensions_loadList=["mysql-metadata-storage","druid-basic-security","simple-client-sslcontext","it-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches","druid-parquet-extensions","druid-avro-extensions","druid-protobuf-extensions","druid-orc-extensions","druid-kafka-indexing-service","druid-s3-extensions"]
druid_extensions_loadList=["mysql-metadata-storage","it-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches","druid-parquet-extensions","druid-avro-extensions","druid-protobuf-extensions","druid-orc-extensions","druid-kafka-indexing-service","druid-s3-extensions"]
druid_extensions_loadList=["mysql-metadata-storage","it-tools","druid-lookups-cached-global","druid-histogram","druid-datasketches","druid-parquet-extensions","druid-avro-extensions","druid-protobuf-extensions","druid-orc-extensions","druid-kafka-indexing-service","druid-s3-extensions","druid-multi-stage-query"]
druid_extensions_hadoopDependenciesDir=/shared/hadoop-dependencies

# Logging
Expand Down
6 changes: 6 additions & 0 deletions integration-tests-ex/cases/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@
<artifactId>druid-integration-tests</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- WARNING ALERT - Remove this -->
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- See https://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html -->
<dependency>
<groupId>org.apache.druid</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testsEx.msq;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.clients.MsqTestClient;
import org.apache.druid.testing.guice.models.MSQTaskReportDeserializable;
import org.apache.druid.testing.utils.MsqQueryWithResults;
import org.apache.druid.testing.utils.MsqTestQueryHelper;
import org.apache.druid.testsEx.categories.BatchIndex;
import org.apache.druid.testsEx.config.DruidTestRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.Map;

@RunWith(DruidTestRunner.class)
@Category(BatchIndex.class)
public class ITMultiStageQuery
{
@Inject
private MsqTestQueryHelper msqHelper;

@Inject
private MsqTestClient msqClient;

@Inject
private IntegrationTestingConfig config;
@Inject

private ObjectMapper jsonMapper;

@Test
public void test() throws Exception
{
String query =
"INSERT INTO dst SELECT *\n"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be picked up from a file using a http data source

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not necessary to put queries in a file. Existing tests use files, but only because they have many queries. However, since this query is rather complex, it might be easier to maintain if it is in a file.

The existing format may not be a good fit for MSQ tests, so we might want to create a simplified form for use here.

+ "FROM TABLE(extern(\n"
+ " '{\n"
+ " \"type\": \"inline\",\n"
+ " \"data\": \"a,b,1\\nc,d,2\\n\"\n"
+ " }',\n"
+ " '{\n"
+ " \"type\": \"csv\",\n"
+ " \"columns\": [\"x\",\"y\",\"z\"],\n"
+ " \"listDelimiter\": null,\n"
+ " \"findColumnsFromHeader\": false,\n"
+ " \"skipHeaderRows\": 0\n"
+ " }',\n"
+ " '[\n"
+ " {\"name\": \"x\", \"type\": \"STRING\"},\n"
+ " {\"name\": \"y\", \"type\": \"STRING\"},\n"
+ " {\"name\": \"z\", \"type\": \"LONG\"}\n"
+ " ]'\n"
+ "))\n"
+ "PARTITIONED BY ALL TIME";
String taskId = msqHelper.submitMsqTask(query);
msqHelper.pollTaskIdForCompletion(taskId, 0);
Map<String, MSQTaskReportDeserializable> reports = msqHelper.fetchStatusReports(taskId);

String resultsQuery = "SELECT * FROM dst";
String resultsTaskId = msqHelper.submitMsqTask(resultsQuery);
msqHelper.pollTaskIdForCompletion(resultsTaskId, 0);
msqHelper.compareResults(resultsTaskId, new MsqQueryWithResults(
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Results should also be served from a file.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we can use the existing format which holds both the query and expected results.

query,
ImmutableList.of(
ImmutableMap.of("x", "a", "y", "b", "z", 1),
ImmutableMap.of("x", "c", "y", "d", "z", 2)
)
));
int x = 5;
x += 1;
}
}
6 changes: 6 additions & 0 deletions integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,12 @@
<artifactId>simple-client-sslcontext</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.druid.extensions</groupId>
<artifactId>druid-multi-stage-query</artifactId>
<version>${project.parent.version}</version>
<!-- <scope>test</scope>-->
</dependency>
<dependency>
<groupId>org.apache.druid</groupId>
<artifactId>druid-services</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.clients;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StatusResponseHolder;
import org.apache.druid.msq.guice.MSQIndexingModule;
import org.apache.druid.msq.indexing.report.MSQTaskReport;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;
import org.apache.druid.testing.guice.models.MSQTaskReportDeserializable;
import org.jboss.netty.handler.codec.http.HttpMethod;

import java.util.HashMap;
import java.util.Map;

public class MsqOverlordResourceTestClient extends OverlordResourceTestClient
{
ObjectMapper jsonMapper;

@Inject
MsqOverlordResourceTestClient(
@Json ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
super(jsonMapper, httpClient, config);
this.jsonMapper = jsonMapper;
}

public Map<String, MSQTaskReportDeserializable> getTaskReportForMsqTask(String taskId)
{
try {
StatusResponseHolder response = makeRequest(
HttpMethod.GET,
StringUtils.format(
"%s%s",
getIndexerURL(),
StringUtils.format("task/%s/reports", StringUtils.urlEncode(taskId))
)
);
jsonMapper.registerModules(new MSQIndexingModule().getJacksonModules());
jsonMapper.registerSubtypes(ImmutableList.of(MSQTaskReportDeserializable.class));
return jsonMapper.readValue(
response.getContent(),
new TypeReference<Map<String, MSQTaskReportDeserializable>>()
{
}
);
}
catch (ISE e) {
throw e;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}

private static class MsqTaskReportType extends HashMap<String, MSQTaskReport>
{

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.clients;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.sql.http.SqlQuery;
import org.apache.druid.testing.IntegrationTestingConfig;
import org.apache.druid.testing.guice.TestClient;

import javax.ws.rs.core.MediaType;

public class MsqTestClient extends AbstractQueryResourceTestClient<SqlQuery>
{
@Inject
MsqTestClient(
ObjectMapper jsonMapper,
@TestClient HttpClient httpClient,
IntegrationTestingConfig config
)
{
super(jsonMapper, null, httpClient, config.getRouterUrl(), MediaType.APPLICATION_JSON, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.testing.guice.models;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.segment.column.RowSignature;

import javax.annotation.Nullable;
import java.util.List;

public class MSQResultsReportDeserializable
{
private final RowSignature signature;
@Nullable
private final List<String> sqlTypeNames;
private final List<Object[]> results;

@JsonCreator
public MSQResultsReportDeserializable(
@JsonProperty("signature") final RowSignature signature,
@JsonProperty("sqlTypeNames") @Nullable final List<String> sqlTypeNames,
@JsonProperty("results") final List<Object[]> results
)
{
this.signature = Preconditions.checkNotNull(signature, "signature");
this.sqlTypeNames = sqlTypeNames;
this.results = Preconditions.checkNotNull(results, "results");
}

@JsonProperty("signature")
public RowSignature getSignature()
{
return signature;
}

@JsonProperty("sqlTypeNames")
@JsonInclude(JsonInclude.Include.NON_NULL)
public List<String> getSqlTypeNames()
{
return sqlTypeNames;
}

@JsonProperty("results")
public List<Object[]> getResults()
{
return results;
}
}
Loading