diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 87f4c8f5d9..7c25397e32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -26,7 +26,7 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.StringUtils -import org.apache.spark.TaskContext +import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.annotation.{DeveloperApi, Evolving, Experimental, Stable, Unstable} import org.apache.spark.api.java.JavaRDD import org.apache.spark.api.java.function._ @@ -184,11 +184,23 @@ private[sql] object Dataset { */ @Stable class Dataset[T] private[sql]( - @transient val sparkSession: SparkSession, + @transient private val _sparkSession: SparkSession, @DeveloperApi @Unstable @transient val queryExecution: QueryExecution, @DeveloperApi @Unstable @transient val encoder: Encoder[T]) extends Serializable { + @transient lazy val sparkSession: SparkSession = { + if (_sparkSession == null) { + throw new SparkException( + "Dataset transformations and actions can only be invoked by the driver, not inside of" + + " other Dataset transformations; for example, dataset1.map(x => dataset2.values.count()" + + " * x) is invalid because the values transformation and count action cannot be " + + "performed inside of the dataset1.map transformation. For more information," + + " see SPARK-28702.") + } + _sparkSession + } + // A globally unique id of this Dataset. private val id = Dataset.curId.getAndIncrement()