Skip to content

Commit

Permalink
fix: line protocol bug (openGemini#41)
Browse files Browse the repository at this point in the history
Signed-off-by: weiping-code <[email protected]>
  • Loading branch information
weiping-code authored May 28, 2024
1 parent 5f984be commit da1b121
Show file tree
Hide file tree
Showing 4 changed files with 243 additions and 37 deletions.
126 changes: 102 additions & 24 deletions opengemini-client-api/src/main/java/io/opengemini/client/api/Point.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,53 +2,131 @@

import lombok.Getter;
import lombok.Setter;
import lombok.SneakyThrows;

import java.util.HashMap;
import java.math.BigDecimal;
import java.util.Map;

@Getter
@Setter
public class Point {

private static final char[] MEASUREMENT_ESCAPE_CHARACTERS = new char[]{',', ' '};
private static final char[] TAG_KEY_ESCAPE_CHARACTERS = new char[]{',', '=', ' '};
private static final char[] TAG_VALUE_ESCAPE_CHARACTERS = new char[]{',', '=', ' '};
private static final char[] FIELD_KEY_ESCAPE_CHARACTERS = new char[]{',', '=', ' '};
private static final char[] FIELD_VALUE_ESCAPE_CHARACTERS = new char[]{'"', '\\'};

private static final ThreadLocal<StringBuilder> SB_CACHE = ThreadLocal.withInitial(() -> new StringBuilder(1024));

String measurement;
Precision precision = Precision.PRECISIONNANOSECOND;
long time;
HashMap<String, String> tags;
HashMap<String, Object> fields;
Map<String, String> tags;
Map<String, Object> fields;

@SneakyThrows
@Override
public String toString() {
StringBuilder sb = new StringBuilder().append(measurement);
tags.forEach((key, value) -> sb.append(",").append(key).append("=").append(value));
public String lineProtocol() {
StringBuilder sb = SB_CACHE.get();
sb.setLength(0);
appendMeasurement(sb);
appendTags(sb);
int validFields = appendFields(sb);
if (validFields <= 0) {
return "";
}
appendTimestamp(sb);
return sb.toString();
}

private void appendMeasurement(StringBuilder sb) {
appendWithEscape(sb, measurement, MEASUREMENT_ESCAPE_CHARACTERS);
}

private void appendTags(StringBuilder sb) {
if (tags != null && !tags.isEmpty()) {
for (Map.Entry<String, String> tag : tags.entrySet()) {
sb.append(',');
appendWithEscape(sb, tag.getKey(), TAG_KEY_ESCAPE_CHARACTERS);
sb.append('=');
appendWithEscape(sb, tag.getValue(), TAG_VALUE_ESCAPE_CHARACTERS);
}
}
sb.append(' ');
}

private int appendFields(StringBuilder sb) {
int validFields = 0;
if (fields == null || fields.isEmpty()) {
return validFields;
}

sb.append(" ");
boolean firstField = true;
for (Map.Entry<String, Object> entry : fields.entrySet()) {
Object fieldValue = entry.getValue();
if (fieldValue == null || isNotFinite(fieldValue)) {
continue;
}

if (firstField) {
firstField = false;
} else {
sb.append(",");
sb.append(',');
}

sb.append(entry.getKey()).append("=");
if (entry.getValue() instanceof String) {
sb.append("\"").append(entry.getValue()).append("\"");
} else if ((entry.getValue() instanceof Integer) || (entry.getValue() instanceof Long)) {
sb.append(entry.getValue()).append("i");
String fieldKey = entry.getKey();
appendWithEscape(sb, fieldKey, FIELD_KEY_ESCAPE_CHARACTERS);
sb.append('=');
if (fieldValue instanceof Number) {
if (fieldValue instanceof Double || fieldValue instanceof Float || fieldValue instanceof BigDecimal) {
sb.append(fieldValue);
} else {
sb.append(fieldValue).append('i');
}
} else if (fieldValue instanceof String) {
String stringValue = (String) fieldValue;
sb.append('"');
appendWithEscape(sb, stringValue, FIELD_VALUE_ESCAPE_CHARACTERS);
sb.append('"');
} else {
throw new Exception("ss");
sb.append(fieldValue);
}
validFields++;
}
sb.append(" ");
sb.append(converteTime());
return sb.toString();

return validFields;
}

private String converteTime() {
if (time == 0) {
return "";
private void appendTimestamp(StringBuilder sb) {
sb.append(' ');
if (time != 0 && precision != null) {
sb.append(precision.getTimeUnit().toNanos(time));
}
}

@Override
public String toString() {
return lineProtocol();
}

private static void appendWithEscape(StringBuilder sb, String origin, char[] escapeChars) {
for (char c : origin.toCharArray()) {
if (shouldEscape(c, escapeChars)) {
sb.append("\\");
}
sb.append(c);
}
return String.valueOf(time);
}

private static boolean shouldEscape(char c, char[] escapeChars) {
for (char escapeChar : escapeChars) {
if (escapeChar == c) {
return true;
}
}
return false;
}

private static boolean isNotFinite(final Object value) {
return (value instanceof Double && !Double.isFinite((Double) value))
|| (value instanceof Float && !Float.isFinite((Float) value));
}
}
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
package io.opengemini.client.api;

import lombok.Getter;

import java.util.concurrent.TimeUnit;

@Getter
public enum Precision {
PRECISIONNANOSECOND("PrecisionNanoSecond"),
PRECISIONNANOSECOND("PrecisionNanoSecond", TimeUnit.NANOSECONDS),

PRECISIONMICROSECOND("PrecisionMicrosecond"),
PRECISIONMICROSECOND("PrecisionMicrosecond", TimeUnit.MICROSECONDS),

PRECISIONMILLISECOND("PrecisionMillisecond"),
PRECISIONMILLISECOND("PrecisionMillisecond", TimeUnit.MILLISECONDS),

PRECISIONSECOND("PrecisionSecond"),
PRECISIONSECOND("PrecisionSecond", TimeUnit.SECONDS),

PRECISIONMINUTE("PrecisionMinute"),
PRECISIONMINUTE("PrecisionMinute", TimeUnit.MINUTES),

PRECISIONHOUR("PrecisionHour");
PRECISIONHOUR("PrecisionHour", TimeUnit.HOURS);

private final String description;

Precision(String description) {
this.description = description;
}
private final TimeUnit timeUnit;

public String getDescription() {
return description;
Precision(String description, TimeUnit timeUnit) {
this.description = description;
this.timeUnit = timeUnit;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package io.opengemini.client.api;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.math.BigDecimal;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;


class PointTest {

@Test
void lineProtocol_without_escaped_chars() {
Assertions.assertEquals("test,T0=0 a=1i 1", testPoint("test", "T0", "0", "a", 1).lineProtocol());
}

@Test
void lineProtocol_measurement_with_escaped_chars() {
Assertions.assertEquals("test\\,,T0=0 a=1i 1", testPoint("test,", "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test\\ ,T0=0 a=1i 1", testPoint("test ", "T0", "0", "a", 1).lineProtocol());
}

@Test
void lineProtocol_tag_key_with_escaped_chars() {
Assertions.assertEquals("test,T0\\,=0 a=1i 1", testPoint("test", "T0,", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0\\==0 a=1i 1", testPoint("test", "T0=", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0\\ =0 a=1i 1", testPoint("test", "T0 ", "0", "a", 1).lineProtocol());
}

@Test
void lineProtocol_tag_value_with_escaped_chars() {
Assertions.assertEquals("test,T0=0\\, a=1i 1", testPoint("test", "T0", "0,", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0\\= a=1i 1", testPoint("test", "T0", "0=", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0\\ a=1i 1", testPoint("test", "T0", "0 ", "a", 1).lineProtocol());
}

@Test
void lineProtocol_field_key_with_escaped_chars() {
Assertions.assertEquals("test,T0=0 a\\,=1i 1", testPoint("test", "T0", "0", "a,", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a\\==1i 1", testPoint("test", "T0", "0", "a=", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a\\ =1i 1", testPoint("test", "T0", "0", "a ", 1).lineProtocol());
}

@Test
void lineProtocol_field_value_with_escaped_chars() {
Assertions.assertEquals("test,T0=0 a=\"1\\\"\" 1", testPoint("test", "T0", "0", "a", "1\"").lineProtocol());
Assertions.assertEquals("test,T0=0 a=\"1\\\\\" 1", testPoint("test", "T0", "0", "a", "1\\").lineProtocol());
Assertions.assertEquals("test,T0=0 a=\"1\\\\\\\\\" 1",
testPoint("test", "T0", "0", "a", "1\\\\").lineProtocol());
Assertions.assertEquals("test,T0=0 a=\"1\\\\\\\\\\\\\" 1",
testPoint("test", "T0", "0", "a", "1\\\\\\").lineProtocol());
}

@Test
void lineProtocol_field_value_with_different_types() {
Assertions.assertEquals("test,T0=0 a=1.0 1", testPoint("test", "T0", "0", "a", 1D).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1.1 1", testPoint("test", "T0", "0", "a", 1.1D).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1.0 1", testPoint("test", "T0", "0", "a", 1F).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1.1 1", testPoint("test", "T0", "0", "a", 1.1F).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1.0 1",
testPoint("test", "T0", "0", "a", BigDecimal.valueOf(1D)).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1.1 1",
testPoint("test", "T0", "0", "a", BigDecimal.valueOf(1.1D)).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1", testPoint("test", "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1", testPoint("test", "T0", "0", "a", 1L).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1", testPoint("test", "T0", "0", "a", (short) 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1", testPoint("test", "T0", "0", "a", (byte) 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=true 1", testPoint("test", "T0", "0", "a", true).lineProtocol());
}

@Test
void lineProtocol_timestamp_convert_precision() {
Assertions.assertEquals("test,T0=0 a=1i 1",
testPoint("test", Precision.PRECISIONNANOSECOND, "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1000",
testPoint("test", Precision.PRECISIONMICROSECOND, "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1000000",
testPoint("test", Precision.PRECISIONMILLISECOND, "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 1000000000",
testPoint("test", Precision.PRECISIONSECOND, "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 60000000000",
testPoint("test", Precision.PRECISIONMINUTE, "T0", "0", "a", 1).lineProtocol());
Assertions.assertEquals("test,T0=0 a=1i 3600000000000",
testPoint("test", Precision.PRECISIONHOUR, "T0", "0", "a", 1).lineProtocol());
}

@Test
void lineProtocol_with_multi_tags_and_fields() {
Point point = new Point();
point.setMeasurement("test");
point.setPrecision(Precision.PRECISIONNANOSECOND);
point.setTime(1);

Map<String, String> tags = new LinkedHashMap<>();
tags.put("T0", "0");
tags.put("T1", "1");
point.setTags(tags);

Map<String, Object> fields = new LinkedHashMap<>();
fields.put("a", 1);
fields.put("b", 2);
point.setFields(fields);

Assertions.assertEquals("test,T0=0,T1=1 a=1i,b=2i 1", point.lineProtocol());
}

private static Point testPoint(String measurement, String tagKey, String tagValue, String fieldKey,
Object fieldValue) {
return testPoint(measurement, Precision.PRECISIONNANOSECOND, tagKey, tagValue, fieldKey, fieldValue);
}

private static Point testPoint(String measurement, Precision precision, String tagKey, String tagValue,
String fieldKey, Object fieldValue) {
Point point = new Point();
point.setMeasurement(measurement);
point.setPrecision(precision);
point.setTime(1);
point.setTags(Collections.singletonMap(tagKey, tagValue));
point.setFields(Collections.singletonMap(fieldKey, fieldValue));
return point;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,15 +144,15 @@ public CompletableFuture<QueryResult> query(Query query) {

public CompletableFuture<Void> write(String database, Point point) {
String writeUrl = getWriteUrl(database);
String body = point.toString();
String body = point.lineProtocol();

return httpExecute(writeUrl, Void.class, UrlConst.POST, HttpRequest.BodyPublishers.ofString(body));
}

public CompletableFuture<Void> writeBatch(String database, List<Point> points) {
String writeUrl = getWriteUrl(database);
StringJoiner sj = new StringJoiner("\n");
points.forEach(point -> sj.add(point.toString()));
points.forEach(point -> sj.add(point.lineProtocol()));

return httpExecute(writeUrl, Void.class, UrlConst.POST, HttpRequest.BodyPublishers.ofString(sj.toString()));
}
Expand Down

0 comments on commit da1b121

Please sign in to comment.