RepartiPy helps you to elaborately handle PySpark DataFrame partition size.
- Repartition your DataFrame precisely, without knowing the whole DataFrame size (i.e.
Dynamic Repartition
) - Estimate your DataFrame size with more accuracy
Although Spark SizeEstimator can be used to estimate a DataFrame size, it is not accurate sometimes. RepartiPy uses Spark's execution plan statistics in order to provide a roundabout way. It suggests two approaches to achieve this:
reaprtipy.SizeEstimator
reaprtipy.SamplingSizeEstimator
Recommended when your executor resource (memory) is affordable to cache the whole DataFrame.
SizeEstimator
just simply caches the whole Dataframe into the memory and extract the execution plan statistics.
Recommended when your executor resource (memory) is NOT affordable to cache the whole dataframe.
SamplingSizeEstimator
uses 'disk write and re-read (HDFS)' approach behind the scene for two reasons:
- Prevent double read from the source like S3, which might be inefficient -> better performance
- Reduce partition skewness by reading data again on purpose (leverage MaxPartitionBytes) -> better sampling result
Therefore, you must have HDFS settings on your cluster and enough disk space.
This may not be accurate compared to SizeEstimator
due to sampling.
If you want more accurate results, tune the sample_count
option properly.
Additionally, this approach will be slower than SizeEstimator
as SamplingSizeEstimator
requires disk I/O and additional logics.
pip install repartipy
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
input_data = [
(1, "Seoul"),
(2, "Busan"),
]
df = spark.createDataFrame(data=input_data, schema=["id", "location"])
Calculate ideal number of partitions for a DataFrame
SizeEstimator will suggest desired_partition_count
, so that each partition can have desired_partition_size_in_bytes
(default: 1GiB) after repartition.
reproduce()
produces exactly the same df
, but internally reproduced by SizeEstimator for better performance.
SizeEstimator
reproduces df
from Memory (Cache).
SamplingSizeEstimator
reproduces df
from Disk (HDFS).
import repartipy
one_gib_in_bytes = 1073741824
with repartipy.SizeEstimator(spark=spark, df=df) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
# or
se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
import repartipy
one_gib_in_bytes = 1073741824
with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
# or
se.reproduce().coalesce(desired_partition_count).write.save("your/write/path")
Estimate size of a DataFrame
import repartipy
with repartipy.SizeEstimator(spark=spark, df=df) as se:
df_size_in_bytes = se.estimate()
import repartipy
with repartipy.SamplingSizeEstimator(spark=spark, df=df, sample_count=10) as se:
df_size_in_bytes = se.estimate()
Overall, there appears to be a slight performance loss when employing RepartiPy. This benchmark compares the running time of spark jobs in the following two cases to give a rough estimate:
- Static Repartition (repartition without RepartiPy)
# e.g.
df.repartition(123).write.save("your/write/path")
- Dynamic Repartition (repartition with RepartiPy)
# e.g.
with repartipy.SizeEstimator(spark=spark, df=df) as se:
desired_partition_count = se.get_desired_partition_count(desired_partition_size_in_bytes=one_gib_in_bytes)
se.reproduce().repartition(desired_partition_count).write.save("your/write/path")
All the other conditions remain the same except the usage of RepartiPy.
Note
Benchmark results provided are for brief reference only, not absolute. Actual performance metrics can vary depending on your own circumstances (e.g. your data, your spark code, your cluster resources, ...).
- DataFrame Size ~= 256 MiB (decompressed size)
Static | Dynamic | |
---|---|---|
Running Time | 8.5 min | 8.6 min |
- DataFrame Size ~= 241 GiB (decompressed size)
Static | Dynamic | |
---|---|---|
Running Time | 14 min | 16 min |