Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add perf test for time zone operators #10260

Merged
merged 1 commit into from
Jan 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.spark.sql.types._
* - c_long_of_ts: long value which is microseconds
* - c_date: date column
* - c_int_of_date:int value which is days from 1970-01-01
* - c_long_of_ts_seconds: long values of seconds from epoch
* - c_str_for_cast: strings for cast to timestamp, formats are yyyy, yyyy-mm, ...
* - c_str_of_ts: strings with format: yyyy-MM-dd HH:mm:ss
* Each column is high duplicated.
Expand Down Expand Up @@ -100,11 +101,14 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
def createDF(spark: SparkSession): DataFrame = {
val id = col("id")
val tsArray = Array[Long](year1980, year2000, year2030)
val secondsArray = tsArray.map(e => e / 1000000L)
val dateArray = Array[Int](0, 100, 200)
val columns = Array[Column](
TimeZonePerfUtils.createColumn(id, TimestampType, TsGenFunc(tsArray)).alias("c_ts"),
TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(tsArray)).alias("c_long_of_ts"),
TimeZonePerfUtils.createColumn(id, DateType, DateGenFunc(dateArray)).alias("c_date"),
TimeZonePerfUtils.createColumn(id, LongType, TsGenFunc(secondsArray))
.alias("c_long_of_ts_seconds"),
TimeZonePerfUtils.createColumn(id, IntegerType, DateGenFunc(dateArray))
.alias("c_int_of_date"),
TimeZonePerfUtils.createColumn(id, StringType, StringGenFunc(stringsForCast))
Expand Down Expand Up @@ -137,7 +141,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
val startOnCpu = System.nanoTime()
withCpuSparkSession(
spark => func(spark, zoneStr).collect(),
conf)
// set session time zone
conf.set("spark.sql.session.timeZone", zoneStr))
val endOnCpu = System.nanoTime()
val elapseOnCpuMS = (endOnCpu - startOnCpu) / 1000000L
if (i != 1) {
Expand All @@ -148,7 +153,8 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl
val startOnGpu = System.nanoTime()
withGpuSparkSession(
spark => func(spark, zoneStr).collect(),
conf)
// set session time zone
conf.set("spark.sql.session.timeZone", zoneStr))
val endOnGpu = System.nanoTime()
val elapseOnGpuMS = (endOnGpu - startOnGpu) / 1000000L
if (i != 1) {
Expand Down Expand Up @@ -197,4 +203,106 @@ class TimeZonePerfSuite extends SparkQueryCompareTestSuite with BeforeAndAfterAl

runAndRecordTime("to_utc_timestamp", perfTest)
}

test("test hour") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.hour(functions.col("c_ts"))
))
}

runAndRecordTime("hour",
perfTest)
}

test("test minute") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.minute(functions.col("c_ts"))
))
}

runAndRecordTime("minute",
perfTest)
}

test("test second") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.second(functions.col("c_ts"))
))
}

runAndRecordTime("second",
perfTest)
}

test("test unix_timestamp") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.unix_timestamp(functions.col("c_str_of_ts"))
))
}

runAndRecordTime("unix_timestamp",
perfTest)
}

test("test from_unixtime") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.from_unixtime(functions.col("c_long_of_ts_seconds"))
))
}

runAndRecordTime("from_unixtime",
perfTest)
}

test("test date_format") {
assume(enablePerfTest)

// cache time zone DB in advance
GpuTimeZoneDB.cacheDatabase()
Thread.sleep(5L)

def perfTest(spark: SparkSession, zone: String): DataFrame = {
spark.read.parquet(path).select(functions.count(
functions.date_format(functions.col("c_ts"), "yyyy-MM-dd HH:mm:ss")
))
}

runAndRecordTime("date_format",
perfTest)
}
}
Loading