Skip to content

TsFile Spark Connector v0.1.2

jixuan1989 edited this page Nov 30, 2017 · 1 revision

TsFile-Spark Connector

This library lets you expose a TsFile as Spark RDDs and execute arbitrary queries in your SparkSQL.

Requirements

The versions required for Spark and Java are as follow:

Spark Version Scala Version Java Version
2.0+ 2.11 1.8

Building From Source

To build the TsFile-Spark connector, run the following commands at the root folder of TsFile:

mvn install -Dmaven.test.skip=true

cd integration-parent/tsfile-spark
mvn package -Dmaven.test.skip=true

The tsfile-spark-0.1.0-jar-with-dependencies.jar can be get in folder target.

Data Type Reflection from TsFile to SparkSQL

This library uses the following mapping the data type from TsFile to SparkSQL:

TsFile SparkSQL
BOOLEAN BooleanType
INT32 IntegerType
INT64 LongType
FLOAT FloatType
DOUBLE DoubleType
ENUMS StringType
BYTE_ARRAY BinaryType

TsFile Schema -> SparkSQL Table Structure

The set of time-series data in section "Time-series Data" is used here to illustrate the mapping from TsFile Schema to SparkSQL Table Stucture.

device_1
sensor_1 sensor_2 sensor_3
time value time value time value
1 1.2 1 20 2 50
3 1.4 2 20 4 51
5 1.1 3 21 6 52
7 1.8 4 20 8 53
A set of time-series data

There are two reserved columns in Spark SQL Table:

  • time : Timestamp, LongType
  • delta_object : Delta_object ID, StringType

The SparkSQL Table Structure is as follow:

time(LongType) delta_object(StringType) sensor_1(FloatType) sensor_2(IntType) sensor_3(IntType)
1 device_1 1.2 20 null
2 device_1 null 20 50
3 device_1 1.4 21 null
4 device_1 null 20 51
5 device_1 1.1 null null
6 device_1 null null 52
7 device_1 1.8 null null
8 device_1 null null 53

Examples

Scala API
  • Example 1

     // import this library and Spark
     import com.corp.delta.tsfile.spark._
     import org.apache.spark.sql.SparkSession
    
     val spark = SparkSession.builder().master("local").getOrCreate()
    
     //read data in TsFile and create a table
     val df = spark.read.tsfile("test.ts")
     df.createOrReplaceTempView("TsFile_table")
    
     //query with filter
     val newDf = spark.sql("select * from TsFile_table where sensor_1 > 1.2").cache()
    
     newDf.show()
    
  • Example 2

     val spark = SparkSession.builder().master("local").getOrCreate()
     val df = spark.read
           .format("com.corp.delta.tsfile")
           .load("test.ts")
    
    
     df.filter("sensor_1 > 1.2").show()
    
  • Example 3

     val spark = SparkSession.builder().master("local").getOrCreate()
    
     //create a table in SparkSQL and build relation with a TsFile
     spark.sql("create temporary view TsFile using com.corp.delta.tsfile options(path = \"test.ts\")")
    
     spark.sql("select * from TsFile where sensor_1 > 1.2").show()
    
spark-shell

This library can be used in spark-shell.

$ bin/spark-shell --jars tsfile-spark-0.1.0-jar-with-dependencies.jar

scala> sql("CREATE TEMPORARY TABLE TsFile_table USING com.corp.delta.tsfile OPTIONS (path \"hdfs://localhost:9000/test.ts\")")

scala> sql("select * from TsFile_table where sensor_1 > 1.2").show()