[SPARK-21789][PYTHON] Remove obsolete codes for parsing abstract schema strings
## What changes were proposed in this pull request? This PR proposes to remove private functions that look not used in the main codes, `_split_schema_abstract`, `_parse_field_abstract`, `_parse_schema_abstract` and `_infer_schema_type`. ## How was this patch tested? Existing tests. Author: hyukjinkwon <gurwls223@gmail.com> Closes #18647 from HyukjinKwon/remove-abstract.
This commit is contained in:
parent
5cd8ea99f0
commit
648a8626b8
|
@ -894,16 +894,6 @@ class SQLTests(ReusedPySparkTestCase):
|
|||
|
||||
self.assertEqual((126, -127, -32767, 32766, 2147483646, 2.5), tuple(r))
|
||||
|
||||
from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type
|
||||
rdd = self.sc.parallelize([(127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1),
|
||||
{"a": 1}, (2,), [1, 2, 3])])
|
||||
abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]"
|
||||
schema = _parse_schema_abstract(abstract)
|
||||
typedSchema = _infer_schema_type(rdd.first(), schema)
|
||||
df = self.spark.createDataFrame(rdd, typedSchema)
|
||||
r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3])
|
||||
self.assertEqual(r, tuple(df.first()))
|
||||
|
||||
def test_struct_in_map(self):
|
||||
d = [Row(m={Row(i=1): Row(s="")})]
|
||||
df = self.sc.parallelize(d).toDF()
|
||||
|
|
|
@ -1187,135 +1187,6 @@ def _create_converter(dataType):
|
|||
return convert_struct
|
||||
|
||||
|
||||
def _split_schema_abstract(s):
|
||||
"""
|
||||
split the schema abstract into fields
|
||||
|
||||
>>> _split_schema_abstract("a b c")
|
||||
['a', 'b', 'c']
|
||||
>>> _split_schema_abstract("a(a b)")
|
||||
['a(a b)']
|
||||
>>> _split_schema_abstract("a b[] c{a b}")
|
||||
['a', 'b[]', 'c{a b}']
|
||||
>>> _split_schema_abstract(" ")
|
||||
[]
|
||||
"""
|
||||
|
||||
r = []
|
||||
w = ''
|
||||
brackets = []
|
||||
for c in s:
|
||||
if c == ' ' and not brackets:
|
||||
if w:
|
||||
r.append(w)
|
||||
w = ''
|
||||
else:
|
||||
w += c
|
||||
if c in _BRACKETS:
|
||||
brackets.append(c)
|
||||
elif c in _BRACKETS.values():
|
||||
if not brackets or c != _BRACKETS[brackets.pop()]:
|
||||
raise ValueError("unexpected " + c)
|
||||
|
||||
if brackets:
|
||||
raise ValueError("brackets not closed: %s" % brackets)
|
||||
if w:
|
||||
r.append(w)
|
||||
return r
|
||||
|
||||
|
||||
def _parse_field_abstract(s):
|
||||
"""
|
||||
Parse a field in schema abstract
|
||||
|
||||
>>> _parse_field_abstract("a")
|
||||
StructField(a,NullType,true)
|
||||
>>> _parse_field_abstract("b(c d)")
|
||||
StructField(b,StructType(...c,NullType,true),StructField(d...
|
||||
>>> _parse_field_abstract("a[]")
|
||||
StructField(a,ArrayType(NullType,true),true)
|
||||
>>> _parse_field_abstract("a{[]}")
|
||||
StructField(a,MapType(NullType,ArrayType(NullType,true),true),true)
|
||||
"""
|
||||
if set(_BRACKETS.keys()) & set(s):
|
||||
idx = min((s.index(c) for c in _BRACKETS if c in s))
|
||||
name = s[:idx]
|
||||
return StructField(name, _parse_schema_abstract(s[idx:]), True)
|
||||
else:
|
||||
return StructField(s, NullType(), True)
|
||||
|
||||
|
||||
def _parse_schema_abstract(s):
|
||||
"""
|
||||
parse abstract into schema
|
||||
|
||||
>>> _parse_schema_abstract("a b c")
|
||||
StructType...a...b...c...
|
||||
>>> _parse_schema_abstract("a[b c] b{}")
|
||||
StructType...a,ArrayType...b...c...b,MapType...
|
||||
>>> _parse_schema_abstract("c{} d{a b}")
|
||||
StructType...c,MapType...d,MapType...a...b...
|
||||
>>> _parse_schema_abstract("a b(t)").fields[1]
|
||||
StructField(b,StructType(List(StructField(t,NullType,true))),true)
|
||||
"""
|
||||
s = s.strip()
|
||||
if not s:
|
||||
return NullType()
|
||||
|
||||
elif s.startswith('('):
|
||||
return _parse_schema_abstract(s[1:-1])
|
||||
|
||||
elif s.startswith('['):
|
||||
return ArrayType(_parse_schema_abstract(s[1:-1]), True)
|
||||
|
||||
elif s.startswith('{'):
|
||||
return MapType(NullType(), _parse_schema_abstract(s[1:-1]))
|
||||
|
||||
parts = _split_schema_abstract(s)
|
||||
fields = [_parse_field_abstract(p) for p in parts]
|
||||
return StructType(fields)
|
||||
|
||||
|
||||
def _infer_schema_type(obj, dataType):
|
||||
"""
|
||||
Fill the dataType with types inferred from obj
|
||||
|
||||
>>> schema = _parse_schema_abstract("a b c d")
|
||||
>>> row = (1, 1.0, "str", datetime.date(2014, 10, 10))
|
||||
>>> _infer_schema_type(row, schema)
|
||||
StructType...LongType...DoubleType...StringType...DateType...
|
||||
>>> row = [[1], {"key": (1, 2.0)}]
|
||||
>>> schema = _parse_schema_abstract("a[] b{c d}")
|
||||
>>> _infer_schema_type(row, schema)
|
||||
StructType...a,ArrayType...b,MapType(StringType,...c,LongType...
|
||||
"""
|
||||
if isinstance(dataType, NullType):
|
||||
return _infer_type(obj)
|
||||
|
||||
if not obj:
|
||||
return NullType()
|
||||
|
||||
if isinstance(dataType, ArrayType):
|
||||
eType = _infer_schema_type(obj[0], dataType.elementType)
|
||||
return ArrayType(eType, True)
|
||||
|
||||
elif isinstance(dataType, MapType):
|
||||
k, v = next(iter(obj.items()))
|
||||
return MapType(_infer_schema_type(k, dataType.keyType),
|
||||
_infer_schema_type(v, dataType.valueType))
|
||||
|
||||
elif isinstance(dataType, StructType):
|
||||
fs = dataType.fields
|
||||
assert len(fs) == len(obj), \
|
||||
"Obj(%s) have different length with fields(%s)" % (obj, fs)
|
||||
fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True)
|
||||
for o, f in zip(obj, fs)]
|
||||
return StructType(fields)
|
||||
|
||||
else:
|
||||
raise TypeError("Unexpected dataType: %s" % type(dataType))
|
||||
|
||||
|
||||
_acceptable_types = {
|
||||
BooleanType: (bool,),
|
||||
ByteType: (int, long),
|
||||
|
|
Loading…
Reference in a new issue