[SPARK-18368] Fix regexp_replace with task serialization.

## What changes were proposed in this pull request?

This makes the result value both transient and lazy, so that if the RegExpReplace object is initialized then serialized, `result: StringBuffer` will be correctly initialized.

## How was this patch tested?

* Verified that this patch fixed the query that found the bug.
* Added a test case that fails without the fix.

Author: Ryan Blue <blue@apache.org>

Closes #15816 from rdblue/SPARK-18368-fix-regexp-replace.
This commit is contained in:
Ryan Blue 2016-11-08 23:47:48 -08:00 committed by Reynold Xin
parent 4afa39e223
commit b9192bb3ff
2 changed files with 10 additions and 7 deletions

View file

@ -230,7 +230,7 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio
@transient private var lastReplacement: String = _
@transient private var lastReplacementInUTF8: UTF8String = _
// result buffer write by Matcher
@transient private val result: StringBuffer = new StringBuffer
@transient private lazy val result: StringBuffer = new StringBuffer
override def nullSafeEval(s: Any, p: Any, r: Any): Any = {
if (!p.equals(lastRegex)) {

View file

@ -22,7 +22,8 @@ import org.scalactic.TripleEqualsSupport.Spread
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.apache.spark.SparkFunSuite
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
@ -43,13 +44,15 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
protected def checkEvaluation(
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
val serializer = new JavaSerializer(new SparkConf()).newInstance
val expr: Expression = serializer.deserialize(serializer.serialize(expression))
val catalystValue = CatalystTypeConverters.convertToCatalyst(expected)
checkEvaluationWithoutCodegen(expression, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expression, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expression.dataType)) {
checkEvalutionWithUnsafeProjection(expression, catalystValue, inputRow)
checkEvaluationWithoutCodegen(expr, catalystValue, inputRow)
checkEvaluationWithGeneratedMutableProjection(expr, catalystValue, inputRow)
if (GenerateUnsafeProjection.canSupport(expr.dataType)) {
checkEvalutionWithUnsafeProjection(expr, catalystValue, inputRow)
}
checkEvaluationWithOptimization(expression, catalystValue, inputRow)
checkEvaluationWithOptimization(expr, catalystValue, inputRow)
}
/**