diff --git a/README.md b/README.md index 22909296..64c4e105 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ Welcome to project Druidry! ======================================= -![build_status](https://api.travis-ci.org/zapr-oss/druidry.svg?branch=master) +![build_status](https://api.travis-ci.org/zapr-oss/druidry.svg?branch=master) [![License: GPL v3](https://img.shields.io/badge/License-GPL%20v3-blue.svg)](https://www.gnu.org/licenses/gpl-3.0) Druid is an extremely popular tool to perform OLAP queries on event data. Druid drives real-time dashboards in most of the organisations right now. We@Zapr love Druid! Therefore we want to contribute towards making Druid, even more, friendlier to the ever expanding community. @@ -27,6 +27,12 @@ This library is still growing and does not support each and every constructs, ho Getting Started --------------- +Prerequisite +----------- + +* Maven +* Java 8 + Usage ----- @@ -154,6 +160,19 @@ ObjectMapper mapper = new ObjectMapper(); String requiredJson = mapper.writeValueAsString(query); ``` +``` java +DruidConfiguration config = DruidConfiguration + .builder() + .host("druid.io") + .endpoint("druid/v2/") + .build(); + +DruidClient client = new DruidJerseyClient(druidConfiguration); +client.connect(); +List responses = client.query(query, DruidResponse.class); +client.close(); +``` + Supported Features ------------------ diff --git a/pom.xml b/pom.xml index 1bfeb137..4757622e 100644 --- a/pom.xml +++ b/pom.xml @@ -6,11 +6,11 @@ in.zapr.druid druidry - 1.5 + 2.0 - Druidry - Druid Query Generator + Druidry - Druid Java Client Druidry is an open-source Java based utility library which supports creating - query to Druid + query to Druid and connection handling with Druid https://github.com/zapr-oss/druidry @@ -23,6 +23,7 @@ 1.2.1 1.4.0 3.5 + 2.26 @@ -83,6 +84,30 @@ ${commons.version} + + org.glassfish.jersey.core + jersey-client + ${jersey.version} + + + + org.glassfish.jersey.media + jersey-media-json-jackson + ${jersey.version} + + + + org.glassfish.jersey.inject + jersey-hk2 + ${jersey.version} + + + + org.glassfish.jersey.connectors + jersey-apache-connector + ${jersey.version} + + @@ -231,4 +256,4 @@ - + \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/DruidClient.java b/src/main/java/in/zapr/druid/druidry/client/DruidClient.java new file mode 100644 index 00000000..31dcf12e --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/DruidClient.java @@ -0,0 +1,43 @@ +package in.zapr.druid.druidry.client; + +import java.util.List; + +import in.zapr.druid.druidry.client.exception.ConnectionException; +import in.zapr.druid.druidry.client.exception.QueryException; +import in.zapr.druid.druidry.query.DruidQuery; + +public interface DruidClient { + + /** + * Connects with Druid + * + * @throws ConnectionException When connection is not formed + */ + void connect() throws ConnectionException; + + /** + * Closes connection with Druid + * @throws ConnectionException When connection is not closed + */ + void close() throws ConnectionException; + + /** + * Queries druid + * + * @param druidQuery Druid Query object + * @return Result from Druid + * @throws QueryException Error while querying + */ + String query(DruidQuery druidQuery) throws QueryException; + + /** + * Queries druid + * + * @param druidQuery Druid Query Object + * @param className Class according to which DruidResult should be converted to + * @param Class according to which DruidResult should be converted to + * @return Druid Result in the form of class T object + * @throws QueryException Error while querying + */ + List query(DruidQuery druidQuery, Class className) throws QueryException; +} \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/DruidConfiguration.java b/src/main/java/in/zapr/druid/druidry/client/DruidConfiguration.java new file mode 100644 index 00000000..3e7368e1 --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/DruidConfiguration.java @@ -0,0 +1,104 @@ +package in.zapr.druid.druidry.client; + +import org.apache.commons.lang3.StringUtils; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DruidConfiguration { + + private static final int DEFAULT_HTTP_PORT = 8082; + private static final int DEFAULT_HTTPS_PORT = 8282; + + /** + * Protocol by which Druid Broker is accessible + * Defaults to HTTP + */ + private DruidQueryProtocol protocol; + + /** + * Address of Druid Broker Instance + */ + private String host; + + /** + * Port at which Druid Broker is listening. + * {@value DEFAULT_HTTP_PORT} if protocol is 8082 + * {@value DEFAULT_HTTPS_PORT} if protocol is 8282 + */ + private Integer port; + + /** + * Endpoint (without host address) at which query needs to be fired + */ + private String endpoint; + + /** + * Number of connections to be maintained in connection pool + * Is Ignored when custom JerseyConfig is passed. + */ + private Integer concurrentConnectionsRequired; + + @Builder + private DruidConfiguration(DruidQueryProtocol protocol, + String host, + Integer port, + String endpoint, + Integer concurrentConnectionsRequired) { + + if (StringUtils.isEmpty(host)) { + throw new IllegalArgumentException("Host cannot be null or empty"); + } + + if (port != null && port < 0) { + throw new IllegalArgumentException("Port cannot be negative"); + } + + if (concurrentConnectionsRequired != null && concurrentConnectionsRequired < 1) { + throw new IllegalArgumentException("Connections required cannot be less than 1"); + } + + if (protocol == null) { + protocol = DruidQueryProtocol.HTTP; + } + + if (port == null) { + port = getDefaultPortOnBasisOfProtocol(protocol); + } + + this.protocol = protocol; + this.host = host; + this.port = port; + this.endpoint = endpoint; + this.concurrentConnectionsRequired = concurrentConnectionsRequired; + } + + protected String getUrl() { + + String endpoint = this.getEndpoint(); + if (endpoint == null) { + endpoint = ""; + } + + return String.format("%s://%s:%d/%s", + this.getProtocol(), + this.getHost(), + this.getPort(), + endpoint); + } + + private Integer getDefaultPortOnBasisOfProtocol(DruidQueryProtocol protocol) { + + switch (protocol) { + case HTTP: + return DEFAULT_HTTP_PORT; + case HTTPS: + return DEFAULT_HTTPS_PORT; + default: + throw new IllegalArgumentException("Druid Query Protocol not handled"); + } + } +} \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/DruidError.java b/src/main/java/in/zapr/druid/druidry/client/DruidError.java new file mode 100644 index 00000000..4d703b67 --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/DruidError.java @@ -0,0 +1,12 @@ +package in.zapr.druid.druidry.client; + +import lombok.Getter; + +@Getter +public class DruidError { + + private String error; + private String errorMessage; + private String errorClass; + private String host; +} \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/DruidJerseyClient.java b/src/main/java/in/zapr/druid/druidry/client/DruidJerseyClient.java new file mode 100644 index 00000000..b04b2c9b --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/DruidJerseyClient.java @@ -0,0 +1,144 @@ +package in.zapr.druid.druidry.client; + +import org.apache.http.conn.HttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; +import org.glassfish.jersey.apache.connector.ApacheClientProperties; +import org.glassfish.jersey.apache.connector.ApacheConnectorProvider; +import org.glassfish.jersey.client.ClientConfig; + +import java.util.List; + +import javax.ws.rs.client.Client; +import javax.ws.rs.client.ClientBuilder; +import javax.ws.rs.client.Entity; +import javax.ws.rs.client.WebTarget; +import javax.ws.rs.core.GenericType; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; + +import in.zapr.druid.druidry.client.exception.ConnectionException; +import in.zapr.druid.druidry.client.exception.QueryException; +import in.zapr.druid.druidry.query.DruidQuery; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class DruidJerseyClient implements DruidClient { + + private static final int DEFAULT_CONNECTION_POOL_LIMIT = 5; + private final String druidUrl; + private final DruidConfiguration druidConfiguration; + + private ClientConfig jerseyConfig; + private Client client; + private WebTarget queryWebTarget; + + public DruidJerseyClient(@NonNull DruidConfiguration druidConfiguration){ + this(druidConfiguration, null); + } + + public DruidJerseyClient(@NonNull DruidConfiguration druidConfiguration, + ClientConfig jerseyConfig) { + + this.druidUrl = druidConfiguration.getUrl(); + this.jerseyConfig = jerseyConfig; + + log.debug("Will query to {}", druidUrl); + this.druidConfiguration = druidConfiguration; + } + + @Override + public void connect() throws ConnectionException { + + try { + if (jerseyConfig == null) { + + HttpClientConnectionManager connectionManager = createConnectionManager(); + this.jerseyConfig = new ClientConfig(); + this.jerseyConfig.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager); + this.jerseyConfig.connectorProvider(new ApacheConnectorProvider()); + } + + this.client = ClientBuilder.newClient(this.jerseyConfig); + this.queryWebTarget = this.client.target(this.druidUrl); + + } catch (Exception e) { + throw new ConnectionException(e); + } + } + + @Override + public void close() throws ConnectionException { + try { + if (this.client == null) { + return; + } + + this.client.close(); + } catch (Exception e) { + throw new ConnectionException(e); + } + } + + @Override + public String query(DruidQuery druidQuery) throws QueryException { + + try (Response response = this.queryWebTarget + .request(MediaType.APPLICATION_JSON) + .post(Entity.entity(druidQuery, MediaType.APPLICATION_JSON))) { + + if (response.getStatus() == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) { + handleInternalServerResponse(response); + } + + return response.readEntity(String.class); + + } catch (QueryException e) { + log.error("Exception while querying {}", e); + throw e; + } catch (Exception e) { + log.error("Exception while querying {}", e); + throw new QueryException(e); + } + } + + @Override + public List query(DruidQuery druidQuery, Class className) throws QueryException { + try (Response response = this.queryWebTarget + .request(MediaType.APPLICATION_JSON) + .post(Entity.entity(druidQuery, MediaType.APPLICATION_JSON))) { + + if (response.getStatus() == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) { + handleInternalServerResponse(response); + } + + return response.readEntity(new GenericType>() { + }); + } catch (QueryException e) { + log.error("Exception while querying {}", e); + throw e; + } catch (Exception e) { + log.error("Exception while querying {}", e); + throw new QueryException(e); + } + } + + private DruidError handleInternalServerResponse(Response response) throws Exception { + DruidError error = response.readEntity(DruidError.class); + throw new QueryException(error); + } + + private HttpClientConnectionManager createConnectionManager() { + PoolingHttpClientConnectionManager connectionManager + = new PoolingHttpClientConnectionManager(); + + int numberOfConnectionsInPool = DEFAULT_CONNECTION_POOL_LIMIT; + + if (this.druidConfiguration.getConcurrentConnectionsRequired() != null) { + numberOfConnectionsInPool = this.druidConfiguration.getConcurrentConnectionsRequired(); + } + + connectionManager.setDefaultMaxPerRoute(numberOfConnectionsInPool); + return connectionManager; + } +} \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/DruidQueryProtocol.java b/src/main/java/in/zapr/druid/druidry/client/DruidQueryProtocol.java new file mode 100644 index 00000000..888c0af8 --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/DruidQueryProtocol.java @@ -0,0 +1,17 @@ +package in.zapr.druid.druidry.client; + +public enum DruidQueryProtocol { + HTTP("http"), + HTTPS("https"); + + private String protocol; + + DruidQueryProtocol(String protocol) { + this.protocol = protocol; + } + + @Override + public String toString() { + return this.protocol; + } +} diff --git a/src/main/java/in/zapr/druid/druidry/client/exception/ConnectionException.java b/src/main/java/in/zapr/druid/druidry/client/exception/ConnectionException.java new file mode 100644 index 00000000..6b3c95d4 --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/exception/ConnectionException.java @@ -0,0 +1,7 @@ +package in.zapr.druid.druidry.client.exception; + +public class ConnectionException extends DruidryException { + public ConnectionException(Exception e) { + super(e); + } +} diff --git a/src/main/java/in/zapr/druid/druidry/client/exception/DruidryException.java b/src/main/java/in/zapr/druid/druidry/client/exception/DruidryException.java new file mode 100644 index 00000000..39733c00 --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/exception/DruidryException.java @@ -0,0 +1,12 @@ +package in.zapr.druid.druidry.client.exception; + +public class DruidryException extends Exception { + + protected DruidryException(Exception e) { + super(e); + } + + protected DruidryException(String message) { + super(message); + } +} \ No newline at end of file diff --git a/src/main/java/in/zapr/druid/druidry/client/exception/QueryException.java b/src/main/java/in/zapr/druid/druidry/client/exception/QueryException.java new file mode 100644 index 00000000..65bfa30d --- /dev/null +++ b/src/main/java/in/zapr/druid/druidry/client/exception/QueryException.java @@ -0,0 +1,23 @@ +package in.zapr.druid.druidry.client.exception; + +import in.zapr.druid.druidry.client.DruidError; +import lombok.Getter; + +public class QueryException extends DruidryException { + + @Getter + private DruidError druidError; + + public QueryException(Exception e) { + super(e); + } + + public QueryException(String message) { + super(message); + } + + public QueryException(DruidError error) { + super(error.getErrorMessage()); + this.druidError = error; + } +} diff --git a/src/main/java/in/zapr/druid/druidry/granularity/PeriodGranularity.java b/src/main/java/in/zapr/druid/druidry/granularity/PeriodGranularity.java index 06a69f30..5040c4f9 100644 --- a/src/main/java/in/zapr/druid/druidry/granularity/PeriodGranularity.java +++ b/src/main/java/in/zapr/druid/druidry/granularity/PeriodGranularity.java @@ -23,8 +23,6 @@ import org.joda.time.DateTime; import org.joda.time.DateTimeZone; -import java.util.Date; - import lombok.Builder; import lombok.Getter; diff --git a/src/test/java/in/zapr/druid/druidry/IntervalTest.java b/src/test/java/in/zapr/druid/druidry/IntervalTest.java index c9296f64..05638f15 100644 --- a/src/test/java/in/zapr/druid/druidry/IntervalTest.java +++ b/src/test/java/in/zapr/druid/druidry/IntervalTest.java @@ -16,15 +16,10 @@ * along with this program. If not, see . */ -import in.zapr.druid.druidry.Interval; - import org.joda.time.DateTime; import org.testng.Assert; import org.testng.annotations.Test; -import java.util.HashSet; -import java.util.Set; - public class IntervalTest { @Test(expectedExceptions = NullPointerException.class) diff --git a/src/test/java/in/zapr/druid/druidry/client/DruidConfigurationTest.java b/src/test/java/in/zapr/druid/druidry/client/DruidConfigurationTest.java new file mode 100644 index 00000000..fb33fd58 --- /dev/null +++ b/src/test/java/in/zapr/druid/druidry/client/DruidConfigurationTest.java @@ -0,0 +1,95 @@ +package in.zapr.druid.druidry.client; + +import org.testng.Assert; +import org.testng.annotations.Test; + +public class DruidConfigurationTest { + + @Test + public void testDruidConfiguration() { + DruidConfiguration config = DruidConfiguration + .builder() + .protocol(DruidQueryProtocol.HTTPS) + .host("druid.zapr.in") + .port(443) + .endpoint("druid/v2/") + .concurrentConnectionsRequired(8) + .build(); + + Assert.assertEquals(config.getProtocol(), DruidQueryProtocol.HTTPS); + Assert.assertEquals(config.getHost(), "druid.zapr.in"); + Assert.assertEquals(config.getPort().intValue(), 443); + Assert.assertEquals(config.getEndpoint(), "druid/v2/"); + Assert.assertEquals(config.getConcurrentConnectionsRequired().intValue(), 8); + Assert.assertEquals(config.getUrl(), "https://druid.zapr.in:443/druid/v2/"); + } + + @Test + public void testDefaultConfigProtocol() { + DruidConfiguration config = DruidConfiguration + .builder() + .host("druid.zapr.in") + .endpoint("druid/v2/") + .concurrentConnectionsRequired(8) + .build(); + + Assert.assertEquals(config.getProtocol(), DruidQueryProtocol.HTTP); + Assert.assertEquals(config.getPort().intValue(), 8082); + Assert.assertEquals(config.getUrl(), "http://druid.zapr.in:8082/druid/v2/"); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNegativePort() { + DruidConfiguration config = DruidConfiguration + .builder() + .host("druid.zapr.in") + .port(-1) + .endpoint("druid/v2/") + .concurrentConnectionsRequired(8) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNegativeConcurrentConnectionValue() { + DruidConfiguration config = DruidConfiguration + .builder() + .host("druid.zapr.in") + .port(443) + .endpoint("druid/v2/") + .concurrentConnectionsRequired(-1) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testNullHost() { + DruidConfiguration config = DruidConfiguration + .builder() + .port(443) + .endpoint("druid/v2/") + .concurrentConnectionsRequired(14) + .build(); + } + + @Test(expectedExceptions = IllegalArgumentException.class) + public void testEmptyHost() { + DruidConfiguration config = DruidConfiguration + .builder() + .host("") + .port(443) + .endpoint("druid/v2/") + .concurrentConnectionsRequired(16) + .build(); + } + + @Test + public void testNullEndpoint() { + DruidConfiguration config = DruidConfiguration + .builder() + .host("druid.zapr.in") + .port(443) + .concurrentConnectionsRequired(11) + .build(); + + Assert.assertEquals(config.getUrl(), "http://druid.zapr.in:443/"); + } +} \ No newline at end of file diff --git a/src/test/java/in/zapr/druid/druidry/granularity/DurationGranularityTest.java b/src/test/java/in/zapr/druid/druidry/granularity/DurationGranularityTest.java index b06c6db4..d32139df 100644 --- a/src/test/java/in/zapr/druid/druidry/granularity/DurationGranularityTest.java +++ b/src/test/java/in/zapr/druid/druidry/granularity/DurationGranularityTest.java @@ -46,7 +46,7 @@ public void testAllFields() throws JSONException, JsonProcessingException { DurationGranularity spec = new DurationGranularity(3141, originDate); JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "duration"); - jsonObject.put("duration", 3141l); + jsonObject.put("duration", 3141L); jsonObject.put("origin", originDate); String actualJSON = objectMapper.writeValueAsString(spec); @@ -59,7 +59,7 @@ public void testRequiredFields() throws JSONException, JsonProcessingException { DurationGranularity spec = new DurationGranularity(3141); JSONObject jsonObject = new JSONObject(); jsonObject.put("type", "duration"); - jsonObject.put("duration", 3141l); + jsonObject.put("duration", 3141L); String actualJSON = objectMapper.writeValueAsString(spec); String expectedJSON = jsonObject.toString();