From 948bc14f724b613aac761c25cdc607a291c898ac Mon Sep 17 00:00:00 2001 From: Sciator <39964450+Sciator@users.noreply.github.com> Date: Wed, 8 Nov 2023 12:36:55 +0100 Subject: [PATCH] feat: query structured (#41) --- CHANGELOG.md | 4 + README.md | 31 +- examples/pom.xml | 15 + .../com/influxdb/v3/DownsamplingExample.java | 115 +++ .../main/java/com/influxdb/v3/IOxExample.java | 31 +- .../influxdb/v3/client/InfluxDBClient.java | 38 +- .../java/com/influxdb/v3/client/Point.java | 723 ++++++++++++++++++ .../com/influxdb/v3/client/PointValues.java | 540 +++++++++++++ .../client/internal/InfluxDBClientImpl.java | 70 +- .../com/influxdb/v3/client/write/Point.java | 451 ----------- .../com/influxdb/v3/client/ITQueryWrite.java | 7 + .../v3/client/InfluxDBClientWriteTest.java | 13 +- .../influxdb/v3/client/write/PointTest.java | 186 ++--- 13 files changed, 1665 insertions(+), 559 deletions(-) create mode 100644 examples/src/main/java/com/influxdb/v3/DownsamplingExample.java create mode 100644 src/main/java/com/influxdb/v3/client/Point.java create mode 100644 src/main/java/com/influxdb/v3/client/PointValues.java delete mode 100644 src/main/java/com/influxdb/v3/client/write/Point.java diff --git a/CHANGELOG.md b/CHANGELOG.md index c1ffab3..cb1602f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## 0.4.0 [unreleased] +### Features + +1. [#41](https://github.com/InfluxCommunity/influxdb3-java/pull/41): Add structured query support + ## 0.3.1 [2023-10-17] ### Bug Fixes diff --git a/README.md b/README.md index ef26790..7115077 100644 --- a/README.md +++ b/README.md @@ -69,7 +69,7 @@ import java.util.stream.Stream; import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.query.QueryOptions; -import com.influxdb.v3.client.write.Point; +import com.influxdb.v3.client.Point; public class IOxExample { public static void main(String[] args) throws Exception { @@ -91,8 +91,8 @@ to insert data, you can use code like this: // Write by Point // Point point = Point.measurement("temperature") - .addTag("location", "west") - .addField("value", 55.15) + .setTag("location", "west") + .setField("value", 55.15) .setTimestamp(Instant.now().minusSeconds(-10)); client.writePoint(point); @@ -135,6 +135,31 @@ try (Stream stream = client.query(influxQL, QueryOptions.INFLUX_QL)) { System.out.printf("-----------------------------------------%n"); ``` +or use `PointValues` structure with `client.queryPoints`: + +```java +System.out.printf("--------------------------------------------------------%n"); +System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time"); +System.out.printf("--------------------------------------------------------%n"); + +// +// Query by SQL into Points +// +String sql1 = "select time,location,value from temperature order by time desc limit 10"; +try (Stream stream = client.queryPoints(sql1, QueryOptions.DEFAULTS)) { + stream.forEach( + (PointValues p) -> { + var time = p.getField("time", LocalDateTime.class); + var location = p.getField("location", String.class); + var value = p.getField("value", Double.class); + + System.out.printf("| %-8s | %-8s | %-30s |%n", location, value, time); + }); +} + +System.out.printf("--------------------------------------------------------%n%n"); +``` + ## Feedback If you need help, please use our [Community Slack](https://app.slack.com/huddle/TH8RGQX5Z/C02UDUPLQKA) diff --git a/examples/pom.xml b/examples/pom.xml index 1f037ce..b5f03c5 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -44,6 +44,21 @@ + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + + java + + + + + com.influxdb.v3.IOxExample + + org.apache.maven.plugins maven-compiler-plugin diff --git a/examples/src/main/java/com/influxdb/v3/DownsamplingExample.java b/examples/src/main/java/com/influxdb/v3/DownsamplingExample.java new file mode 100644 index 0000000..60bb00c --- /dev/null +++ b/examples/src/main/java/com/influxdb/v3/DownsamplingExample.java @@ -0,0 +1,115 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3; + +import java.time.Instant; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.stream.Stream; + +import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.PointValues; + +/** + * The example depends on the "influxdb3-java" module and this module should be built first + * by running "mvn install" in the root directory. + */ +public final class DownsamplingExample { + private DownsamplingExample() { + } + + public static void main(final String[] args) throws Exception { + String host = "https://us-east-1-1.aws.cloud2.influxdata.com"; + String token = "my-token"; + String database = "my-database"; + + try (InfluxDBClient client = InfluxDBClient.getInstance(host, token.toCharArray(), database)) { + // + // Write data + // + Point point1 = Point.measurement("stat") + .setTag("unit", "temperature") + .setField("avg", 24.5) + .setField("max", 45.0) + .setTimestamp(Instant.now().minus(20, ChronoUnit.MINUTES)); + client.writePoint(point1); + + Point point2 = Point.measurement("stat") + .setTag("unit", "temperature") + .setField("avg", 28.0) + .setField("max", 40.3) + .setTimestamp(Instant.now().minus(10, ChronoUnit.MINUTES)); + client.writePoint(point2); + + Point point3 = Point.measurement("stat") + .setTag("unit", "temperature") + .setField("avg", 20.5) + .setField("max", 49.0) + .setTimestamp(Instant.now()); + client.writePoint(point3); + + // + // Query downsampled data + // + String sql = "SELECT\n" + + " date_bin('5 minutes', \"time\") as window_start,\n" + + " AVG(\"avg\") as avg,\n" + + " MAX(\"max\") as max\n" + + " FROM \"stat\"\n" + + " WHERE\n" + + " \"time\" >= now() - interval '1 hour'\n" + + " GROUP BY window_start\n" + + " ORDER BY window_start ASC;\n"; + + + // + // Execute downsampling query into pointValues + // + try (Stream stream = client.queryPoints(sql)) { + stream.forEach( + (PointValues row) -> { + var timestamp = row.getField("window_start", LocalDateTime.class); + + if (timestamp == null) { + return; + } + + System.out.printf("%s: avg is %s, max is %s%n", + timestamp, row.getFloatField("avg"), row.getFloatField("max")); + + // + // write back downsampled date to 'stat_downsampled' measurement + // + var downsampledPoint = row + .asPoint("stat_downsampled") + .removeField("window_start") + .setTimestamp(timestamp.toInstant(ZoneOffset.UTC)); + + client.writePoint(downsampledPoint); + }); + } + + + } + } +} diff --git a/examples/src/main/java/com/influxdb/v3/IOxExample.java b/examples/src/main/java/com/influxdb/v3/IOxExample.java index b27e6a4..147baeb 100644 --- a/examples/src/main/java/com/influxdb/v3/IOxExample.java +++ b/examples/src/main/java/com/influxdb/v3/IOxExample.java @@ -22,11 +22,13 @@ package com.influxdb.v3; import java.time.Instant; +import java.time.LocalDateTime; import java.util.stream.Stream; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.query.QueryOptions; -import com.influxdb.v3.client.write.Point; /** * The example depends on the "influxdb3-java" module and this module should be built first @@ -47,8 +49,8 @@ public static void main(final String[] args) throws Exception { // Write by Point // Point point = Point.measurement("temperature") - .addTag("location", "west") - .addField("value", 55.15) + .setTag("location", "west") + .setField("value", 55.15) .setTimestamp(Instant.now().minusSeconds(-10)); client.writePoint(point); @@ -85,7 +87,28 @@ public static void main(final String[] args) throws Exception { stream.forEach(row -> System.out.printf("| %-16s | %-18s |%n", row[1], row[2])); } - System.out.printf("-----------------------------------------%n"); + System.out.printf("-----------------------------------------%n%n"); + + + System.out.printf("--------------------------------------------------------%n"); + System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time"); + System.out.printf("--------------------------------------------------------%n"); + + // + // Query by SQL into Points + // + try (Stream stream = client.queryPoints(sql)) { + stream.forEach( + (PointValues p) -> { + var time = p.getField("time", LocalDateTime.class); + var location = p.getField("location", String.class); + var value = p.getField("value", Double.class); + + System.out.printf("| %-8s | %-8s | %-30s |%n", location, value, time); + }); + } + + System.out.printf("--------------------------------------------------------%n%n"); } } } diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index 1976bbd..86201b7 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -32,7 +32,6 @@ import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.internal.InfluxDBClientImpl; import com.influxdb.v3.client.query.QueryOptions; -import com.influxdb.v3.client.write.Point; import com.influxdb.v3.client.write.WriteOptions; /** @@ -139,6 +138,43 @@ public interface InfluxDBClient extends AutoCloseable { @Nonnull Stream query(@Nonnull final String query, @Nonnull final QueryOptions options); + /** + * Query data from InfluxDB IOx into Point structure using FlightSQL. + *

+ * The result stream should be closed after use, you can use try-resource pattern to close it automatically: + *

+     * try (Stream<PointValues> rows = client.queryPoints("select * from cpu", options)) {
+     *      rows.forEach(row -> {
+     *          // process row
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @return Batches of PointValues returned by the query + */ + @Nonnull + Stream queryPoints(@Nonnull final String query); + + /** + * Query data from InfluxDB IOx into Point structure using FlightSQL. + *

+ * The result stream should be closed after use, you can use try-resource pattern to close it automatically: + *

+     * try (Stream<PointValues> rows = client.queryPoints("select * from cpu", options)) {
+     *      rows.forEach(row -> {
+     *          // process row
+     *      }
+     * });
+     * 
+ * + * @param query the SQL query string to execute, cannot be null + * @param options the options for querying data from InfluxDB + * @return Batches of PointValues returned by the query + */ + @Nonnull + Stream queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options); + /** * Query data from InfluxDB IOx using FlightSQL. *

diff --git a/src/main/java/com/influxdb/v3/client/Point.java b/src/main/java/com/influxdb/v3/client/Point.java new file mode 100644 index 0000000..24f1381 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/Point.java @@ -0,0 +1,723 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.text.NumberFormat; +import java.time.Instant; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import org.checkerframework.checker.nullness.qual.NonNull; + +import com.influxdb.v3.client.internal.Arguments; +import com.influxdb.v3.client.write.WriteOptions; +import com.influxdb.v3.client.write.WritePrecision; + + +/** + * Point defines the values that will be written to the database. + * See Go Implementation. + * + * @author Jakub Bednar (bednar@github) (11/10/2018 11:40) + */ +@NotThreadSafe +public final class Point { + + private static final int MAX_FRACTION_DIGITS = 340; + private static final ThreadLocal NUMBER_FORMATTER = + ThreadLocal.withInitial(() -> { + NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH); + numberFormat.setMaximumFractionDigits(MAX_FRACTION_DIGITS); + numberFormat.setGroupingUsed(false); + numberFormat.setMinimumFractionDigits(1); + return numberFormat; + }); + + private final PointValues values; + + private Point(final PointValues values) { + this.values = values; + } + + /** + * Create a new Point. + * + * @param measurementName the measurement name + */ + public Point(@Nonnull final String measurementName) { + Arguments.checkNotNull(measurementName, "measurement"); + + values = new PointValues(); + values.setMeasurement(measurementName); + } + + /** + * Create a new Point withe specified a measurement name. + * + * @param measurementName the measurement name + * @return new instance of {@link Point} + */ + @Nonnull + public static Point measurement(@Nonnull final String measurementName) { + + Arguments.checkNotNull(measurementName, "measurement"); + + return new Point(new PointValues()).setMeasurement(measurementName); + } + + /** + * Create a new Point with given values. + * + * @param values the point values + * @return the new Point + * @throws Exception if measurement is missing + */ + public static Point fromValues(final PointValues values) throws Exception { + if (values.getMeasurement() == null) { + throw new Exception("Missing measurement!"); + } + return new Point(values); + } + + /** + * Get measurement name. + * + * @return Measurement name + */ + @Nonnull + public String getMeasurement() { + assert values.getMeasurement() != null; + + return values.getMeasurement(); + } + + /** + * Updates the measurement for the point. + * + * @param measurement the measurement + * @return this + */ + @Nonnull + public Point setMeasurement(@Nonnull final String measurement) { + + Arguments.checkNotNull(measurement, "precision"); + + values.setMeasurement(measurement); + + return this; + } + + /** + * Get timestamp. Can be null if not set. + * + * @return timestamp or null + */ + @Nullable + public Number getTimestamp() { + return values.getTimestamp(); + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Instant time) { + values.setTimestamp(time); + + return this; + } + + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Number time, @Nonnull final WritePrecision precision) { + + Arguments.checkNotNull(precision, "precision"); + + values.setTimestamp(time, precision); + + return this; + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public Point setTimestamp(@Nullable final Long time, @Nonnull final WritePrecision precision) { + + return setTimestamp((Number) time, precision); + } + + /** + * Gets value of tag with given name. Returns null if tag not found. + * + * @param name the tag name + * @return tag value or null + */ + @Nullable + public String getTag(@Nonnull final String name) { + Arguments.checkNotNull(name, "tagName"); + + return values.getTag(name); + } + + /** + * Adds or replaces a tag value for this point. + * + * @param key the tag name + * @param value the tag value + * @return this + */ + @Nonnull + public Point setTag(@Nonnull final String key, @Nullable final String value) { + + Arguments.checkNotNull(key, "tagName"); + + values.setTag(key, value); + + return this; + } + + /** + * Adds or replaces tags for this point. + * + * @param tagsToAdd the Map of tags to add + * @return this + */ + @Nonnull + public Point setTags(@Nonnull final Map tagsToAdd) { + + Arguments.checkNotNull(tagsToAdd, "tagsToAdd"); + + values.setTags(tagsToAdd); + + return this; + } + + /** + * Removes a tag with the specified name if it exists; otherwise, it does nothing. + * + * @param name the tag name + * @return this + */ + @Nonnull + public Point removeTag(@Nonnull final String name) { + + Arguments.checkNotNull(name, "tagName"); + + values.removeTag(name); + + return this; + } + + /** + * Gets an array of tag names. + * + * @return An array of tag names + */ + @Nonnull + public String[] getTagNames() { + return values.getTagNames(); + } + + /** + * Gets the float field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The float field value or null + */ + @Nullable + public Double getFloatField(@Nonnull final String name) throws ClassCastException { + return getField(name, Double.class); + } + + /** + * Adds or replaces a float field. + * + * @param name the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setFloatField(@Nonnull final String name, final double value) { + return putField(name, value); + } + + /** + * Gets the integer field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The integer field value or null + */ + @Nullable + public Long getIntegerField(@Nonnull final String name) throws ClassCastException { + return getField(name, Long.class); + } + + /** + * Adds or replaces a integer field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public Point setIntegerField(@Nonnull final String name, final long value) { + return putField(name, value); + } + + /** + * Gets the string field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The string field value or null + */ + @Nullable + public String getStringField(@Nonnull final String name) throws ClassCastException { + return getField(name, String.class); + } + + /** + * Adds or replaces a string field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public Point setStringField(@Nonnull final String name, final String value) { + return putField(name, value); + } + + /** + * Gets the boolean field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The boolean field value or null + */ + @Nullable + public Boolean getBooleanField(@Nonnull final String name) throws ClassCastException { + return getField(name, Boolean.class); + } + + /** + * Adds or replaces a boolean field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public Point setBooleanField(@Nonnull final String name, final boolean value) { + return putField(name, value); + } + + /** + * Get field of given name. Can be null if field doesn't exist. + * + * @param name the field name + * @return Field as object + */ + @Nullable + public Object getField(@Nonnull final String name) { + return values.getField(name); + } + + /** + * Get field of given name as type. Can be null if field doesn't exist. + * + * @param name the field name + * @param type the field type Class + * @param the field type + * @return Field as given type + */ + @Nullable + public T getField(final String name, final Class type) throws ClassCastException { + Object field = getField(name); + if (field == null) { + return null; + } + return type.cast(field); + } + + /** + * Gets the type of field with given name, if it exists. + * If the field is not present, returns null. + * + * @param name the field name + * @return The field type or null. + */ + @Nullable + public Class getFieldType(@Nonnull final String name) { + Object field = getField(name); + if (field == null) { + return null; + } + return field.getClass(); + } + + /** + * Add {@link Double} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setField(@Nonnull final String field, final double value) { + return putField(field, value); + } + + /** + * Add {@link Long} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + public Point setField(@Nonnull final String field, final long value) { + return putField(field, value); + } + + /** + * Add {@link Number} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setField(@Nonnull final String field, @Nullable final Number value) { + return putField(field, value); + } + + /** + * Add {@link String} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setField(@Nonnull final String field, @Nullable final String value) { + return putField(field, value); + } + + /** + * Add {@link Boolean} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setField(@Nonnull final String field, final boolean value) { + return putField(field, value); + } + + /** + * Add {@link Object} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public Point setField(@Nonnull final String field, @Nullable final Object value) { + return putField(field, value); + } + + /** + * Adds or replaces fields for this point. + * + * @param fieldsToAdd the Map of fields to add + * @return this + */ + @Nonnull + public Point setFields(@Nonnull final Map fieldsToAdd) { + + Arguments.checkNotNull(fieldsToAdd, "fieldsToAdd"); + + fieldsToAdd.forEach(this::putField); + + return this; + } + + /** + * Removes a field with the specified name if it exists; otherwise, it does nothing. + * + * @param name the field name + * @return this + */ + @Nonnull + public Point removeField(@NonNull final String name) { + values.removeField(name); + + return this; + } + + /** + * Gets an array of field names. + * + * @return An array of field names + */ + @Nonnull + public String[] getFieldNames() { + return values.getFieldNames(); + } + + /** + * Has point any fields? + * + * @return true, if the point contains any fields, false otherwise. + */ + public boolean hasFields() { + return values.hasFields(); + } + + /** + * Creates a copy of this object. + * + * @return A new instance with same values. + */ + @Nonnull + public Point copy() { + return new Point(values.copy()); + } + + /** + * Transform to Line Protocol with nanosecond precision. + * + * @return Line Protocol + */ + @Nonnull + public String toLineProtocol() { + return toLineProtocol(null); + } + + /** + * Transform to Line Protocol. + * + * @param precision required precision + * @return Line Protocol + */ + @Nonnull + public String toLineProtocol(@Nullable final WritePrecision precision) { + + StringBuilder sb = new StringBuilder(); + + escapeKey(sb, getMeasurement(), false); + appendTags(sb); + boolean appendedFields = appendFields(sb); + if (!appendedFields) { + return ""; + } + appendTime(sb, precision); + + return sb.toString(); + } + + @Nonnull + private Point putField(@Nonnull final String field, @Nullable final Object value) { + + Arguments.checkNonEmpty(field, "fieldName"); + + values.setField(field, value); + return this; + } + + private void appendTags(@Nonnull final StringBuilder sb) { + + for (String name : values.getTagNames()) { + + String value = values.getTag(name); + + if (name.isEmpty() || value == null || value.isEmpty()) { + continue; + } + + sb.append(','); + escapeKey(sb, name); + sb.append('='); + escapeKey(sb, value); + } + sb.append(' '); + } + + private boolean appendFields(@Nonnull final StringBuilder sb) { + + boolean appended = false; + + for (String field : values.getFieldNames()) { + Object value = values.getField(field); + if (isNotDefined(value)) { + continue; + } + escapeKey(sb, field); + sb.append('='); + if (value instanceof Number) { + if (value instanceof Double || value instanceof Float || value instanceof BigDecimal) { + sb.append(NUMBER_FORMATTER.get().format(value)); + } else { + sb.append(value).append('i'); + } + } else if (value instanceof String) { + String stringValue = (String) value; + sb.append('"'); + escapeValue(sb, stringValue); + sb.append('"'); + } else { + sb.append(value); + } + + sb.append(','); + + appended = true; + } + + // efficiently chop off the trailing comma + int lengthMinusOne = sb.length() - 1; + if (sb.charAt(lengthMinusOne) == ',') { + sb.setLength(lengthMinusOne); + } + + return appended; + } + + private void appendTime(@Nonnull final StringBuilder sb, @Nullable final WritePrecision precision) { + + var time = getTimestamp(); + if (time == null) { + return; + } + + sb.append(" "); + + WritePrecision precisionNotNull = precision != null ? precision : WriteOptions.DEFAULT_WRITE_PRECISION; + + if (WritePrecision.NS.equals(precisionNotNull)) { + if (time instanceof BigDecimal) { + sb.append(((BigDecimal) time).toBigInteger()); + } else if (time instanceof BigInteger) { + sb.append(time); + } else { + sb.append(time.longValue()); + } + } else { + long timeLong; + if (time instanceof BigDecimal) { + timeLong = ((BigDecimal) time).longValueExact(); + } else if (time instanceof BigInteger) { + timeLong = ((BigInteger) time).longValueExact(); + } else { + timeLong = time.longValue(); + } + sb.append(toTimeUnit(precisionNotNull).convert(timeLong, toTimeUnit(WritePrecision.NS))); + } + } + + private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key) { + escapeKey(sb, key, true); + } + + private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key, final boolean escapeEqual) { + for (int i = 0; i < key.length(); i++) { + switch (key.charAt(i)) { + case '\n': + sb.append("\\n"); + continue; + case '\r': + sb.append("\\r"); + continue; + case '\t': + sb.append("\\t"); + continue; + case ' ': + case ',': + sb.append('\\'); + break; + case '=': + if (escapeEqual) { + sb.append('\\'); + } + break; + default: + } + + sb.append(key.charAt(i)); + } + } + + private void escapeValue(@Nonnull final StringBuilder sb, @Nonnull final String value) { + for (int i = 0; i < value.length(); i++) { + switch (value.charAt(i)) { + case '\\': + case '\"': + sb.append('\\'); + default: + sb.append(value.charAt(i)); + } + } + } + + private boolean isNotDefined(final Object value) { + return value == null + || (value instanceof Double && !Double.isFinite((Double) value)) + || (value instanceof Float && !Float.isFinite((Float) value)); + } + + @Nonnull + private TimeUnit toTimeUnit(@Nonnull final WritePrecision precision) { + switch (precision) { + case MS: + return TimeUnit.MILLISECONDS; + case S: + return TimeUnit.SECONDS; + case US: + return TimeUnit.MICROSECONDS; + case NS: + return TimeUnit.NANOSECONDS; + default: + throw new IllegalStateException("Unexpected value: " + precision); + } + } +} diff --git a/src/main/java/com/influxdb/v3/client/PointValues.java b/src/main/java/com/influxdb/v3/client/PointValues.java new file mode 100644 index 0000000..0259ea2 --- /dev/null +++ b/src/main/java/com/influxdb/v3/client/PointValues.java @@ -0,0 +1,540 @@ +/* + * The MIT License + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package com.influxdb.v3.client; + +import java.math.BigInteger; +import java.time.Instant; +import java.util.Map; +import java.util.TreeMap; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.NotThreadSafe; + +import org.checkerframework.checker.nullness.qual.NonNull; + +import com.influxdb.v3.client.internal.Arguments; +import com.influxdb.v3.client.internal.NanosecondConverter; +import com.influxdb.v3.client.write.WritePrecision; + + +/** + * PointValues defines the values that will be written to the database. + * See Go Implementation. + * + * @author Jakub Bednar (bednar@github) (11/10/2018 11:40) + */ +@NotThreadSafe +public final class PointValues { + + private String name; + private final Map tags = new TreeMap<>(); + private final Map fields = new TreeMap<>(); + private Number time; + + /** + * Create a new PointValues. + */ + public PointValues() { } + + /** + * Create a new PointValues withe specified a measurement name. + * + * @param measurementName the measurement name + * @return new instance of {@link PointValues} + */ + @Nonnull + public static PointValues measurement(@Nonnull final String measurementName) { + + Arguments.checkNotNull(measurementName, "measurement"); + + return new PointValues().setMeasurement(measurementName); + } + + /** + * Get measurement name. + * + * @return Measurement name + */ + @Nullable + public String getMeasurement() { + return name; + } + + /** + * Updates the measurement for the point. + * + * @param measurement the measurement + * @return this + */ + @Nonnull + public PointValues setMeasurement(@Nonnull final String measurement) { + + Arguments.checkNotNull(measurement, "precision"); + + this.name = measurement; + + return this; + } + + /** + * Get timestamp. Can be null if not set. + * + * @return timestamp or null + */ + @Nullable + public Number getTimestamp() { + return time; + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @return this + */ + @Nonnull + public PointValues setTimestamp(@Nullable final Instant time) { + + if (time == null) { + return setTimestamp(null, WritePrecision.NS); + } + + BigInteger convertedTime = NanosecondConverter.convert(time, WritePrecision.NS); + + return setTimestamp(convertedTime, WritePrecision.NS); + } + + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public PointValues setTimestamp(@Nullable final Number time, @Nonnull final WritePrecision precision) { + + Arguments.checkNotNull(precision, "precision"); + + this.time = NanosecondConverter.convertToNanos(time, precision); + + return this; + } + + /** + * Updates the timestamp for the point. + * + * @param time the timestamp + * @param precision the timestamp precision + * @return this + */ + @Nonnull + public PointValues setTimestamp(@Nullable final Long time, @Nonnull final WritePrecision precision) { + + return setTimestamp((Number) time, precision); + } + + /** + * Gets value of tag with given name. Returns null if tag not found. + * + * @param name the tag name + * @return tag value or null + */ + @Nullable + public String getTag(@Nonnull final String name) { + + Arguments.checkNotNull(name, "tagName"); + + return tags.get(name); + } + + /** + * Adds or replaces a tag value for this point. + * + * @param key the tag name + * @param value the tag value + * @return this + */ + @Nonnull + public PointValues setTag(@Nonnull final String key, @Nullable final String value) { + + Arguments.checkNotNull(key, "tagName"); + + tags.put(key, value); + + return this; + } + + /** + * Adds or replaces tags for this point. + * + * @param tagsToAdd the Map of tags to add + * @return this + */ + @Nonnull + public PointValues setTags(@Nonnull final Map tagsToAdd) { + + Arguments.checkNotNull(tagsToAdd, "tagsToAdd"); + + tagsToAdd.forEach(this::setTag); + + return this; + } + + /** + * Removes a tag with the specified name if it exists; otherwise, it does nothing. + * + * @param name the tag name + * @return this + */ + @Nonnull + public PointValues removeTag(@Nonnull final String name) { + + Arguments.checkNotNull(name, "tagName"); + + tags.remove(name); + + return this; + } + + /** + * Gets an array of tag names. + * + * @return An array of tag names + */ + @Nonnull + public String[] getTagNames() { + return tags.keySet().toArray(new String[0]); + } + + /** + * Gets the float field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The float field value or null + */ + @Nullable + public Double getFloatField(@Nonnull final String name) throws ClassCastException { + return getField(name, Double.class); + } + + /** + * Adds or replaces a float field. + * + * @param name the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setFloatField(@Nonnull final String name, final double value) { + return putField(name, value); + } + + /** + * Gets the integer field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The integer field value or null + */ + @Nullable + public Long getIntegerField(@Nonnull final String name) throws ClassCastException { + return getField(name, Long.class); + } + + /** + * Adds or replaces a integer field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public PointValues setIntegerField(@Nonnull final String name, final long value) { + return putField(name, value); + } + + /** + * Gets the string field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The string field value or null + */ + @Nullable + public String getStringField(@Nonnull final String name) throws ClassCastException { + return getField(name, String.class); + } + + /** + * Adds or replaces a string field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public PointValues setStringField(@Nonnull final String name, final String value) { + return putField(name, value); + } + + /** + * Gets the boolean field value associated with the specified name. + * If the field is not present, returns null. + * + * @param name the field name + * @return The boolean field value or null + */ + @Nullable + public Boolean getBooleanField(@Nonnull final String name) throws ClassCastException { + return getField(name, Boolean.class); + } + + /** + * Adds or replaces a boolean field. + * + * @param name the field name + * @param value the field value + * @return this + */ + public PointValues setBooleanField(@Nonnull final String name, final boolean value) { + return putField(name, value); + } + + /** + * Get field of given name. Can be null if field doesn't exist. + * + * @param name the field name + * @return Field as object + */ + @Nullable + public Object getField(@Nonnull final String name) { + return fields.get(name); + } + + /** + * Get field of given name as type. Can be null if field doesn't exist. + * + * @param name the field name + * @param type the field type Class + * @param the field type + * @return Field as given type + */ + @Nullable + public T getField(final String name, final Class type) throws ClassCastException { + Object field = getField(name); + if (field == null) { + return null; + } + return type.cast(field); + } + + /** + * Gets the type of field with given name, if it exists. + * If the field is not present, returns null. + * + * @param name the field name + * @return The field type or null. + */ + @Nullable + public Class getFieldType(@Nonnull final String name) { + Object field = getField(name); + if (field == null) { + return null; + } + return field.getClass(); + } + + /** + * Add {@link Double} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setField(@Nonnull final String field, final double value) { + return putField(field, value); + } + + /** + * Add {@link Long} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + public PointValues setField(@Nonnull final String field, final long value) { + return putField(field, value); + } + + /** + * Add {@link Number} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setField(@Nonnull final String field, @Nullable final Number value) { + return putField(field, value); + } + + /** + * Add {@link String} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setField(@Nonnull final String field, @Nullable final String value) { + return putField(field, value); + } + + /** + * Add {@link Boolean} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setField(@Nonnull final String field, final boolean value) { + return putField(field, value); + } + + /** + * Add {@link Object} field. + * + * @param field the field name + * @param value the field value + * @return this + */ + @Nonnull + public PointValues setField(@Nonnull final String field, @Nullable final Object value) { + return putField(field, value); + } + + /** + * Adds or replaces fields for this point. + * + * @param fieldsToAdd the Map of fields to add + * @return this + */ + @Nonnull + public PointValues setFields(@Nonnull final Map fieldsToAdd) { + + Arguments.checkNotNull(fieldsToAdd, "fieldsToAdd"); + + fieldsToAdd.forEach(this::putField); + + return this; + } + + /** + * Removes a field with the specified name if it exists; otherwise, it does nothing. + * + * @param name the field name + * @return this + */ + @Nonnull + public PointValues removeField(@NonNull final String name) { + fields.remove(name); + + return this; + } + + /** + * Gets an array of field names. + * + * @return An array of field names + */ + @Nonnull + public String[] getFieldNames() { + return fields.keySet().toArray(new String[0]); + } + + /** + * Has point any fields? + * + * @return true, if the point contains any fields, false otherwise. + */ + public boolean hasFields() { + return !fields.isEmpty(); + } + + /** + * Creates a copy of this object. + * + * @return A new instance with same values. + */ + @Nonnull + public PointValues copy() { + PointValues copy = new PointValues(); + + copy.name = this.name; + copy.tags.putAll(this.tags); + copy.fields.putAll(this.fields); + copy.time = this.time; + + return copy; + } + + /** + * Creates new Point with this as values with given measurement. + * + * @param measurement the point measurement + * @return Point from this values with given measurement. + */ + @Nonnull + public Point asPoint(@Nonnull final String measurement) { + setMeasurement(measurement); + try { + return asPoint(); + } catch (Exception e) { + // never + throw new RuntimeException(e); + } + } + + /** + * Creates new Point with this as values. + * + * @return Point from this values with given measurement. + * @throws Exception if measurement is missing + */ + @Nonnull + public Point asPoint() throws Exception { + return Point.fromValues(this); + } + + @Nonnull + private PointValues putField(@Nonnull final String field, @Nullable final Object value) { + + Arguments.checkNonEmpty(field, "fieldName"); + + fields.put(field, value); + return this; + } + +} diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index 80199e6..94d72e5 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -24,11 +24,13 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.logging.Logger; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -40,12 +42,14 @@ import io.netty.handler.codec.http.HttpMethod; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.util.Text; import com.influxdb.v3.client.InfluxDBApiException; import com.influxdb.v3.client.InfluxDBClient; +import com.influxdb.v3.client.Point; +import com.influxdb.v3.client.PointValues; import com.influxdb.v3.client.config.ClientConfig; import com.influxdb.v3.client.query.QueryOptions; -import com.influxdb.v3.client.write.Point; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -157,6 +161,70 @@ public Stream query(@Nonnull final String query, @Nonnull final QueryO }); } + @Nonnull + @Override + public Stream queryPoints(@Nonnull final String query) { + return queryPoints(query, QueryOptions.DEFAULTS); + } + + @Nonnull + @Override + public Stream queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options) { + return queryData(query, options) + .flatMap(vector -> { + List fieldVectors = vector.getFieldVectors(); + return IntStream + .range(0, vector.getRowCount()) + .mapToObj(rowNumber -> { + PointValues p = new PointValues(); + + + ArrayList row = new ArrayList<>(); + for (int i = 0; i < fieldVectors.size(); i++) { + var schema = vector.getSchema().getFields().get(i); + var value = fieldVectors.get(i).getObject(rowNumber); + var name = schema.getName(); + var metaType = schema.getMetadata().get("iox::column::type"); + + if (value instanceof Text) { + value = value.toString(); + } + + if ((Objects.equals(name, "measurement") + || Objects.equals(name, "iox::measurement")) + && value instanceof String) { + p.setMeasurement((String) value); + continue; + } + + if (metaType == null) { + if (Objects.equals(name, "time") && value instanceof Instant) { + p.setTimestamp((Instant) value); + } else { + // just push as field If you don't know what type is it + p.setField(name, value); + } + + continue; + } + + String[] parts = metaType.split(":"); + String valueType = parts[2]; + + if ("field".equals(valueType)) { + p.setField(name, value); + } else if ("tag".equals(valueType) && value instanceof String) { + p.setTag(name, (String) value); + } else if ("timestamp".equals(valueType) && value instanceof Instant) { + p.setTimestamp((Instant) value); + } + } + + return p; + }); + }); + } + @Nonnull @Override public Stream queryBatches(@Nonnull final String query) { diff --git a/src/main/java/com/influxdb/v3/client/write/Point.java b/src/main/java/com/influxdb/v3/client/write/Point.java deleted file mode 100644 index 9375381..0000000 --- a/src/main/java/com/influxdb/v3/client/write/Point.java +++ /dev/null @@ -1,451 +0,0 @@ -/* - * The MIT License - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package com.influxdb.v3.client.write; - -import java.math.BigDecimal; -import java.math.BigInteger; -import java.text.NumberFormat; -import java.time.Instant; -import java.util.Locale; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import javax.annotation.Nullable; -import javax.annotation.concurrent.NotThreadSafe; - -import com.influxdb.v3.client.internal.Arguments; -import com.influxdb.v3.client.internal.NanosecondConverter; - - -/** - * Point defines the values that will be written to the database. - * See Go Implementation. - * - * @author Jakub Bednar (bednar@github) (11/10/2018 11:40) - */ -@NotThreadSafe -public final class Point { - - private static final int MAX_FRACTION_DIGITS = 340; - private static final ThreadLocal NUMBER_FORMATTER = - ThreadLocal.withInitial(() -> { - NumberFormat numberFormat = NumberFormat.getInstance(Locale.ENGLISH); - numberFormat.setMaximumFractionDigits(MAX_FRACTION_DIGITS); - numberFormat.setGroupingUsed(false); - numberFormat.setMinimumFractionDigits(1); - return numberFormat; - }); - - - private final String name; - private final Map tags = new TreeMap<>(); - private final Map fields = new TreeMap<>(); - private Number time; - - /** - * Create a new Point with specified a measurement name. - * - * @param measurementName the measurement name - */ - public Point(@Nonnull final String measurementName) { - - Arguments.checkNotNull(measurementName, "measurement"); - - this.name = measurementName; - } - - /** - * Create a new Point withe specified a measurement name. - * - * @param measurementName the measurement name - * @return new instance of {@link Point} - */ - @Nonnull - public static Point measurement(@Nonnull final String measurementName) { - - Arguments.checkNotNull(measurementName, "measurement"); - - return new Point(measurementName); - } - - /** - * Adds or replaces a tag value for this point. - * - * @param key the tag name - * @param value the tag value - * @return this - */ - @Nonnull - public Point addTag(@Nonnull final String key, @Nullable final String value) { - - Arguments.checkNotNull(key, "tagName"); - - tags.put(key, value); - - return this; - } - - /** - * Adds or replaces tags for this point. - * - * @param tagsToAdd the Map of tags to add - * @return this - */ - @Nonnull - public Point addTags(@Nonnull final Map tagsToAdd) { - - Arguments.checkNotNull(tagsToAdd, "tagsToAdd"); - - tagsToAdd.forEach(this::addTag); - - return this; - } - - /** - * Add {@link Boolean} field. - * - * @param field the field name - * @param value the field value - * @return this - */ - @Nonnull - public Point addField(@Nonnull final String field, final boolean value) { - return putField(field, value); - } - - /** - * Add {@link Long} field. - * - * @param field the field name - * @param value the field value - * @return this - */ - public Point addField(@Nonnull final String field, final long value) { - return putField(field, value); - } - - /** - * Add {@link Double} field. - * - * @param field the field name - * @param value the field value - * @return this - */ - @Nonnull - public Point addField(@Nonnull final String field, final double value) { - return putField(field, value); - } - - /** - * Add {@link Number} field. - * - * @param field the field name - * @param value the field value - * @return this - */ - @Nonnull - public Point addField(@Nonnull final String field, @Nullable final Number value) { - return putField(field, value); - } - - /** - * Add {@link String} field. - * - * @param field the field name - * @param value the field value - * @return this - */ - @Nonnull - public Point addField(@Nonnull final String field, @Nullable final String value) { - return putField(field, value); - } - - /** - * Adds or replaces fields for this point. - * - * @param fieldsToAdd the Map of fields to add - * @return this - */ - @Nonnull - public Point addFields(@Nonnull final Map fieldsToAdd) { - - Arguments.checkNotNull(fieldsToAdd, "fieldsToAdd"); - - fieldsToAdd.forEach(this::putField); - - return this; - } - - /** - * Updates the timestamp for the point. - * - * @param time the timestamp - * @return this - */ - @Nonnull - public Point setTimestamp(@Nullable final Instant time) { - - if (time == null) { - return setTimestamp(null, WritePrecision.NS); - } - - BigInteger convertedTime = NanosecondConverter.convert(time, WritePrecision.NS); - - return setTimestamp(convertedTime, WritePrecision.NS); - } - - /** - * Updates the timestamp for the point. - * - * @param time the timestamp - * @param precision the timestamp precision - * @return this - */ - @Nonnull - public Point setTimestamp(@Nullable final Number time, @Nonnull final WritePrecision precision) { - - Arguments.checkNotNull(precision, "precision"); - - this.time = NanosecondConverter.convertToNanos(time, precision); - - return this; - } - - /** - * Updates the timestamp for the point. - * - * @param time the timestamp - * @param precision the timestamp precision - * @return this - */ - @Nonnull - public Point setTimestamp(@Nullable final Long time, @Nonnull final WritePrecision precision) { - - return setTimestamp((Number) time, precision); - } - - /** - * Has point any fields? - * - * @return true, if the point contains any fields, false otherwise. - */ - public boolean hasFields() { - return !fields.isEmpty(); - } - - /** - * Transform to Line Protocol with nanosecond precision. - * - * @return Line Protocol - */ - @Nonnull - public String toLineProtocol() { - return toLineProtocol(null); - } - - /** - * Transform to Line Protocol. - * - * @param precision required precision - * @return Line Protocol - */ - @Nonnull - public String toLineProtocol(@Nullable final WritePrecision precision) { - - StringBuilder sb = new StringBuilder(); - - escapeKey(sb, name, false); - appendTags(sb); - boolean appendedFields = appendFields(sb); - if (!appendedFields) { - return ""; - } - appendTime(sb, precision); - - return sb.toString(); - } - - @Nonnull - private Point putField(@Nonnull final String field, @Nullable final Object value) { - - Arguments.checkNonEmpty(field, "fieldName"); - - fields.put(field, value); - return this; - } - - private void appendTags(@Nonnull final StringBuilder sb) { - - for (Map.Entry tag : this.tags.entrySet()) { - - String key = tag.getKey(); - String value = tag.getValue(); - - if (key.isEmpty() || value == null || value.isEmpty()) { - continue; - } - - sb.append(','); - escapeKey(sb, key); - sb.append('='); - escapeKey(sb, value); - } - sb.append(' '); - } - - private boolean appendFields(@Nonnull final StringBuilder sb) { - - boolean appended = false; - for (Map.Entry field : this.fields.entrySet()) { - Object value = field.getValue(); - if (isNotDefined(value)) { - continue; - } - escapeKey(sb, field.getKey()); - sb.append('='); - if (value instanceof Number) { - if (value instanceof Double || value instanceof Float || value instanceof BigDecimal) { - sb.append(NUMBER_FORMATTER.get().format(value)); - } else { - sb.append(value).append('i'); - } - } else if (value instanceof String) { - String stringValue = (String) value; - sb.append('"'); - escapeValue(sb, stringValue); - sb.append('"'); - } else { - sb.append(value); - } - - sb.append(','); - - appended = true; - } - - // efficiently chop off the trailing comma - int lengthMinusOne = sb.length() - 1; - if (sb.charAt(lengthMinusOne) == ',') { - sb.setLength(lengthMinusOne); - } - - return appended; - } - - private void appendTime(@Nonnull final StringBuilder sb, @Nullable final WritePrecision precision) { - - if (this.time == null) { - return; - } - - sb.append(" "); - - WritePrecision precisionNotNull = precision != null ? precision : WriteOptions.DEFAULT_WRITE_PRECISION; - - if (WritePrecision.NS.equals(precisionNotNull)) { - if (this.time instanceof BigDecimal) { - sb.append(((BigDecimal) this.time).toBigInteger()); - } else if (this.time instanceof BigInteger) { - sb.append(this.time); - } else { - sb.append(this.time.longValue()); - } - } else { - long time; - if (this.time instanceof BigDecimal) { - time = ((BigDecimal) this.time).longValueExact(); - } else if (this.time instanceof BigInteger) { - time = ((BigInteger) this.time).longValueExact(); - } else { - time = this.time.longValue(); - } - sb.append(toTimeUnit(precisionNotNull).convert(time, toTimeUnit(WritePrecision.NS))); - } - } - - private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key) { - escapeKey(sb, key, true); - } - - private void escapeKey(@Nonnull final StringBuilder sb, @Nonnull final String key, final boolean escapeEqual) { - for (int i = 0; i < key.length(); i++) { - switch (key.charAt(i)) { - case '\n': - sb.append("\\n"); - continue; - case '\r': - sb.append("\\r"); - continue; - case '\t': - sb.append("\\t"); - continue; - case ' ': - case ',': - sb.append('\\'); - break; - case '=': - if (escapeEqual) { - sb.append('\\'); - } - break; - default: - } - - sb.append(key.charAt(i)); - } - } - - private void escapeValue(@Nonnull final StringBuilder sb, @Nonnull final String value) { - for (int i = 0; i < value.length(); i++) { - switch (value.charAt(i)) { - case '\\': - case '\"': - sb.append('\\'); - default: - sb.append(value.charAt(i)); - } - } - } - - private boolean isNotDefined(final Object value) { - return value == null - || (value instanceof Double && !Double.isFinite((Double) value)) - || (value instanceof Float && !Float.isFinite((Float) value)); - } - - @Nonnull - private TimeUnit toTimeUnit(@Nonnull final WritePrecision precision) { - switch (precision) { - case MS: - return TimeUnit.MILLISECONDS; - case S: - return TimeUnit.SECONDS; - case US: - return TimeUnit.MICROSECONDS; - case NS: - return TimeUnit.NANOSECONDS; - default: - throw new IllegalStateException("Unexpected value: " + precision); - } - } -} diff --git a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java index a7098df..2e56cf7 100644 --- a/src/test/java/com/influxdb/v3/client/ITQueryWrite.java +++ b/src/test/java/com/influxdb/v3/client/ITQueryWrite.java @@ -74,6 +74,13 @@ void queryWrite() { Assertions.assertThat(rows).hasSize(1); } + try (Stream stream = client.queryPoints(sql)) { + + List rows = stream.collect(Collectors.toList()); + + Assertions.assertThat(rows).hasSize(1); + } + String influxQL = String.format("SELECT MEAN(value) FROM %s WHERE \"testId\"=%d " + "group by time(1s) fill(none) order by time desc limit 1", measurement, testId); try (Stream stream = client.query(influxQL, QueryOptions.INFLUX_QL)) { diff --git a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java index 97b8bf3..56df9f5 100644 --- a/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java +++ b/src/test/java/com/influxdb/v3/client/InfluxDBClientWriteTest.java @@ -30,7 +30,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import com.influxdb.v3.client.write.Point; import com.influxdb.v3.client.write.WriteOptions; import com.influxdb.v3.client.write.WritePrecision; @@ -228,8 +227,8 @@ void bodyPoint() throws InterruptedException { mockServer.enqueue(createResponse(200)); Point point = new Point("mem"); - point.addTag("tag", "one"); - point.addField("value", 1.0); + point.setTag("tag", "one"); + point.setField("value", 1.0); client.writePoint(point); @@ -244,12 +243,12 @@ void bodyConcat() throws InterruptedException { mockServer.enqueue(createResponse(200)); Point point1 = Point.measurement("mem") - .addTag("tag", "one") - .addField("value", 1.0); + .setTag("tag", "one") + .setField("value", 1.0); Point point2 = Point.measurement("cpu") - .addTag("tag", "two") - .addField("value", 2.0); + .setTag("tag", "two") + .setField("value", 2.0); client.writePoints(Arrays.asList(point1, point2)); diff --git a/src/test/java/com/influxdb/v3/client/write/PointTest.java b/src/test/java/com/influxdb/v3/client/write/PointTest.java index 9adf88e..8bffdc6 100644 --- a/src/test/java/com/influxdb/v3/client/write/PointTest.java +++ b/src/test/java/com/influxdb/v3/client/write/PointTest.java @@ -29,6 +29,8 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.Point; + /** * @author Jakub Bednar (bednar@github) (11/10/2018 12:57) */ @@ -38,23 +40,23 @@ class PointTest { void measurementEscape() { Point point = Point.measurement("h2 o") - .addTag("location", "europe") - .addTag("", "warn") - .addField("level", 2); + .setTag("location", "europe") + .setTag("", "warn") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2\\ o,location=europe level=2i"); point = Point.measurement("h2=o") - .addTag("location", "europe") - .addTag("", "warn") - .addField("level", 2); + .setTag("location", "europe") + .setTag("", "warn") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2=o,location=europe level=2i"); point = Point.measurement("h2,o") - .addTag("location", "europe") - .addTag("", "warn") - .addField("level", 2); + .setTag("location", "europe") + .setTag("", "warn") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2\\,o,location=europe level=2i"); } @@ -62,8 +64,8 @@ void measurementEscape() { @Test public void createByConstructor() { Point point = new Point("h2o") - .addTag("location", "europe") - .addField("level", 2); + .setTag("location", "europe") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -80,9 +82,9 @@ public void createByConstructorMeasurementRequired() { void tagEmptyKey() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addTag("", "warn") - .addField("level", 2); + .setTag("location", "europe") + .setTag("", "warn") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -91,9 +93,9 @@ void tagEmptyKey() { void tagEmptyValue() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addTag("log", "") - .addField("level", 2); + .setTag("location", "europe") + .setTag("log", "") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -102,9 +104,9 @@ void tagEmptyValue() { void tagNullValue() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addTag("log", null) - .addField("level", 2); + .setTag("location", "europe") + .setTag("log", null) + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -113,10 +115,10 @@ void tagNullValue() { public void tagEscapingKeyAndValue() { Point point = Point.measurement("h\n2\ro\t_data") - .addTag("new\nline", "new\nline") - .addTag("carriage\rreturn", "carriage\rreturn") - .addTag("t\tab", "t\tab") - .addField("level", 2); + .setTag("new\nline", "new\nline") + .setTag("carriage\rreturn", "carriage\rreturn") + .setTag("t\tab", "t\tab") + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()) .isEqualTo("h\\n2\\ro\\t_data,carriage\\rreturn=carriage\\rreturn,new\\nline=new\\nline," @@ -127,8 +129,8 @@ public void tagEscapingKeyAndValue() { public void equalSignEscaping() { Point point = Point.measurement("h=2o") - .addTag("l=ocation", "e=urope") - .addField("l=evel", 2); + .setTag("l=ocation", "e=urope") + .setField("l=evel", 2); Assertions.assertThat(point.toLineProtocol()) .isEqualTo("h=2o,l\\=ocation=e\\=urope l\\=evel=2i"); @@ -137,19 +139,19 @@ public void equalSignEscaping() { @Test void fieldTypes() { - Point point = Point.measurement("h2o").addTag("location", "europe") - .addField("long", 1L) - .addField("double", 2D) - .addField("float", 3F) - .addField("longObject", Long.valueOf("4")) - .addField("doubleObject", Double.valueOf("5")) - .addField("floatObject", Float.valueOf("6")) - .addField("bigDecimal", new BigDecimal("33.45")) - .addField("integer", 7) - .addField("integerObject", Integer.valueOf("8")) - .addField("boolean", false) - .addField("booleanObject", Boolean.TRUE) - .addField("string", "string value"); + Point point = Point.measurement("h2o").setTag("location", "europe") + .setField("long", 1L) + .setField("double", 2D) + .setField("float", 3F) + .setField("longObject", Long.valueOf("4")) + .setField("doubleObject", Double.valueOf("5")) + .setField("floatObject", Float.valueOf("6")) + .setField("bigDecimal", new BigDecimal("33.45")) + .setField("integer", 7) + .setField("integerObject", Integer.valueOf("8")) + .setField("boolean", false) + .setField("booleanObject", Boolean.TRUE) + .setField("string", "string value"); String expected = "h2o,location=europe bigDecimal=33.45,boolean=false,booleanObject=true,double=2.0," + "doubleObject=5.0,float=3.0,floatObject=6.0,integer=7i,integerObject=8i,long=1i,longObject=4i," @@ -160,8 +162,8 @@ void fieldTypes() { @Test void fieldNullValue() { - Point point = Point.measurement("h2o").addTag("location", "europe").addField("level", 2) - .addField("warning", (String) null); + Point point = Point.measurement("h2o").setTag("location", "europe").setField("level", 2) + .setField("warning", (String) null); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -170,15 +172,15 @@ void fieldNullValue() { void fieldEscape() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", "string esc\\ape value"); + .setTag("location", "europe") + .setField("level", "string esc\\ape value"); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe " + "level=\"string esc\\\\ape value\""); point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", "string esc\"ape value"); + .setTag("location", "europe") + .setField("level", "string esc\"ape value"); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe " + "level=\"string esc\\\"ape value\""); @@ -188,8 +190,8 @@ void fieldEscape() { void time() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(123L, WritePrecision.S); Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); @@ -199,16 +201,16 @@ void time() { void timeBigInteger() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(new BigInteger("123"), WritePrecision.S); Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); // Friday, June 22, 3353 point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(new BigInteger("43658216763800123456"), WritePrecision.NS); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); @@ -218,23 +220,23 @@ void timeBigInteger() { void timeBigDecimal() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(new BigDecimal("123"), WritePrecision.S); Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(new BigDecimal("1.23E+02"), WritePrecision.NS); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 123"); // Friday, June 22, 3353 point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(new BigDecimal("43658216763800123456"), WritePrecision.NS); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); @@ -244,15 +246,15 @@ void timeBigDecimal() { void timeFloat() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(Float.valueOf("123"), WritePrecision.S); Assertions.assertThat(point.toLineProtocol(WritePrecision.S)).isEqualTo("h2o,location=europe level=2i 123"); point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(Float.valueOf("1.23"), WritePrecision.NS); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 1"); @@ -264,8 +266,8 @@ void timeInstantOver2262() { Instant time = Instant.parse("3353-06-22T10:26:03.800123456Z"); Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(time); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 43658216763800123456"); @@ -275,8 +277,8 @@ void timeInstantOver2262() { void timeInstantNull() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(null); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); @@ -288,8 +290,8 @@ void timeGetTime() { Instant time = Instant.parse("2022-06-12T10:26:03.800123456Z"); Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("level", 2) + .setTag("location", "europe") + .setField("level", 2) .setTimestamp(time); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i 1655029563800123456"); @@ -298,14 +300,14 @@ void timeGetTime() { @Test public void infinityValues() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("double-infinity-positive", Double.POSITIVE_INFINITY) - .addField("double-infinity-negative", Double.NEGATIVE_INFINITY) - .addField("double-nan", Double.NaN) - .addField("flout-infinity-positive", Float.POSITIVE_INFINITY) - .addField("flout-infinity-negative", Float.NEGATIVE_INFINITY) - .addField("flout-nan", Float.NaN) - .addField("level", 2); + .setTag("location", "europe") + .setField("double-infinity-positive", Double.POSITIVE_INFINITY) + .setField("double-infinity-negative", Double.NEGATIVE_INFINITY) + .setField("double-nan", Double.NaN) + .setField("flout-infinity-positive", Float.POSITIVE_INFINITY) + .setField("flout-infinity-negative", Float.NEGATIVE_INFINITY) + .setField("flout-nan", Float.NaN) + .setField("level", 2); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe level=2i"); } @@ -313,13 +315,13 @@ public void infinityValues() { @Test public void onlyInfinityValues() { Point point = Point.measurement("h2o") - .addTag("location", "europe") - .addField("double-infinity-positive", Double.POSITIVE_INFINITY) - .addField("double-infinity-negative", Double.NEGATIVE_INFINITY) - .addField("double-nan", Double.NaN) - .addField("flout-infinity-positive", Float.POSITIVE_INFINITY) - .addField("flout-infinity-negative", Float.NEGATIVE_INFINITY) - .addField("flout-nan", Float.NaN); + .setTag("location", "europe") + .setField("double-infinity-positive", Double.POSITIVE_INFINITY) + .setField("double-infinity-negative", Double.NEGATIVE_INFINITY) + .setField("double-nan", Double.NaN) + .setField("flout-infinity-positive", Float.POSITIVE_INFINITY) + .setField("flout-infinity-negative", Float.NEGATIVE_INFINITY) + .setField("flout-nan", Float.NaN); Assertions.assertThat(point.toLineProtocol()).isEqualTo(""); } @@ -328,19 +330,19 @@ public void onlyInfinityValues() { void hasFields() { Assertions.assertThat(Point.measurement("h2o").hasFields()).isFalse(); - Assertions.assertThat(Point.measurement("h2o").addTag("location", "europe").hasFields()).isFalse(); - Assertions.assertThat(Point.measurement("h2o").addField("level", 2).hasFields()).isTrue(); + Assertions.assertThat(Point.measurement("h2o").setTag("location", "europe").hasFields()).isFalse(); + Assertions.assertThat(Point.measurement("h2o").setField("level", 2).hasFields()).isTrue(); Assertions.assertThat( Point .measurement("h2o") - .addTag("location", "europe") - .addField("level", 3) + .setTag("location", "europe") + .setField("level", 3) .hasFields()) .isTrue(); } @Test - void addTags() { + void setTags() { HashMap tags = new HashMap<>(); tags.put("type", "production"); @@ -348,14 +350,14 @@ void addTags() { tags.put("expensive", ""); Point point = Point.measurement("h2o") - .addField("level", 2) - .addTags(tags); + .setField("level", 2) + .setTags(tags); Assertions.assertThat(point.toLineProtocol()).isEqualTo("h2o,location=europe,type=production level=2i"); } @Test - void addFields() { + void setFields() { HashMap fields = new HashMap<>(); fields.put("level", 2); @@ -365,8 +367,8 @@ void addFields() { Point point = Point .measurement("h2o") - .addTag("location", "europe") - .addFields(fields); + .setTag("location", "europe") + .setFields(fields); Assertions .assertThat(point.toLineProtocol())