[SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side

### What changes were proposed in this pull request?

This PR proposes to implement `distributed-sequence` index in Scala side.

### Why are the changes needed?

- Avoid unnecessary (de)serialization
- Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104)

### Does this PR introduce _any_ user-facing change?

No to end users since pandas API on Spark is not released yet.

```python
import pyspark.pandas as ps
ps.set_option('compute.default_index_type', 'distributed-sequence')
ps.range(1).spark.print_schema()
```

Before:

```
root
 |-- id: long (nullable = true)
```

After:

```
root
 |-- id: long (nullable = false)
```

### How was this patch tested?

Manually tested, and existing tests should cover them.

Closes #33570 from HyukjinKwon/SPARK-36338.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit c6140d4d0a)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Hyukjin Kwon 2021-07-30 22:29:23 +09:00
parent 9cd370894b
commit fee87f13d1
10 changed files with 88 additions and 200 deletions

View file

@ -169,7 +169,7 @@ class PandasOnSparkFrameMethods(object):
for scol, label in zip(internal.data_spark_columns, internal.column_labels)
]
)
sdf, force_nullable = attach_func(sdf, name_like_string(column))
sdf = attach_func(sdf, name_like_string(column))
return DataFrame(
InternalFrame(
@ -178,28 +178,18 @@ class PandasOnSparkFrameMethods(object):
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
],
index_names=internal.index_names,
index_fields=(
[field.copy(nullable=True) for field in internal.index_fields]
if force_nullable
else internal.index_fields
),
index_fields=internal.index_fields,
column_labels=internal.column_labels + [column],
data_spark_columns=(
[scol_for(sdf, name_like_string(label)) for label in internal.column_labels]
+ [scol_for(sdf, name_like_string(column))]
),
data_fields=(
(
[field.copy(nullable=True) for field in internal.data_fields]
if force_nullable
else internal.data_fields
data_fields=internal.data_fields
+ [
InternalField.from_struct_field(
StructField(name_like_string(column), LongType(), nullable=False)
)
+ [
InternalField.from_struct_field(
StructField(name_like_string(column), LongType(), nullable=False)
)
]
),
],
column_label_names=internal.column_label_names,
).resolved_copy
)

View file

@ -175,7 +175,7 @@ _options = [
Option(
key="compute.default_index_type",
doc=("This sets the default index type: sequence, distributed and distributed-sequence."),
default="sequence",
default="distributed-sequence",
types=str,
check_func=(
lambda v: v in ("sequence", "distributed", "distributed-sequence"),

View file

@ -1692,9 +1692,7 @@ class Index(IndexOpsMixin):
]
sdf = sdf.select(index_value_columns)
sdf, force_nullable = InternalFrame.attach_default_index(
sdf, default_index_type="distributed-sequence"
)
sdf = InternalFrame.attach_default_index(sdf, default_index_type="distributed-sequence")
# sdf here looks as below
# +-----------------+-----------------+-----------------+-----------------+
# |__index_level_0__|__index_value_0__|__index_value_1__|__index_value_2__|
@ -1727,11 +1725,7 @@ class Index(IndexOpsMixin):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
index_fields=(
[field.copy(nullable=True) for field in self._internal.index_fields]
if force_nullable
else self._internal.index_fields
),
index_fields=self._internal.index_fields,
)
return DataFrame(internal).index
@ -1829,7 +1823,7 @@ class Index(IndexOpsMixin):
"""
sdf = self._internal.spark_frame.select(self.spark.column)
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
# spark_frame here looks like below
# +-----------------+---------------+
# |__index_level_0__|__index_value__|
@ -1877,7 +1871,7 @@ class Index(IndexOpsMixin):
"""
sdf = self._internal.spark_frame.select(self.spark.column)
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
return (
sdf.orderBy(
@ -2475,7 +2469,7 @@ class Index(IndexOpsMixin):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
)
return DataFrame(internal).index

View file

@ -1054,7 +1054,7 @@ class MultiIndex(Index):
scol_for(sdf, col) for col in self._internal.index_spark_column_names
],
index_names=self._internal.index_names,
index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
)
return DataFrame(internal).index

View file

@ -1536,22 +1536,10 @@ class iLocIndexer(LocIndexerLike):
def _internal(self) -> "InternalFrame":
# Use resolved_copy to fix the natural order.
internal = super()._internal.resolved_copy
sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
sdf = InternalFrame.attach_distributed_sequence_column(
internal.spark_frame, column_name=self._sequence_col
)
return internal.with_new_sdf(
spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME),
index_fields=(
[field.copy(nullable=True) for field in internal.index_fields]
if force_nullable
else internal.index_fields
),
data_fields=(
[field.copy(nullable=True) for field in internal.data_fields]
if force_nullable
else internal.data_fields
),
)
return internal.with_new_sdf(spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME))
@lazy_property
def _sequence_col(self) -> str:

View file

@ -20,15 +20,12 @@ An internal immutable DataFrame with some metadata to manage indexes.
"""
import re
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING, cast
from itertools import accumulate
import py4j
import numpy as np
import pandas as pd
from pandas.api.types import CategoricalDtype # noqa: F401
from pyspark._globals import _NoValue, _NoValueType
from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame, Window
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ( # noqa: F401
BooleanType,
DataType,
@ -64,7 +61,6 @@ from pyspark.pandas.utils import (
name_like_string,
scol_for,
spark_column_equals,
verify_temp_column_name,
)
@ -636,7 +632,7 @@ class InternalFrame(object):
)
# Create default index.
spark_frame, force_nullable = InternalFrame.attach_default_index(spark_frame)
spark_frame = InternalFrame.attach_default_index(spark_frame)
index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
index_fields = [
@ -658,7 +654,6 @@ class InternalFrame(object):
data_fields = [
field.copy(
name=name_like_string(struct_field.name),
nullable=(force_nullable or field.nullable),
)
for field, struct_field in zip(data_fields, data_struct_fields)
]
@ -836,7 +831,7 @@ class InternalFrame(object):
@staticmethod
def attach_default_index(
sdf: SparkDataFrame, default_index_type: Optional[str] = None
) -> Tuple[SparkDataFrame, bool]:
) -> SparkDataFrame:
"""
This method attaches a default index to Spark DataFrame. Spark does not have the index
notion so corresponding column should be generated.
@ -848,13 +843,13 @@ class InternalFrame(object):
It adds the default index column '__index_level_0__'.
>>> spark_frame = InternalFrame.attach_default_index(spark_frame)[0]
>>> spark_frame = InternalFrame.attach_default_index(spark_frame)
>>> spark_frame
DataFrame[__index_level_0__: bigint, id: bigint]
It throws an exception if the given column name already exists.
>>> InternalFrame.attach_default_index(spark_frame)[0]
>>> InternalFrame.attach_default_index(spark_frame)
... # doctest: +ELLIPSIS
Traceback (most recent call last):
...
@ -881,34 +876,26 @@ class InternalFrame(object):
)
@staticmethod
def attach_sequence_column(
sdf: SparkDataFrame, column_name: str
) -> Tuple[SparkDataFrame, bool]:
def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
sequential_index = (
F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
)
return sdf.select(sequential_index.alias(column_name), *scols), False
return sdf.select(sequential_index.alias(column_name), *scols)
@staticmethod
def attach_distributed_column(
sdf: SparkDataFrame, column_name: str
) -> Tuple[SparkDataFrame, bool]:
def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
scols = [scol_for(sdf, column) for column in sdf.columns]
return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols), False
return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
@staticmethod
def attach_distributed_sequence_column(
sdf: SparkDataFrame, column_name: str
) -> Tuple[SparkDataFrame, bool]:
def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
"""
This method attaches a Spark column that has a sequence in a distributed manner.
This is equivalent to the column assigned when default index type 'distributed-sequence'.
>>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
>>> sdf, force_nullable = (
... InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
... )
>>> sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
>>> sdf.show() # doctest: +NORMALIZE_WHITESPACE
+--------+---+
|sequence| 0|
@ -917,124 +904,21 @@ class InternalFrame(object):
| 1| b|
| 2| c|
+--------+---+
>>> force_nullable
True
"""
if len(sdf.columns) > 0:
try:
jdf = sdf._jdf.toDF() # type: ignore
sql_ctx = sdf.sql_ctx
encoders = sql_ctx._jvm.org.apache.spark.sql.Encoders # type: ignore
encoder = encoders.tuple(jdf.exprEnc(), encoders.scalaLong())
jrdd = jdf.localCheckpoint(False).rdd().zipWithIndex()
df = SparkDataFrame(
sql_ctx.sparkSession._jsparkSession.createDataset( # type: ignore
jrdd, encoder
).toDF(),
sql_ctx,
)
columns = df.columns
return (
df.selectExpr(
"`{}` as `{}`".format(columns[1], column_name), "`{}`.*".format(columns[0])
),
True,
)
except py4j.protocol.Py4JError:
if is_testing():
raise
return InternalFrame._attach_distributed_sequence_column(sdf, column_name)
return SparkDataFrame(
sdf._jdf.toDF().withSequenceColumn(column_name), # type: ignore
sdf.sql_ctx,
)
else:
cnt = sdf.count()
if cnt > 0:
return default_session().range(cnt).toDF(column_name), False
return default_session().range(cnt).toDF(column_name)
else:
return (
default_session().createDataFrame(
[],
schema=StructType().add(column_name, data_type=LongType(), nullable=False),
),
False,
return default_session().createDataFrame(
[], schema=StructType().add(column_name, data_type=LongType(), nullable=False)
)
@staticmethod
def _attach_distributed_sequence_column(
sdf: SparkDataFrame, column_name: str
) -> Tuple[SparkDataFrame, bool]:
"""
>>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
>>> sdf, force_nullable = (
... InternalFrame._attach_distributed_sequence_column(sdf, column_name="sequence")
... )
>>> sdf.sort("sequence").show() # doctest: +NORMALIZE_WHITESPACE
+--------+---+
|sequence| 0|
+--------+---+
| 0| a|
| 1| b|
| 2| c|
+--------+---+
>>> force_nullable
False
"""
scols = [scol_for(sdf, column) for column in sdf.columns]
spark_partition_column = verify_temp_column_name(sdf, "__spark_partition_id__")
offset_column = verify_temp_column_name(sdf, "__offset__")
row_number_column = verify_temp_column_name(sdf, "__row_number__")
# 1. Calculates counts per each partition ID. `counts` here is, for instance,
# {
# 1: 83,
# 6: 83,
# 3: 83,
# ...
# }
sdf = sdf.withColumn(spark_partition_column, F.spark_partition_id())
# Checkpoint the DataFrame to fix the partition ID.
sdf = sdf.localCheckpoint(eager=False)
counts = map(
lambda x: (x["key"], x["count"]),
sdf.groupby(sdf[spark_partition_column].alias("key")).count().collect(),
)
# 2. Calculates cumulative sum in an order of partition id.
# Note that it does not matter if partition id guarantees its order or not.
# We just need a one-by-one sequential id.
# sort by partition key.
sorted_counts = sorted(counts, key=lambda x: x[0])
# get cumulative sum in an order of partition key.
cumulative_counts = [0] + list(accumulate(map(lambda count: count[1], sorted_counts)))
# zip it with partition key.
sums = dict(zip(map(lambda count: count[0], sorted_counts), cumulative_counts))
# 3. Attach offset for each partition.
@pandas_udf(returnType=LongType()) # type: ignore
def offset(id: pd.Series) -> pd.Series:
current_partition_offset = sums[id.iloc[0]]
return pd.Series(current_partition_offset).repeat(len(id))
sdf = sdf.withColumn(offset_column, offset(spark_partition_column))
# 4. Calculate row_number in each partition.
w = Window.partitionBy(spark_partition_column).orderBy(F.monotonically_increasing_id())
row_number = F.row_number().over(w)
sdf = sdf.withColumn(row_number_column, row_number)
# 5. Calculate the index.
return (
sdf.select(
(sdf[offset_column] + sdf[row_number_column] - 1).alias(column_name), *scols
),
False,
)
def spark_column_for(self, label: Label) -> Column:
"""Return Spark Column for the given column label."""
column_labels_to_scol = dict(zip(self.column_labels, self.data_spark_columns))

View file

@ -5533,7 +5533,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
sdf_for_index = notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)
tmp_join_key = verify_temp_column_name(sdf_for_index, "__tmp_join_key__")
sdf_for_index, _ = InternalFrame.attach_distributed_sequence_column(
sdf_for_index = InternalFrame.attach_distributed_sequence_column(
sdf_for_index, tmp_join_key
)
# sdf_for_index:
@ -5550,7 +5550,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
sdf_for_data = notnull._internal.spark_frame.select(
notnull.spark.column.alias("values"), NATURAL_ORDER_COLUMN_NAME
)
sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
sdf_for_data = InternalFrame.attach_distributed_sequence_column(
sdf_for_data, SPARK_DEFAULT_SERIES_NAME
)
# sdf_for_data:
@ -5569,9 +5569,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
).drop("values", NATURAL_ORDER_COLUMN_NAME)
tmp_join_key = verify_temp_column_name(sdf_for_data, "__tmp_join_key__")
sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
sdf_for_data, tmp_join_key
)
sdf_for_data = InternalFrame.attach_distributed_sequence_column(sdf_for_data, tmp_join_key)
# sdf_for_index: sdf_for_data:
# +----------------+-----------------+ +----------------+---+
# |__tmp_join_key__|__index_level_0__| |__tmp_join_key__| 0|
@ -5639,7 +5637,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
return -1
# We should remember the natural sequence started from 0
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
sdf, _ = InternalFrame.attach_distributed_sequence_column(
sdf = InternalFrame.attach_distributed_sequence_column(
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
)
# If the maximum is achieved in multiple locations, the first row position is returned.
@ -5686,7 +5684,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
return -1
# We should remember the natural sequence started from 0
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
sdf, _ = InternalFrame.attach_distributed_sequence_column(
sdf = InternalFrame.attach_distributed_sequence_column(
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
)
# If the minimum is achieved in multiple locations, the first row position is returned.

View file

@ -5146,23 +5146,26 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
sys.stdout = prev
def test_explain_hint(self):
psdf1 = ps.DataFrame(
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]}, columns=["lkey", "value"]
)
psdf2 = ps.DataFrame(
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}, columns=["rkey", "value"]
)
merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
prev = sys.stdout
try:
out = StringIO()
sys.stdout = out
merged.spark.explain()
actual = out.getvalue().strip()
with ps.option_context("compute.default_index_type", "sequence"):
psdf1 = ps.DataFrame(
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]},
columns=["lkey", "value"],
)
psdf2 = ps.DataFrame(
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
columns=["rkey", "value"],
)
merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
prev = sys.stdout
try:
out = StringIO()
sys.stdout = out
merged.spark.explain()
actual = out.getvalue().strip()
self.assertTrue("Broadcast" in actual, actual)
finally:
sys.stdout = prev
self.assertTrue("Broadcast" in actual, actual)
finally:
sys.stdout = prev
def test_mad(self):
pdf = pd.DataFrame(

View file

@ -3509,6 +3509,31 @@ class Dataset[T] private[sql](
// For Python API
////////////////////////////////////////////////////////////////////////////
/**
* It adds a new long column with the name `name` that increases one by one.
* This is for 'distributed-sequence' default index in pandas API on Spark.
*/
private[sql] def withSequenceColumn(name: String) = {
val rdd: RDD[InternalRow] =
// Checkpoint the DataFrame to fix the partition ID.
localCheckpoint(false)
.queryExecution.toRdd.zipWithIndex().mapPartitions { iter =>
val joinedRow = new JoinedRow
val unsafeRowWriter =
new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1)
iter.map { case (row, id) =>
// Writes to an UnsafeRow directly
unsafeRowWriter.reset()
unsafeRowWriter.write(0, id)
joinedRow(unsafeRowWriter.getRow, row)
}
}
sparkSession.internalCreateDataFrame(
rdd, StructType(StructField(name, LongType, nullable = false) +: schema), isStreaming)
}
/**
* Converts a JavaRDD to a PythonRDD.
*/

View file

@ -2937,6 +2937,12 @@ class DataFrameSuite extends QueryTest
checkAnswer(sql("SELECT sum(c1 * c3) + sum(c2 * c3) FROM tbl"), Row(2.00000000000) :: Nil)
}
}
test("SPARK-36338: DataFrame.withSequenceColumn should append unique sequence IDs") {
val ids = spark.range(10).repartition(5)
.withSequenceColumn("default_index").collect().map(_.getLong(0))
assert(ids.toSet === Range(0, 10).toSet)
}
}
case class GroupByKey(a: Int, b: Int)