Skip to content

Commit

Permalink
feat: query structured (#41)
Browse files Browse the repository at this point in the history
  • Loading branch information
Sciator authored Nov 8, 2023
1 parent 7acff83 commit 948bc14
Show file tree
Hide file tree
Showing 13 changed files with 1,665 additions and 559 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.4.0 [unreleased]

### Features

1. [#41](https://github.com/InfluxCommunity/influxdb3-java/pull/41): Add structured query support

## 0.3.1 [2023-10-17]

### Bug Fixes
Expand Down
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.write.Point;
import com.influxdb.v3.client.Point;

public class IOxExample {
public static void main(String[] args) throws Exception {
Expand All @@ -91,8 +91,8 @@ to insert data, you can use code like this:
// Write by Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55.15)
.setTag("location", "west")
.setField("value", 55.15)
.setTimestamp(Instant.now().minusSeconds(-10));
client.writePoint(point);

Expand Down Expand Up @@ -135,6 +135,31 @@ try (Stream<Object[]> stream = client.query(influxQL, QueryOptions.INFLUX_QL)) {
System.out.printf("-----------------------------------------%n");
```

or use `PointValues` structure with `client.queryPoints`:

```java
System.out.printf("--------------------------------------------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

//
// Query by SQL into Points
//
String sql1 = "select time,location,value from temperature order by time desc limit 10";
try (Stream<PointValues> stream = client.queryPoints(sql1, QueryOptions.DEFAULTS)) {
stream.forEach(
(PointValues p) -> {
var time = p.getField("time", LocalDateTime.class);
var location = p.getField("location", String.class);
var value = p.getField("value", Double.class);

System.out.printf("| %-8s | %-8s | %-30s |%n", location, value, time);
});
}

System.out.printf("--------------------------------------------------------%n%n");
```

## Feedback

If you need help, please use our [Community Slack](https://app.slack.com/huddle/TH8RGQX5Z/C02UDUPLQKA)
Expand Down
15 changes: 15 additions & 0 deletions examples/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@

<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<mainClass>com.influxdb.v3.IOxExample</mainClass>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
115 changes: 115 additions & 0 deletions examples/src/main/java/com/influxdb/v3/DownsamplingExample.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* The MIT License
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package com.influxdb.v3;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;

/**
* The example depends on the "influxdb3-java" module and this module should be built first
* by running "mvn install" in the root directory.
*/
public final class DownsamplingExample {
private DownsamplingExample() {
}

public static void main(final String[] args) throws Exception {
String host = "https://us-east-1-1.aws.cloud2.influxdata.com";
String token = "my-token";
String database = "my-database";

try (InfluxDBClient client = InfluxDBClient.getInstance(host, token.toCharArray(), database)) {
//
// Write data
//
Point point1 = Point.measurement("stat")
.setTag("unit", "temperature")
.setField("avg", 24.5)
.setField("max", 45.0)
.setTimestamp(Instant.now().minus(20, ChronoUnit.MINUTES));
client.writePoint(point1);

Point point2 = Point.measurement("stat")
.setTag("unit", "temperature")
.setField("avg", 28.0)
.setField("max", 40.3)
.setTimestamp(Instant.now().minus(10, ChronoUnit.MINUTES));
client.writePoint(point2);

Point point3 = Point.measurement("stat")
.setTag("unit", "temperature")
.setField("avg", 20.5)
.setField("max", 49.0)
.setTimestamp(Instant.now());
client.writePoint(point3);

//
// Query downsampled data
//
String sql = "SELECT\n"
+ " date_bin('5 minutes', \"time\") as window_start,\n"
+ " AVG(\"avg\") as avg,\n"
+ " MAX(\"max\") as max\n"
+ " FROM \"stat\"\n"
+ " WHERE\n"
+ " \"time\" >= now() - interval '1 hour'\n"
+ " GROUP BY window_start\n"
+ " ORDER BY window_start ASC;\n";


//
// Execute downsampling query into pointValues
//
try (Stream<PointValues> stream = client.queryPoints(sql)) {
stream.forEach(
(PointValues row) -> {
var timestamp = row.getField("window_start", LocalDateTime.class);

if (timestamp == null) {
return;
}

System.out.printf("%s: avg is %s, max is %s%n",
timestamp, row.getFloatField("avg"), row.getFloatField("max"));

//
// write back downsampled date to 'stat_downsampled' measurement
//
var downsampledPoint = row
.asPoint("stat_downsampled")
.removeField("window_start")
.setTimestamp(timestamp.toInstant(ZoneOffset.UTC));

client.writePoint(downsampledPoint);
});
}


}
}
}
31 changes: 27 additions & 4 deletions examples/src/main/java/com/influxdb/v3/IOxExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
package com.influxdb.v3;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
import com.influxdb.v3.client.Point;
import com.influxdb.v3.client.PointValues;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.write.Point;

/**
* The example depends on the "influxdb3-java" module and this module should be built first
Expand All @@ -47,8 +49,8 @@ public static void main(final String[] args) throws Exception {
// Write by Point
//
Point point = Point.measurement("temperature")
.addTag("location", "west")
.addField("value", 55.15)
.setTag("location", "west")
.setField("value", 55.15)
.setTimestamp(Instant.now().minusSeconds(-10));
client.writePoint(point);

Expand Down Expand Up @@ -85,7 +87,28 @@ public static void main(final String[] args) throws Exception {
stream.forEach(row -> System.out.printf("| %-16s | %-18s |%n", row[1], row[2]));
}

System.out.printf("-----------------------------------------%n");
System.out.printf("-----------------------------------------%n%n");


System.out.printf("--------------------------------------------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

//
// Query by SQL into Points
//
try (Stream<PointValues> stream = client.queryPoints(sql)) {
stream.forEach(
(PointValues p) -> {
var time = p.getField("time", LocalDateTime.class);
var location = p.getField("location", String.class);
var value = p.getField("value", Double.class);

System.out.printf("| %-8s | %-8s | %-30s |%n", location, value, time);
});
}

System.out.printf("--------------------------------------------------------%n%n");
}
}
}
38 changes: 37 additions & 1 deletion src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.influxdb.v3.client.config.ClientConfig;
import com.influxdb.v3.client.internal.InfluxDBClientImpl;
import com.influxdb.v3.client.query.QueryOptions;
import com.influxdb.v3.client.write.Point;
import com.influxdb.v3.client.write.WriteOptions;

/**
Expand Down Expand Up @@ -139,6 +138,43 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu", options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu", options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
Expand Down
Loading

0 comments on commit 948bc14

Please sign in to comment.