diff --git a/sql/hive/src/test/resources/test_script.py b/sql/core/src/test/resources/test_script.py similarity index 100% rename from sql/hive/src/test/resources/test_script.py rename to sql/core/src/test/resources/test_script.py diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala new file mode 100644 index 0000000000..aa800000e0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.scalatest.Assertions._ +import org.scalatest.BeforeAndAfterEach +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.{SparkException, TaskContext, TestUtils} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Column +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression, GenericInternalRow} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestUtils + with BeforeAndAfterEach { + import testImplicits._ + import ScriptTransformationIOSchema._ + + protected val uncaughtExceptionHandler = new TestUncaughtExceptionHandler + + private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ + + protected override def beforeAll(): Unit = { + super.beforeAll() + defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler + Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + } + + protected override def afterAll(): Unit = { + super.afterAll() + Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) + } + + override protected def afterEach(): Unit = { + super.afterEach() + uncaughtExceptionHandler.cleanStatus() + } + + def isHive23OrSpark: Boolean + + def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec + + test("cat without SerDe") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = child, + ioschema = defaultIOSchema + ), + rowsDf.collect()) + assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("script transformation should not swallow errors from upstream operators (no serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + val e = intercept[TestFailedException] { + checkAnswer( + rowsDf, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "cat", + output = Seq(AttributeReference("a", StringType)()), + child = ExceptionInjectingOperator(child), + ioschema = defaultIOSchema + ), + rowsDf.collect()) + } + assert(e.getMessage().contains("intentional exception")) + // Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException + assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle different data types correctly") { + assume(TestUtils.testCommandAvailable("python")) + val scriptFilePath = getTestResourcePath("test_script.py") + + withTempView("v") { + val df = Seq( + (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), + (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), + (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + df.createTempView("v") + + val query = sql( + s""" + |SELECT + |TRANSFORM(a, b, c, d, e) + |USING 'python $scriptFilePath' AS (a, b, c, d, e) + |FROM v + """.stripMargin) + + // In Hive 1.2, the string representation of a decimal omits trailing zeroes. + // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. + val decimalToString: Column => Column = if (isHive23OrSpark) { + c => c.cast("string") + } else { + c => c.cast("decimal(1, 0)").cast("string") + } + checkAnswer(query, identity, df.select( + 'a.cast("string"), + 'b.cast("string"), + 'c.cast("string"), + decimalToString('d), + 'e.cast("string")).collect()) + } + } + + test("SPARK-25990: TRANSFORM should handle schema less correctly (no serde)") { + assume(TestUtils.testCommandAvailable("python")) + val scriptFilePath = getTestResourcePath("test_script.py") + + withTempView("v") { + val df = Seq( + (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), + (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), + (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr, + df.col("c").expr, + df.col("d").expr, + df.col("e").expr), + script = s"python $scriptFilePath", + output = Seq( + AttributeReference("key", StringType)(), + AttributeReference("value", StringType)()), + child = child, + ioschema = defaultIOSchema.copy(schemaLess = true) + ), + df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) + } + } + + test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + val e = intercept[SparkException] { + val plan = + createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "some_non_existent_command", + output = Seq(AttributeReference("a", StringType)()), + child = rowsDf.queryExecution.sparkPlan, + ioschema = defaultIOSchema) + SparkPlanTest.executePlan(plan, spark.sqlContext) + } + assert(e.getMessage.contains("Subprocess exited with status")) + assert(uncaughtExceptionHandler.exception.isEmpty) + } + + def testBasicInputDataTypesWith(serde: ScriptTransformationIOSchema, testName: String): Unit = { + test(s"SPARK-32400: TRANSFORM should support basic data types as input ($testName)") { + assume(TestUtils.testCommandAvailable("python")) + withTempView("v") { + val df = Seq( + (1, "1", 1.0f, 1.0, 11.toByte, BigDecimal(1.0), new Timestamp(1), + new Date(2020, 7, 1), true), + (2, "2", 2.0f, 2.0, 22.toByte, BigDecimal(2.0), new Timestamp(2), + new Date(2020, 7, 2), true), + (3, "3", 3.0f, 3.0, 33.toByte, BigDecimal(3.0), new Timestamp(3), + new Date(2020, 7, 3), false) + ).toDF("a", "b", "c", "d", "e", "f", "g", "h", "i") + .withColumn("j", lit("abc").cast("binary")) + + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr, + df.col("c").expr, + df.col("d").expr, + df.col("e").expr, + df.col("f").expr, + df.col("g").expr, + df.col("h").expr, + df.col("i").expr, + df.col("j").expr), + script = "cat", + output = Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", FloatType)(), + AttributeReference("d", DoubleType)(), + AttributeReference("e", ByteType)(), + AttributeReference("f", DecimalType(38, 18))(), + AttributeReference("g", TimestampType)(), + AttributeReference("h", DateType)(), + AttributeReference("i", BooleanType)(), + AttributeReference("j", BinaryType)()), + child = child, + ioschema = serde + ), + df.select('a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, 'i, 'j).collect()) + } + } + } + + testBasicInputDataTypesWith(defaultIOSchema, "no serde") + + test("SPARK-32400: TRANSFORM should support more data types (interval, array, map, struct " + + "and udt) as input (no serde)") { + assume(TestUtils.testCommandAvailable("python")) + withTempView("v") { + val df = Seq( + (new CalendarInterval(7, 1, 1000), Array(0, 1, 2), Map("a" -> 1), (1, 2), + new SimpleTuple(1, 1L)), + (new CalendarInterval(7, 2, 2000), Array(3, 4, 5), Map("b" -> 2), (3, 4), + new SimpleTuple(1, 1L)), + (new CalendarInterval(7, 3, 3000), Array(6, 7, 8), Map("c" -> 3), (5, 6), + new SimpleTuple(1, 1L)) + ).toDF("a", "b", "c", "d", "e") + + // Can't support convert script output data to ArrayType/MapType/StructType now, + // return these column still as string. + // For UserDefinedType, if user defined deserialize method to support convert string + // to UserType like [[SimpleTupleUDT]], we can support convert to this UDT, else we + // will return null value as column. + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr, + df.col("c").expr, + df.col("d").expr, + df.col("e").expr), + script = "cat", + output = Seq( + AttributeReference("a", CalendarIntervalType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)(), + AttributeReference("d", StringType)(), + AttributeReference("e", new SimpleTupleUDT)()), + child = child, + ioschema = defaultIOSchema + ), + df.select('a, 'b.cast("string"), 'c.cast("string"), 'd.cast("string"), 'e).collect()) + } + } + + test("SPARK-32400: TRANSFORM should respect DATETIME_JAVA8API_ENABLED (no serde)") { + assume(TestUtils.testCommandAvailable("python")) + Array(false, true).foreach { java8AapiEnable => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8AapiEnable.toString) { + withTempView("v") { + val df = Seq( + (new Timestamp(1), new Date(2020, 7, 1)), + (new Timestamp(2), new Date(2020, 7, 2)), + (new Timestamp(3), new Date(2020, 7, 3)) + ).toDF("a", "b") + df.createTempView("v") + + val query = sql( + """ + |SELECT TRANSFORM (a, b) + |USING 'cat' AS (a timestamp, b date) + |FROM v + """.stripMargin) + checkAnswer(query, identity, df.select('a, 'b).collect()) + } + } + } + } +} + +case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { + override protected def doExecute(): RDD[InternalRow] = { + child.execute().map { x => + assert(TaskContext.get() != null) // Make sure that TaskContext is defined. + Thread.sleep(1000) // This sleep gives the external process time to start. + throw new IllegalArgumentException("intentional exception") + } + } + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = child.outputPartitioning +} + +@SQLUserDefinedType(udt = classOf[SimpleTupleUDT]) +private class SimpleTuple(val id: Int, val size: Long) extends Serializable { + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = other match { + case v: SimpleTuple => this.id == v.id && this.size == v.size + case _ => false + } + + override def toString: String = + compact(render( + ("id" -> id) ~ + ("size" -> size) + )) +} + +private class SimpleTupleUDT extends UserDefinedType[SimpleTuple] { + + override def sqlType: DataType = StructType( + StructField("id", IntegerType, false) :: + StructField("size", LongType, false) :: + Nil) + + override def serialize(sql: SimpleTuple): Any = { + val row = new GenericInternalRow(2) + row.setInt(0, sql.id) + row.setLong(1, sql.size) + row + } + + override def deserialize(datum: Any): SimpleTuple = { + datum match { + case str: String => + implicit val format = DefaultFormats + val json = parse(str) + new SimpleTuple((json \ "id").extract[Int], (json \ "size").extract[Long]) + case data: InternalRow if data.numFields == 2 => + new SimpleTuple(data.getInt(0), data.getLong(1)) + case _ => null + } + } + + override def userClass: Class[SimpleTuple] = classOf[SimpleTuple] + + override def asNullable: SimpleTupleUDT = this + + override def hashCode(): Int = getClass.hashCode() + + override def equals(other: Any): Boolean = { + other.isInstanceOf[SimpleTupleUDT] + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestUncaughtExceptionHandler.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestUncaughtExceptionHandler.scala similarity index 96% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestUncaughtExceptionHandler.scala rename to sql/core/src/test/scala/org/apache/spark/sql/execution/TestUncaughtExceptionHandler.scala index 681eb4e255..360f465834 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/TestUncaughtExceptionHandler.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestUncaughtExceptionHandler.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive.execution +package org.apache.spark.sql.execution class TestUncaughtExceptionHandler extends Thread.UncaughtExceptionHandler { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index a6826a6546..8ab6e28366 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -1063,6 +1063,9 @@ private[hive] trait HiveInspectors { case DateType => dateTypeInfo case TimestampType => timestampTypeInfo case NullType => voidTypeInfo + case dt => + throw new AnalysisException( + s"${dt.catalogString} cannot be converted to Hive TypeInfo") } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index 38bc8d0429..affa4935b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -20,67 +20,44 @@ package org.apache.spark.sql.hive.execution import java.sql.Timestamp import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe -import org.scalatest.Assertions._ -import org.scalatest.BeforeAndAfterEach import org.scalatest.exceptions.TestFailedException -import org.apache.spark.{SparkException, TaskContext, TestUtils} -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.Column -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{ScriptTransformationIOSchema, SparkPlan, SparkPlanTest, UnaryExecNode} +import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton -import org.apache.spark.sql.test.SQLTestUtils -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types._ +import org.apache.spark.unsafe.types.CalendarInterval + +class HiveScriptTransformationSuite extends BaseScriptTransformationSuite with TestHiveSingleton { + import testImplicits._ -class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with TestHiveSingleton - with BeforeAndAfterEach { - import spark.implicits._ import ScriptTransformationIOSchema._ - private val serdeIOSchema = defaultIOSchema.copy( - inputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName), - outputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName) - ) + override def isHive23OrSpark: Boolean = HiveUtils.isHive23 - private var defaultUncaughtExceptionHandler: Thread.UncaughtExceptionHandler = _ - - private val uncaughtExceptionHandler = new TestUncaughtExceptionHandler - - protected override def beforeAll(): Unit = { - super.beforeAll() - defaultUncaughtExceptionHandler = Thread.getDefaultUncaughtExceptionHandler - Thread.setDefaultUncaughtExceptionHandler(uncaughtExceptionHandler) + override def createScriptTransformationExec( + input: Seq[Expression], + script: String, + output: Seq[Attribute], + child: SparkPlan, + ioschema: ScriptTransformationIOSchema): BaseScriptTransformationExec = { + HiveScriptTransformationExec( + input = input, + script = script, + output = output, + child = child, + ioschema = ioschema + ) } - protected override def afterAll(): Unit = { - super.afterAll() - Thread.setDefaultUncaughtExceptionHandler(defaultUncaughtExceptionHandler) - } - - override protected def afterEach(): Unit = { - super.afterEach() - uncaughtExceptionHandler.cleanStatus() - } - - test("cat without SerDe") { - assume(TestUtils.testCommandAvailable("/bin/bash")) - - val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - checkAnswer( - rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "cat", - output = Seq(AttributeReference("a", StringType)()), - child = child, - ioschema = defaultIOSchema - ), - rowsDf.collect()) - assert(uncaughtExceptionHandler.exception.isEmpty) + private val hiveIOSchema: ScriptTransformationIOSchema = { + defaultIOSchema.copy( + inputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName), + outputSerdeClass = Some(classOf[LazySimpleSerDe].getCanonicalName) + ) } test("cat with LazySimpleSerDe") { @@ -89,30 +66,30 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( + (child: SparkPlan) => createScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = child, - ioschema = serdeIOSchema + ioschema = hiveIOSchema ), rowsDf.collect()) assert(uncaughtExceptionHandler.exception.isEmpty) } - test("script transformation should not swallow errors from upstream operators (no serde)") { + test("script transformation should not swallow errors from upstream operators (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( + (child: SparkPlan) => createScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), - ioschema = defaultIOSchema + ioschema = hiveIOSchema ), rowsDf.collect()) } @@ -121,67 +98,65 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with assert(uncaughtExceptionHandler.exception.isEmpty) } - test("script transformation should not swallow errors from upstream operators (with serde)") { - assume(TestUtils.testCommandAvailable("/bin/bash")) - - val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[TestFailedException] { - checkAnswer( - rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "cat", - output = Seq(AttributeReference("a", StringType)()), - child = ExceptionInjectingOperator(child), - ioschema = serdeIOSchema - ), - rowsDf.collect()) - } - assert(e.getMessage().contains("intentional exception")) - // Before SPARK-25158, uncaughtExceptionHandler will catch IllegalArgumentException - assert(uncaughtExceptionHandler.exception.isEmpty) - } - - test("SPARK-14400 script transformation should fail for bad script command") { + test("SPARK-14400 script transformation should fail for bad script command (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") val e = intercept[SparkException] { val plan = - new HiveScriptTransformationExec( + createScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), child = rowsDf.queryExecution.sparkPlan, - ioschema = serdeIOSchema) + ioschema = hiveIOSchema) SparkPlanTest.executePlan(plan, hiveContext) } assert(e.getMessage.contains("Subprocess exited with status")) assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-24339 verify the result after pruning the unused columns") { + test("SPARK-24339 verify the result after pruning the unused columns (hive serde)") { val rowsDf = Seq( ("Bob", 16, 176), ("Alice", 32, 164), ("David", 60, 192), - ("Amy", 24, 180)).toDF("name", "age", "height") + ("Amy", 24, 180) + ).toDF("name", "age", "height") checkAnswer( rowsDf, - (child: SparkPlan) => new HiveScriptTransformationExec( + (child: SparkPlan) => createScriptTransformationExec( input = Seq(rowsDf.col("name").expr), script = "cat", output = Seq(AttributeReference("name", StringType)()), child = child, - ioschema = serdeIOSchema + ioschema = hiveIOSchema ), rowsDf.select("name").collect()) assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-25990: TRANSFORM should handle different data types correctly") { + test("SPARK-30973: TRANSFORM should wait for the termination of the script (hive serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + + val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") + val e = intercept[SparkException] { + val plan = + createScriptTransformationExec( + input = Seq(rowsDf.col("a").expr), + script = "some_non_existent_command", + output = Seq(AttributeReference("a", StringType)()), + child = rowsDf.queryExecution.sparkPlan, + ioschema = hiveIOSchema) + SparkPlanTest.executePlan(plan, hiveContext) + } + assert(e.getMessage.contains("Subprocess exited with status")) + assert(uncaughtExceptionHandler.exception.isEmpty) + } + + test("SPARK-25990: TRANSFORM should handle schema less correctly (hive serde)") { assume(TestUtils.testCommandAvailable("python")) val scriptFilePath = getTestResourcePath("test_script.py") @@ -195,75 +170,142 @@ class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with val query = sql( s""" - |SELECT - |TRANSFORM(a, b, c, d, e) - |USING 'python $scriptFilePath' AS (a, b, c, d, e) - |FROM v + |SELECT TRANSFORM(a, b, c, d, e) + |USING 'python ${scriptFilePath}' + |FROM v """.stripMargin) - // In Hive 1.2, the string representation of a decimal omits trailing zeroes. - // But in Hive 2.3, it is always padded to 18 digits with trailing zeroes if necessary. - val decimalToString: Column => Column = if (HiveUtils.isHive23) { - c => c.cast("string") - } else { - c => c.cast("decimal(1, 0)").cast("string") - } - checkAnswer(query, identity, df.select( - 'a.cast("string"), - 'b.cast("string"), - 'c.cast("string"), - decimalToString('d), - 'e.cast("string")).collect()) + // In hive default serde mode, if we don't define output schema, it will choose first + // two column as output schema (key: String, value: String) + checkAnswer( + query, + identity, + df.select( + 'a.cast("string").as("key"), + 'b.cast("string").as("value")).collect()) } } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") { - assume(TestUtils.testCommandAvailable("/bin/bash")) + testBasicInputDataTypesWith(hiveIOSchema, "hive serde") - val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[SparkException] { - val plan = - new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = defaultIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + test("SPARK-32400: TRANSFORM supports complex data types type (hive serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq( + (1, "1", Array(0, 1, 2), Map("a" -> 1)), + (2, "2", Array(3, 4, 5), Map("b" -> 2)) + ).toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") + + // Hive serde support ArrayType/MapType/StructType as input and output data type + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("c").expr, + df.col("d").expr, + df.col("e").expr), + script = "cat", + output = Seq( + AttributeReference("c", ArrayType(IntegerType))(), + AttributeReference("d", MapType(StringType, IntegerType))(), + AttributeReference("e", StructType( + Seq( + StructField("col1", IntegerType, false), + StructField("col2", StringType, true))))()), + child = child, + ioschema = hiveIOSchema + ), + df.select('c, 'd, 'e).collect()) } - assert(e.getMessage.contains("Subprocess exited with status")) - assert(uncaughtExceptionHandler.exception.isEmpty) } - test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") { + test("SPARK-32400: TRANSFORM supports complex data types end to end (hive serde)") { assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq( + (1, "1", Array(0, 1, 2), Map("a" -> 1)), + (2, "2", Array(3, 4, 5), Map("b" -> 2)) + ).toDF("a", "b", "c", "d") + .select('a, 'b, 'c, 'd, struct('a, 'b).as("e")) + df.createTempView("v") - val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") - val e = intercept[SparkException] { - val plan = - new HiveScriptTransformationExec( - input = Seq(rowsDf.col("a").expr), - script = "some_non_existent_command", - output = Seq(AttributeReference("a", StringType)()), - child = rowsDf.queryExecution.sparkPlan, - ioschema = serdeIOSchema) - SparkPlanTest.executePlan(plan, hiveContext) + // Hive serde support ArrayType/MapType/StructType as input and output data type + val query = sql( + """ + |SELECT TRANSFORM (c, d, e) + |USING 'cat' AS (c array, d map, e struct) + |FROM v + """.stripMargin) + checkAnswer(query, identity, df.select('c, 'd, 'e).collect()) + } + } + + test("SPARK-32400: TRANSFORM doesn't support CalenderIntervalType/UserDefinedType (hive serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq( + (1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))), + (1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))) + ).toDF("a", "b", "c") + df.createTempView("v") + + val e1 = intercept[SparkException] { + val plan = createScriptTransformationExec( + input = Seq(df.col("a").expr, df.col("b").expr), + script = "cat", + output = Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("b", CalendarIntervalType)()), + child = df.queryExecution.sparkPlan, + ioschema = hiveIOSchema) + SparkPlanTest.executePlan(plan, hiveContext) + }.getMessage + assert(e1.contains("interval cannot be converted to Hive TypeInfo")) + + val e2 = intercept[SparkException] { + val plan = createScriptTransformationExec( + input = Seq(df.col("a").expr, df.col("c").expr), + script = "cat", + output = Seq( + AttributeReference("a", IntegerType)(), + AttributeReference("c", new TestUDT.MyDenseVectorUDT)()), + child = df.queryExecution.sparkPlan, + ioschema = hiveIOSchema) + SparkPlanTest.executePlan(plan, hiveContext) + }.getMessage + assert(e2.contains("array cannot be converted to Hive TypeInfo")) + } + } + + test("SPARK-32400: TRANSFORM doesn't support" + + " CalenderIntervalType/UserDefinedType end to end (hive serde)") { + assume(TestUtils.testCommandAvailable("/bin/bash")) + withTempView("v") { + val df = Seq( + (1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))), + (1, new CalendarInterval(7, 1, 1000), new TestUDT.MyDenseVector(Array(1, 2, 3))) + ).toDF("a", "b", "c") + df.createTempView("v") + + val e1 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(a, b) USING 'cat' AS (a, b) + |FROM v + """.stripMargin).collect() + }.getMessage + assert(e1.contains("interval cannot be converted to Hive TypeInfo")) + + val e2 = intercept[SparkException] { + sql( + """ + |SELECT TRANSFORM(a, c) USING 'cat' AS (a, c) + |FROM v + """.stripMargin).collect() + }.getMessage + assert(e2.contains("array cannot be converted to Hive TypeInfo")) } - assert(e.getMessage.contains("Subprocess exited with status")) - assert(uncaughtExceptionHandler.exception.isEmpty) } } - -private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { - override protected def doExecute(): RDD[InternalRow] = { - child.execute().map { x => - assert(TaskContext.get() != null) // Make sure that TaskContext is defined. - Thread.sleep(1000) // This sleep gives the external process time to start. - throw new IllegalArgumentException("intentional exception") - } - } - - override def output: Seq[Attribute] = child.output - - override def outputPartitioning: Partitioning = child.outputPartitioning -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 62a411a561..431790e1fb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, Functio import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, HiveTableRelation} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} +import org.apache.spark.sql.execution.TestUncaughtExceptionHandler import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.command.{FunctionsCommand, LoadDataCommand} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}