-
Notifications
You must be signed in to change notification settings - Fork 22
Get Started_0.3.1
Now, you’re ready to start doing some awesome things with TsFile. This section demonstrates the detailed usage of TsFile.
A time-series is considered as a set of quadruples. A quadruple is defined as (deltaObject, measurement, time, value).
- deltaObject: In many stituations, a device which contains many sensors can be considered as a deltaObject.
- measurement: A sensor can be considered as a measurement
Table 1 illustates a set of time-series data. The set showed in the following table contains one deltaObject named "device_1" with three measurements named "sensor_1", "sensor_2" and "sensor_3".
device_1 | |||||
---|---|---|---|---|---|
sensor_1 | sensor_2 | sensor_3 | |||
time | value | time | value | time | value |
1 | 1.2 | 1 | 20 | 2 | 50 |
3 | 1.4 | 2 | 20 | 4 | 51 |
5 | 1.1 | 3 | 21 | 6 | 52 |
7 | 1.8 | 4 | 20 | 8 | 53 |
One Line of Data: In many industrial applications, a device normally contains more than one sensor and these sensors may have values at a same timestamp, which is called one line of data.
Formally, one line of data consists of a deltaObject_id
, a timestamp which indicates the milliseconds since January 1, 1970, 00:00:00, and several data pairs composed of measurement_id
and corresponding value
. All data pairs in one line belong to this deltaObject_id
and have the same timestamp. If one of the measurements
doesn't have a value
in the timestamp
, use a space instead(Actually, TsFile does not store null values). Its format is shown as follow:
deltaObject_id, timestamp, <measurement_id, value>...
An example is illustrated as follow. In this example, the data type of three measurements are INT32
, FLOAT
and ENUMS
respectively.
device_1, 1490860659000, m1, 10, m2, 12.12, m3, MAN
A TsFile can be generated by following three steps and the complete code will be given in the section "Example for writing TsFile".
-
First, use the interface to construct a TsFile instance, there are two ways
(1)
public TsFileWriter(File file) throws WriteProcessException, IOException
Parameters:
- file : The TsFile to write
-
Second, add measurements
public void addMeasurement(MeasurementDescriptor measurementDescriptor) throws WriteProcessException
Parameters:
- measurementDescriptor : The measurement information including name, data type and encoding
-
Third, write data continually.
public void write(TSRecord record) throws IOException, WriteProcessException
The details to construct a
TSRecord
could be refered in section "Example for writing TsFile". -
Finally, call
close
to finish this writing process.public void close() throws IOException
SchemaJSON
consists of two parts: user settings of the TsFile and a schema specifying a list of allowable time series. The schema describes each measurement's measurement_id
, data_type
, encoding
, etc..
An example is shown as follow:
{
"schema": [
{
"measurement_id": "m1",
"data_type": "INT32",
"encoding": "RLE"
},
{
"measurement_id": "m2",
"data_type": "FLOAT",
"encoding": "TS_2DIFF",
"max_point_number": 2
},
{
"measurement_id": "m3",
"data_type": "ENUMS",
"encoding": "BITMAP",
"enum_values":["MAN","WOMAN"]
},
{
"measurement_id": "m4",
"data_type": "INT64",
"encoding": "RLE",
"compressor": "SNAPPY"
}
],
"row_group_size": 8388608,
"page_size": 1048576,
}
SchemaJSON
consists of a required field schema
in type of JSONArray
and two optional fields: row_group_size
and page_size
. For each entry in schema
which corresponds to a time series, its field description is shown as follow:
key | is required | description | allowed values |
---|---|---|---|
measurement_id | required | name of the time series | any combination of letters, numbers and other symbols like _ .
|
data_type | required | data type |
BOOLEAN , INT32 , INT64 , FLOAT , DOUBLE , ENUM and TEXT (namely String ) |
encoding | required | encoding approach for time domain. |
PLAIN (for all data types), {TS_2DIFF , RLE }(for INT32 , INT64 , FLOAT , DOUBLE , ENUM ), BITMAP (ENUM ) |
enum_values | required if data_type is ENUM
|
the fields of ENUM
|
in format of ["MAN","WOMAN"]
|
max_point_number | optional | the number of reserved decimal digits. It's useful if the data type is FLOAT , DOUBLE or BigDecimal
|
natural number, defaults to 2 |
compressor | optional | the type of compression. |
SNAPPY and UNCOMPRESSED , defaults to UNCOMPRESSED
|
max_string_length | optional | maximal length of string. It's useful if the data type is TEXT . |
positive integer, defaults to 128 |
You should first install tsfile to your local maven repository.
reference: Installation_0.3.0
/**
* There are two ways to construct a TsFile instance,they generate the same TsFile file.
* The class use the first interface:
* public TsFileWriter(File file) throws WriteProcessException, IOException
*/
package cn.edu.tsinghua.tsfile;
import java.io.File;
import java.util.ArrayList;
import org.json.JSONObject;
import cn.edu.tsinghua.tsfile.common.utils.TsRandomAccessFileWriter;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.record.datapoint.FloatDataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.datapoint.IntDataPoint;
public class TsFileWrite1 {
public static void main(String args[]) {
try {
String path = "test.ts";
String s = "{\n" +
" \"schema\": [\n" +
" {\n" +
" \"measurement_id\": \"sensor_1\",\n" +
" \"data_type\": \"FLOAT\",\n" +
" \"encoding\": \"RLE\"\n" +
" },\n" +
" {\n" +
" \"measurement_id\": \"sensor_2\",\n" +
" \"data_type\": \"INT32\",\n" +
" \"encoding\": \"TS_2DIFF\"\n" +
" },\n" +
" {\n" +
" \"measurement_id\": \"sensor_3\",\n" +
" \"data_type\": \"INT32\",\n" +
" \"encoding\": \"TS_2DIFF\"\n" +
" }\n" +
" ],\n" +
" \"row_group_size\": 134217728\n" +
"}";
JSONObject schemaObject = new JSONObject(s);
TsRandomAccessFileWriter output = new TsRandomAccessFileWriter(new File(path));
TsFile tsFile = new TsFile(output, schemaObject);
//format : deltaObject_id, timestamp, <measurement_id, value>...
tsFile.writeLine("device_1,1, sensor_1, 1.2, sensor_2, 20, sensor_3,");
tsFile.writeLine("device_1,2, sensor_1, , sensor_2, 20, sensor_3, 50");
tsFile.writeLine("device_1,3, sensor_1, 1.4, sensor_2, 21, sensor_3,");
tsFile.writeLine("device_1,4, sensor_1, 1.2, sensor_2, 20, sensor_3, 51");
TSRecord tsRecord1 = new TSRecord(6, "device_1");
tsRecord1.dataPointList = new ArrayList<DataPoint>() {
{
add(new FloatDataPoint("sensor_1", 7.2f));
add(new IntDataPoint("sensor_2", 10));
add(new IntDataPoint("sensor_3", 11));
}
};
TSRecord tsRecord2 = new TSRecord(7, "device_1");
tsRecord2.dataPointList = new ArrayList<DataPoint>() {
{
add(new FloatDataPoint("sensor_1", 6.2f));
add(new IntDataPoint("sensor_2", 20));
add(new IntDataPoint("sensor_3", 21));
}
};
TSRecord tsRecord3 = new TSRecord(8, "device_1");
tsRecord3.dataPointList = new ArrayList<DataPoint>() {
{
add(new FloatDataPoint("sensor_1", 9.2f));
add(new IntDataPoint("sensor_2", 30));
add(new IntDataPoint("sensor_3", 31));
}
};
tsFile.writeRecord(tsRecord1);
tsFile.writeRecord(tsRecord2);
tsFile.writeRecord(tsRecord3);
tsFile.close();
} catch (Throwable e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}
}
/**
* There are two ways to construct a TsFile instance,they generate the same TsFile file.
* The class use the second interface:
* public void addMeasurement(MeasurementDescriptor measurementDescriptor) throws WriteProcessException
*/
package cn.edu.tsinghua.tsfile;
import java.io.File;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSDataType;
import cn.edu.tsinghua.tsfile.file.metadata.enums.TSEncoding;
import cn.edu.tsinghua.tsfile.timeseries.write.TsFileWriter;
import cn.edu.tsinghua.tsfile.timeseries.write.desc.MeasurementDescriptor;
import cn.edu.tsinghua.tsfile.timeseries.write.record.DataPoint;
import cn.edu.tsinghua.tsfile.timeseries.write.record.TSRecord;
import cn.edu.tsinghua.tsfile.timeseries.write.record.datapoint.*;
public class TsFileWrite2 {
public static void main(String args[]) {
try {
TsFileWriter tsFileWriter = new TsFileWriter(new File("test.ts"));
// add measurements
tsFileWriter.addMeasurement(new MeasurementDescriptor("sensor_1", TSDataType.FLOAT, TSEncoding.RLE));
tsFileWriter.addMeasurement(new MeasurementDescriptor("sensor_2", TSDataType.INT32, TSEncoding.TS_2DIFF));
tsFileWriter.addMeasurement(new MeasurementDescriptor("sensor_3", TSDataType.INT32, TSEncoding.TS_2DIFF));
// construct TSRecord
TSRecord tsRecord = new TSRecord(1, "device_1");
DataPoint dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
DataPoint dPoint2 = new IntDataPoint("sensor_2", 20);
DataPoint dPoint3;
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(2, "device_1");
dPoint2 = new IntDataPoint("sensor_2", 20);
dPoint3 = new IntDataPoint("sensor_3", 50);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(3, "device_1");
dPoint1 = new FloatDataPoint("sensor_1", 1.4f);
dPoint2 = new IntDataPoint("sensor_2", 21);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(4, "device_1");
dPoint1 = new FloatDataPoint("sensor_1", 1.2f);
dPoint2 = new IntDataPoint("sensor_2", 20);
dPoint3 = new IntDataPoint("sensor_3", 51);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(6, "device_1");
dPoint1 = new FloatDataPoint("sensor_1", 7.2f);
dPoint2 = new IntDataPoint("sensor_2", 10);
dPoint3 = new IntDataPoint("sensor_3", 11);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(7, "device_1");
dPoint1 = new FloatDataPoint("sensor_1", 6.2f);
dPoint2 = new IntDataPoint("sensor_2", 20);
dPoint3 = new IntDataPoint("sensor_3", 21);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
tsFileWriter.write(tsRecord);
tsRecord = new TSRecord(8, "device_1");
dPoint1 = new FloatDataPoint("sensor_1", 9.2f);
dPoint2 = new IntDataPoint("sensor_2", 30);
dPoint3 = new IntDataPoint("sensor_3", 31);
tsRecord.addTuple(dPoint1);
tsRecord.addTuple(dPoint2);
tsRecord.addTuple(dPoint3);
tsFileWriter.write(tsRecord);
// close TsFile
tsFileWriter.close();
} catch (Throwable e) {
e.printStackTrace();
System.out.println(e.getMessage());
}
}
}
The set of time-series data in section "Time-series Data" is used here for a concrete introduction in this section. The set showed in the following table contains one deltaObject named "device_1" with three measurements named "sensor_1", "sensor_2" and "sensor_3". And the measurements has been simplified to do a simple illustration, which contains only 4 time-value pairs each.
device_1 | |||||
---|---|---|---|---|---|
sensor_1 | sensor_2 | sensor_3 | |||
time | value | time | value | time | value |
1 | 1.2 | 1 | 20 | 2 | 50 |
3 | 1.4 | 2 | 20 | 4 | 51 |
5 | 1.1 | 3 | 21 | 6 | 52 |
7 | 1.8 | 4 | 20 | 8 | 53 |
A path reprensents a series instance in TsFile. In the example given above, "device_1.sensor_1" is a path.
In read interfaces, The parameter paths
indicates the measurements that will be selected.
Path instance can be easily constructed through the class Path
. For example:
Path p = new Path("device_1.sensor_1");
If "sensor_1" and "sensor_3" need to be selected in a query, just use following codes.
List<Path> paths = n ew ArrayList<Path>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_3"));
Notice: When constructing a Path, the format of the parameter should be "<deltaObjectId>.<measurementId>"
Filter is used in TsFile reading process.
A filter expression consists of FilterSeries and FilterOperators.
-
FilterSeries
There are two kinds of FilterSeries.
-
FilterSeriesType.TIME_FILTER: used to construct a filter for
time
in time-series data.FilterSeries timeSeries = FilterFactory.timeFilterSeries();
-
FilterSeriesType.VALUE_FILTER: used to construct a filter for
value
in time-series data.FilterSeries valueSeries = FilterFactory.intFilterSeries(device_1, sensor_1, VALUE_FILTER);
The FilterSeries above defines a series 'device_1.sensor_1' whose data type is INT32 and FilterSeriesType is VALUE_FILTER.
-
-
FilterOperator
FilterOperator can be used to construct diverse filters.
Basic filter operation:
- Lt: Less than
- Gt: Greater than
- Eq: Equals
- NotEq: Not equals
- Not: Flip a filter
- And(left, right): Conjunction of two filters
- Or(left, right): Disjunction of two filters
-
TimeFilterExpression Usage
First, define a FilterSeries with TIME_FILTER type.
FilterSeries timeSeries = FilterFactory.timeFilterSeries();
Then, construct FilterExpression. Some typical FilterExpression definitions are shown as below with the
timeSeries
defined above.FilterExpression expression = FilterFactory.eq(timeSeries, 15); // series time = 15
FilterExpression expression = FilterFactory.LtEq(timeSeries, 15, true); // series time <= 15
FilterExpression expression = FilterFactory.LtEq(timeSeries, 15, false); // series time < 15
FilterExpression expression = FilterFactory.GtEq(timeSeries, 15, true); // series time >= 15
FilterExpression expression = FilterFactory.NotEq(timeSeries, 15); // series time != 15
FilterExpression expression = FilterFactory.And( FilterFactory.GtEq(timeSeries, 15, true), FilterFactory.LtEq(timeSeries, 25, false)); // 15 <= series time < 25
FilterExpression expression = FilterFactory.Or( FilterFactory.GtEq(timeSeries, 15, true), FilterFactory.LtEq(timeSeries, 25, false)); // series time >= 15 or series time < 25
-
ValueFilterExpression Usage
First, define a FilterSeries with VALUE_FILTER type.
FilterSeries valueSeries = FilterFactory.intFilterSeries(root.beijing.vehicle, car, VALUE_FILTER);
Then, construct FilterExpression. Some typical FilterExpression definitions are shown as below with the
valueSeries
defined aboveFilterExpression expression = FilterFactory.eq(valueSeries, 15); // series value = 15
FilterExpression expression = FilterFactory.LtEq(valueSeries, 15, true); // series value <= 15
The method query()
can be used to read from a TsFile. In class TsFile
, two override metheds named query are supported. Concrete description is as follow:
-
Method 1
QueryDataSet query( List<Path> paths, FilterExpression timeFilter, FilterExpression valueFilter ) throws IOException
Parameters:
- paths : selected
Series
- timeFilter : filter for timestamps. Input
null
if timeFilter is not required. - valueFitler : filter for specific series. Input
null
if valueFilter is not required.
What does valueFilter mean in a query ?
When executing a query in TsFile, all series involved will be viewed as a "Table". In this special Table, there are (1 + n) columns where "n" is the count of series and "1" indicates the column of timestamp.
Fields in timestamp column is the union of timestamps from each series involved, which is in ascending order. Then each field of the series column is the value in corsponding timestamps or null instead.
For example, the query parameters is :
- paths : ["device_1.sensor_1","device_1.sensor_3"]
- timeFilter : timestamp <= 3
- valueFilter : device_1.sensor_3 <= 51 or device_1.sensor_1 < 1.4
The virtual "Table" is:
timestamp device_1.sensor_1 device_1.sensor_2 device_1.sensor_3 1 1.2 20 null 2 null 20 50 3 1.4 21 null 4 null 20 51 5 1.1 null null 6 null null 52 7 1.8 null null 8 null null 53 timestamp device_1.sensor_1 device_1.sensor_3 1 1.2 20 2 null 20 - paths : selected
-
Method 2
QueryDataSet query( List<Path> paths, FilterExpression timeFilter, FilterExpression valueFilter, Map<String, Long> params ) throws IOException
This method is designed for advanced applications such as the TsFile-Spark Connector. The differences from Method 1 is that this method has an additional parameter named "params".
-
params : This parameter is a Map instance which stores some additional options for a specific query. In current version, a partial query is supported by adding two options to this parameter.
-
QueryConstant.PARTITION_START_OFFSET
: start offset for a TsFile -
QueryConstant.PARTITION_END_OFFSET
: end offset for a TsFile
What is Partial Query ?
In some distributed file systems(e.g. HDFS), a file is split into severval parts which are called "Blocks" and stored in different nodes. Executing a query paralleled in each nodes involved makes better efficiency. Thus Partial Query is needed. Paritial Query only selects the results stored in the part split by
QueryConstant.PARTITION_START_OFFSET
andQueryConstant.PARTITION_END_OFFSET
for a TsFile. -
-
You should first install tsfile to your local maven repository.
reference: Installation_0.3.0
/**
* The class is to show how to read TsFile file named "test.ts".
* The TsFile file "test.ts" is generated from class TsFileWrite1 or class TsFileWrite2,
* they generate the same TsFile file by two different ways
*/
package cn.edu.tsinghua.tsfile;
import cn.edu.tsinghua.tsfile.timeseries.basis.TsFile;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterExpression;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.FilterFactory;
import cn.edu.tsinghua.tsfile.timeseries.filter.definition.filterseries.FilterSeriesType;
import cn.edu.tsinghua.tsfile.timeseries.read.TsRandomAccessLocalFileReader;
import cn.edu.tsinghua.tsfile.timeseries.read.query.QueryDataSet;
import cn.edu.tsinghua.tsfile.timeseries.read.support.Path;
import java.io.IOException;
import java.util.ArrayList;
public class TsFileRead {
public static void main(String[] args) throws IOException {
String path = "test.ts";
// read example : no filter
TsRandomAccessLocalFileReader input = new TsRandomAccessLocalFileReader(path);
TsFile readTsFile = new TsFile(input);
ArrayList<Path> paths = new ArrayList<>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_2"));
paths.add(new Path("device_1.sensor_3"));
QueryDataSet queryDataSet = readTsFile.query(paths, null, null);
while (queryDataSet.hasNextRecord()) {
System.out.println(queryDataSet.getNextRecord());
}
System.out.println("------------");
// time filter : 4 <= time < 10
FilterExpression timeFilter = FilterFactory.and(FilterFactory.gtEq(FilterFactory.timeFilterSeries(), 4L, true),
FilterFactory.ltEq(FilterFactory.timeFilterSeries(), 10L, false));
input = new TsRandomAccessLocalFileReader(path);
readTsFile = new TsFile(input);
paths = new ArrayList<>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_2"));
paths.add(new Path("device_1.sensor_3"));
queryDataSet = readTsFile.query(paths, timeFilter, null);
while (queryDataSet.hasNextRecord()) {
System.out.println(queryDataSet.getNextRecord());
}
System.out.println("------------");
// value filter : device_1.sensor_2 < 20
FilterExpression valueFilter = FilterFactory
.ltEq(FilterFactory.intFilterSeries("device_1", "sensor_2", FilterSeriesType.VALUE_FILTER), 20, false);
input = new TsRandomAccessLocalFileReader(path);
readTsFile = new TsFile(input);
paths = new ArrayList<>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_2"));
paths.add(new Path("device_1.sensor_3"));
queryDataSet = readTsFile.query(paths, null, valueFilter);
while (queryDataSet.hasNextRecord()) {
System.out.println(queryDataSet.getNextRecord());
}
System.out.println("------------");
// time filter : 4 <= time < 10, value filter : device_1.sensor_3 > 20
timeFilter = FilterFactory.and(FilterFactory.gtEq(FilterFactory.timeFilterSeries(), 4L, true),
FilterFactory.ltEq(FilterFactory.timeFilterSeries(), 10L, false));
valueFilter = FilterFactory
.gtEq(FilterFactory.intFilterSeries("device_1", "sensor_3", FilterSeriesType.VALUE_FILTER), 20, false);
input = new TsRandomAccessLocalFileReader(path);
readTsFile = new TsFile(input);
paths = new ArrayList<>();
paths.add(new Path("device_1.sensor_1"));
paths.add(new Path("device_1.sensor_2"));
paths.add(new Path("device_1.sensor_3"));
queryDataSet = readTsFile.query(paths, timeFilter, valueFilter);
while (queryDataSet.hasNextRecord()) {
System.out.println(queryDataSet.getNextRecord());
}
}
}
Default config file tsfile-format.properties is located at tsfile/conf/ directory. If you want to use your own path, you can:
System.setProperty(SystemConstant.TSFILE_CONF, "your config file path");
and then call:
TSFileConfig config = TSFileDescriptor.getInstance().getConfig();