273b28404c
## What changes were proposed in this pull request? When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset. With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further. Spark 2.4 ``` spark git:(master) ./build/mvn -DskipTests clean package spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar ``` Spark 2.3 + databricks avro ``` spark git:(branch-2.3) ./build/mvn -DskipTests clean package spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ``` Current master: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.95621| | stddev|0.030895815479469294| | min| 2.915| | max| 3.049| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.31072999999999995| | stddev|0.054139709842390006| | min| 0.259| | max| 0.692| +-------+--------------------+ ``` Current master with this PR: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 2.5804300000000002| | stddev|0.011175600225672079| | min| 2.558| | max| 2.62| +-------+--------------------+ +-------+--------------------+ |summary| readTimes| +-------+--------------------+ | count| 100| | mean| 0.29922000000000004| | stddev|0.058261961532514166| | min| 0.251| | max| 0.732| +-------+--------------------+ ``` Spark 2.3 + databricks avro: ``` +-------+--------------------+ |summary| writeTimes| +-------+--------------------+ | count| 100| | mean| 1.7730500000000005| | stddev|0.025199156230863575| | min| 1.729| | max| 1.833| +-------+--------------------+ +-------+-------------------+ |summary| readTimes| +-------+-------------------+ | count| 100| | mean| 0.29715| | stddev|0.05685643358850465| | min| 0.258| | max| 0.718| +-------+-------------------+ ``` The following is the test code to reproduce the result. ```scala spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed") val sparkSession = spark import sparkSession.implicits._ val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid => val features = Array.fill(16000)(scala.math.random) (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features) }.toDF("uid", "random", "uuid1", "uuid2", "features").cache() val size = df.count() // Write into ramdisk to rule out the disk IO impact val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/" val n = 150 val writeTimes = new Array[Double](n) var i = 0 while (i < n) { val t1 = System.currentTimeMillis() df.write .format("com.databricks.spark.avro") .mode("overwrite") .save(tempSaveDir) val t2 = System.currentTimeMillis() writeTimes(i) = (t2 - t1) / 1000.0 i += 1 } df.unpersist() // The first 50 runs are for warm-up val readTimes = new Array[Double](n) i = 0 while (i < n) { val t1 = System.currentTimeMillis() val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir) assert(readDF.count() == size) val t2 = System.currentTimeMillis() readTimes(i) = (t2 - t1) / 1000.0 i += 1 } spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show() spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show() ``` ## How was this patch tested? Existing tests. Author: DB Tsai <d_tsai@apple.com> Author: Brian Lindblom <blindblom@apple.com> Closes #21952 from dbtsai/avro-performance-fix. |
||
---|---|---|
.. | ||
src | ||
pom.xml |