diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a2a3ceb29d..3d87ccfc03 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -894,16 +894,6 @@ class SQLTests(ReusedPySparkTestCase): self.assertEqual((126, -127, -32767, 32766, 2147483646, 2.5), tuple(r)) - from pyspark.sql.types import _parse_schema_abstract, _infer_schema_type - rdd = self.sc.parallelize([(127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), - {"a": 1}, (2,), [1, 2, 3])]) - abstract = "byte1 short1 float1 time1 map1{} struct1(b) list1[]" - schema = _parse_schema_abstract(abstract) - typedSchema = _infer_schema_type(rdd.first(), schema) - df = self.spark.createDataFrame(rdd, typedSchema) - r = (127, -32768, 1.0, datetime(2010, 1, 1, 1, 1, 1), {"a": 1}, Row(b=2), [1, 2, 3]) - self.assertEqual(r, tuple(df.first())) - def test_struct_in_map(self): d = [Row(m={Row(i=1): Row(s="")})] df = self.sc.parallelize(d).toDF() diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index ecb8eb9a2f..51bf7bef49 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1187,135 +1187,6 @@ def _create_converter(dataType): return convert_struct -def _split_schema_abstract(s): - """ - split the schema abstract into fields - - >>> _split_schema_abstract("a b c") - ['a', 'b', 'c'] - >>> _split_schema_abstract("a(a b)") - ['a(a b)'] - >>> _split_schema_abstract("a b[] c{a b}") - ['a', 'b[]', 'c{a b}'] - >>> _split_schema_abstract(" ") - [] - """ - - r = [] - w = '' - brackets = [] - for c in s: - if c == ' ' and not brackets: - if w: - r.append(w) - w = '' - else: - w += c - if c in _BRACKETS: - brackets.append(c) - elif c in _BRACKETS.values(): - if not brackets or c != _BRACKETS[brackets.pop()]: - raise ValueError("unexpected " + c) - - if brackets: - raise ValueError("brackets not closed: %s" % brackets) - if w: - r.append(w) - return r - - -def _parse_field_abstract(s): - """ - Parse a field in schema abstract - - >>> _parse_field_abstract("a") - StructField(a,NullType,true) - >>> _parse_field_abstract("b(c d)") - StructField(b,StructType(...c,NullType,true),StructField(d... - >>> _parse_field_abstract("a[]") - StructField(a,ArrayType(NullType,true),true) - >>> _parse_field_abstract("a{[]}") - StructField(a,MapType(NullType,ArrayType(NullType,true),true),true) - """ - if set(_BRACKETS.keys()) & set(s): - idx = min((s.index(c) for c in _BRACKETS if c in s)) - name = s[:idx] - return StructField(name, _parse_schema_abstract(s[idx:]), True) - else: - return StructField(s, NullType(), True) - - -def _parse_schema_abstract(s): - """ - parse abstract into schema - - >>> _parse_schema_abstract("a b c") - StructType...a...b...c... - >>> _parse_schema_abstract("a[b c] b{}") - StructType...a,ArrayType...b...c...b,MapType... - >>> _parse_schema_abstract("c{} d{a b}") - StructType...c,MapType...d,MapType...a...b... - >>> _parse_schema_abstract("a b(t)").fields[1] - StructField(b,StructType(List(StructField(t,NullType,true))),true) - """ - s = s.strip() - if not s: - return NullType() - - elif s.startswith('('): - return _parse_schema_abstract(s[1:-1]) - - elif s.startswith('['): - return ArrayType(_parse_schema_abstract(s[1:-1]), True) - - elif s.startswith('{'): - return MapType(NullType(), _parse_schema_abstract(s[1:-1])) - - parts = _split_schema_abstract(s) - fields = [_parse_field_abstract(p) for p in parts] - return StructType(fields) - - -def _infer_schema_type(obj, dataType): - """ - Fill the dataType with types inferred from obj - - >>> schema = _parse_schema_abstract("a b c d") - >>> row = (1, 1.0, "str", datetime.date(2014, 10, 10)) - >>> _infer_schema_type(row, schema) - StructType...LongType...DoubleType...StringType...DateType... - >>> row = [[1], {"key": (1, 2.0)}] - >>> schema = _parse_schema_abstract("a[] b{c d}") - >>> _infer_schema_type(row, schema) - StructType...a,ArrayType...b,MapType(StringType,...c,LongType... - """ - if isinstance(dataType, NullType): - return _infer_type(obj) - - if not obj: - return NullType() - - if isinstance(dataType, ArrayType): - eType = _infer_schema_type(obj[0], dataType.elementType) - return ArrayType(eType, True) - - elif isinstance(dataType, MapType): - k, v = next(iter(obj.items())) - return MapType(_infer_schema_type(k, dataType.keyType), - _infer_schema_type(v, dataType.valueType)) - - elif isinstance(dataType, StructType): - fs = dataType.fields - assert len(fs) == len(obj), \ - "Obj(%s) have different length with fields(%s)" % (obj, fs) - fields = [StructField(f.name, _infer_schema_type(o, f.dataType), True) - for o, f in zip(obj, fs)] - return StructType(fields) - - else: - raise TypeError("Unexpected dataType: %s" % type(dataType)) - - _acceptable_types = { BooleanType: (bool,), ByteType: (int, long),