[SPARK-24896][SQL] Uuid should produce different values for each execution in streaming query

## What changes were proposed in this pull request?

`Uuid`'s results depend on random seed given during analysis. Thus under streaming query, we will have the same uuids in each execution. This seems to be incorrect for streaming query execution.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21854 from viirya/uuid_in_streaming.
This commit is contained in:
Liang-Chi Hsieh 2018-08-02 15:35:46 -07:00 committed by Shixiong Zhu
parent efef55388f
commit d0bc3ed679
No known key found for this signature in database
GPG key ID: ECB1BBEA55295E39
2 changed files with 28 additions and 2 deletions

View file

@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger
import scala.util.Random
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
@ -73,10 +75,14 @@ class IncrementalExecution(
* with the desired literal
*/
override lazy val optimizedPlan: LogicalPlan = {
val random = new Random()
sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions {
case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
logInfo(s"Current batch timestamp = $timestamp")
ts.toLiteral
// SPARK-24896: Set the seed for random number generation in Uuid expressions.
case _: Uuid => Uuid(Some(random.nextLong()))
}
}

View file

@ -21,6 +21,8 @@ import java.{util => ju}
import java.util.Optional
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
@ -29,8 +31,9 @@ import org.scalatest.mockito.MockitoSugar
import org.apache.spark.SparkException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset}
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Uuid
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.sources.TestForeachWriter
import org.apache.spark.sql.functions._
@ -834,6 +837,23 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
CheckLastBatch(("A", 1)))
}
test("Uuid in streaming query should not produce same uuids in each execution") {
val uuids = mutable.ArrayBuffer[String]()
def collectUuid: Seq[Row] => Unit = { rows: Seq[Row] =>
rows.foreach(r => uuids += r.getString(0))
}
val stream = MemoryStream[Int]
val df = stream.toDF().select(new Column(Uuid()))
testStream(df)(
AddData(stream, 1),
CheckAnswer(collectUuid),
AddData(stream, 2),
CheckAnswer(collectUuid)
)
assert(uuids.distinct.size == 2)
}
test("StreamingRelationV2/StreamingExecutionRelation/ContinuousExecutionRelation.toJSON " +
"should not fail") {
val df = spark.readStream.format("rate").load()