Result size larger than spark.driver.maxResultsSize error
ex:
You typically run into this error for one of the following reasons.
- You are sending a large result set to the driver using
SELECT
(in SQL) orCOLLECT
(in dataframes/dataset/RDD): Apply alimit
if your intention is to spot check a few rows as you won't be able to go through full set of rows if you have a really high no.of rows. Writing the results to a temporary table in your schema and querying the new table would be an alternative if you need to query the results multiple times with a specific set of filters. (Collect best practices ) - You are broadcasting a table that is too big. Spark downloads all the rows for a table that needs to be broadcasted to the driver before it starts shipping to the executors. So iff you are broadcasting a table that is larger than
spark.driver.maxResultsSize
, you will run into this error. You can overcome this by either increasing thespark.driver.maxResultsSize
or not broadcasting the table so Spark would use a shuffle hash or sort-merge join. Note that Spark broadcasts a table referenced in a join if the size of the table is less thanspark.sql.autoBroadcastJoinThreshold
(100 MB by default at Netflix). You can change this config to include a larger tables in broadcast or reduce the threshold if you want to exclude certain tables. You can also set this to -1 if you want to disable broadcast joins. - You have a
sort
in your SQL/Dataframe: Spark internally uses range-partitioning to assign sort keys to a partition range. This involves in collecting sample rows(reservoir sampling) from input partitions and sending them to the driver for computing range boundaries. This error can further fall into one of the below scenarios. a. You have wide/bloated rows in your table: In this case, you are not sending a lot of rows to the driver, but you are sending bytes larger than thespark.driver.maxResultsSize
. The recommendation here is to lower the default sample size by setting the spark propertyspark.sql.execution.rangeExchange.sampleSizePerPartition
to something lower than 20. You can also increasespark.driver.maxResultsSize
if lowering the sample size is causing an imbalance in partition ranges(for ex: skew in a subsequent stage or non-uniform output files etc.) b. You have too many Spark partitions from the previous stage: In this case, you have a large no.of map tasks while reading from a table. Since spark has to collect sample rows from every partition, your total bytes from the no.of rows(partitions*sampleSize) could be larger thanspark.driver.maxResultsSize
. A recommended way to resolve this issue is by combining the splits for the table(increasespark.netflix.(db).(table).target-size
) with high map tasks. Note that having a large no.of map tasks(>80k) will cause other OOM issues on driver as it needs to keep track of metadata for all these tasks/partitions.