[SPARK-36338][PYTHON][SQL] Move distributed-sequence implementation to Scala side
### What changes were proposed in this pull request? This PR proposes to implement `distributed-sequence` index in Scala side. ### Why are the changes needed? - Avoid unnecessary (de)serialization - Keep the nullability in the input DataFrame when `distributed-sequence` is enabled. During the serialization, all fields are being nullable for now (see https://github.com/apache/spark/pull/32775#discussion_r645882104) ### Does this PR introduce _any_ user-facing change? No to end users since pandas API on Spark is not released yet. ```python import pyspark.pandas as ps ps.set_option('compute.default_index_type', 'distributed-sequence') ps.range(1).spark.print_schema() ``` Before: ``` root |-- id: long (nullable = true) ``` After: ``` root |-- id: long (nullable = false) ``` ### How was this patch tested? Manually tested, and existing tests should cover them. Closes #33570 from HyukjinKwon/SPARK-36338. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
parent
dd2ca0aee2
commit
c6140d4d0a
|
@ -169,7 +169,7 @@ class PandasOnSparkFrameMethods(object):
|
|||
for scol, label in zip(internal.data_spark_columns, internal.column_labels)
|
||||
]
|
||||
)
|
||||
sdf, force_nullable = attach_func(sdf, name_like_string(column))
|
||||
sdf = attach_func(sdf, name_like_string(column))
|
||||
|
||||
return DataFrame(
|
||||
InternalFrame(
|
||||
|
@ -178,28 +178,18 @@ class PandasOnSparkFrameMethods(object):
|
|||
scol_for(sdf, SPARK_INDEX_NAME_FORMAT(i)) for i in range(internal.index_level)
|
||||
],
|
||||
index_names=internal.index_names,
|
||||
index_fields=(
|
||||
[field.copy(nullable=True) for field in internal.index_fields]
|
||||
if force_nullable
|
||||
else internal.index_fields
|
||||
),
|
||||
index_fields=internal.index_fields,
|
||||
column_labels=internal.column_labels + [column],
|
||||
data_spark_columns=(
|
||||
[scol_for(sdf, name_like_string(label)) for label in internal.column_labels]
|
||||
+ [scol_for(sdf, name_like_string(column))]
|
||||
),
|
||||
data_fields=(
|
||||
(
|
||||
[field.copy(nullable=True) for field in internal.data_fields]
|
||||
if force_nullable
|
||||
else internal.data_fields
|
||||
data_fields=internal.data_fields
|
||||
+ [
|
||||
InternalField.from_struct_field(
|
||||
StructField(name_like_string(column), LongType(), nullable=False)
|
||||
)
|
||||
+ [
|
||||
InternalField.from_struct_field(
|
||||
StructField(name_like_string(column), LongType(), nullable=False)
|
||||
)
|
||||
]
|
||||
),
|
||||
],
|
||||
column_label_names=internal.column_label_names,
|
||||
).resolved_copy
|
||||
)
|
||||
|
|
|
@ -175,7 +175,7 @@ _options = [
|
|||
Option(
|
||||
key="compute.default_index_type",
|
||||
doc=("This sets the default index type: sequence, distributed and distributed-sequence."),
|
||||
default="sequence",
|
||||
default="distributed-sequence",
|
||||
types=str,
|
||||
check_func=(
|
||||
lambda v: v in ("sequence", "distributed", "distributed-sequence"),
|
||||
|
|
|
@ -1692,9 +1692,7 @@ class Index(IndexOpsMixin):
|
|||
]
|
||||
sdf = sdf.select(index_value_columns)
|
||||
|
||||
sdf, force_nullable = InternalFrame.attach_default_index(
|
||||
sdf, default_index_type="distributed-sequence"
|
||||
)
|
||||
sdf = InternalFrame.attach_default_index(sdf, default_index_type="distributed-sequence")
|
||||
# sdf here looks as below
|
||||
# +-----------------+-----------------+-----------------+-----------------+
|
||||
# |__index_level_0__|__index_value_0__|__index_value_1__|__index_value_2__|
|
||||
|
@ -1727,11 +1725,7 @@ class Index(IndexOpsMixin):
|
|||
scol_for(sdf, col) for col in self._internal.index_spark_column_names
|
||||
],
|
||||
index_names=self._internal.index_names,
|
||||
index_fields=(
|
||||
[field.copy(nullable=True) for field in self._internal.index_fields]
|
||||
if force_nullable
|
||||
else self._internal.index_fields
|
||||
),
|
||||
index_fields=self._internal.index_fields,
|
||||
)
|
||||
|
||||
return DataFrame(internal).index
|
||||
|
@ -1829,7 +1823,7 @@ class Index(IndexOpsMixin):
|
|||
"""
|
||||
sdf = self._internal.spark_frame.select(self.spark.column)
|
||||
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
|
||||
sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
|
||||
sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
|
||||
# spark_frame here looks like below
|
||||
# +-----------------+---------------+
|
||||
# |__index_level_0__|__index_value__|
|
||||
|
@ -1877,7 +1871,7 @@ class Index(IndexOpsMixin):
|
|||
"""
|
||||
sdf = self._internal.spark_frame.select(self.spark.column)
|
||||
sequence_col = verify_temp_column_name(sdf, "__distributed_sequence_column__")
|
||||
sdf, _ = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
|
||||
sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name=sequence_col)
|
||||
|
||||
return (
|
||||
sdf.orderBy(
|
||||
|
@ -2475,7 +2469,7 @@ class Index(IndexOpsMixin):
|
|||
scol_for(sdf, col) for col in self._internal.index_spark_column_names
|
||||
],
|
||||
index_names=self._internal.index_names,
|
||||
index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
|
||||
index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
|
||||
)
|
||||
return DataFrame(internal).index
|
||||
|
||||
|
|
|
@ -1054,7 +1054,7 @@ class MultiIndex(Index):
|
|||
scol_for(sdf, col) for col in self._internal.index_spark_column_names
|
||||
],
|
||||
index_names=self._internal.index_names,
|
||||
index_fields=[field.copy(nullable=True) for field in self._internal.index_fields],
|
||||
index_fields=[InternalField(field.dtype) for field in self._internal.index_fields],
|
||||
)
|
||||
return DataFrame(internal).index
|
||||
|
||||
|
|
|
@ -1536,22 +1536,10 @@ class iLocIndexer(LocIndexerLike):
|
|||
def _internal(self) -> "InternalFrame":
|
||||
# Use resolved_copy to fix the natural order.
|
||||
internal = super()._internal.resolved_copy
|
||||
sdf, force_nullable = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf = InternalFrame.attach_distributed_sequence_column(
|
||||
internal.spark_frame, column_name=self._sequence_col
|
||||
)
|
||||
return internal.with_new_sdf(
|
||||
spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME),
|
||||
index_fields=(
|
||||
[field.copy(nullable=True) for field in internal.index_fields]
|
||||
if force_nullable
|
||||
else internal.index_fields
|
||||
),
|
||||
data_fields=(
|
||||
[field.copy(nullable=True) for field in internal.data_fields]
|
||||
if force_nullable
|
||||
else internal.data_fields
|
||||
),
|
||||
)
|
||||
return internal.with_new_sdf(spark_frame=sdf.orderBy(NATURAL_ORDER_COLUMN_NAME))
|
||||
|
||||
@lazy_property
|
||||
def _sequence_col(self) -> str:
|
||||
|
|
|
@ -20,15 +20,12 @@ An internal immutable DataFrame with some metadata to manage indexes.
|
|||
"""
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union, TYPE_CHECKING, cast
|
||||
from itertools import accumulate
|
||||
import py4j
|
||||
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from pandas.api.types import CategoricalDtype # noqa: F401
|
||||
from pyspark._globals import _NoValue, _NoValueType
|
||||
from pyspark.sql import functions as F, Column, DataFrame as SparkDataFrame, Window
|
||||
from pyspark.sql.functions import pandas_udf
|
||||
from pyspark.sql.types import ( # noqa: F401
|
||||
BooleanType,
|
||||
DataType,
|
||||
|
@ -64,7 +61,6 @@ from pyspark.pandas.utils import (
|
|||
name_like_string,
|
||||
scol_for,
|
||||
spark_column_equals,
|
||||
verify_temp_column_name,
|
||||
)
|
||||
|
||||
|
||||
|
@ -636,7 +632,7 @@ class InternalFrame(object):
|
|||
)
|
||||
|
||||
# Create default index.
|
||||
spark_frame, force_nullable = InternalFrame.attach_default_index(spark_frame)
|
||||
spark_frame = InternalFrame.attach_default_index(spark_frame)
|
||||
index_spark_columns = [scol_for(spark_frame, SPARK_DEFAULT_INDEX_NAME)]
|
||||
|
||||
index_fields = [
|
||||
|
@ -658,7 +654,6 @@ class InternalFrame(object):
|
|||
data_fields = [
|
||||
field.copy(
|
||||
name=name_like_string(struct_field.name),
|
||||
nullable=(force_nullable or field.nullable),
|
||||
)
|
||||
for field, struct_field in zip(data_fields, data_struct_fields)
|
||||
]
|
||||
|
@ -836,7 +831,7 @@ class InternalFrame(object):
|
|||
@staticmethod
|
||||
def attach_default_index(
|
||||
sdf: SparkDataFrame, default_index_type: Optional[str] = None
|
||||
) -> Tuple[SparkDataFrame, bool]:
|
||||
) -> SparkDataFrame:
|
||||
"""
|
||||
This method attaches a default index to Spark DataFrame. Spark does not have the index
|
||||
notion so corresponding column should be generated.
|
||||
|
@ -848,13 +843,13 @@ class InternalFrame(object):
|
|||
|
||||
It adds the default index column '__index_level_0__'.
|
||||
|
||||
>>> spark_frame = InternalFrame.attach_default_index(spark_frame)[0]
|
||||
>>> spark_frame = InternalFrame.attach_default_index(spark_frame)
|
||||
>>> spark_frame
|
||||
DataFrame[__index_level_0__: bigint, id: bigint]
|
||||
|
||||
It throws an exception if the given column name already exists.
|
||||
|
||||
>>> InternalFrame.attach_default_index(spark_frame)[0]
|
||||
>>> InternalFrame.attach_default_index(spark_frame)
|
||||
... # doctest: +ELLIPSIS
|
||||
Traceback (most recent call last):
|
||||
...
|
||||
|
@ -881,34 +876,26 @@ class InternalFrame(object):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def attach_sequence_column(
|
||||
sdf: SparkDataFrame, column_name: str
|
||||
) -> Tuple[SparkDataFrame, bool]:
|
||||
def attach_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
|
||||
scols = [scol_for(sdf, column) for column in sdf.columns]
|
||||
sequential_index = (
|
||||
F.row_number().over(Window.orderBy(F.monotonically_increasing_id())).cast("long") - 1
|
||||
)
|
||||
return sdf.select(sequential_index.alias(column_name), *scols), False
|
||||
return sdf.select(sequential_index.alias(column_name), *scols)
|
||||
|
||||
@staticmethod
|
||||
def attach_distributed_column(
|
||||
sdf: SparkDataFrame, column_name: str
|
||||
) -> Tuple[SparkDataFrame, bool]:
|
||||
def attach_distributed_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
|
||||
scols = [scol_for(sdf, column) for column in sdf.columns]
|
||||
return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols), False
|
||||
return sdf.select(F.monotonically_increasing_id().alias(column_name), *scols)
|
||||
|
||||
@staticmethod
|
||||
def attach_distributed_sequence_column(
|
||||
sdf: SparkDataFrame, column_name: str
|
||||
) -> Tuple[SparkDataFrame, bool]:
|
||||
def attach_distributed_sequence_column(sdf: SparkDataFrame, column_name: str) -> SparkDataFrame:
|
||||
"""
|
||||
This method attaches a Spark column that has a sequence in a distributed manner.
|
||||
This is equivalent to the column assigned when default index type 'distributed-sequence'.
|
||||
|
||||
>>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
|
||||
>>> sdf, force_nullable = (
|
||||
... InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
|
||||
... )
|
||||
>>> sdf = InternalFrame.attach_distributed_sequence_column(sdf, column_name="sequence")
|
||||
>>> sdf.show() # doctest: +NORMALIZE_WHITESPACE
|
||||
+--------+---+
|
||||
|sequence| 0|
|
||||
|
@ -917,124 +904,21 @@ class InternalFrame(object):
|
|||
| 1| b|
|
||||
| 2| c|
|
||||
+--------+---+
|
||||
>>> force_nullable
|
||||
True
|
||||
"""
|
||||
if len(sdf.columns) > 0:
|
||||
try:
|
||||
jdf = sdf._jdf.toDF() # type: ignore
|
||||
|
||||
sql_ctx = sdf.sql_ctx
|
||||
encoders = sql_ctx._jvm.org.apache.spark.sql.Encoders # type: ignore
|
||||
encoder = encoders.tuple(jdf.exprEnc(), encoders.scalaLong())
|
||||
|
||||
jrdd = jdf.localCheckpoint(False).rdd().zipWithIndex()
|
||||
|
||||
df = SparkDataFrame(
|
||||
sql_ctx.sparkSession._jsparkSession.createDataset( # type: ignore
|
||||
jrdd, encoder
|
||||
).toDF(),
|
||||
sql_ctx,
|
||||
)
|
||||
columns = df.columns
|
||||
return (
|
||||
df.selectExpr(
|
||||
"`{}` as `{}`".format(columns[1], column_name), "`{}`.*".format(columns[0])
|
||||
),
|
||||
True,
|
||||
)
|
||||
except py4j.protocol.Py4JError:
|
||||
if is_testing():
|
||||
raise
|
||||
return InternalFrame._attach_distributed_sequence_column(sdf, column_name)
|
||||
return SparkDataFrame(
|
||||
sdf._jdf.toDF().withSequenceColumn(column_name), # type: ignore
|
||||
sdf.sql_ctx,
|
||||
)
|
||||
else:
|
||||
cnt = sdf.count()
|
||||
if cnt > 0:
|
||||
return default_session().range(cnt).toDF(column_name), False
|
||||
return default_session().range(cnt).toDF(column_name)
|
||||
else:
|
||||
return (
|
||||
default_session().createDataFrame(
|
||||
[],
|
||||
schema=StructType().add(column_name, data_type=LongType(), nullable=False),
|
||||
),
|
||||
False,
|
||||
return default_session().createDataFrame(
|
||||
[], schema=StructType().add(column_name, data_type=LongType(), nullable=False)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _attach_distributed_sequence_column(
|
||||
sdf: SparkDataFrame, column_name: str
|
||||
) -> Tuple[SparkDataFrame, bool]:
|
||||
"""
|
||||
>>> sdf = ps.DataFrame(['a', 'b', 'c']).to_spark()
|
||||
>>> sdf, force_nullable = (
|
||||
... InternalFrame._attach_distributed_sequence_column(sdf, column_name="sequence")
|
||||
... )
|
||||
>>> sdf.sort("sequence").show() # doctest: +NORMALIZE_WHITESPACE
|
||||
+--------+---+
|
||||
|sequence| 0|
|
||||
+--------+---+
|
||||
| 0| a|
|
||||
| 1| b|
|
||||
| 2| c|
|
||||
+--------+---+
|
||||
>>> force_nullable
|
||||
False
|
||||
"""
|
||||
scols = [scol_for(sdf, column) for column in sdf.columns]
|
||||
|
||||
spark_partition_column = verify_temp_column_name(sdf, "__spark_partition_id__")
|
||||
offset_column = verify_temp_column_name(sdf, "__offset__")
|
||||
row_number_column = verify_temp_column_name(sdf, "__row_number__")
|
||||
|
||||
# 1. Calculates counts per each partition ID. `counts` here is, for instance,
|
||||
# {
|
||||
# 1: 83,
|
||||
# 6: 83,
|
||||
# 3: 83,
|
||||
# ...
|
||||
# }
|
||||
sdf = sdf.withColumn(spark_partition_column, F.spark_partition_id())
|
||||
|
||||
# Checkpoint the DataFrame to fix the partition ID.
|
||||
sdf = sdf.localCheckpoint(eager=False)
|
||||
|
||||
counts = map(
|
||||
lambda x: (x["key"], x["count"]),
|
||||
sdf.groupby(sdf[spark_partition_column].alias("key")).count().collect(),
|
||||
)
|
||||
|
||||
# 2. Calculates cumulative sum in an order of partition id.
|
||||
# Note that it does not matter if partition id guarantees its order or not.
|
||||
# We just need a one-by-one sequential id.
|
||||
|
||||
# sort by partition key.
|
||||
sorted_counts = sorted(counts, key=lambda x: x[0])
|
||||
# get cumulative sum in an order of partition key.
|
||||
cumulative_counts = [0] + list(accumulate(map(lambda count: count[1], sorted_counts)))
|
||||
# zip it with partition key.
|
||||
sums = dict(zip(map(lambda count: count[0], sorted_counts), cumulative_counts))
|
||||
|
||||
# 3. Attach offset for each partition.
|
||||
@pandas_udf(returnType=LongType()) # type: ignore
|
||||
def offset(id: pd.Series) -> pd.Series:
|
||||
current_partition_offset = sums[id.iloc[0]]
|
||||
return pd.Series(current_partition_offset).repeat(len(id))
|
||||
|
||||
sdf = sdf.withColumn(offset_column, offset(spark_partition_column))
|
||||
|
||||
# 4. Calculate row_number in each partition.
|
||||
w = Window.partitionBy(spark_partition_column).orderBy(F.monotonically_increasing_id())
|
||||
row_number = F.row_number().over(w)
|
||||
sdf = sdf.withColumn(row_number_column, row_number)
|
||||
|
||||
# 5. Calculate the index.
|
||||
return (
|
||||
sdf.select(
|
||||
(sdf[offset_column] + sdf[row_number_column] - 1).alias(column_name), *scols
|
||||
),
|
||||
False,
|
||||
)
|
||||
|
||||
def spark_column_for(self, label: Label) -> Column:
|
||||
"""Return Spark Column for the given column label."""
|
||||
column_labels_to_scol = dict(zip(self.column_labels, self.data_spark_columns))
|
||||
|
|
|
@ -5533,7 +5533,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
|
|||
sdf_for_index = notnull._internal.spark_frame.select(notnull._internal.index_spark_columns)
|
||||
|
||||
tmp_join_key = verify_temp_column_name(sdf_for_index, "__tmp_join_key__")
|
||||
sdf_for_index, _ = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf_for_index = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf_for_index, tmp_join_key
|
||||
)
|
||||
# sdf_for_index:
|
||||
|
@ -5550,7 +5550,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
|
|||
sdf_for_data = notnull._internal.spark_frame.select(
|
||||
notnull.spark.column.alias("values"), NATURAL_ORDER_COLUMN_NAME
|
||||
)
|
||||
sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf_for_data = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf_for_data, SPARK_DEFAULT_SERIES_NAME
|
||||
)
|
||||
# sdf_for_data:
|
||||
|
@ -5569,9 +5569,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
|
|||
).drop("values", NATURAL_ORDER_COLUMN_NAME)
|
||||
|
||||
tmp_join_key = verify_temp_column_name(sdf_for_data, "__tmp_join_key__")
|
||||
sdf_for_data, _ = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf_for_data, tmp_join_key
|
||||
)
|
||||
sdf_for_data = InternalFrame.attach_distributed_sequence_column(sdf_for_data, tmp_join_key)
|
||||
# sdf_for_index: sdf_for_data:
|
||||
# +----------------+-----------------+ +----------------+---+
|
||||
# |__tmp_join_key__|__index_level_0__| |__tmp_join_key__| 0|
|
||||
|
@ -5639,7 +5637,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
|
|||
return -1
|
||||
# We should remember the natural sequence started from 0
|
||||
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
|
||||
sdf, _ = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
|
||||
)
|
||||
# If the maximum is achieved in multiple locations, the first row position is returned.
|
||||
|
@ -5686,7 +5684,7 @@ class Series(Frame, IndexOpsMixin, Generic[T]):
|
|||
return -1
|
||||
# We should remember the natural sequence started from 0
|
||||
seq_col_name = verify_temp_column_name(sdf, "__distributed_sequence_column__")
|
||||
sdf, _ = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf = InternalFrame.attach_distributed_sequence_column(
|
||||
sdf.drop(NATURAL_ORDER_COLUMN_NAME), seq_col_name
|
||||
)
|
||||
# If the minimum is achieved in multiple locations, the first row position is returned.
|
||||
|
|
|
@ -5146,23 +5146,26 @@ class DataFrameTest(PandasOnSparkTestCase, SQLTestUtils):
|
|||
sys.stdout = prev
|
||||
|
||||
def test_explain_hint(self):
|
||||
psdf1 = ps.DataFrame(
|
||||
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]}, columns=["lkey", "value"]
|
||||
)
|
||||
psdf2 = ps.DataFrame(
|
||||
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]}, columns=["rkey", "value"]
|
||||
)
|
||||
merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
|
||||
prev = sys.stdout
|
||||
try:
|
||||
out = StringIO()
|
||||
sys.stdout = out
|
||||
merged.spark.explain()
|
||||
actual = out.getvalue().strip()
|
||||
with ps.option_context("compute.default_index_type", "sequence"):
|
||||
psdf1 = ps.DataFrame(
|
||||
{"lkey": ["foo", "bar", "baz", "foo"], "value": [1, 2, 3, 5]},
|
||||
columns=["lkey", "value"],
|
||||
)
|
||||
psdf2 = ps.DataFrame(
|
||||
{"rkey": ["foo", "bar", "baz", "foo"], "value": [5, 6, 7, 8]},
|
||||
columns=["rkey", "value"],
|
||||
)
|
||||
merged = psdf1.merge(psdf2.spark.hint("broadcast"), left_on="lkey", right_on="rkey")
|
||||
prev = sys.stdout
|
||||
try:
|
||||
out = StringIO()
|
||||
sys.stdout = out
|
||||
merged.spark.explain()
|
||||
actual = out.getvalue().strip()
|
||||
|
||||
self.assertTrue("Broadcast" in actual, actual)
|
||||
finally:
|
||||
sys.stdout = prev
|
||||
self.assertTrue("Broadcast" in actual, actual)
|
||||
finally:
|
||||
sys.stdout = prev
|
||||
|
||||
def test_mad(self):
|
||||
pdf = pd.DataFrame(
|
||||
|
|
|
@ -3536,6 +3536,31 @@ class Dataset[T] private[sql](
|
|||
// For Python API
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
/**
|
||||
* It adds a new long column with the name `name` that increases one by one.
|
||||
* This is for 'distributed-sequence' default index in pandas API on Spark.
|
||||
*/
|
||||
private[sql] def withSequenceColumn(name: String) = {
|
||||
val rdd: RDD[InternalRow] =
|
||||
// Checkpoint the DataFrame to fix the partition ID.
|
||||
localCheckpoint(false)
|
||||
.queryExecution.toRdd.zipWithIndex().mapPartitions { iter =>
|
||||
val joinedRow = new JoinedRow
|
||||
val unsafeRowWriter =
|
||||
new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(1)
|
||||
|
||||
iter.map { case (row, id) =>
|
||||
// Writes to an UnsafeRow directly
|
||||
unsafeRowWriter.reset()
|
||||
unsafeRowWriter.write(0, id)
|
||||
joinedRow(unsafeRowWriter.getRow, row)
|
||||
}
|
||||
}
|
||||
|
||||
sparkSession.internalCreateDataFrame(
|
||||
rdd, StructType(StructField(name, LongType, nullable = false) +: schema), isStreaming)
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts a JavaRDD to a PythonRDD.
|
||||
*/
|
||||
|
|
|
@ -2981,6 +2981,12 @@ class DataFrameSuite extends QueryTest
|
|||
checkAnswer(sql("SELECT sum(c1 * c3) + sum(c2 * c3) FROM tbl"), Row(2.00000000000) :: Nil)
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-36338: DataFrame.withSequenceColumn should append unique sequence IDs") {
|
||||
val ids = spark.range(10).repartition(5)
|
||||
.withSequenceColumn("default_index").collect().map(_.getLong(0))
|
||||
assert(ids.toSet === Range(0, 10).toSet)
|
||||
}
|
||||
}
|
||||
|
||||
case class GroupByKey(a: Int, b: Int)
|
||||
|
|
Loading…
Reference in a new issue