Result size larger than spark.driver.maxResultsSize error

ex: spark-driver-maxResultsSize-Error

You typically run into this error for one of the following reasons.

  1. You are sending a large result set to the driver using SELECT(in SQL) or COLLECT(in dataframes/dataset/RDD): Apply a limit if your intention is to spot check a few rows as you won't be able to go through full set of rows if you have a really high no.of rows. Writing the results to a temporary table in your schema and querying the new table would be an alternative if you need to query the results multiple times with a specific set of filters. (Collect best practices )
  2. You are broadcasting a table that is too big. Spark downloads all the rows for a table that needs to be broadcasted to the driver before it starts shipping to the executors. So iff you are broadcasting a table that is larger than spark.driver.maxResultsSize, you will run into this error. You can overcome this by either increasing the spark.driver.maxResultsSize or not broadcasting the table so Spark would use a shuffle hash or sort-merge join. Note that Spark broadcasts a table referenced in a join if the size of the table is less than spark.sql.autoBroadcastJoinThreshold(100 MB by default at Netflix). You can change this config to include a larger tables in broadcast or reduce the threshold if you want to exclude certain tables. You can also set this to -1 if you want to disable broadcast joins.
  3. You have a sort in your SQL/Dataframe: Spark internally uses range-partitioning to assign sort keys to a partition range. This involves in collecting sample rows(reservoir sampling) from input partitions and sending them to the driver for computing range boundaries. This error can further fall into one of the below scenarios. a. You have wide/bloated rows in your table: In this case, you are not sending a lot of rows to the driver, but you are sending bytes larger than the spark.driver.maxResultsSize. The recommendation here is to lower the default sample size by setting the spark property spark.sql.execution.rangeExchange.sampleSizePerPartition to something lower than 20. You can also increase spark.driver.maxResultsSize if lowering the sample size is causing an imbalance in partition ranges(for ex: skew in a subsequent stage or non-uniform output files etc.) b. You have too many Spark partitions from the previous stage: In this case, you have a large no.of map tasks while reading from a table. Since spark has to collect sample rows from every partition, your total bytes from the no.of rows(partitions*sampleSize) could be larger than spark.driver.maxResultsSize. A recommended way to resolve this issue is by combining the splits for the table(increase spark.netflix.(db).(table).target-size) with high map tasks. Note that having a large no.of map tasks(>80k) will cause other OOM issues on driver as it needs to keep track of metadata for all these tasks/partitions.

Broadcast join related articles

SQL Broadcast Join Hints

Tables getting broadcasted even when broadcast is disabled