[SPARK-24169][SQL] JsonToStructs should not access SQLConf at executor side
## What changes were proposed in this pull request? This PR is extracted from #21190 , to make it easier to backport. `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. ## How was this patch tested? tested in #21190 Author: Wenchen Fan <wenchen@databricks.com> Closes #21226 from cloud-fan/minor4.
This commit is contained in:
parent
991b526992
commit
96a50016bb
|
@ -534,7 +534,9 @@ object FunctionRegistry {
|
|||
// Otherwise, find a constructor method that matches the number of arguments, and use that.
|
||||
val params = Seq.fill(expressions.size)(classOf[Expression])
|
||||
val f = constructors.find(_.getParameterTypes.toSeq == params).getOrElse {
|
||||
val validParametersCount = constructors.map(_.getParameterCount).distinct.sorted
|
||||
val validParametersCount = constructors
|
||||
.filter(_.getParameterTypes.forall(_ == classOf[Expression]))
|
||||
.map(_.getParameterCount).distinct.sorted
|
||||
val expectedNumberOfParameters = if (validParametersCount.length == 1) {
|
||||
validParametersCount.head.toString
|
||||
} else {
|
||||
|
|
|
@ -514,11 +514,10 @@ case class JsonToStructs(
|
|||
schema: DataType,
|
||||
options: Map[String, String],
|
||||
child: Expression,
|
||||
timeZoneId: Option[String] = None)
|
||||
timeZoneId: Option[String],
|
||||
forceNullableSchema: Boolean)
|
||||
extends UnaryExpression with TimeZoneAwareExpression with CodegenFallback with ExpectsInputTypes {
|
||||
|
||||
val forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)
|
||||
|
||||
// The JSON input data might be missing certain fields. We force the nullability
|
||||
// of the user-provided schema to avoid data corruptions. In particular, the parquet-mr encoder
|
||||
// can generate incorrect files if values are missing in columns declared as non-nullable.
|
||||
|
@ -532,14 +531,21 @@ case class JsonToStructs(
|
|||
schema = JsonExprUtils.validateSchemaLiteral(schema),
|
||||
options = Map.empty[String, String],
|
||||
child = child,
|
||||
timeZoneId = None)
|
||||
timeZoneId = None,
|
||||
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
|
||||
|
||||
def this(child: Expression, schema: Expression, options: Expression) =
|
||||
this(
|
||||
schema = JsonExprUtils.validateSchemaLiteral(schema),
|
||||
options = JsonExprUtils.convertToMapData(options),
|
||||
child = child,
|
||||
timeZoneId = None)
|
||||
timeZoneId = None,
|
||||
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
|
||||
|
||||
// Used in `org.apache.spark.sql.functions`
|
||||
def this(schema: DataType, options: Map[String, String], child: Expression) =
|
||||
this(schema, options, child, timeZoneId = None,
|
||||
forceNullableSchema = SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA))
|
||||
|
||||
override def checkInputDataTypes(): TypeCheckResult = nullableSchema match {
|
||||
case _: StructType | ArrayType(_: StructType, _) =>
|
||||
|
|
|
@ -392,7 +392,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
val jsonData = """{"a": 1}"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
|
||||
InternalRow(1)
|
||||
)
|
||||
}
|
||||
|
@ -401,13 +401,13 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
val jsonData = """{"a" 1}"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData), gmtId, true),
|
||||
null
|
||||
)
|
||||
|
||||
// Other modes should still return `null`.
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId),
|
||||
JsonToStructs(schema, Map("mode" -> PermissiveMode.name), Literal(jsonData), gmtId, true),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -416,62 +416,62 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
val input = """[{"a": 1}, {"a": 2}]"""
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(1) :: InternalRow(2) :: Nil
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=object, schema=array, output=array of single row") {
|
||||
val input = """{"a": 1}"""
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(1) :: Nil
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty array, schema=array, output=empty array") {
|
||||
val input = "[ ]"
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = Nil
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty object, schema=array, output=array of single row with null") {
|
||||
val input = "{ }"
|
||||
val schema = ArrayType(StructType(StructField("a", IntegerType) :: Nil))
|
||||
val output = InternalRow(null) :: Nil
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=array of single object, schema=struct, output=single row") {
|
||||
val input = """[{"a": 1}]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = InternalRow(1)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=array, schema=struct, output=null") {
|
||||
val input = """[{"a": 1}, {"a": 2}]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = null
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty array, schema=struct, output=null") {
|
||||
val input = """[]"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = null
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json - input=empty object, schema=struct, output=single row with null") {
|
||||
val input = """{ }"""
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
val output = InternalRow(null)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId), output)
|
||||
checkEvaluation(JsonToStructs(schema, Map.empty, Literal(input), gmtId, true), output)
|
||||
}
|
||||
|
||||
test("from_json null input column") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal.create(null, StringType), gmtId, true),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -479,7 +479,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
test("SPARK-20549: from_json bad UTF-8") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(badJson), gmtId, true),
|
||||
null)
|
||||
}
|
||||
|
||||
|
@ -491,14 +491,14 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
c.set(2016, 0, 1, 0, 0, 0)
|
||||
c.set(Calendar.MILLISECOND, 123)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), gmtId, true),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
// The result doesn't change because the json string includes timezone string ("Z" here),
|
||||
// which means the string represents the timestamp string in the timezone regardless of
|
||||
// the timeZoneId parameter.
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST")),
|
||||
JsonToStructs(schema, Map.empty, Literal(jsonData1), Option("PST"), true),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
|
||||
|
@ -512,7 +512,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
schema,
|
||||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss"),
|
||||
Literal(jsonData2),
|
||||
Option(tz.getID)),
|
||||
Option(tz.getID),
|
||||
true),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
checkEvaluation(
|
||||
|
@ -521,7 +522,8 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
Map("timestampFormat" -> "yyyy-MM-dd'T'HH:mm:ss",
|
||||
DateTimeUtils.TIMEZONE_OPTION -> tz.getID),
|
||||
Literal(jsonData2),
|
||||
gmtId),
|
||||
gmtId,
|
||||
true),
|
||||
InternalRow(c.getTimeInMillis * 1000L)
|
||||
)
|
||||
}
|
||||
|
@ -530,7 +532,7 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
test("SPARK-19543: from_json empty input column") {
|
||||
val schema = StructType(StructField("a", IntegerType) :: Nil)
|
||||
checkEvaluation(
|
||||
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId),
|
||||
JsonToStructs(schema, Map.empty, Literal.create(" ", StringType), gmtId, true),
|
||||
null
|
||||
)
|
||||
}
|
||||
|
@ -685,27 +687,23 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
|
|||
|
||||
test("from_json missing fields") {
|
||||
for (forceJsonNullableSchema <- Seq(false, true)) {
|
||||
withSQLConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA.key -> forceJsonNullableSchema.toString) {
|
||||
val input =
|
||||
"""{
|
||||
| "a": 1,
|
||||
| "c": "foo"
|
||||
|}
|
||||
|""".stripMargin
|
||||
val jsonSchema = new StructType()
|
||||
.add("a", LongType, nullable = false)
|
||||
.add("b", StringType, nullable = false)
|
||||
.add("c", StringType, nullable = false)
|
||||
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
|
||||
checkEvaluation(
|
||||
JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId),
|
||||
output
|
||||
)
|
||||
val schema = JsonToStructs(jsonSchema, Map.empty, Literal.create(input, StringType), gmtId)
|
||||
.dataType
|
||||
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
|
||||
assert(schemaToCompare == schema)
|
||||
}
|
||||
val input =
|
||||
"""{
|
||||
| "a": 1,
|
||||
| "c": "foo"
|
||||
|}
|
||||
|""".stripMargin
|
||||
val jsonSchema = new StructType()
|
||||
.add("a", LongType, nullable = false)
|
||||
.add("b", StringType, nullable = false)
|
||||
.add("c", StringType, nullable = false)
|
||||
val output = InternalRow(1L, null, UTF8String.fromString("foo"))
|
||||
val expr = JsonToStructs(
|
||||
jsonSchema, Map.empty, Literal.create(input, StringType), gmtId, forceJsonNullableSchema)
|
||||
checkEvaluation(expr, output)
|
||||
val schema = expr.dataType
|
||||
val schemaToCompare = if (forceJsonNullableSchema) jsonSchema.asNullable else jsonSchema
|
||||
assert(schemaToCompare == schema)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3179,7 +3179,7 @@ object functions {
|
|||
* @since 2.2.0
|
||||
*/
|
||||
def from_json(e: Column, schema: DataType, options: Map[String, String]): Column = withExpr {
|
||||
JsonToStructs(schema, options, e.expr)
|
||||
new JsonToStructs(schema, options, e.expr)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -129,7 +129,7 @@ select to_json()
|
|||
struct<>
|
||||
-- !query 12 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function to_json. Expected: one of 1, 2 and 3; Found: 0; line 1 pos 7
|
||||
Invalid number of arguments for function to_json. Expected: one of 1 and 2; Found: 0; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 13
|
||||
|
@ -225,7 +225,7 @@ select from_json()
|
|||
struct<>
|
||||
-- !query 21 output
|
||||
org.apache.spark.sql.AnalysisException
|
||||
Invalid number of arguments for function from_json. Expected: one of 2, 3 and 4; Found: 0; line 1 pos 7
|
||||
Invalid number of arguments for function from_json. Expected: one of 2 and 3; Found: 0; line 1 pos 7
|
||||
|
||||
|
||||
-- !query 22
|
||||
|
|
Loading…
Reference in a new issue