Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add queryRows function #209

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 1.1.0 [unreleased]

### Features

1. [#209](https://github.com/InfluxCommunity/influxdb3-java/pull/209) Add function return row as a map

## 1.0.0 [2024-12-11]

### Features
Expand Down
81 changes: 81 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,87 @@ Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel")) {
* rows.forEach(row -&gt; {
* // process row
* });
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* rows.forEach(row -&gt; {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* // process row
* })
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=intel",
* options)) {
* rows.forEach(row -&gt; {
* // process row
* })
* };
* </pre>
*
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Map&lt;String, Object&gt;&gt; rows = client.queryRows("select * from cpu where host=$host",
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* })
* };
* </pre>
*
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,41 @@
)));
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query) {
return queryRows(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters
) {
return queryRows(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryRows(query, NO_PARAMETERS, options);

Check warning on line 209 in src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java#L209

Added line #L209 was not covered by tests
}

@Nonnull
@Override
public Stream<Map<String, Object>> queryRows(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> IntStream.range(0, vector.getRowCount())
.mapToObj(rowNumber ->
VectorSchemaRootConverter.INSTANCE
.getMapFromVectorSchemaRoot(
vector,
rowNumber
)));
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@

import java.math.BigInteger;
import java.time.LocalDateTime;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -195,4 +197,25 @@ public Object[] getArrayObjectFromVectorSchemaRoot(@Nonnull final VectorSchemaRo

return row;
}

/**
* Get a Map from VectorSchemaRoot.
*
* @param vector The data return from InfluxDB.
* @param rowNumber The row number of data
* @return A Map represents a row of data
*/
public Map<String, Object> getMapFromVectorSchemaRoot(@Nonnull final VectorSchemaRoot vector, final int rowNumber) {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
Map<String, Object> row = new LinkedHashMap<>();
for (FieldVector fieldVector : vector.getFieldVectors()) {
Object mappedValue = getMappedValue(
fieldVector.getField(),
fieldVector.getObject(rowNumber)
);
row.put(fieldVector.getName(), mappedValue);

}

return row;
}
}
119 changes: 117 additions & 2 deletions src/test/java/com/influxdb/v3/client/InfluxDBClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@

import java.math.BigInteger;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.arrow.flight.FlightRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
Expand Down Expand Up @@ -138,7 +142,7 @@ public void testQuery() throws Exception {
String uuid = UUID.randomUUID().toString();
long timestamp = Instant.now().getEpochSecond();
String record = String.format(
"host10,tag=empty "
"host12,tag=empty "
+ "name=\"intel\","
+ "mem_total=2048,"
+ "disk_free=100i,"
Expand All @@ -151,7 +155,7 @@ public void testQuery() throws Exception {
client.writeRecord(record, new WriteOptions(null, WritePrecision.S, null));

Map<String, Object> parameters = Map.of("testId", uuid);
String sql = "Select * from host10 where \"testId\"=$testId";
String sql = "Select * from host12 where \"testId\"=$testId";
try (Stream<Object[]> stream = client.query(sql, parameters)) {
stream.findFirst()
.ifPresent(objects -> {
Expand All @@ -167,10 +171,121 @@ public void testQuery() throws Exception {
Assertions.assertThat(objects[3].getClass()).isEqualTo(String.class);
Assertions.assertThat(objects[3]).isEqualTo("intel");

Assertions.assertThat(objects[4].getClass()).isEqualTo(String.class);
Assertions.assertThat(objects[4]).isEqualTo("empty");

Assertions.assertThat(objects[7].getClass()).isEqualTo(BigInteger.class);
Assertions.assertThat(objects[7]).isEqualTo(BigInteger.valueOf(timestamp * 1_000_000_000));
});
}
}
}

@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testQueryRows() throws Exception {
NguyenHoangSon96 marked this conversation as resolved.
Show resolved Hide resolved
try (InfluxDBClient client = InfluxDBClient.getInstance(
System.getenv("TESTING_INFLUXDB_URL"),
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
System.getenv("TESTING_INFLUXDB_DATABASE"),
null)) {
String uuid = UUID.randomUUID().toString();
String measurement = "host21";
List<Map<String, Object>> testDatas = new ArrayList<>();
for (int i = 0; i <= 9; i++) {
long timestamp = System.currentTimeMillis();
Map<String, Object> map = Map.of(
"measurement", measurement,
"tag", "tagValue",
"name", "intel",
"mem_total", 2048.0,
"disk_free", 100L,
"temperature", 100.86,
"isActive", true,
"time", timestamp,
"testId", uuid
);
String record = String.format(
"%s,tag=tagValue "
+ "name=\"%s\","
+ "mem_total=%f,"
+ "disk_free=%di,"
+ "temperature=%f,"
+ "isActive=%b,"
+ "testId=\"%s\" %d",
measurement,
map.get("name"),
(Double) map.get("mem_total"),
(Long) map.get("disk_free"),
(Double) map.get("temperature"),
map.get("isActive"),
uuid,
timestamp
);
client.writeRecord(record, new WriteOptions(null, WritePrecision.MS, null));
testDatas.add(map);
}

Map<String, Object> parameters = Map.of("testId", uuid);
// Result set much be ordered by time
String sql = String.format("Select * from %s where \"testId\"=$testId order by time", measurement);
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
List<Map<String, Object>> results = stream.collect(Collectors.toList());
for (int i = 0; i <= 9; i++) {
Map<String, Object> row = results.get(i);
Map<String, Object> testData = testDatas.get(i);
Assertions.assertThat(row.get("tag").getClass()).isEqualTo(String.class);
Assertions.assertThat(row.get("tag")).isEqualTo(testData.get("tag"));

Assertions.assertThat(row.get("name").getClass()).isEqualTo(String.class);
Assertions.assertThat(row.get("name")).isEqualTo(testData.get("name"));

Assertions.assertThat(row.get("mem_total").getClass()).isEqualTo(Double.class);
Assertions.assertThat(row.get("mem_total")).isEqualTo(testData.get("mem_total"));

Assertions.assertThat(row.get("disk_free").getClass()).isEqualTo(Long.class);
Assertions.assertThat(row.get("disk_free")).isEqualTo(testData.get("disk_free"));

Assertions.assertThat(row.get("isActive").getClass()).isEqualTo(Boolean.class);
Assertions.assertThat(row.get("isActive")).isEqualTo(testData.get("isActive"));

Assertions.assertThat(row.get("time").getClass()).isEqualTo(BigInteger.class);
Assertions.assertThat(row.get("time"))
.isEqualTo(BigInteger.valueOf((Long) testData.get("time") * 1_000_000));
}
}
}
}


@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_URL", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_TOKEN", matches = ".*")
@EnabledIfEnvironmentVariable(named = "TESTING_INFLUXDB_DATABASE", matches = ".*")
@Test
public void testQueryRowsExceptionCases() throws Exception {
try (InfluxDBClient client = InfluxDBClient.getInstance(
System.getenv("TESTING_INFLUXDB_URL"),
System.getenv("TESTING_INFLUXDB_TOKEN").toCharArray(),
System.getenv("TESTING_INFLUXDB_DATABASE"),
null)) {

// Empty result case
Map<String, Object> parameters = Map.of("testId", "NotExist");
String sql = "Select * from host21 where \"testId\"=$testId";
try (Stream<Map<String, Object>> stream = client.queryRows(sql, parameters)) {
Assertions.assertThat((int) stream.count()).isEqualTo(0);
}

// Malformed query case
Assertions.assertThatThrownBy(() -> {
String query = "Select * from host21 whereabs testId=2";
try (Stream<Map<String, Object>> stream = client.queryRows(query)) {
stream.findFirst();
}
})
.isInstanceOf(FlightRuntimeException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testConverterWithMetaType() {
Assertions.assertThat(diskFree.getClass()).isEqualTo(Long.class);

Double temperature = (Double) pointValues.getField("temperature");
Assertions.assertThat(temperature).isEqualTo(100.8766f);
Assertions.assertThat(temperature).isEqualTo(100.8766);
Assertions.assertThat(temperature.getClass()).isEqualTo(Double.class);

String name = (String) pointValues.getField("name");
Expand All @@ -227,6 +227,21 @@ public void testConverterWithMetaType() {
}
}

@Test
void testGetMapFromVectorSchemaRoot() {
try (VectorSchemaRoot root = VectorSchemaRootUtils.generateVectorSchemaRoot()) {
Map<String, Object> map = VectorSchemaRootConverter.INSTANCE.getMapFromVectorSchemaRoot(root, 0);

Assertions.assertThat(map).hasSize(7);
Assertions.assertThat(map.get("measurement")).isEqualTo("host");
Assertions.assertThat(map.get("mem_total")).isEqualTo(2048L);
Assertions.assertThat(map.get("temperature")).isEqualTo(100.8766);
Assertions.assertThat(map.get("isActive")).isEqualTo(true);
Assertions.assertThat(map.get("name")).isEqualTo("intel");
Assertions.assertThat(map.get("time")).isEqualTo(BigInteger.valueOf(123_456L * 1_000_000));
}
}

@Test
void timestampWithoutMetadataAndFieldWithoutMetadata() {
FieldType timeType = new FieldType(true, new ArrowType.Time(TimeUnit.MILLISECOND, 32), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static VectorSchemaRoot generateVectorSchemaRoot() {

Float8Vector floatVector = (Float8Vector) root.getVector("temperature");
floatVector.allocateNew();
floatVector.set(0, 100.8766f);
floatVector.set(0, 100.8766);

VarCharVector stringVector = (VarCharVector) root.getVector("name");
stringVector.allocateNew();
Expand Down
Loading