Spark performance regression with sum aggregations

Posted in: Big Data, Hadoop, Technical Track

There is an interesting bug that was found during the latest performance tuning we performed for Spark 2.2 (2.3 is also affected). It was a batch Spark job scheduled to be executed hourly and to process about 1Tb worth of data stored in parquet format. There were a number of things we tuned and this resulted in around 10% performance boost.

We were looking for known issues and found explode related issue SPARK-21657. Although explode was used in our job it didn’t cause any issues due to the low number of exploding rows. But this forced us to take a closer look into wholeStage codegen. Simple test with disabled spark.sql.codegen.wholeStage resulted in 30-40% time savings.

We spent another day working on each piece of code and performed a test case with nothing but sum aggregations.

    val cnt = 50
    val rows = 5000000

    val dummy = udf(() => 1)

    def addConstColumns(inputDF: DataFrame) =
      (0 until cnt).foldLeft(inputDF)((df, idx) => df.withColumn(s"col_$idx", dummy()))

    spark.range(rows).toDF()
      .withColumn("grp", lit(1))
      .transform(addConstColumns)
      .groupBy("grp")
      .agg(sum("col_0"), (1 until cnt).map(idx => sum(s"col_$idx")): _*)
      .collect()

This code creates a dataframe with 50 columns that are sum aggregated. I did this test locally on a laptop (HotSpot JVM) with WHOLESTAGE_CODEGEN_ENABLED=true/false and noticed that timing was about 15 sec vs 3 sec.
The result was it created a Spark Jira issue SPARK-23791.
Apparently I found one very similar SPARK-20184 and SPARK-20479 issues targeted Spark 2.4.

After a series of tests we also found that with a low number of cnt <= 13 and cnt>=100 there were no noticeable differences; with 14 <= cnt <= 83 it’s about a 4-7 time difference and finally with 84 <= cnt <= 99 code simply failed with nasty a error:

java.lang.ClassFormatError: Too many arguments in method signature in class file org/apache/spark/sql/catalyst/expressions/GeneratedClass$GeneratedIteratorForCodegenStage2
	at java.lang.ClassLoader.defineClass1(Native Method)

If you have a performance critical Spark job that’s performing a lot of aggregations with Dataframe API be careful, there is no time savings that can be achieved with disabling wholeStage code generation.

email

Interested in working with Valentin? Schedule a tech call.

About the Author

Valentin is a specialist in Big Data and Cloud solutions. He has extensive expertise in Cloudera Hadoop Distribution, Google Cloud Platform and skilled in building scalable performance critical distributed systems and data visualization systems.

No comments

Leave a Reply

Your email address will not be published. Required fields are marked *