Skewed/Slow Write
Writes can be slow depending on the preceding stage of write()
, target table partition scheme, and write parallelism(spark.sql.shuffle.partitions
).
The goal of this article is to go through below options and see the most optimal transformation for writing optimal files in target table/partition.
When to use Sort
A global sort in Spark internally uses range-partitioning to assign sort keys to a partition range. This involves in collecting sample rows(reservoir sampling) from input partitions and sending them to the driver for computing range boundaries.
Use global sort
- If you are writing multiple partitions(especially heterogeneous partitions) as part of your write() as it can estimate the no. of files/tasks for a given target table partition based on the no. of sample rows it observes.
- If you want to enable
predicate-push-down
on a set of target table fields for down stream consumption.
Tips:
1. You can increase the spark property spark.sql.execution.rangeExchange.sampleSizePerPartition
to improve the estimates if you are not seeing optimal no. of files per partition.
2. You can also introduce salt
to sort keys to increase the no. of write tasks if the sort keys cardinality less than the spark.sql.shuffle.partitions
. Example
When to use Repartition
Repartition(hash partitioning) partitions rows in a round-robin manner and to produce uniform distribution across the tasks and a hash partitioning just before the write
would produce uniform files and all write tasks should take about the same time.
Use repartition
- If you are writing into a single partition or a non-partitioned table and want to get uniform file sizes.
- If you want to produce a specific no.o files. for ex: using
repartiton(100)
would generate up to 100 files.
When to use Coalesce
Coalesce tries to combine files without invoking a shuffle and useful when you are going from a higher parallelism to lower parallelism. Use Coalesce:
- If you are writing very small no. of files and the file size is relatively small.
Note that, Coalesce(N) is not an optimal way to merge files as it tries to combine multiple files(until it reaches target no. of files 'N' ) without taking size into equation, and you could run into (org.apache.spark.memory.SparkOutOfMemoryError: Unable to acquire 65536 bytes of memory, got 0)
if the size exceeds.