[SPARK-34790][CORE] Disable fetching shuffle blocks in batch when io encryption is enabled

### What changes were proposed in this pull request?

This patch proposes to disable fetching shuffle blocks in batch when io encryption is enabled. Adaptive Query Execution fetch contiguous shuffle blocks for the same map task in batch to reduce IO and improve performance. However, we found that batch fetching is incompatible with io encryption.

### Why are the changes needed?
Before this patch, we set `spark.io.encryption.enabled` to true, then run some queries which coalesced partitions by AEQ, may got following error message:
```14:05:52.638 WARN org.apache.spark.scheduler.TaskSetManager: Lost task 1.0 in stage 2.0 (TID 3) (11.240.37.88 executor driver): FetchFailed(BlockManagerId(driver, 11.240.37.88, 63574, None), shuffleId=0, mapIndex=0, mapId=0, reduceId=2, message=
org.apache.spark.shuffle.FetchFailedException: Stream is corrupted
	at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:772)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:845)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.readSize(UnsafeRowSerializer.scala:113)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:129)
	at org.apache.spark.sql.execution.UnsafeRowSerializerInstance$$anon$2$$anon$3.next(UnsafeRowSerializer.scala:110)
	at scala.collection.Iterator$$anon$11.next(Iterator.scala:494)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.util.CompletionIterator.next(CompletionIterator.scala:29)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1437)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Stream is corrupted
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:200)
	at net.jpountz.lz4.LZ4BlockInputStream.refill(LZ4BlockInputStream.java:226)
	at net.jpountz.lz4.LZ4BlockInputStream.read(LZ4BlockInputStream.java:157)
	at org.apache.spark.storage.BufferReleasingInputStream.read(ShuffleBlockFetcherIterator.scala:841)
	... 25 more

)
```

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

New tests.

Closes #31898 from hezuojiao/fetch_shuffle_in_batch.

Authored-by: hezuojiao <hezuojiao@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
hezuojiao 2021-03-22 13:06:12 -07:00 committed by Dongjoon Hyun
parent 31da90762e
commit 39542bb81f
3 changed files with 30 additions and 5 deletions

View file

@ -51,15 +51,17 @@ private[spark] class BlockStoreShuffleReader[K, C](
true true
} }
val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL) val useOldFetchProtocol = conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)
// SPARK-34790: Fetching continuous blocks in batch is incompatible with io encryption.
val ioEncryption = conf.get(config.IO_ENCRYPTION_ENABLED)
val doBatchFetch = shouldBatchFetch && serializerRelocatable && val doBatchFetch = shouldBatchFetch && serializerRelocatable &&
(!compressed || codecConcatenation) && !useOldFetchProtocol (!compressed || codecConcatenation) && !useOldFetchProtocol && !ioEncryption
if (shouldBatchFetch && !doBatchFetch) { if (shouldBatchFetch && !doBatchFetch) {
logDebug("The feature tag of continuous shuffle block fetching is set to true, but " + logDebug("The feature tag of continuous shuffle block fetching is set to true, but " +
"we can not enable the feature because other conditions are not satisfied. " + "we can not enable the feature because other conditions are not satisfied. " +
s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " + s"Shuffle compress: $compressed, serializer relocatable: $serializerRelocatable, " +
s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " + s"codec concatenation: $codecConcatenation, use old shuffle fetch protocol: " +
s"$useOldFetchProtocol.") s"$useOldFetchProtocol, io encryption: $ioEncryption.")
} }
doBatchFetch doBatchFetch
} }

View file

@ -500,8 +500,8 @@ object SQLConf {
"reduce IO and improve performance. Note, multiple contiguous blocks exist in single " + "reduce IO and improve performance. Note, multiple contiguous blocks exist in single " +
s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " + s"fetch request only happen when '${ADAPTIVE_EXECUTION_ENABLED.key}' and " +
s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " + s"'${COALESCE_PARTITIONS_ENABLED.key}' are both true. This feature also depends " +
"on a relocatable serializer, the concatenation support codec in use and the new version " + "on a relocatable serializer, the concatenation support codec in use, the new version " +
"shuffle fetch protocol.") "shuffle fetch protocol and io encryption is disabled.")
.version("3.0.0") .version("3.0.0")
.booleanConf .booleanConf
.createWithDefault(true) .createWithDefault(true)

View file

@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.scalatest.BeforeAndAfterAll import org.scalatest.BeforeAndAfterAll
import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql._ import org.apache.spark.sql._
import org.apache.spark.sql.execution.adaptive._ import org.apache.spark.sql.execution.adaptive._
@ -57,15 +58,18 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
def withSparkSession( def withSparkSession(
f: SparkSession => Unit, f: SparkSession => Unit,
targetPostShuffleInputSize: Int, targetPostShuffleInputSize: Int,
minNumPostShufflePartitions: Option[Int]): Unit = { minNumPostShufflePartitions: Option[Int],
enableIOEncryption: Boolean = false): Unit = {
val sparkConf = val sparkConf =
new SparkConf(false) new SparkConf(false)
.setMaster("local[*]") .setMaster("local[*]")
.setAppName("test") .setAppName("test")
.set(UI_ENABLED, false) .set(UI_ENABLED, false)
.set(IO_ENCRYPTION_ENABLED, enableIOEncryption)
.set(SQLConf.SHUFFLE_PARTITIONS.key, "5") .set(SQLConf.SHUFFLE_PARTITIONS.key, "5")
.set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5") .set(SQLConf.COALESCE_PARTITIONS_INITIAL_PARTITION_NUM.key, "5")
.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true") .set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "true")
.set(SQLConf.FETCH_SHUFFLE_BLOCKS_IN_BATCH.key, "true")
.set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1") .set(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key, "-1")
.set( .set(
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key, SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key,
@ -408,6 +412,25 @@ class CoalesceShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterAl
} }
withSparkSession(test, 100, None) withSparkSession(test, 100, None)
} }
test("SPARK-34790: enable IO encryption in AQE partition coalescing") {
val test: SparkSession => Unit = { spark: SparkSession =>
val ds = spark.range(0, 100, 1, numInputPartitions)
val resultDf = ds.repartition(ds.col("id"))
resultDf.collect()
val finalPlan = resultDf.queryExecution.executedPlan
.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
assert(
finalPlan.collect {
case r @ CoalescedShuffleReader() => r
}.isDefinedAt(0))
}
Seq(true, false).foreach { enableIOEncryption =>
// Before SPARK-34790, it will throw an exception when io encryption enabled.
withSparkSession(test, Int.MaxValue, None, enableIOEncryption)
}
}
} }
object CoalescedShuffleReader { object CoalescedShuffleReader {