[SPARK-9763][SQL] Minimize exposure of internal SQL classes.

There are a few changes in this pull request:

1. Moved all data sources to execution.datasources, except the public JDBC APIs.
2. In order to maintain backward compatibility from 1, added a backward compatibility translation map in data source resolution.
3. Moved ui and metric package into execution.
4. Added more documentation on some internal classes.
5. Renamed DataSourceRegister.format -> shortName.
6. Added "override" modifier on shortName.
7. Removed IntSQLMetric.

Author: Reynold Xin <rxin@databricks.com>

Closes #8056 from rxin/SPARK-9763 and squashes the following commits:

9df4801 [Reynold Xin] Removed hardcoded name in test cases.
d9babc6 [Reynold Xin] Shorten.
e484419 [Reynold Xin] Removed VisibleForTesting.
171b812 [Reynold Xin] MimaExcludes.
2041389 [Reynold Xin] Compile ...
79dda42 [Reynold Xin] Compile.
0818ba3 [Reynold Xin] Removed IntSQLMetric.
c46884f [Reynold Xin] Two more fixes.
f9aa88d [Reynold Xin] [SPARK-9763][SQL] Minimize exposure of internal SQL classes.
This commit is contained in:
Reynold Xin 2015-08-10 13:49:23 -07:00
parent 0fe66744f1
commit 40ed2af587
76 changed files with 1124 additions and 976 deletions

View file

@ -62,8 +62,6 @@ object MimaExcludes {
"org.apache.spark.ml.classification.LogisticCostFun.this"),
// SQL execution is considered private.
excludePackage("org.apache.spark.sql.execution"),
// Parquet support is considered private.
excludePackage("org.apache.spark.sql.parquet"),
// The old JSON RDD is removed in favor of streaming Jackson
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
@ -155,7 +153,27 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.SqlNewHadoopRDD$NewHadoopMapPartitionsWithSplitRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.PartitionSpec$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DescribeCommand"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException")
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.sources.DDLException"),
// SPARK-9763 Minimize exposure of internal SQL classes
excludePackage("org.apache.spark.sql.parquet"),
excludePackage("org.apache.spark.sql.json"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$DecimalConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartition$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$JDBCConversion"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$DriverWrapper"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCPartitioningInfo"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JdbcUtils"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.DefaultSource"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.package$"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.jdbc.JDBCRelation")
) ++ Seq(
// SPARK-4751 Dynamic allocation for standalone mode
ProblemFilters.exclude[MissingMethodProblem](

View file

@ -1,3 +1,3 @@
org.apache.spark.sql.jdbc.DefaultSource
org.apache.spark.sql.json.DefaultSource
org.apache.spark.sql.parquet.DefaultSource
org.apache.spark.sql.execution.datasources.jdbc.DefaultSource
org.apache.spark.sql.execution.datasources.json.DefaultSource
org.apache.spark.sql.execution.datasources.parquet.DefaultSource

View file

@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.sources.HadoopFsRelation
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel

View file

@ -25,10 +25,10 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}

View file

@ -23,8 +23,8 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.HadoopFsRelation
@ -264,7 +264,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JDBCWriteDetails.schemaString(df, url)
val schema = JdbcUtils.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
}
@ -272,7 +272,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
conn.close()
}
JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
JdbcUtils.saveTable(df, url, table, connectionProperties)
}
/**

View file

@ -43,7 +43,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.ui.{SQLListener, SQLTab}
import org.apache.spark.sql.execution.ui.{SQLListener, SQLTab}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils

View file

@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.util.Utils
private[sql] object SQLExecution {

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.{IntSQLMetric, LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric, SQLMetrics}
import org.apache.spark.sql.types.DataType
object SparkPlan {
@ -98,12 +98,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
*/
private[sql] def metrics: Map[String, SQLMetric[_, _]] = defaultMetrics
/**
* Return a IntSQLMetric according to the name.
*/
private[sql] def intMetric(name: String): IntSQLMetric =
metrics(name).asInstanceOf[IntSQLMetric]
/**
* Return a LongSQLMetric according to the name.
*/

View file

@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.metric.SQLMetrics
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.ExternalSorter
import org.apache.spark.util.collection.unsafe.sort.PrefixComparator

View file

@ -0,0 +1,185 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import scala.language.implicitConversions
import scala.util.matching.Regex
import org.apache.spark.Logging
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{TableIdentifier, AbstractSparkSQLParser}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types._
/**
* A parser for foreign DDL commands.
*/
class DDLParser(parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
protected val IF = Keyword("IF")
protected val NOT = Keyword("NOT")
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
protected val REFRESH = Keyword("REFRESH")
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
protected def start: Parser[LogicalPlan] = ddl
/**
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] = {
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
val options = opts.getOrElse(Map.empty[String, String])
if (query.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}
val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableName,
provider,
temp.isDefined,
Array.empty[String],
mode,
options,
queryPlan)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableName,
userSpecifiedSchema,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}
}
}
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,comment
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
case e ~ db ~ tbl =>
val tblIdentifier = db match {
case Some(dbName) =>
Seq(dbName, tbl)
case None =>
Seq(tbl)
}
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
case maybeDatabaseName ~ tableName =>
RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
}
protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
s"identifier matching regex $regex", {
case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
}
)
protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
case name => name
}
protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
case parts => parts.mkString(".")
}
protected lazy val pair: Parser[(String, String)] =
optionName ~ stringLit ^^ { case k ~ v => (k, v) }
protected lazy val column: Parser[StructField] =
ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
val meta = cm match {
case Some(comment) =>
new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
case None => Metadata.empty
}
StructField(columnName, typ, nullable = true, meta)
}
}

View file

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCRelation, JDBCPartitioningInfo, DriverRegistry}
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider}
class DefaultSource extends RelationProvider with DataSourceRegister {
override def shortName(): String = "jdbc"
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val driver = parameters.getOrElse("driver", null)
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val lowerBound = parameters.getOrElse("lowerBound", null)
val upperBound = parameters.getOrElse("upperBound", null)
val numPartitions = parameters.getOrElse("numPartitions", null)
if (driver != null) DriverRegistry.register(driver)
if (partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn,
lowerBound.toLong,
upperBound.toLong,
numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}
}

View file

@ -17,27 +17,10 @@
package org.apache.spark.sql.execution.datasources
import java.io.IOException
import java.util.{Date, UUID}
import scala.collection.JavaConversions.asScalaIterator
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}
import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.{RunnableCommand, SQLExecution}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.{Utils, SerializableConfiguration}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.sources.InsertableRelation
/**

View file

@ -0,0 +1,204 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources
import java.util.ServiceLoader
import scala.collection.JavaConversions._
import scala.language.{existentials, implicitConversions}
import scala.util.{Success, Failure, Try}
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{DataFrame, SaveMode, AnalysisException, SQLContext}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
import org.apache.spark.util.Utils
case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
object ResolvedDataSource extends Logging {
/** A map to maintain backward compatibility in case we move data sources around. */
private val backwardCompatibilityMap = Map(
"org.apache.spark.sql.jdbc" -> classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.jdbc.DefaultSource" -> classOf[jdbc.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json" -> classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.json.DefaultSource" -> classOf[json.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet" -> classOf[parquet.DefaultSource].getCanonicalName,
"org.apache.spark.sql.parquet.DefaultSource" -> classOf[parquet.DefaultSource].getCanonicalName
)
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider0: String): Class[_] = {
val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
val provider2 = s"$provider.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
serviceLoader.iterator().filter(_.shortName().equalsIgnoreCase(provider)).toList match {
/** the provider format did not match any given registered aliases */
case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) => dataSource
case Failure(error) =>
if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
throw new ClassNotFoundException(
"The ORC data source must be used with Hive support enabled.", error)
} else {
throw new ClassNotFoundException(
s"Failed to load class for data source: $provider.", error)
}
}
/** there is exactly one registered alias */
case head :: Nil => head.getClass
/** There are multiple registered aliases for the input */
case sources => sys.error(s"Multiple sources found for $provider, " +
s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
"please specify the fully qualified class name.")
}
}
/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val clazz: Class[_] = lookupDataSource(provider)
def className: String = clazz.getCanonicalName
val relation = userSpecifiedSchema match {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
Some(partitionColumnsSchema(schema, partitionColumns))
}
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}
val dataSchema =
StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
dataSource.createRelation(
sqlContext,
paths,
Some(dataSchema),
maybePartitionsSchema,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
case _ =>
throw new AnalysisException(s"$className is not a RelationProvider.")
}
case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
throw new AnalysisException(
s"A schema needs to be specified when using $className.")
case _ =>
throw new AnalysisException(
s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
}
}
new ResolvedDataSource(clazz, relation)
}
private def partitionColumnsSchema(
schema: StructType,
partitionColumns: Array[String]): StructType = {
StructType(partitionColumns.map { col =>
schema.find(_.name == col).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $schema")
}
}).asNullable
}
/** Create a [[ResolvedDataSource]] for saving the content of the given DataFrame. */
def apply(
sqlContext: SQLContext,
provider: String,
partitionColumns: Array[String],
mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
val clazz: Class[_] = lookupDataSource(provider)
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sqlContext, mode, options, data)
case dataSource: HadoopFsRelationProvider =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val outputPath = {
val path = new Path(caseInsensitiveOptions("path"))
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
val r = dataSource.createRelation(
sqlContext,
Array(outputPath.toString),
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
mode)).toRdd
r
case _ =>
sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
}
ResolvedDataSource(clazz, relation)
}
}

View file

@ -17,340 +17,12 @@
package org.apache.spark.sql.execution.datasources
import java.util.ServiceLoader
import scala.collection.Iterator
import scala.collection.JavaConversions._
import scala.language.{existentials, implicitConversions}
import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex
import org.apache.hadoop.fs.Path
import org.apache.spark.Logging
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{AbstractSparkSQLParser, TableIdentifier}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SQLContext, SaveMode}
import org.apache.spark.util.Utils
/**
* A parser for foreign DDL commands.
*/
private[sql] class DDLParser(
parseQuery: String => LogicalPlan)
extends AbstractSparkSQLParser with DataTypeParser with Logging {
def parse(input: String, exceptionOnError: Boolean): LogicalPlan = {
try {
parse(input)
} catch {
case ddlException: DDLException => throw ddlException
case _ if !exceptionOnError => parseQuery(input)
case x: Throwable => throw x
}
}
// Keyword is a convention with AbstractSparkSQLParser, which will scan all of the `Keyword`
// properties via reflection the class in runtime for constructing the SqlLexical object
protected val CREATE = Keyword("CREATE")
protected val TEMPORARY = Keyword("TEMPORARY")
protected val TABLE = Keyword("TABLE")
protected val IF = Keyword("IF")
protected val NOT = Keyword("NOT")
protected val EXISTS = Keyword("EXISTS")
protected val USING = Keyword("USING")
protected val OPTIONS = Keyword("OPTIONS")
protected val DESCRIBE = Keyword("DESCRIBE")
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
protected val REFRESH = Keyword("REFRESH")
protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
protected def start: Parser[LogicalPlan] = ddl
/**
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* or
* `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
* USING org.apache.spark.sql.avro
* OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] =
// TODO: Support database.table.
(CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
}
val options = opts.getOrElse(Map.empty[String, String])
if (query.isDefined) {
if (columns.isDefined) {
throw new DDLException(
"a CREATE TABLE AS SELECT statement does not allow column definitions.")
}
// When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
val mode = if (allowExisting.isDefined) {
SaveMode.Ignore
} else if (temp.isDefined) {
SaveMode.Overwrite
} else {
SaveMode.ErrorIfExists
}
val queryPlan = parseQuery(query.get)
CreateTableUsingAsSelect(tableName,
provider,
temp.isDefined,
Array.empty[String],
mode,
options,
queryPlan)
} else {
val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
CreateTableUsing(
tableName,
userSpecifiedSchema,
provider,
temp.isDefined,
options,
allowExisting.isDefined,
managedIfNoPath = false)
}
}
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
/*
* describe [extended] table avroTable
* This will display all columns of table `avroTable` includes column_name,column_type,comment
*/
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ (ident <~ ".").? ~ ident ^^ {
case e ~ db ~ tbl =>
val tblIdentifier = db match {
case Some(dbName) =>
Seq(dbName, tbl)
case None =>
Seq(tbl)
}
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
case maybeDatabaseName ~ tableName =>
RefreshTable(TableIdentifier(tableName, maybeDatabaseName))
}
protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
protected lazy val className: Parser[String] = repsep(ident, ".") ^^ { case s => s.mkString(".")}
override implicit def regexToParser(regex: Regex): Parser[String] = acceptMatch(
s"identifier matching regex $regex", {
case lexical.Identifier(str) if regex.unapplySeq(str).isDefined => str
case lexical.Keyword(str) if regex.unapplySeq(str).isDefined => str
}
)
protected lazy val optionPart: Parser[String] = "[_a-zA-Z][_a-zA-Z0-9]*".r ^^ {
case name => name
}
protected lazy val optionName: Parser[String] = repsep(optionPart, ".") ^^ {
case parts => parts.mkString(".")
}
protected lazy val pair: Parser[(String, String)] =
optionName ~ stringLit ^^ { case k ~ v => (k, v) }
protected lazy val column: Parser[StructField] =
ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
val meta = cm match {
case Some(comment) =>
new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
case None => Metadata.empty
}
StructField(columnName, typ, nullable = true, meta)
}
}
private[sql] object ResolvedDataSource extends Logging {
/** Given a provider name, look up the data source class definition. */
def lookupDataSource(provider: String): Class[_] = {
val provider2 = s"$provider.DefaultSource"
val loader = Utils.getContextOrSparkClassLoader
val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
serviceLoader.iterator().filter(_.format().equalsIgnoreCase(provider)).toList match {
/** the provider format did not match any given registered aliases */
case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
case Success(dataSource) => dataSource
case Failure(error) => if (provider.startsWith("org.apache.spark.sql.hive.orc")) {
throw new ClassNotFoundException(
"The ORC data source must be used with Hive support enabled.", error)
} else {
throw new ClassNotFoundException(
s"Failed to load class for data source: $provider", error)
}
}
/** there is exactly one registered alias */
case head :: Nil => head.getClass
/** There are multiple registered aliases for the input */
case sources => sys.error(s"Multiple sources found for $provider, " +
s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
"please specify the fully qualified class name")
}
}
/** Create a [[ResolvedDataSource]] for reading data in. */
def apply(
sqlContext: SQLContext,
userSpecifiedSchema: Option[StructType],
partitionColumns: Array[String],
provider: String,
options: Map[String, String]): ResolvedDataSource = {
val clazz: Class[_] = lookupDataSource(provider)
def className: String = clazz.getCanonicalName
val relation = userSpecifiedSchema match {
case Some(schema: StructType) => clazz.newInstance() match {
case dataSource: SchemaRelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
case dataSource: HadoopFsRelationProvider =>
val maybePartitionsSchema = if (partitionColumns.isEmpty) {
None
} else {
Some(partitionColumnsSchema(schema, partitionColumns))
}
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}
val dataSchema =
StructType(schema.filterNot(f => partitionColumns.contains(f.name))).asNullable
dataSource.createRelation(
sqlContext,
paths,
Some(dataSchema),
maybePartitionsSchema,
caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.RelationProvider =>
throw new AnalysisException(s"$className does not allow user-specified schemas.")
case _ =>
throw new AnalysisException(s"$className is not a RelationProvider.")
}
case None => clazz.newInstance() match {
case dataSource: RelationProvider =>
dataSource.createRelation(sqlContext, new CaseInsensitiveMap(options))
case dataSource: HadoopFsRelationProvider =>
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val paths = {
val patternPath = new Path(caseInsensitiveOptions("path"))
val fs = patternPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
val qualifiedPattern = patternPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
SparkHadoopUtil.get.globPathIfNecessary(qualifiedPattern).map(_.toString).toArray
}
dataSource.createRelation(sqlContext, paths, None, None, caseInsensitiveOptions)
case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
throw new AnalysisException(
s"A schema needs to be specified when using $className.")
case _ =>
throw new AnalysisException(
s"$className is neither a RelationProvider nor a FSBasedRelationProvider.")
}
}
new ResolvedDataSource(clazz, relation)
}
private def partitionColumnsSchema(
schema: StructType,
partitionColumns: Array[String]): StructType = {
StructType(partitionColumns.map { col =>
schema.find(_.name == col).getOrElse {
throw new RuntimeException(s"Partition column $col not found in schema $schema")
}
}).asNullable
}
/** Create a [[ResolvedDataSource]] for saving the content of the given [[DataFrame]]. */
def apply(
sqlContext: SQLContext,
provider: String,
partitionColumns: Array[String],
mode: SaveMode,
options: Map[String, String],
data: DataFrame): ResolvedDataSource = {
if (data.schema.map(_.dataType).exists(_.isInstanceOf[CalendarIntervalType])) {
throw new AnalysisException("Cannot save interval data type into external storage.")
}
val clazz: Class[_] = lookupDataSource(provider)
val relation = clazz.newInstance() match {
case dataSource: CreatableRelationProvider =>
dataSource.createRelation(sqlContext, mode, options, data)
case dataSource: HadoopFsRelationProvider =>
// Don't glob path for the write path. The contracts here are:
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val outputPath = {
val path = new Path(caseInsensitiveOptions("path"))
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
val r = dataSource.createRelation(
sqlContext,
Array(outputPath.toString),
Some(dataSchema.asNullable),
Some(partitionColumnsSchema(data.schema, partitionColumns)),
caseInsensitiveOptions)
// For partitioned relation r, r.schema's column ordering can be different from the column
// ordering of data.logicalPlan (partition columns are all moved after data column). This
// will be adjusted within InsertIntoHadoopFsRelation.
sqlContext.executePlan(
InsertIntoHadoopFsRelation(
r,
data.logicalPlan,
mode)).toRdd
r
case _ =>
sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
}
new ResolvedDataSource(clazz, relation)
}
}
private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode}
/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
@ -358,11 +30,12 @@ private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRel
* @param isExtended True if "DESCRIBE EXTENDED" is used. Otherwise, false.
* It is effective only when the table is a Hive table.
*/
private[sql] case class DescribeCommand(
case class DescribeCommand(
table: LogicalPlan,
isExtended: Boolean) extends LogicalPlan with Command {
override def children: Seq[LogicalPlan] = Seq.empty
override val output: Seq[Attribute] = Seq(
// Column names are based on Hive.
AttributeReference("col_name", StringType, nullable = false,
@ -370,7 +43,8 @@ private[sql] case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
AttributeReference("comment", StringType, nullable = false,
new MetadataBuilder().putString("comment", "comment of the column").build())())
new MetadataBuilder().putString("comment", "comment of the column").build())()
)
}
/**
@ -378,7 +52,7 @@ private[sql] case class DescribeCommand(
* @param allowExisting If it is true, we will do nothing when the table already exists.
* If it is false, an exception will be thrown
*/
private[sql] case class CreateTableUsing(
case class CreateTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
@ -397,7 +71,7 @@ private[sql] case class CreateTableUsing(
* can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
private[sql] case class CreateTableUsingAsSelect(
case class CreateTableUsingAsSelect(
tableName: String,
provider: String,
temporary: Boolean,
@ -410,7 +84,7 @@ private[sql] case class CreateTableUsingAsSelect(
// override lazy val resolved = databaseName != None && childrenResolved
}
private[sql] case class CreateTempTableUsing(
case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
@ -425,7 +99,7 @@ private[sql] case class CreateTempTableUsing(
}
}
private[sql] case class CreateTempTableUsingAsSelect(
case class CreateTempTableUsingAsSelect(
tableName: String,
provider: String,
partitionColumns: Array[String],
@ -443,7 +117,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
}
}
private[sql] case class RefreshTable(tableIdent: TableIdentifier)
case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
@ -472,7 +146,7 @@ private[sql] case class RefreshTable(tableIdent: TableIdentifier)
/**
* Builds a map in which keys are case insensitive
*/
protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
with Serializable {
val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
@ -490,4 +164,4 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[St
/**
* The exception thrown from the DDL parser.
*/
protected[sql] class DDLException(message: String) extends Exception(message)
class DDLException(message: String) extends RuntimeException(message)

View file

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.jdbc
import java.util.Properties
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.sources.{BaseRelation, RelationProvider, DataSourceRegister}
class DefaultSource extends RelationProvider with DataSourceRegister {
override def shortName(): String = "jdbc"
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val driver = parameters.getOrElse("driver", null)
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val lowerBound = parameters.getOrElse("lowerBound", null)
val upperBound = parameters.getOrElse("upperBound", null)
val numPartitions = parameters.getOrElse("numPartitions", null)
if (driver != null) DriverRegistry.register(driver)
if (partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn,
lowerBound.toLong,
upperBound.toLong,
numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}
}

View file

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Driver, DriverManager}
import scala.collection.mutable
import org.apache.spark.Logging
import org.apache.spark.util.Utils
/**
* java.sql.DriverManager is always loaded by bootstrap classloader,
* so it can't load JDBC drivers accessible by Spark ClassLoader.
*
* To solve the problem, drivers from user-supplied jars are wrapped into thin wrapper.
*/
object DriverRegistry extends Logging {
private val wrapperMap: mutable.Map[String, DriverWrapper] = mutable.Map.empty
def register(className: String): Unit = {
val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
if (cls.getClassLoader == null) {
logTrace(s"$className has been loaded with bootstrap ClassLoader, wrapper is not required")
} else if (wrapperMap.get(className).isDefined) {
logTrace(s"Wrapper for $className already exists")
} else {
synchronized {
if (wrapperMap.get(className).isEmpty) {
val wrapper = new DriverWrapper(cls.newInstance().asInstanceOf[Driver])
DriverManager.registerDriver(wrapper)
wrapperMap(className) = wrapper
logTrace(s"Wrapper for $className registered")
}
}
}
}
def getDriverClassName(url: String): String = DriverManager.getDriver(url) match {
case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
case driver => driver.getClass.getCanonicalName
}
}

View file

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, Driver, DriverPropertyInfo, SQLFeatureNotSupportedException}
import java.util.Properties
/**
* A wrapper for a JDBC Driver to work around SPARK-6913.
*
* The problem is in `java.sql.DriverManager` class that can't access drivers loaded by
* Spark ClassLoader.
*/
class DriverWrapper(val wrapped: Driver) extends Driver {
override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] = {
wrapped.getPropertyInfo(url, info)
}
override def getMinorVersion: Int = wrapped.getMinorVersion
def getParentLogger: java.util.logging.Logger = {
throw new SQLFeatureNotSupportedException(
s"${this.getClass.getName}.getParentLogger is not yet implemented.")
}
override def connect(url: String, info: Properties): Connection = wrapped.connect(url, info)
override def getMajorVersion: Int = wrapped.getMajorVersion
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager, ResultSet, ResultSetMetaData, SQLException}
import java.util.Properties
@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
@ -180,9 +181,8 @@ private[sql] object JDBCRDD extends Logging {
try {
if (driver != null) DriverRegistry.register(driver)
} catch {
case e: ClassNotFoundException => {
logWarning(s"Couldn't find class $driver", e);
}
case e: ClassNotFoundException =>
logWarning(s"Couldn't find class $driver", e)
}
DriverManager.getConnection(url, properties)
}
@ -344,7 +344,6 @@ private[sql] class JDBCRDD(
}).toArray
}
/**
* Runs the SQL query against the JDBC driver.
*/

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
package org.apache.spark.sql.execution.datasources.jdbc
import java.util.Properties
@ -77,45 +77,6 @@ private[sql] object JDBCRelation {
}
}
private[sql] class DefaultSource extends RelationProvider with DataSourceRegister {
def format(): String = "jdbc"
/** Returns a new base relation with the given parameters. */
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val url = parameters.getOrElse("url", sys.error("Option 'url' not specified"))
val driver = parameters.getOrElse("driver", null)
val table = parameters.getOrElse("dbtable", sys.error("Option 'dbtable' not specified"))
val partitionColumn = parameters.getOrElse("partitionColumn", null)
val lowerBound = parameters.getOrElse("lowerBound", null)
val upperBound = parameters.getOrElse("upperBound", null)
val numPartitions = parameters.getOrElse("numPartitions", null)
if (driver != null) DriverRegistry.register(driver)
if (partitionColumn != null
&& (lowerBound == null || upperBound == null || numPartitions == null)) {
sys.error("Partitioning incompletely specified")
}
val partitionInfo = if (partitionColumn == null) {
null
} else {
JDBCPartitioningInfo(
partitionColumn,
lowerBound.toLong,
upperBound.toLong,
numPartitions.toInt)
}
val parts = JDBCRelation.columnPartition(partitionInfo)
val properties = new Properties() // Additional properties that we will pass to getConnection
parameters.foreach(kv => properties.setProperty(kv._1, kv._2))
JDBCRelation(url, table, parts, properties)(sqlContext)
}
}
private[sql] case class JDBCRelation(
url: String,
table: String,

View file

@ -0,0 +1,219 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.datasources.jdbc
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.Properties
import scala.util.Try
import org.apache.spark.Logging
import org.apache.spark.sql.jdbc.JdbcDialects
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame, Row}
/**
* Util functions for JDBC tables.
*/
object JdbcUtils extends Logging {
/**
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties)
}
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, table: String): Boolean = {
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems, considering "table" could also include the database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
}
/**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
}
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
var fieldsLeft = rddSchema.fields.length
while (fieldsLeft > 0) {
sql.append("?")
if (fieldsLeft > 1) sql.append(", ") else sql.append(")")
fieldsLeft = fieldsLeft - 1
}
conn.prepareStatement(sql.toString())
}
/**
* Saves a partition of a DataFrame to the JDBC database. This is done in
* a single database transaction in order to avoid repeatedly inserting
* data as much as possible.
*
* It is still theoretically possible for rows in a DataFrame to be
* inserted into the database more than once if a stage somehow fails after
* the commit occurs but before the stage can return successfully.
*
* This is not a closure inside saveTable() because apparently cosmetic
* implementation changes elsewhere might easily render such a closure
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
def savePartition(
getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
nullTypes: Array[Int]): Iterator[Byte] = {
val conn = getConnection()
var committed = false
try {
conn.setAutoCommit(false) // Everything in the same db transaction.
val stmt = insertStatement(conn, table, rddSchema)
try {
while (iterator.hasNext) {
val row = iterator.next()
val numFields = rddSchema.fields.length
var i = 0
while (i < numFields) {
if (row.isNullAt(i)) {
stmt.setNull(i + 1, nullTypes(i))
} else {
rddSchema.fields(i).dataType match {
case IntegerType => stmt.setInt(i + 1, row.getInt(i))
case LongType => stmt.setLong(i + 1, row.getLong(i))
case DoubleType => stmt.setDouble(i + 1, row.getDouble(i))
case FloatType => stmt.setFloat(i + 1, row.getFloat(i))
case ShortType => stmt.setInt(i + 1, row.getShort(i))
case ByteType => stmt.setInt(i + 1, row.getByte(i))
case BooleanType => stmt.setBoolean(i + 1, row.getBoolean(i))
case StringType => stmt.setString(i + 1, row.getString(i))
case BinaryType => stmt.setBytes(i + 1, row.getAs[Array[Byte]](i))
case TimestampType => stmt.setTimestamp(i + 1, row.getAs[java.sql.Timestamp](i))
case DateType => stmt.setDate(i + 1, row.getAs[java.sql.Date](i))
case t: DecimalType => stmt.setBigDecimal(i + 1, row.getDecimal(i))
case _ => throw new IllegalArgumentException(
s"Can't translate non-null value for field $i")
}
}
i = i + 1
}
stmt.executeUpdate()
}
} finally {
stmt.close()
}
conn.commit()
committed = true
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
conn.rollback()
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.
try {
conn.close()
} catch {
case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
}
}
}
Array[Byte]().iterator
}
/**
* Compute the schema string for this RDD.
*/
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision}},${t.scale}})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}}
if (sb.length < 2) "" else sb.substring(2)
}
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case t: DecimalType => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
})
}
val rddSchema = df.schema
val driver: String = DriverRegistry.getDriverClassName(url)
val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
df.foreachPartition { iterator =>
savePartition(getConnection, table, iterator, rddSchema, nullTypes)
}
}
}

View file

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
private[sql] object InferSchema {

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import java.io.CharArrayWriter
@ -39,9 +39,10 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
import org.apache.spark.util.SerializableConfiguration
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
def format(): String = "json"
class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
override def shortName(): String = "json"
override def createRelation(
sqlContext: SQLContext,

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.sql.catalyst.InternalRow

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import java.io.ByteArrayOutputStream
@ -27,7 +27,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.json.JacksonUtils.nextUntil
import org.apache.spark.sql.execution.datasources.json.JacksonUtils.nextUntil
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import com.fasterxml.jackson.core.{JsonParser, JsonToken}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.util.{Map => JMap}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import scala.collection.JavaConversions._
@ -25,7 +25,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.parquet.schema._
import org.apache.spark.sql.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
import org.apache.spark.sql.execution.datasources.parquet.CatalystSchemaConverter.{MAX_PRECISION_FOR_INT32, MAX_PRECISION_FOR_INT64, maxPrecisionForBytes}
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLConf}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{MapData, ArrayData}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.Serializable
import java.nio.ByteBuffer

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.net.URI
import java.util.logging.{Level, Logger => JLogger}
@ -51,7 +51,7 @@ import org.apache.spark.util.{SerializableConfiguration, Utils}
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
def format(): String = "parquet"
override def shortName(): String = "parquet"
override def createRelation(
sqlContext: SQLContext,

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.math.BigInteger
import java.nio.{ByteBuffer, ByteOrder}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.IOException

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.metric
package org.apache.spark.sql.execution.metric
import org.apache.spark.{Accumulable, AccumulableParam, SparkContext}
@ -93,22 +93,6 @@ private[sql] class LongSQLMetric private[metric](name: String)
}
}
/**
* A specialized int Accumulable to avoid boxing and unboxing when using Accumulator's
* `+=` and `add`.
*/
private[sql] class IntSQLMetric private[metric](name: String)
extends SQLMetric[IntSQLMetricValue, Int](name, IntSQLMetricParam) {
override def +=(term: Int): Unit = {
localValue.add(term)
}
override def add(term: Int): Unit = {
localValue.add(term)
}
}
private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Long] {
override def addAccumulator(r: LongSQLMetricValue, t: Long): LongSQLMetricValue = r.add(t)
@ -121,26 +105,8 @@ private object LongSQLMetricParam extends SQLMetricParam[LongSQLMetricValue, Lon
override def zero: LongSQLMetricValue = new LongSQLMetricValue(0L)
}
private object IntSQLMetricParam extends SQLMetricParam[IntSQLMetricValue, Int] {
override def addAccumulator(r: IntSQLMetricValue, t: Int): IntSQLMetricValue = r.add(t)
override def addInPlace(r1: IntSQLMetricValue, r2: IntSQLMetricValue): IntSQLMetricValue =
r1.add(r2.value)
override def zero(initialValue: IntSQLMetricValue): IntSQLMetricValue = zero
override def zero: IntSQLMetricValue = new IntSQLMetricValue(0)
}
private[sql] object SQLMetrics {
def createIntMetric(sc: SparkContext, name: String): IntSQLMetric = {
val acc = new IntSQLMetric(name)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
val acc = new LongSQLMetric(name)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))

View file

@ -18,12 +18,6 @@
package org.apache.spark.sql
/**
* :: DeveloperApi ::
* An execution engine for relational query plans that runs on top Spark and returns RDDs.
*
* Note that the operators in this package are created automatically by a query planner using a
* [[SQLContext]] and are not intended to be used directly by end users of Spark SQL. They are
* documented here in order to make it easier for others to understand the performance
* characteristics of query plans that are generated by Spark SQL.
* The physical execution component of Spark SQL. Note that this is a private package.
*/
package object execution

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import scala.collection.mutable
@ -26,7 +26,7 @@ import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener with Logging {
@ -51,17 +51,14 @@ private[sql] class SQLListener(sqlContext: SQLContext) extends SparkListener wit
private val completedExecutions = mutable.ListBuffer[SQLExecutionUIData]()
@VisibleForTesting
def executionIdToData: Map[Long, SQLExecutionUIData] = synchronized {
_executionIdToData.toMap
}
@VisibleForTesting
def jobIdToExecutionId: Map[Long, Long] = synchronized {
_jobIdToExecutionId.toMap
}
@VisibleForTesting
def stageIdToStageMetrics: Map[Long, SQLStageMetrics] = synchronized {
_stageIdToStageMetrics.toMap
}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicInteger
@ -38,12 +38,12 @@ private[sql] class SQLTab(sqlContext: SQLContext, sparkUI: SparkUI)
private[sql] object SQLTab {
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/ui/static"
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
private val nextTabId = new AtomicInteger(0)
private def nextTabName: String = {
val nextId = nextTabId.getAndIncrement()
if (nextId == 0) "SQL" else s"SQL${nextId}"
if (nextId == 0) "SQL" else s"SQL$nextId"
}
}

View file

@ -15,14 +15,14 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.metric.{SQLMetricParam, SQLMetricValue}
import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
/**
* A graph used for storing information of an executionPlan of DataFrame.

View file

@ -1,52 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties
import scala.util.Try
/**
* Util functions for JDBC tables.
*/
private[sql] object JdbcUtils {
/**
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties)
}
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, table: String): Boolean = {
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems, considering "table" could also include the database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
}
/**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
}
}

View file

@ -1,250 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
import java.sql.{Connection, Driver, DriverManager, DriverPropertyInfo, PreparedStatement, SQLFeatureNotSupportedException}
import java.util.Properties
import scala.collection.mutable
import org.apache.spark.Logging
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
package object jdbc {
private[sql] object JDBCWriteDetails extends Logging {
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
def insertStatement(conn: Connection, table: String, rddSchema: StructType):
PreparedStatement = {
val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
var fieldsLeft = rddSchema.fields.length
while (fieldsLeft > 0) {
sql.append("?")
if (fieldsLeft > 1) sql.append(", ") else sql.append(")")
fieldsLeft = fieldsLeft - 1
}
conn.prepareStatement(sql.toString)
}
/**
* Saves a partition of a DataFrame to the JDBC database. This is done in
* a single database transaction in order to avoid repeatedly inserting
* data as much as possible.
*
* It is still theoretically possible for rows in a DataFrame to be
* inserted into the database more than once if a stage somehow fails after
* the commit occurs but before the stage can return successfully.
*
* This is not a closure inside saveTable() because apparently cosmetic
* implementation changes elsewhere might easily render such a closure
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
def savePartition(
getConnection: () => Connection,
table: String,
iterator: Iterator[Row],
rddSchema: StructType,
nullTypes: Array[Int]): Iterator[Byte] = {
val conn = getConnection()
var committed = false
try {
conn.setAutoCommit(false) // Everything in the same db transaction.
val stmt = insertStatement(conn, table, rddSchema)
try {
while (iterator.hasNext) {
val row = iterator.next()
val numFields = rddSchema.fields.length
var i = 0
while (i < numFields) {
if (row.isNullAt(i)) {
stmt.setNull(i + 1, nullTypes(i))
} else {
rddSchema.fields(i).dataType match {
case IntegerType => stmt.setInt(i + 1, row.getInt(i))
case LongType => stmt.setLong(i + 1, row.getLong(i))
case DoubleType => stmt.setDouble(i + 1, row.getDouble(i))
case FloatType => stmt.setFloat(i + 1, row.getFloat(i))
case ShortType => stmt.setInt(i + 1, row.getShort(i))
case ByteType => stmt.setInt(i + 1, row.getByte(i))
case BooleanType => stmt.setBoolean(i + 1, row.getBoolean(i))
case StringType => stmt.setString(i + 1, row.getString(i))
case BinaryType => stmt.setBytes(i + 1, row.getAs[Array[Byte]](i))
case TimestampType => stmt.setTimestamp(i + 1, row.getAs[java.sql.Timestamp](i))
case DateType => stmt.setDate(i + 1, row.getAs[java.sql.Date](i))
case t: DecimalType => stmt.setBigDecimal(i + 1, row.getDecimal(i))
case _ => throw new IllegalArgumentException(
s"Can't translate non-null value for field $i")
}
}
i = i + 1
}
stmt.executeUpdate()
}
} finally {
stmt.close()
}
conn.commit()
committed = true
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
conn.rollback()
conn.close()
} else {
// The stage must succeed. We cannot propagate any exception close() might throw.
try {
conn.close()
} catch {
case e: Exception => logWarning("Transaction succeeded, but closing failed", e)
}
}
}
Array[Byte]().iterator
}
/**
* Compute the schema string for this RDD.
*/
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val dialect = JdbcDialects.get(url)
df.schema.fields foreach { field => {
val name = field.name
val typ: String =
dialect.getJDBCType(field.dataType).map(_.databaseTypeDefinition).getOrElse(
field.dataType match {
case IntegerType => "INTEGER"
case LongType => "BIGINT"
case DoubleType => "DOUBLE PRECISION"
case FloatType => "REAL"
case ShortType => "INTEGER"
case ByteType => "BYTE"
case BooleanType => "BIT(1)"
case StringType => "TEXT"
case BinaryType => "BLOB"
case TimestampType => "TIMESTAMP"
case DateType => "DATE"
case t: DecimalType => s"DECIMAL(${t.precision}},${t.scale}})"
case _ => throw new IllegalArgumentException(s"Don't know how to save $field to JDBC")
})
val nullable = if (field.nullable) "" else "NOT NULL"
sb.append(s", $name $typ $nullable")
}}
if (sb.length < 2) "" else sb.substring(2)
}
/**
* Saves the RDD to the database in a single transaction.
*/
def saveTable(
df: DataFrame,
url: String,
table: String,
properties: Properties = new Properties()) {
val dialect = JdbcDialects.get(url)
val nullTypes: Array[Int] = df.schema.fields.map { field =>
dialect.getJDBCType(field.dataType).map(_.jdbcNullType).getOrElse(
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
case LongType => java.sql.Types.BIGINT
case DoubleType => java.sql.Types.DOUBLE
case FloatType => java.sql.Types.REAL
case ShortType => java.sql.Types.INTEGER
case ByteType => java.sql.Types.INTEGER
case BooleanType => java.sql.Types.BIT
case StringType => java.sql.Types.CLOB
case BinaryType => java.sql.Types.BLOB
case TimestampType => java.sql.Types.TIMESTAMP
case DateType => java.sql.Types.DATE
case t: DecimalType => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
})
}
val rddSchema = df.schema
val driver: String = DriverRegistry.getDriverClassName(url)
val getConnection: () => Connection = JDBCRDD.getConnector(driver, url, properties)
df.foreachPartition { iterator =>
JDBCWriteDetails.savePartition(getConnection, table, iterator, rddSchema, nullTypes)
}
}
}
private [sql] class DriverWrapper(val wrapped: Driver) extends Driver {
override def acceptsURL(url: String): Boolean = wrapped.acceptsURL(url)
override def jdbcCompliant(): Boolean = wrapped.jdbcCompliant()
override def getPropertyInfo(url: String, info: Properties): Array[DriverPropertyInfo] = {
wrapped.getPropertyInfo(url, info)
}
override def getMinorVersion: Int = wrapped.getMinorVersion
def getParentLogger: java.util.logging.Logger =
throw new SQLFeatureNotSupportedException(
s"${this.getClass().getName}.getParentLogger is not yet implemented.")
override def connect(url: String, info: Properties): Connection = wrapped.connect(url, info)
override def getMajorVersion: Int = wrapped.getMajorVersion
}
/**
* java.sql.DriverManager is always loaded by bootstrap classloader,
* so it can't load JDBC drivers accessible by Spark ClassLoader.
*
* To solve the problem, drivers from user-supplied jars are wrapped
* into thin wrapper.
*/
private [sql] object DriverRegistry extends Logging {
private val wrapperMap: mutable.Map[String, DriverWrapper] = mutable.Map.empty
def register(className: String): Unit = {
val cls = Utils.getContextOrSparkClassLoader.loadClass(className)
if (cls.getClassLoader == null) {
logTrace(s"$className has been loaded with bootstrap ClassLoader, wrapper is not required")
} else if (wrapperMap.get(className).isDefined) {
logTrace(s"Wrapper for $className already exists")
} else {
synchronized {
if (wrapperMap.get(className).isEmpty) {
val wrapper = new DriverWrapper(cls.newInstance().asInstanceOf[Driver])
DriverManager.registerDriver(wrapper)
wrapperMap(className) = wrapper
logTrace(s"Wrapper for $className registered")
}
}
}
}
def getDriverClassName(url: String): String = DriverManager.getDriver(url) match {
case wrapper: DriverWrapper => wrapper.wrapped.getClass.getCanonicalName
case driver => driver.getClass.getCanonicalName
}
}
} // package object jdbc

View file

@ -43,19 +43,24 @@ import org.apache.spark.util.SerializableConfiguration
* This allows users to give the data source alias as the format type over the fully qualified
* class name.
*
* ex: parquet.DefaultSource.format = "parquet".
*
* A new instance of this class with be instantiated each time a DDL call is made.
*
* @since 1.5.0
*/
@DeveloperApi
trait DataSourceRegister {
/**
* The string that represents the format that this data source provider uses. This is
* overridden by children to provide a nice alias for the data source,
* ex: override def format(): String = "parquet"
* overridden by children to provide a nice alias for the data source. For example:
*
* {{{
* override def format(): String = "parquet"
* }}}
*
* @since 1.5.0
*/
def format(): String
def shortName(): String
}
/**

View file

@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.parquet.test.avro;
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
@ -12,6 +12,6 @@ public interface CompatibilityTest {
@SuppressWarnings("all")
public interface Callback extends CompatibilityTest {
public static final org.apache.avro.Protocol PROTOCOL = org.apache.spark.sql.parquet.test.avro.CompatibilityTest.PROTOCOL;
public static final org.apache.avro.Protocol PROTOCOL = org.apache.spark.sql.execution.datasources.parquet.test.avro.CompatibilityTest.PROTOCOL;
}
}

View file

@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.parquet.test.avro;
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class Nested extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
@ -77,18 +77,18 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Creates a new Nested RecordBuilder */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder() {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder();
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder() {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder();
}
/** Creates a new Nested RecordBuilder by copying an existing Builder */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder(other);
}
/** Creates a new Nested RecordBuilder by copying an existing Nested instance */
public static org.apache.spark.sql.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.Nested other) {
return new org.apache.spark.sql.parquet.test.avro.Nested.Builder(other);
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder(other);
}
/**
@ -102,11 +102,11 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
/** Creates a new Builder */
private Builder() {
super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(org.apache.spark.sql.parquet.test.avro.Nested.Builder other) {
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder other) {
super(other);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
@ -119,8 +119,8 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Creates a Builder by copying an existing Nested instance */
private Builder(org.apache.spark.sql.parquet.test.avro.Nested other) {
super(org.apache.spark.sql.parquet.test.avro.Nested.SCHEMA$);
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested other) {
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.SCHEMA$);
if (isValidValue(fields()[0], other.nested_ints_column)) {
this.nested_ints_column = data().deepCopy(fields()[0].schema(), other.nested_ints_column);
fieldSetFlags()[0] = true;
@ -137,7 +137,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Sets the value of the 'nested_ints_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder setNestedIntsColumn(java.util.List<java.lang.Integer> value) {
validate(fields()[0], value);
this.nested_ints_column = value;
fieldSetFlags()[0] = true;
@ -150,7 +150,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Clears the value of the 'nested_ints_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedIntsColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder clearNestedIntsColumn() {
nested_ints_column = null;
fieldSetFlags()[0] = false;
return this;
@ -162,7 +162,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Sets the value of the 'nested_string_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder setNestedStringColumn(java.lang.String value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder setNestedStringColumn(java.lang.String value) {
validate(fields()[1], value);
this.nested_string_column = value;
fieldSetFlags()[1] = true;
@ -175,7 +175,7 @@ public class Nested extends org.apache.avro.specific.SpecificRecordBase implemen
}
/** Clears the value of the 'nested_string_column' field */
public org.apache.spark.sql.parquet.test.avro.Nested.Builder clearNestedStringColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested.Builder clearNestedStringColumn() {
nested_string_column = null;
fieldSetFlags()[1] = false;
return this;

View file

@ -3,7 +3,7 @@
*
* DO NOT EDIT DIRECTLY
*/
package org.apache.spark.sql.parquet.test.avro;
package org.apache.spark.sql.execution.datasources.parquet.test.avro;
@SuppressWarnings("all")
@org.apache.avro.specific.AvroGenerated
public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
@ -25,7 +25,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
@Deprecated public java.lang.String maybe_string_column;
@Deprecated public java.util.List<java.lang.String> strings_column;
@Deprecated public java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column;
@Deprecated public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column;
@Deprecated public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column;
/**
* Default constructor. Note that this does not initialize fields
@ -37,7 +37,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
/**
* All-args constructor.
*/
public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, java.util.List<java.lang.String> strings_column, java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column, java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column) {
public ParquetAvroCompat(java.lang.Boolean bool_column, java.lang.Integer int_column, java.lang.Long long_column, java.lang.Float float_column, java.lang.Double double_column, java.nio.ByteBuffer binary_column, java.lang.String string_column, java.lang.Boolean maybe_bool_column, java.lang.Integer maybe_int_column, java.lang.Long maybe_long_column, java.lang.Float maybe_float_column, java.lang.Double maybe_double_column, java.nio.ByteBuffer maybe_binary_column, java.lang.String maybe_string_column, java.util.List<java.lang.String> strings_column, java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column, java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column) {
this.bool_column = bool_column;
this.int_column = int_column;
this.long_column = long_column;
@ -101,7 +101,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
case 13: maybe_string_column = (java.lang.String)value$; break;
case 14: strings_column = (java.util.List<java.lang.String>)value$; break;
case 15: string_to_int_column = (java.util.Map<java.lang.String,java.lang.Integer>)value$; break;
case 16: complex_column = (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>)value$; break;
case 16: complex_column = (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>>)value$; break;
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
}
}
@ -349,7 +349,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
/**
* Gets the value of the 'complex_column' field.
*/
public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> getComplexColumn() {
public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> getComplexColumn() {
return complex_column;
}
@ -357,23 +357,23 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
* Sets the value of the 'complex_column' field.
* @param value the value to set.
*/
public void setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> value) {
public void setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> value) {
this.complex_column = value;
}
/** Creates a new ParquetAvroCompat RecordBuilder */
public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder() {
return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder();
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder() {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder();
}
/** Creates a new ParquetAvroCompat RecordBuilder by copying an existing Builder */
public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder other) {
return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder(other);
}
/** Creates a new ParquetAvroCompat RecordBuilder by copying an existing ParquetAvroCompat instance */
public static org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat other) {
return new org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder(other);
public static org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder newBuilder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat other) {
return new org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder(other);
}
/**
@ -398,15 +398,15 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
private java.lang.String maybe_string_column;
private java.util.List<java.lang.String> strings_column;
private java.util.Map<java.lang.String,java.lang.Integer> string_to_int_column;
private java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> complex_column;
private java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> complex_column;
/** Creates a new Builder */
private Builder() {
super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
}
/** Creates a Builder by copying an existing Builder */
private Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder other) {
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder other) {
super(other);
if (isValidValue(fields()[0], other.bool_column)) {
this.bool_column = data().deepCopy(fields()[0].schema(), other.bool_column);
@ -479,8 +479,8 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Creates a Builder by copying an existing ParquetAvroCompat instance */
private Builder(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat other) {
super(org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
private Builder(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat other) {
super(org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.SCHEMA$);
if (isValidValue(fields()[0], other.bool_column)) {
this.bool_column = data().deepCopy(fields()[0].schema(), other.bool_column);
fieldSetFlags()[0] = true;
@ -557,7 +557,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'bool_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setBoolColumn(boolean value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setBoolColumn(boolean value) {
validate(fields()[0], value);
this.bool_column = value;
fieldSetFlags()[0] = true;
@ -570,7 +570,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'bool_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearBoolColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearBoolColumn() {
fieldSetFlags()[0] = false;
return this;
}
@ -581,7 +581,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setIntColumn(int value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setIntColumn(int value) {
validate(fields()[1], value);
this.int_column = value;
fieldSetFlags()[1] = true;
@ -594,7 +594,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearIntColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearIntColumn() {
fieldSetFlags()[1] = false;
return this;
}
@ -605,7 +605,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'long_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setLongColumn(long value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setLongColumn(long value) {
validate(fields()[2], value);
this.long_column = value;
fieldSetFlags()[2] = true;
@ -618,7 +618,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'long_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearLongColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearLongColumn() {
fieldSetFlags()[2] = false;
return this;
}
@ -629,7 +629,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'float_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setFloatColumn(float value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setFloatColumn(float value) {
validate(fields()[3], value);
this.float_column = value;
fieldSetFlags()[3] = true;
@ -642,7 +642,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'float_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearFloatColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearFloatColumn() {
fieldSetFlags()[3] = false;
return this;
}
@ -653,7 +653,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'double_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setDoubleColumn(double value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setDoubleColumn(double value) {
validate(fields()[4], value);
this.double_column = value;
fieldSetFlags()[4] = true;
@ -666,7 +666,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'double_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearDoubleColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearDoubleColumn() {
fieldSetFlags()[4] = false;
return this;
}
@ -677,7 +677,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'binary_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setBinaryColumn(java.nio.ByteBuffer value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setBinaryColumn(java.nio.ByteBuffer value) {
validate(fields()[5], value);
this.binary_column = value;
fieldSetFlags()[5] = true;
@ -690,7 +690,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'binary_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearBinaryColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearBinaryColumn() {
binary_column = null;
fieldSetFlags()[5] = false;
return this;
@ -702,7 +702,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'string_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringColumn(java.lang.String value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringColumn(java.lang.String value) {
validate(fields()[6], value);
this.string_column = value;
fieldSetFlags()[6] = true;
@ -715,7 +715,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'string_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringColumn() {
string_column = null;
fieldSetFlags()[6] = false;
return this;
@ -727,7 +727,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_bool_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBoolColumn(java.lang.Boolean value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBoolColumn(java.lang.Boolean value) {
validate(fields()[7], value);
this.maybe_bool_column = value;
fieldSetFlags()[7] = true;
@ -740,7 +740,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_bool_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBoolColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBoolColumn() {
maybe_bool_column = null;
fieldSetFlags()[7] = false;
return this;
@ -752,7 +752,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeIntColumn(java.lang.Integer value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeIntColumn(java.lang.Integer value) {
validate(fields()[8], value);
this.maybe_int_column = value;
fieldSetFlags()[8] = true;
@ -765,7 +765,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeIntColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeIntColumn() {
maybe_int_column = null;
fieldSetFlags()[8] = false;
return this;
@ -777,7 +777,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_long_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeLongColumn(java.lang.Long value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeLongColumn(java.lang.Long value) {
validate(fields()[9], value);
this.maybe_long_column = value;
fieldSetFlags()[9] = true;
@ -790,7 +790,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_long_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeLongColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeLongColumn() {
maybe_long_column = null;
fieldSetFlags()[9] = false;
return this;
@ -802,7 +802,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_float_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeFloatColumn(java.lang.Float value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeFloatColumn(java.lang.Float value) {
validate(fields()[10], value);
this.maybe_float_column = value;
fieldSetFlags()[10] = true;
@ -815,7 +815,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_float_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeFloatColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeFloatColumn() {
maybe_float_column = null;
fieldSetFlags()[10] = false;
return this;
@ -827,7 +827,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_double_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeDoubleColumn(java.lang.Double value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeDoubleColumn(java.lang.Double value) {
validate(fields()[11], value);
this.maybe_double_column = value;
fieldSetFlags()[11] = true;
@ -840,7 +840,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_double_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeDoubleColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeDoubleColumn() {
maybe_double_column = null;
fieldSetFlags()[11] = false;
return this;
@ -852,7 +852,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_binary_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBinaryColumn(java.nio.ByteBuffer value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeBinaryColumn(java.nio.ByteBuffer value) {
validate(fields()[12], value);
this.maybe_binary_column = value;
fieldSetFlags()[12] = true;
@ -865,7 +865,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_binary_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBinaryColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeBinaryColumn() {
maybe_binary_column = null;
fieldSetFlags()[12] = false;
return this;
@ -877,7 +877,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'maybe_string_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setMaybeStringColumn(java.lang.String value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setMaybeStringColumn(java.lang.String value) {
validate(fields()[13], value);
this.maybe_string_column = value;
fieldSetFlags()[13] = true;
@ -890,7 +890,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'maybe_string_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeStringColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearMaybeStringColumn() {
maybe_string_column = null;
fieldSetFlags()[13] = false;
return this;
@ -902,7 +902,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'strings_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringsColumn(java.util.List<java.lang.String> value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringsColumn(java.util.List<java.lang.String> value) {
validate(fields()[14], value);
this.strings_column = value;
fieldSetFlags()[14] = true;
@ -915,7 +915,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'strings_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringsColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringsColumn() {
strings_column = null;
fieldSetFlags()[14] = false;
return this;
@ -927,7 +927,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Sets the value of the 'string_to_int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setStringToIntColumn(java.util.Map<java.lang.String,java.lang.Integer> value) {
validate(fields()[15], value);
this.string_to_int_column = value;
fieldSetFlags()[15] = true;
@ -940,19 +940,19 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'string_to_int_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearStringToIntColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearStringToIntColumn() {
string_to_int_column = null;
fieldSetFlags()[15] = false;
return this;
}
/** Gets the value of the 'complex_column' field */
public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> getComplexColumn() {
public java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> getComplexColumn() {
return complex_column;
}
/** Sets the value of the 'complex_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>> value) {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder setComplexColumn(java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>> value) {
validate(fields()[16], value);
this.complex_column = value;
fieldSetFlags()[16] = true;
@ -965,7 +965,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
}
/** Clears the value of the 'complex_column' field */
public org.apache.spark.sql.parquet.test.avro.ParquetAvroCompat.Builder clearComplexColumn() {
public org.apache.spark.sql.execution.datasources.parquet.test.avro.ParquetAvroCompat.Builder clearComplexColumn() {
complex_column = null;
fieldSetFlags()[16] = false;
return this;
@ -991,7 +991,7 @@ public class ParquetAvroCompat extends org.apache.avro.specific.SpecificRecordBa
record.maybe_string_column = fieldSetFlags()[13] ? this.maybe_string_column : (java.lang.String) defaultValue(fields()[13]);
record.strings_column = fieldSetFlags()[14] ? this.strings_column : (java.util.List<java.lang.String>) defaultValue(fields()[14]);
record.string_to_int_column = fieldSetFlags()[15] ? this.string_to_int_column : (java.util.Map<java.lang.String,java.lang.Integer>) defaultValue(fields()[15]);
record.complex_column = fieldSetFlags()[16] ? this.complex_column : (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.parquet.test.avro.Nested>>) defaultValue(fields()[16]);
record.complex_column = fieldSetFlags()[16] ? this.complex_column : (java.util.Map<java.lang.String,java.util.List<org.apache.spark.sql.execution.datasources.parquet.test.avro.Nested>>) defaultValue(fields()[16]);
return record;
} catch (Exception e) {
throw new org.apache.avro.AvroRuntimeException(e);

View file

@ -25,8 +25,8 @@ import scala.util.Random
import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.functions._
import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.json.JSONRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SQLTestUtils}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import java.io.{File, StringWriter}
import java.sql.{Date, Timestamp}
@ -28,7 +28,7 @@ import org.apache.spark.sql.{SQLContext, QueryTest, Row, SQLConf}
import org.apache.spark.sql.TestData._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.{ResolvedDataSource, LogicalRelation}
import org.apache.spark.sql.json.InferSchema.compatibleType
import org.apache.spark.sql.execution.datasources.json.InferSchema.compatibleType
import org.apache.spark.sql.types._
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.util.Utils

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.json
package org.apache.spark.sql.execution.datasources.json
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.nio.ByteBuffer
import java.util.{List => JList, Map => JMap}
@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
import org.apache.hadoop.fs.Path
import org.apache.parquet.avro.AvroParquetWriter
import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
import org.apache.spark.sql.execution.datasources.parquet.test.avro.{Nested, ParquetAvroCompat}
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import scala.collection.JavaConversions._

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.parquet.filter2.predicate.Operators._
import org.apache.parquet.filter2.predicate.{FilterPredicate, Operators}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@ -373,7 +373,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest {
// _temporary should be missing if direct output committer works.
try {
configuration.set("spark.sql.parquet.output.committer.class",
"org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
classOf[DirectParquetOutputCommitter].getCanonicalName)
sqlContext.udf.register("div0", (x: Int) => x / 0)
withTempPath { dir =>
intercept[org.apache.spark.SparkException] {

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File
import java.math.BigInteger

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import java.io.File

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.parquet
package org.apache.spark.sql.execution.datasources.parquet
import org.apache.spark.sql.test.TestSQLContext
import org.apache.spark.sql.{Row, SQLContext}

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.metric
package org.apache.spark.sql.execution.metric
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
@ -41,16 +41,6 @@ class SQLMetricsSuite extends SparkFunSuite {
}
}
test("IntSQLMetric should not box Int") {
val l = SQLMetrics.createIntMetric(TestSQLContext.sparkContext, "Int")
val f = () => { l += 1 }
BoxingFinder.getClassReader(f.getClass).foreach { cl =>
val boxingFinder = new BoxingFinder()
cl.accept(boxingFinder, 0)
assert(boxingFinder.boxingInvokes.isEmpty, s"Found boxing: ${boxingFinder.boxingInvokes}")
}
}
test("Normal accumulator should do boxing") {
// We need this test to make sure BoxingFinder works.
val l = TestSQLContext.sparkContext.accumulator(0L)

View file

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.spark.sql.ui
package org.apache.spark.sql.execution.ui
import java.util.Properties
import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.sql.metric.LongSQLMetricValue
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.SQLExecution

View file

@ -51,7 +51,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -75,7 +75,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -92,7 +92,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -107,7 +107,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -122,7 +122,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -139,7 +139,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -158,7 +158,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -175,7 +175,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable (a int, b string)
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -188,7 +188,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS
@ -199,7 +199,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
sql(
s"""
|CREATE TEMPORARY TABLE jsonTable
|USING org.apache.spark.sql.json.DefaultSource
|USING json
|OPTIONS (
| path '${path.toString}'
|) AS

View file

@ -20,44 +20,7 @@ package org.apache.spark.sql.sources
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.types.{StringType, StructField, StructType}
class FakeSourceOne extends RelationProvider with DataSourceRegister {
def format(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
class FakeSourceTwo extends RelationProvider with DataSourceRegister {
def format(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
class FakeSourceThree extends RelationProvider with DataSourceRegister {
def format(): String = "gathering quorum"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
// please note that the META-INF/services had to be modified for the test directory for this to work
class DDLSourceLoadSuite extends DataSourceTest {
@ -77,9 +40,49 @@ class DDLSourceLoadSuite extends DataSourceTest {
.load().schema == StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
test("Loading Orc") {
test("should fail to load ORC without HiveContext") {
intercept[ClassNotFoundException] {
caseInsensitiveContext.read.format("orc").load()
}
}
}
class FakeSourceOne extends RelationProvider with DataSourceRegister {
def shortName(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
class FakeSourceTwo extends RelationProvider with DataSourceRegister {
def shortName(): String = "Fluet da Bomb"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}
class FakeSourceThree extends RelationProvider with DataSourceRegister {
def shortName(): String = "gathering quorum"
override def createRelation(cont: SQLContext, param: Map[String, String]): BaseRelation =
new BaseRelation {
override def sqlContext: SQLContext = cont
override def schema: StructType =
StructType(Seq(StructField("stringType", StringType, nullable = false)))
}
}

View file

@ -22,14 +22,39 @@ import org.apache.spark.sql.execution.datasources.ResolvedDataSource
class ResolvedDataSourceSuite extends SparkFunSuite {
test("builtin sources") {
assert(ResolvedDataSource.lookupDataSource("jdbc") ===
classOf[org.apache.spark.sql.jdbc.DefaultSource])
test("jdbc") {
assert(
ResolvedDataSource.lookupDataSource("jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.jdbc") ===
classOf[org.apache.spark.sql.execution.datasources.jdbc.DefaultSource])
}
assert(ResolvedDataSource.lookupDataSource("json") ===
classOf[org.apache.spark.sql.json.DefaultSource])
test("json") {
assert(
ResolvedDataSource.lookupDataSource("json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.json") ===
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource])
}
assert(ResolvedDataSource.lookupDataSource("parquet") ===
classOf[org.apache.spark.sql.parquet.DefaultSource])
test("parquet") {
assert(
ResolvedDataSource.lookupDataSource("parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.execution.datasources.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
assert(
ResolvedDataSource.lookupDataSource("org.apache.spark.sql.parquet") ===
classOf[org.apache.spark.sql.execution.datasources.parquet.DefaultSource])
}
}

View file

@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}

View file

@ -32,7 +32,6 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.rdd.{HadoopRDD, RDD}
import org.apache.spark.sql.catalyst.InternalRow
@ -49,9 +48,9 @@ import scala.collection.JavaConversions._
private[sql] class DefaultSource extends HadoopFsRelationProvider with DataSourceRegister {
def format(): String = "orc"
override def shortName(): String = "orc"
def createRelation(
override def createRelation(
sqlContext: SQLContext,
paths: Array[String],
dataSchema: Option[StructType],
@ -144,7 +143,6 @@ private[orc] class OrcOutputWriter(
}
}
@DeveloperApi
private[sql] class OrcRelation(
override val paths: Array[String],
maybeDataSchema: Option[StructType],

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.hive
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetTest
import org.apache.spark.sql.{QueryTest, Row}
case class Cases(lower: String, UPPER: String)

View file

@ -32,7 +32,7 @@ import org.apache.spark.sql.hive.client.{HiveTable, ManagedTable}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

View file

@ -20,7 +20,7 @@ package org.apache.spark.sql.hive
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {

View file

@ -30,7 +30,7 @@ import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.hive.{HiveContext, HiveQLDialect, MetastoreRelation}
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.CalendarInterval

View file

@ -28,7 +28,7 @@ import org.apache.spark.sql.hive.execution.HiveTableScan
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

View file

@ -23,12 +23,12 @@ import com.google.common.io.Files
import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.{AnalysisException, SaveMode, parquet}
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String = classOf[parquet.DefaultSource].getCanonicalName
override val dataSourceName: String = "parquet"
import sqlContext._
import sqlContext.implicits._

View file

@ -53,7 +53,7 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
class JsonHadoopFsRelationSuite extends HadoopFsRelationTest {
override val dataSourceName: String =
classOf[org.apache.spark.sql.json.DefaultSource].getCanonicalName
classOf[org.apache.spark.sql.execution.datasources.json.DefaultSource].getCanonicalName
import sqlContext._