Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add queryRows function #209

Open
wants to merge 11 commits into
base: main
from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.1.0 [unreleased]

### Features

1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add function return row as a map

## 1.0.0 [2024-12-11]

### Features
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,88 @@ Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the query string to execute, cannot be null
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* // process row
* }
* });
* </pre>
*
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* Map.of("host", "server-a"), options)) {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.query("select * from cpu where host=$host",;
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,41 @@
)));
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query) {
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters
) {
return queryRows(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryRows(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber ->
VectorSchemaRootConverter.INSTANCE
.getMapFromVectorSchemaRoot(
vector,
rowNumber
)));
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.math.BigInteger;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo

return row;
}

/**
* Get a Map from VectorSchemaRoot.
*
* @param vector The data return from InfluxDB.
* @param rowNumber The row number of data
* @return A Map represents a row of data
*/
public Map<String, Object> getMapFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> row = new LinkedHashMap<>();
for (FieldVector fieldVector : vector.getFieldVectors()) {
Object mappedValue = getMappedValue(
fieldVector.getField(),
fieldVector.getObject(rowNumber)
);
row.put(fieldVector.getName(), mappedValue);

}

return row;
}
}
60 changes: 58 additions & 2 deletions src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void testQuery() throws Exception {
String uuid = UUID.randomUUID().toString();
long timestamp = Instant.now().getEpochSecond();
String record = String.format(
"host10,tag=empty "
"host12,tag=empty "
+ "name=\"intel\","
+ "mem_total=2048,"
+ "disk_free=100i,"
Expand All @@ -151,7 +151,7 @@ public void testQuery() throws Exception {
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));

Map<String, Object> parameters = Map.of("testId", uuid);
String sql = "Select * from host10 where \"testId\"=$testId";
String sql = "Select * from host12 where \"testId\"=$testId";
try (Stream<Object[]> stream = client.query(sql, parameters)) {
stream.findFirst()
.ifPresent(objects -> {
Expand All @@ -167,10 +167,66 @@ public void testQuery() throws Exception {
Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class);
Assertions.assertThat(objects[3]).isEqualTo("intel");

Assertions.assertThat(objects[4].getClass()).isEqualTo(String.class);
Assertions.assertThat(objects[4]).isEqualTo("empty");

Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class);
Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
});
}
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testQueryRows() throws Exception {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
try (InfluxDBClient client = InfluxDBClient.getInstance(
System.getenv("TESTING_INFLUXDB_URL"),
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
System.getenv("TESTING_INFLUXDB_DATABASE"),
null)) {
String uuid = UUID.randomUUID().toString();
long timestamp = Instant.now().getEpochSecond();
String record = String.format(
"host12,tag=tagValue "
+ "name=\"intel\","
+ "mem_total=2048,"
+ "disk_free=100i,"
+ "temperature=100.86,"
+ "isActive=true,"
+ "testId=\"%s\" %d",
uuid,
timestamp
);
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));

Map<String, Object> parameters = Map.of("testId", uuid);
String sql = "Select * from host12 where \"testId\"=$testId";
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
stream.findFirst()
.ifPresent(map -> {
Assertions.assertThat(map.get("tag").getClass()).isEqualTo(String.class);
Assertions.assertThat(map.get("tag")).isEqualTo("tagValue");

Assertions.assertThat(map.get("name").getClass()).isEqualTo(String.class);
Assertions.assertThat(map.get("name")).isEqualTo("intel");

Assertions.assertThat(map.get("mem_total").getClass()).isEqualTo(Double.class);
Assertions.assertThat(map.get("mem_total")).isEqualTo(2048.0);

Assertions.assertThat(map.get("disk_free").getClass()).isEqualTo(Long.class);
Assertions.assertThat(map.get("disk_free")).isEqualTo(100L);

Assertions.assertThat(map.get("isActive").getClass()).isEqualTo(Boolean.class);
Assertions.assertThat(map.get("isActive")).isEqualTo(true);

Assertions.assertThat(map.get("time").getClass()).isEqualTo(BigInteger.class);
Assertions.assertThat(map.get("time"))
.isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
});
}
}
}
}