[SPARK-13817][BUILD][SQL] Re-enable MiMA and removes object DataFrame
## What changes were proposed in this pull request? PR #11443 temporarily disabled MiMA check, this PR re-enables it. One extra change is that `object DataFrame` is also removed. The only purpose of introducing `object DataFrame` was to use it as an internal factory for creating `Dataset[Row]`. By replacing this internal factory with `Dataset.newDataFrame`, both `DataFrame` and `DataFrame$` are entirely removed from the API, so that we can simply put a `MissingClassProblem` filter in `MimaExcludes.scala` for most DataFrame API changes. ## How was this patch tested? Tested by MiMA check triggered by Jenkins. Author: Cheng Lian <lian@databricks.com> Closes #11656 from liancheng/re-enable-mima.
This commit is contained in:
parent
07f1c54477
commit
6d37e1eb90
|
@ -573,8 +573,7 @@ def main():
|
|||
# backwards compatibility checks
|
||||
if build_tool == "sbt":
|
||||
# Note: compatibility tests only supported in sbt for now
|
||||
# TODO Temporarily disable MiMA check for DF-to-DS migration prototyping
|
||||
# detect_binary_inop_with_mima()
|
||||
detect_binary_inop_with_mima()
|
||||
# Since we did not build assembly/assembly before running dev/mima, we need to
|
||||
# do it here because the tests still rely on it; see SPARK-13294 for details.
|
||||
build_spark_assembly_sbt(hadoop_version)
|
||||
|
|
|
@ -26,7 +26,6 @@ import org.apache.spark.api.java.JavaSparkContext;
|
|||
import org.apache.spark.ml.regression.AFTSurvivalRegression;
|
||||
import org.apache.spark.ml.regression.AFTSurvivalRegressionModel;
|
||||
import org.apache.spark.mllib.linalg.*;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.RowFactory;
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.spark.api.java.function.Function;
|
|||
import org.apache.spark.ml.evaluation.RegressionEvaluator;
|
||||
import org.apache.spark.ml.recommendation.ALS;
|
||||
import org.apache.spark.ml.recommendation.ALSModel;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
// $example off$
|
||||
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Arrays;
|
|||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.ml.feature.Binarizer;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.RowFactory;
|
||||
import org.apache.spark.sql.types.DataTypes;
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.spark.ml.param.ParamMap;
|
|||
import org.apache.spark.ml.tuning.CrossValidator;
|
||||
import org.apache.spark.ml.tuning.CrossValidatorModel;
|
||||
import org.apache.spark.ml.tuning.ParamGridBuilder;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
|
|
@ -33,7 +33,6 @@ import org.apache.spark.mllib.linalg.BLAS;
|
|||
import org.apache.spark.mllib.linalg.Vector;
|
||||
import org.apache.spark.mllib.linalg.Vectors;
|
||||
import org.apache.spark.mllib.regression.LabeledPoint;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.spark.ml.classification.LogisticRegressionModel;
|
|||
import org.apache.spark.ml.param.ParamMap;
|
||||
import org.apache.spark.mllib.linalg.Vectors;
|
||||
import org.apache.spark.mllib.regression.LabeledPoint;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
// $example off$
|
||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.spark.ml.clustering.KMeans;
|
|||
import org.apache.spark.mllib.linalg.Vector;
|
||||
import org.apache.spark.mllib.linalg.VectorUDT;
|
||||
import org.apache.spark.mllib.linalg.Vectors;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.types.Metadata;
|
||||
import org.apache.spark.sql.types.StructField;
|
||||
|
|
|
@ -28,7 +28,6 @@ import org.apache.spark.ml.clustering.LDAModel;
|
|||
import org.apache.spark.mllib.linalg.Vector;
|
||||
import org.apache.spark.mllib.linalg.VectorUDT;
|
||||
import org.apache.spark.mllib.linalg.Vectors;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
|
|
@ -29,7 +29,6 @@ import org.apache.spark.ml.util.MetadataUtils;
|
|||
import org.apache.spark.mllib.evaluation.MulticlassMetrics;
|
||||
import org.apache.spark.mllib.linalg.Matrix;
|
||||
import org.apache.spark.mllib.linalg.Vector;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.SQLContext;
|
||||
|
|
|
@ -24,7 +24,6 @@ import org.apache.spark.SparkConf;
|
|||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.ml.feature.SQLTransformer;
|
||||
import org.apache.spark.sql.DataFrame;
|
||||
import org.apache.spark.sql.Dataset;
|
||||
import org.apache.spark.sql.Row;
|
||||
import org.apache.spark.sql.RowFactory;
|
||||
|
|
|
@ -296,6 +296,28 @@ object MimaExcludes {
|
|||
// SPARK-12073: backpressure rate controller consumes events preferentially from lagging partitions
|
||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.KafkaTestUtils.createTopic"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.kafka.DirectKafkaInputDStream.maxMessagesPerPartition")
|
||||
) ++ Seq(
|
||||
// [SPARK-13244][SQL] Migrates DataFrame to Dataset
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.apply"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.toDF"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.copy$default$1"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrameHolder.df$1"),
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.DataFrameHolder.this"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.tables"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.sql"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.baseRelationToDataFrame"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.SQLContext.table"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.DataFrame.apply"),
|
||||
|
||||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.DataFrame$"),
|
||||
|
||||
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.mllib.evaluation.MultilabelMetrics.this"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions"),
|
||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.ml.classification.LogisticRegressionSummary.predictions")
|
||||
)
|
||||
case v if v.startsWith("1.6") =>
|
||||
Seq(
|
||||
|
|
|
@ -48,18 +48,16 @@ import org.apache.spark.sql.types._
|
|||
import org.apache.spark.storage.StorageLevel
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
private[sql] object DataFrame {
|
||||
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
|
||||
val qe = sqlContext.executePlan(logicalPlan)
|
||||
qe.assertAnalyzed()
|
||||
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
|
||||
}
|
||||
}
|
||||
|
||||
private[sql] object Dataset {
|
||||
def apply[T: Encoder](sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = {
|
||||
new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]])
|
||||
}
|
||||
|
||||
def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
|
||||
val qe = sqlContext.executePlan(logicalPlan)
|
||||
qe.assertAnalyzed()
|
||||
new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2129,7 +2127,7 @@ class Dataset[T] private[sql](
|
|||
|
||||
/** A convenient function to wrap a logical plan and produce a DataFrame. */
|
||||
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
|
||||
DataFrame(sqlContext, logicalPlan)
|
||||
Dataset.newDataFrame(sqlContext, logicalPlan)
|
||||
}
|
||||
|
||||
/** A convenient function to wrap a logical plan and produce a DataFrame. */
|
||||
|
|
|
@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
|
|||
userSpecifiedSchema = userSpecifiedSchema,
|
||||
className = source,
|
||||
options = extraOptions.toMap)
|
||||
DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
|
||||
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation()))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -175,7 +175,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
|
|||
userSpecifiedSchema = userSpecifiedSchema,
|
||||
className = source,
|
||||
options = extraOptions.toMap)
|
||||
DataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
|
||||
Dataset.newDataFrame(sqlContext, StreamingRelation(dataSource.createSource()))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
|
|||
InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions)
|
||||
}
|
||||
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
sqlContext,
|
||||
LogicalRDD(
|
||||
schema.toAttributes,
|
||||
|
@ -393,7 +393,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
|
|||
* @since 1.4.0
|
||||
*/
|
||||
def table(tableName: String): DataFrame = {
|
||||
DataFrame(sqlContext,
|
||||
Dataset.newDataFrame(sqlContext,
|
||||
sqlContext.catalog.lookupRelation(sqlContext.sqlParser.parseTableIdentifier(tableName)))
|
||||
}
|
||||
|
||||
|
|
|
@ -55,17 +55,17 @@ class GroupedData protected[sql](
|
|||
|
||||
groupType match {
|
||||
case GroupedData.GroupByType =>
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan))
|
||||
case GroupedData.RollupType =>
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan))
|
||||
case GroupedData.CubeType =>
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan))
|
||||
case GroupedData.PivotType(pivotCol, values) =>
|
||||
val aliasedGrps = groupingExprs.map(alias)
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ class GroupedDataset[K, V] private[sql](
|
|||
|
||||
private def groupedData =
|
||||
new GroupedData(
|
||||
DataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
|
||||
Dataset.newDataFrame(sqlContext, logicalPlan), groupingAttributes, GroupedData.GroupByType)
|
||||
|
||||
/**
|
||||
* Returns a new [[GroupedDataset]] where the type of the key has been mapped to the specified
|
||||
|
|
|
@ -374,7 +374,7 @@ class SQLContext private[sql](
|
|||
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
|
||||
val attributeSeq = schema.toAttributes
|
||||
val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType))
|
||||
DataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
|
||||
Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -389,7 +389,7 @@ class SQLContext private[sql](
|
|||
SQLContext.setActive(self)
|
||||
val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType]
|
||||
val attributeSeq = schema.toAttributes
|
||||
DataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
|
||||
Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -399,7 +399,7 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
|
||||
DataFrame(this, LogicalRelation(baseRelation))
|
||||
Dataset.newDataFrame(this, LogicalRelation(baseRelation))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -454,7 +454,7 @@ class SQLContext private[sql](
|
|||
rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)}
|
||||
}
|
||||
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
|
||||
DataFrame(this, logicalPlan)
|
||||
Dataset.newDataFrame(this, logicalPlan)
|
||||
}
|
||||
|
||||
|
||||
|
@ -489,7 +489,7 @@ class SQLContext private[sql](
|
|||
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
|
||||
// schema differs from the existing schema on any field data type.
|
||||
val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self)
|
||||
DataFrame(this, logicalPlan)
|
||||
Dataset.newDataFrame(this, logicalPlan)
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -517,7 +517,7 @@ class SQLContext private[sql](
|
|||
*/
|
||||
@DeveloperApi
|
||||
def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
|
||||
DataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
|
||||
Dataset.newDataFrame(self, LocalRelation.fromExternalRows(schema.toAttributes, rows.asScala))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -536,7 +536,7 @@ class SQLContext private[sql](
|
|||
val localBeanInfo = Introspector.getBeanInfo(Utils.classForName(className))
|
||||
SQLContext.beansToRows(iter, localBeanInfo, attributeSeq)
|
||||
}
|
||||
DataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
|
||||
Dataset.newDataFrame(this, LogicalRDD(attributeSeq, rowRdd)(this))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -564,7 +564,7 @@ class SQLContext private[sql](
|
|||
val className = beanClass.getName
|
||||
val beanInfo = Introspector.getBeanInfo(beanClass)
|
||||
val rows = SQLContext.beansToRows(data.asScala.iterator, beanInfo, attrSeq)
|
||||
DataFrame(self, LocalRelation(attrSeq, rows.toSeq))
|
||||
Dataset.newDataFrame(self, LocalRelation(attrSeq, rows.toSeq))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -770,7 +770,7 @@ class SQLContext private[sql](
|
|||
*/
|
||||
@Experimental
|
||||
def range(start: Long, end: Long, step: Long, numPartitions: Int): DataFrame = {
|
||||
DataFrame(this, Range(start, end, step, numPartitions))
|
||||
Dataset.newDataFrame(this, Range(start, end, step, numPartitions))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -781,7 +781,7 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def sql(sqlText: String): DataFrame = {
|
||||
DataFrame(this, parseSql(sqlText))
|
||||
Dataset.newDataFrame(this, parseSql(sqlText))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -795,7 +795,7 @@ class SQLContext private[sql](
|
|||
}
|
||||
|
||||
private def table(tableIdent: TableIdentifier): DataFrame = {
|
||||
DataFrame(this, catalog.lookupRelation(tableIdent))
|
||||
Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -807,7 +807,7 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def tables(): DataFrame = {
|
||||
DataFrame(this, ShowTablesCommand(None))
|
||||
Dataset.newDataFrame(this, ShowTablesCommand(None))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -819,7 +819,7 @@ class SQLContext private[sql](
|
|||
* @since 1.3.0
|
||||
*/
|
||||
def tables(databaseName: String): DataFrame = {
|
||||
DataFrame(this, ShowTablesCommand(Some(databaseName)))
|
||||
Dataset.newDataFrame(this, ShowTablesCommand(Some(databaseName)))
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -886,7 +886,7 @@ class SQLContext private[sql](
|
|||
schema: StructType): DataFrame = {
|
||||
|
||||
val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
|
||||
DataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
|
||||
Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.NoSuchElementException
|
|||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
|
||||
import org.apache.spark.sql.{Dataset, Row, SQLContext}
|
||||
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
|
||||
import org.apache.spark.sql.catalyst.errors.TreeNodeException
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||
|
@ -252,7 +252,7 @@ case class CacheTableCommand(
|
|||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
plan.foreach { logicalPlan =>
|
||||
sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
|
||||
sqlContext.registerDataFrameAsTable(Dataset.newDataFrame(sqlContext, logicalPlan), tableName)
|
||||
}
|
||||
sqlContext.cacheTable(tableName)
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path
|
|||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
|
||||
import org.apache.spark.sql.execution.streaming.{FileStreamSource, Sink, Source}
|
||||
import org.apache.spark.sql.sources._
|
||||
|
@ -154,7 +154,7 @@ case class DataSource(
|
|||
}
|
||||
|
||||
def dataFrameBuilder(files: Array[String]): DataFrame = {
|
||||
DataFrame(
|
||||
Dataset.newDataFrame(
|
||||
sqlContext,
|
||||
LogicalRelation(
|
||||
DataSource(
|
||||
|
|
|
@ -34,7 +34,7 @@ private[sql] case class InsertIntoDataSource(
|
|||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
|
||||
val data = DataFrame(sqlContext, query)
|
||||
val data = Dataset.newDataFrame(sqlContext, query)
|
||||
// Apply the schema of the existing table to the new data.
|
||||
val df = sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
|
||||
relation.insert(df, overwrite)
|
||||
|
|
|
@ -114,7 +114,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
|
|||
val partitionSet = AttributeSet(partitionColumns)
|
||||
val dataColumns = query.output.filterNot(partitionSet.contains)
|
||||
|
||||
val queryExecution = DataFrame(sqlContext, query).queryExecution
|
||||
val queryExecution = Dataset.newDataFrame(sqlContext, query).queryExecution
|
||||
SQLExecution.withNewExecutionId(sqlContext, queryExecution) {
|
||||
val relation =
|
||||
WriteRelation(
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.execution.datasources
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SQLContext}
|
||||
import org.apache.spark.sql._
|
||||
import org.apache.spark.sql.catalyst.TableIdentifier
|
||||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
|
||||
import org.apache.spark.sql.catalyst.plans.logical
|
||||
|
@ -100,7 +100,7 @@ case class CreateTempTableUsing(
|
|||
options = options)
|
||||
sqlContext.catalog.registerTable(
|
||||
tableIdent,
|
||||
DataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
|
||||
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
|
||||
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
@ -115,7 +115,7 @@ case class CreateTempTableUsingAsSelect(
|
|||
query: LogicalPlan) extends RunnableCommand {
|
||||
|
||||
override def run(sqlContext: SQLContext): Seq[Row] = {
|
||||
val df = DataFrame(sqlContext, query)
|
||||
val df = Dataset.newDataFrame(sqlContext, query)
|
||||
val dataSource = DataSource(
|
||||
sqlContext,
|
||||
className = provider,
|
||||
|
@ -125,7 +125,7 @@ case class CreateTempTableUsingAsSelect(
|
|||
val result = dataSource.write(mode, df)
|
||||
sqlContext.catalog.registerTable(
|
||||
tableIdent,
|
||||
DataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
|
||||
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
|
||||
|
||||
Seq.empty[Row]
|
||||
}
|
||||
|
@ -146,7 +146,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
|
|||
if (isCached) {
|
||||
// Create a data frame to represent the table.
|
||||
// TODO: Use uncacheTable once it supports database name.
|
||||
val df = DataFrame(sqlContext, logicalPlan)
|
||||
val df = Dataset.newDataFrame(sqlContext, logicalPlan)
|
||||
// Uncache the logicalPlan.
|
||||
sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
|
||||
// Cache it again.
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat
|
|||
import scala.collection.mutable.{Map => MutableMap}
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.sql.{Column, DataFrame, Row}
|
||||
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
|
||||
import org.apache.spark.sql.types._
|
||||
|
||||
|
@ -121,6 +121,6 @@ private[sql] object FrequentItems extends Logging {
|
|||
StructField(v._1 + "_freqItems", ArrayType(v._2, false))
|
||||
}
|
||||
val schema = StructType(outputCols).toAttributes
|
||||
DataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
|
||||
Dataset.newDataFrame(df.sqlContext, LocalRelation.fromExternalRows(schema, Seq(resultRow)))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.stat
|
|||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.Logging
|
||||
import org.apache.spark.sql.{Column, DataFrame, Row}
|
||||
import org.apache.spark.sql.{Column, DataFrame, Dataset, Row}
|
||||
import org.apache.spark.sql.catalyst.expressions.{Cast, GenericMutableRow}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
|
||||
import org.apache.spark.sql.functions._
|
||||
|
@ -454,6 +454,6 @@ private[sql] object StatFunctions extends Logging {
|
|||
}
|
||||
val schema = StructType(StructField(tableName, StringType) +: headerNames)
|
||||
|
||||
DataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
|
||||
Dataset.newDataFrame(df.sqlContext, LocalRelation(schema.toAttributes, table)).na.fill(0.0)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -211,7 +211,7 @@ class StreamExecution(
|
|||
|
||||
// Construct the batch and send it to the sink.
|
||||
val batchOffset = streamProgress.toCompositeOffset(sources)
|
||||
val nextBatch = new Batch(batchOffset, DataFrame(sqlContext, newPlan))
|
||||
val nextBatch = new Batch(batchOffset, Dataset.newDataFrame(sqlContext, newPlan))
|
||||
sink.addBatch(nextBatch)
|
||||
}
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
|
|||
}
|
||||
|
||||
def toDF()(implicit sqlContext: SQLContext): DataFrame = {
|
||||
DataFrame(sqlContext, logicalPlan)
|
||||
Dataset.newDataFrame(sqlContext, logicalPlan)
|
||||
}
|
||||
|
||||
def addData(data: A*): Offset = {
|
||||
|
|
|
@ -936,7 +936,7 @@ object functions extends LegacyFunctions {
|
|||
* @since 1.5.0
|
||||
*/
|
||||
def broadcast(df: DataFrame): DataFrame = {
|
||||
DataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
|
||||
Dataset.newDataFrame(df.sqlContext, BroadcastHint(df.logicalPlan))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -933,7 +933,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
|
|||
assert(e2.getMessage.contains("Inserting into an RDD-based table is not allowed."))
|
||||
|
||||
// error case: insert into an OneRowRelation
|
||||
DataFrame(sqlContext, OneRowRelation).registerTempTable("one_row")
|
||||
Dataset.newDataFrame(sqlContext, OneRowRelation).registerTempTable("one_row")
|
||||
val e3 = intercept[AnalysisException] {
|
||||
insertion.write.insertInto("one_row")
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ import org.apache.spark.sql.execution.streaming._
|
|||
trait StreamTest extends QueryTest with Timeouts {
|
||||
|
||||
implicit class RichSource(s: Source) {
|
||||
def toDF(): DataFrame = DataFrame(sqlContext, StreamingRelation(s))
|
||||
def toDF(): DataFrame = Dataset.newDataFrame(sqlContext, StreamingRelation(s))
|
||||
|
||||
def toDS[A: Encoder](): Dataset[A] = Dataset(sqlContext, StreamingRelation(s))
|
||||
}
|
||||
|
|
|
@ -219,7 +219,7 @@ private[sql] trait SQLTestUtils
|
|||
* way to construct [[DataFrame]] directly out of local data without relying on implicits.
|
||||
*/
|
||||
protected implicit def logicalPlanToSparkQuery(plan: LogicalPlan): DataFrame = {
|
||||
DataFrame(sqlContext, plan)
|
||||
Dataset.newDataFrame(sqlContext, plan)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -246,7 +246,7 @@ case class CreateMetastoreDataSourceAsSelect(
|
|||
createMetastoreTable = true
|
||||
}
|
||||
|
||||
val data = DataFrame(hiveContext, query)
|
||||
val data = Dataset.newDataFrame(hiveContext, query)
|
||||
val df = existingSchema match {
|
||||
// If we are inserting into an existing table, just use the existing schema.
|
||||
case Some(s) => sqlContext.internalCreateDataFrame(data.queryExecution.toRdd, s)
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
|
|||
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
import org.apache.spark.sql.{DataFrame, QueryTest}
|
||||
import org.apache.spark.sql.{DataFrame, Dataset, QueryTest}
|
||||
import org.apache.spark.sql.catalyst.expressions.Expression
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.sql.hive.test.TestHiveSingleton
|
||||
|
@ -63,7 +63,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton {
|
|||
""".stripMargin)
|
||||
}
|
||||
|
||||
checkAnswer(sqlContext.sql(generatedSQL), DataFrame(sqlContext, plan))
|
||||
checkAnswer(sqlContext.sql(generatedSQL), Dataset.newDataFrame(sqlContext, plan))
|
||||
}
|
||||
|
||||
protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = {
|
||||
|
|
|
@ -968,7 +968,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
|
|||
// Create a new df to make sure its physical operator picks up
|
||||
// spark.sql.TungstenAggregate.testFallbackStartsAt.
|
||||
// todo: remove it?
|
||||
val newActual = DataFrame(sqlContext, actual.logicalPlan)
|
||||
val newActual = Dataset.newDataFrame(sqlContext, actual.logicalPlan)
|
||||
|
||||
QueryTest.checkAnswer(newActual, expectedAnswer) match {
|
||||
case Some(errorMessage) =>
|
||||
|
|
Loading…
Reference in a new issue