[SPARK-31009][SQL] Support json_object_keys function

### What changes were proposed in this pull request?
A new function `json_object_keys` is proposed in this PR. This function will return all the keys of the outmost json object. It takes Json Object as an argument.

- If invalid json expression is given, `NULL` will be returned.
- If an empty string or json array is given, `NULL` will be returned.
- If valid json object is given, all the keys of the outmost object will be returned as an array.
- For empty json object, empty array is returned.

We can also get JSON object keys using `map_keys+from_json`.  But `json_object_keys` is more efficient.
```
Performance result for json_object = {"a":[1,2,3,4,5], "b":[2,4,5,12333321]}

Intel(R) Core(TM) i7-9750H CPU  2.60GHz
JSON functions:                           Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
json_object_keys                                  11666          12361         673          0.9        1166.6       1.0X
from_json+map_keys                                15309          15973         701          0.7        1530.9       0.8X

```

### Why are the changes needed?
This function will help naive users in directly extracting the keys from json string and its fairly intuitive as well. Also its extends the functionality of spark-sql for json strings.

Some of the most popular DBMSs supports this function.
- PostgreSQL
- MySQL
- MariaDB

### Does this PR introduce any user-facing change?
Yes. Now users can extract keys of json objects using this function.

### How was this patch tested?
UTs added.

Closes #27836 from iRakson/jsonKeys.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
iRakson 2020-04-08 13:04:59 -07:00 committed by Dongjoon Hyun
parent 5dc9b9c7c1
commit b56242332d
No known key found for this signature in database
GPG key ID: EDA00CE834F0FC5C
5 changed files with 223 additions and 1 deletions

View file

@ -539,6 +539,7 @@ object FunctionRegistry {
expression[JsonToStructs]("from_json"),
expression[SchemaOfJson]("schema_of_json"),
expression[LengthOfJsonArray]("json_array_length"),
expression[JsonObjectKeys]("json_object_keys"),
// cast
expression[Cast]("cast"),

View file

@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions
import java.io._
import scala.collection.mutable.ArrayBuffer
import scala.util.parsing.combinator.RegexParsers
import com.fasterxml.jackson.core._
@ -864,3 +865,71 @@ case class LengthOfJsonArray(child: Expression) extends UnaryExpression
length
}
}
/**
* A function which returns all the keys of the outmost JSON object.
*/
@ExpressionDescription(
usage = "_FUNC_(json_object) - Returns all the keys of the outmost JSON object as an array.",
arguments = """
Arguments:
* json_object - A JSON object. If a valid JSON object is given, all the keys of the outmost
object will be returned as an array. If it is any other valid JSON string, an invalid JSON
string or an empty string, the function returns null.
""",
examples = """
Examples:
> Select _FUNC_('{}');
[]
> Select _FUNC_('{"key": "value"}');
["key"]
> Select _FUNC_('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}');
["f1","f2"]
""",
since = "3.1.0"
)
case class JsonObjectKeys(child: Expression) extends UnaryExpression with CodegenFallback
with ExpectsInputTypes {
override def inputTypes: Seq[DataType] = Seq(StringType)
override def dataType: DataType = ArrayType(StringType)
override def nullable: Boolean = true
override def prettyName: String = "json_object_keys"
override def eval(input: InternalRow): Any = {
val json = child.eval(input).asInstanceOf[UTF8String]
// return null for `NULL` input
if(json == null) {
return null
}
try {
Utils.tryWithResource(CreateJacksonParser.utf8String(SharedFactory.jsonFactory, json)) {
parser => {
// return null if an empty string or any other valid JSON string is encountered
if (parser.nextToken() == null || parser.currentToken() != JsonToken.START_OBJECT) {
return null
}
// Parse the JSON string to get all the keys of outmost JSON object
getJsonKeys(parser, input)
}
}
} catch {
case _: JsonProcessingException | _: IOException => null
}
}
private def getJsonKeys(parser: JsonParser, input: InternalRow): GenericArrayData = {
var arrayBufferOfKeys = ArrayBuffer.empty[UTF8String]
// traverse until the end of input and ensure it returns valid key
while(parser.nextValue() != null && parser.currentName() != null) {
// add current fieldName to the ArrayBuffer
arrayBufferOfKeys += UTF8String.fromString(parser.getCurrentName)
// skip all the children of inner object or array
parser.skipChildren()
}
new GenericArrayData(arrayBufferOfKeys.toArray)
}
}

View file

@ -831,4 +831,26 @@ class JsonExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper with
checkEvaluation(LengthOfJsonArray(Literal(literal)), expectedValue)
}
}
test("json_object_keys") {
Seq(
// Invalid inputs
("", null),
("[]", null),
("""[{"key": "JSON"}]""", null),
("""{"key": 45, "random_string"}""", null),
("""{[1, 2, {"Key": "Invalid JSON"}]}""", null),
// JSON objects
("{}", Seq.empty[UTF8String]),
("""{"key": 1}""", Seq("key")),
("""{"key": "value", "key2": 2}""", Seq("key", "key2")),
("""{"arrayKey": [1, 2, 3]}""", Seq("arrayKey")),
("""{"key":[1,2,3,{"key":"value"},[1,2,3]]}""", Seq("key")),
("""{"f1":"abc","f2":{"f3":"a", "f4":"b"}}""", Seq("f1", "f2")),
("""{"k1": [1, 2, {"key": 5}], "k2": {"key2": [1, 2]}}""", Seq("k1", "k2"))
).foreach {
case (input, expected) =>
checkEvaluation(JsonObjectKeys(Literal(input)), expected)
}
}
}

View file

@ -72,5 +72,21 @@ select json_array_length('[1,2,3,[33,44],{"key":[2,3,4]}]');
select json_array_length('{"key":"not a json array"}');
select json_array_length('[1,2,3,4,5');
-- json_object_keys
select json_object_keys();
select json_object_keys(null);
select json_object_keys(200);
select json_object_keys('');
select json_object_keys('{}');
select json_object_keys('{"key": 1}');
select json_object_keys('{"key": "value", "key2": 2}');
select json_object_keys('{"arrayKey": [1, 2, 3]}');
select json_object_keys('{"key":[1,2,3,{"key":"value"},[1,2,3]]}');
select json_object_keys('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}');
select json_object_keys('{"k1": [1, 2, {"key": 5}], "k2": {"key2": [1, 2]}}');
select json_object_keys('{[1,2]}');
select json_object_keys('{"key": 45, "random_string"}');
select json_object_keys('[1, 2, 3]');
-- Clean up
DROP VIEW IF EXISTS jsonTable;

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 53
-- Number of queries: 67
-- !query
@ -436,6 +436,120 @@ struct<json_array_length([1,2,3,4,5):int>
NULL
-- !query
select json_object_keys()
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
Invalid number of arguments for function json_object_keys. Expected: 1; Found: 0; line 1 pos 7
-- !query
select json_object_keys(null)
-- !query schema
struct<json_object_keys(CAST(NULL AS STRING)):array<string>>
-- !query output
NULL
-- !query
select json_object_keys(200)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.AnalysisException
cannot resolve 'json_object_keys(200)' due to data type mismatch: argument 1 requires string type, however, '200' is of int type.; line 1 pos 7
-- !query
select json_object_keys('')
-- !query schema
struct<json_object_keys():array<string>>
-- !query output
NULL
-- !query
select json_object_keys('{}')
-- !query schema
struct<json_object_keys({}):array<string>>
-- !query output
[]
-- !query
select json_object_keys('{"key": 1}')
-- !query schema
struct<json_object_keys({"key": 1}):array<string>>
-- !query output
["key"]
-- !query
select json_object_keys('{"key": "value", "key2": 2}')
-- !query schema
struct<json_object_keys({"key": "value", "key2": 2}):array<string>>
-- !query output
["key","key2"]
-- !query
select json_object_keys('{"arrayKey": [1, 2, 3]}')
-- !query schema
struct<json_object_keys({"arrayKey": [1, 2, 3]}):array<string>>
-- !query output
["arrayKey"]
-- !query
select json_object_keys('{"key":[1,2,3,{"key":"value"},[1,2,3]]}')
-- !query schema
struct<json_object_keys({"key":[1,2,3,{"key":"value"},[1,2,3]]}):array<string>>
-- !query output
["key"]
-- !query
select json_object_keys('{"f1":"abc","f2":{"f3":"a", "f4":"b"}}')
-- !query schema
struct<json_object_keys({"f1":"abc","f2":{"f3":"a", "f4":"b"}}):array<string>>
-- !query output
["f1","f2"]
-- !query
select json_object_keys('{"k1": [1, 2, {"key": 5}], "k2": {"key2": [1, 2]}}')
-- !query schema
struct<json_object_keys({"k1": [1, 2, {"key": 5}], "k2": {"key2": [1, 2]}}):array<string>>
-- !query output
["k1","k2"]
-- !query
select json_object_keys('{[1,2]}')
-- !query schema
struct<json_object_keys({[1,2]}):array<string>>
-- !query output
NULL
-- !query
select json_object_keys('{"key": 45, "random_string"}')
-- !query schema
struct<json_object_keys({"key": 45, "random_string"}):array<string>>
-- !query output
NULL
-- !query
select json_object_keys('[1, 2, 3]')
-- !query schema
struct<json_object_keys([1, 2, 3]):array<string>>
-- !query output
NULL
-- !query
DROP VIEW IF EXISTS jsonTable
-- !query schema