Skip to content

Commit

Permalink
Pipe / Load / Subscription: Support new TsDataTypes (STRING / BLOB / …
Browse files Browse the repository at this point in the history
…TIMESTAMP / DATE) (#12665)
  • Loading branch information
Caideyipi authored Jun 13, 2024
1 parent a42e644 commit 0dbb389
Show file tree
Hide file tree
Showing 32 changed files with 1,511 additions and 527 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.io.File;
import java.io.IOException;
import java.time.LocalDate;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -52,7 +53,7 @@ public class TsFileGenerator implements AutoCloseable {
private final Map<String, List<MeasurementSchema>> device2MeasurementSchema;
private Random random;

public TsFileGenerator(File tsFile) throws IOException {
public TsFileGenerator(final File tsFile) throws IOException {
this.tsFile = tsFile;
this.writer = new TsFileWriter(tsFile);
this.device2TimeSet = new HashMap<>();
Expand All @@ -64,11 +65,12 @@ public void resetRandom() {
random = new Random();
}

public void resetRandom(long seed) {
public void resetRandom(final long seed) {
random = new Random(seed);
}

public void registerTimeseries(String path, List<MeasurementSchema> measurementSchemaList) {
public void registerTimeseries(
final String path, final List<MeasurementSchema> measurementSchemaList) {
if (device2MeasurementSchema.containsKey(path)) {
LOGGER.error("Register same device {}.", path);
return;
Expand All @@ -78,7 +80,8 @@ public void registerTimeseries(String path, List<MeasurementSchema> measurementS
device2MeasurementSchema.put(path, measurementSchemaList);
}

public void registerAlignedTimeseries(String path, List<MeasurementSchema> measurementSchemaList)
public void registerAlignedTimeseries(
final String path, final List<MeasurementSchema> measurementSchemaList)
throws WriteProcessException {
if (device2MeasurementSchema.containsKey(path)) {
LOGGER.error("Register same device {}.", path);
Expand All @@ -89,14 +92,15 @@ public void registerAlignedTimeseries(String path, List<MeasurementSchema> measu
device2MeasurementSchema.put(path, measurementSchemaList);
}

public void generateData(String device, int number, long timeGap, boolean isAligned)
public void generateData(
final String device, final int number, final long timeGap, final boolean isAligned)
throws IOException, WriteProcessException {
List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
TreeSet<Long> timeSet = device2TimeSet.get(device);
Tablet tablet = new Tablet(device, schemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
long sensorNum = schemas.size();
final List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
final TreeSet<Long> timeSet = device2TimeSet.get(device);
final Tablet tablet = new Tablet(device, schemas);
final long[] timestamps = tablet.timestamps;
final Object[] values = tablet.values;
final long sensorNum = schemas.size();
long startTime = timeSet.isEmpty() ? 0L : timeSet.last();

for (long r = 0; r < number; r++) {
Expand Down Expand Up @@ -131,18 +135,22 @@ public void generateData(String device, int number, long timeGap, boolean isAlig
}

public void generateData(
String device, int number, long timeGap, boolean isAligned, long startTimestamp)
final String device,
final int number,
final long timeGap,
final boolean isAligned,
final long startTimestamp)
throws IOException, WriteProcessException {
List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
TreeSet<Long> timeSet = device2TimeSet.get(device);
Tablet tablet = new Tablet(device, schemas);
long[] timestamps = tablet.timestamps;
Object[] values = tablet.values;
long sensorNum = schemas.size();
final List<MeasurementSchema> schemas = device2MeasurementSchema.get(device);
final TreeSet<Long> timeSet = device2TimeSet.get(device);
final Tablet tablet = new Tablet(device, schemas);
final long[] timestamps = tablet.timestamps;
final Object[] values = tablet.values;
final long sensorNum = schemas.size();
long startTime = startTimestamp;

for (long r = 0; r < number; r++) {
int row = tablet.rowSize++;
final int row = tablet.rowSize++;
startTime += timeGap;
timestamps[row] = startTime;
timeSet.add(startTime);
Expand Down Expand Up @@ -172,12 +180,16 @@ public void generateData(
LOGGER.info("Write {} points into device {}", number, device);
}

private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
private void generateDataPoint(final Object obj, final int row, final MeasurementSchema schema) {
switch (schema.getType()) {
case INT32:
generateINT32(obj, row);
break;
case DATE:
generateDATE(obj, row);
break;
case INT64:
case TIMESTAMP:
generateINT64(obj, row);
break;
case FLOAT:
Expand All @@ -190,60 +202,69 @@ private void generateDataPoint(Object obj, int row, MeasurementSchema schema) {
generateBOOLEAN(obj, row);
break;
case TEXT:
case BLOB:
case STRING:
generateTEXT(obj, row);
break;
default:
LOGGER.error("Wrong data type {}.", schema.getType());
}
}

private void generateINT32(Object obj, int row) {
int[] ints = (int[]) obj;
private void generateINT32(final Object obj, final int row) {
final int[] ints = (int[]) obj;
ints[row] = random.nextInt();
}

private void generateINT64(Object obj, int row) {
long[] longs = (long[]) obj;
private void generateDATE(final Object obj, final int row) {
final LocalDate[] dates = (LocalDate[]) obj;
dates[row] =
LocalDate.of(1000 + random.nextInt(9000), 1 + random.nextInt(12), 1 + random.nextInt(28));
}

private void generateINT64(final Object obj, final int row) {
final long[] longs = (long[]) obj;
longs[row] = random.nextLong();
}

private void generateFLOAT(Object obj, int row) {
float[] floats = (float[]) obj;
private void generateFLOAT(final Object obj, final int row) {
final float[] floats = (float[]) obj;
floats[row] = random.nextFloat();
}

private void generateDOUBLE(Object obj, int row) {
double[] doubles = (double[]) obj;
private void generateDOUBLE(final Object obj, final int row) {
final double[] doubles = (double[]) obj;
doubles[row] = random.nextDouble();
}

private void generateBOOLEAN(Object obj, int row) {
boolean[] booleans = (boolean[]) obj;
private void generateBOOLEAN(final Object obj, final int row) {
final boolean[] booleans = (boolean[]) obj;
booleans[row] = random.nextBoolean();
}

private void generateTEXT(Object obj, int row) {
Binary[] binaries = (Binary[]) obj;
private void generateTEXT(final Object obj, final int row) {
final Binary[] binaries = (Binary[]) obj;
binaries[row] =
new Binary(String.format("test point %d", random.nextInt()), TSFileConfig.STRING_CHARSET);
}

public void generateDeletion(String device, int number) throws IOException, IllegalPathException {
try (ModificationFile modificationFile =
public void generateDeletion(final String device, final int number)
throws IOException, IllegalPathException {
try (final ModificationFile modificationFile =
new ModificationFile(tsFile.getAbsolutePath() + ModificationFile.FILE_SUFFIX)) {
writer.flushAllChunkGroups();
TreeSet<Long> timeSet = device2TimeSet.get(device);
final TreeSet<Long> timeSet = device2TimeSet.get(device);
if (timeSet.isEmpty()) {
return;
}

long fileOffset = tsFile.length();
long maxTime = timeSet.last() - 1;
final long fileOffset = tsFile.length();
final long maxTime = timeSet.last() - 1;
for (int i = 0; i < number; i++) {
int endTime = random.nextInt((int) (maxTime)) + 1;
int startTime = random.nextInt(endTime);
for (MeasurementSchema measurementSchema : device2MeasurementSchema.get(device)) {
Deletion deletion =
final int endTime = random.nextInt((int) (maxTime)) + 1;
final int startTime = random.nextInt(endTime);
for (final MeasurementSchema measurementSchema : device2MeasurementSchema.get(device)) {
final Deletion deletion =
new Deletion(
new PartialPath(
device
Expand Down
Loading

0 comments on commit 0dbb389

Please sign in to comment.