diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 6ae7f2869b..e9ffe129ca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming import java.util.UUID import java.util.concurrent.atomic.AtomicInteger +import scala.util.Random + import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy} -import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp +import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition} import org.apache.spark.sql.catalyst.rules.Rule @@ -73,10 +75,14 @@ class IncrementalExecution( * with the desired literal */ override lazy val optimizedPlan: LogicalPlan = { + val random = new Random() + sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions { case ts @ CurrentBatchTimestamp(timestamp, _, _) => logInfo(s"Current batch timestamp = $timestamp") ts.toLiteral + // SPARK-24896: Set the seed for random number generation in Uuid expressions. + case _: Uuid => Uuid(Some(random.nextLong())) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 78199b0a1c..f37f3682b0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -21,6 +21,8 @@ import java.{util => ju} import java.util.Optional import java.util.concurrent.CountDownLatch +import scala.collection.mutable + import org.apache.commons.lang3.RandomStringUtils import org.scalactic.TolerantNumerics import org.scalatest.BeforeAndAfter @@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar import org.apache.spark.SparkException import org.apache.spark.internal.Logging -import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.{Column, DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Uuid import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter import org.apache.spark.sql.functions._ @@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi CheckLastBatch(("A", 1))) } + test("Uuid in streaming query should not produce same uuids in each execution") { + val uuids = mutable.ArrayBuffer[String]() + def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] => + rows.foreach(r => uuids += r.getString(0)) + } + + val stream = MemoryStream[Int] + val df = stream.toDF().select(new Column(Uuid())) + testStream(df)( + AddData(stream, 1), + CheckAnswer(collectUuid), + AddData(stream, 2), + CheckAnswer(collectUuid) + ) + assert(uuids.distinct.size == 2) + } + test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " + "should not fail") { val df = spark.readStream.format("rate").load()