[SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.

Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API.

Author: Reynold Xin <rxin@databricks.com>

Closes #6210 from rxin/df-writer-reader-jdbc and squashes the following commits:

7465c2c [Reynold Xin] Fixed unit test.
118e609 [Reynold Xin] Updated tests.
3441b57 [Reynold Xin] Updated javadoc.
13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.

(cherry picked from commit 517eb37a85)
Signed-off-by: Reynold Xin <rxin@databricks.com>
This commit is contained in:
Reynold Xin 2015-05-16 22:01:53 -07:00
parent 84949104c9
commit 17e078671e
20 changed files with 880 additions and 894 deletions

View file

@ -94,7 +94,7 @@ public class JavaSparkSQL {
System.out.println("=== Data source: Parquet File ===");
// DataFrames can be saved as parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");
schemaPeople.write().parquet("people.parquet");
// Read in the parquet file created above.
// Parquet files are self-describing so the schema is preserved.
@ -151,7 +151,7 @@ public class JavaSparkSQL {
List<String> jsonData = Arrays.asList(
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());
DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
// Take a look at the schema of this new DataFrame.
peopleFromJsonRDD.printSchema();

View file

@ -18,7 +18,6 @@
package org.apache.spark.sql
import java.io.CharArrayWriter
import java.sql.DriverManager
import java.util.Properties
import scala.collection.JavaConversions._
@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JacksonGenerator
import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
import org.apache.spark.sql.sources.CreateTableUsingAsSelect
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
@ -227,10 +225,6 @@ class DataFrame private[sql](
}
}
/** Left here for backward compatibility. */
@deprecated("1.3.0", "use toDF")
def toSchemaRDD: DataFrame = this
/**
* Returns the object itself.
* @group basic
@ -1299,230 +1293,6 @@ class DataFrame private[sql](
@Experimental
def write: DataFrameWriter = new DataFrameWriter(this)
/**
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
* @group output
* @since 1.3.0
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}
/**
* Creates a table from the the contents of this DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
* This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String): Unit = {
write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}
/**
* Creates a table from the the contents of this DataFrame, using the default data source
* configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
// If table already exists and the save mode is Append,
// we will just call insertInto to append the contents of this DataFrame.
insertInto(tableName, overwrite = false)
} else {
write.mode(mode).saveAsTable(tableName)
}
}
/**
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source and a set of options,
* using [[SaveMode.ErrorIfExists]] as the save mode.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String): Unit = {
write.format(source).saveAsTable(tableName)
}
/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
write.format(source).mode(mode).saveAsTable(tableName)
}
/**
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
* (Scala-specific)
* Creates a table from the the contents of this DataFrame based on a given data source,
* [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
* Saves the contents of this DataFrame to the given path,
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.save(path)", "1.4.0")
def save(path: String): Unit = {
write.save(path)
}
/**
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.mode(mode).save(path)", "1.4.0")
def save(path: String, mode: SaveMode): Unit = {
write.mode(mode).save(path)
}
/**
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).save(path)", "1.4.0")
def save(path: String, source: String): Unit = {
write.format(source).save(path)
}
/**
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
def save(path: String, source: String, mode: SaveMode): Unit = {
write.format(source).mode(mode).save(path)
}
/**
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).save()
}
/**
* (Scala-specific)
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
* @group output
* @since 1.3.0
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).save()
}
/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
@ -1576,100 +1346,6 @@ class DataFrame private[sql](
}
}
////////////////////////////////////////////////////////////////////////////
// JDBC Write Support
////////////////////////////////////////////////////////////////////////////
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
* If you pass `true` for `allowExisting`, it will drop any table with the
* given name; if you pass `false`, it will throw if the table already
* exists.
* @group output
* @since 1.3.0
*/
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
createJDBCTable(url, table, allowExisting, new Properties())
}
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
* using connection properties defined in `properties`.
* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
* If you pass `true` for `allowExisting`, it will drop any table with the
* given name; if you pass `false`, it will throw if the table already
* exists.
* @group output
* @since 1.4.0
*/
def createJDBCTable(
url: String,
table: String,
allowExisting: Boolean,
properties: Properties): Unit = {
val conn = DriverManager.getConnection(url, properties)
try {
if (allowExisting) {
val sql = s"DROP TABLE IF EXISTS $table"
conn.prepareStatement(sql).executeUpdate()
}
val schema = JDBCWriteDetails.schemaString(this, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
JDBCWriteDetails.saveTable(this, url, table, properties)
}
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
* Assumes the table already exists and has a compatible schema. If you
* pass `true` for `overwrite`, it will `TRUNCATE` the table before
* performing the `INSERT`s.
*
* The table must already exist on the database. It must have a schema
* that is compatible with the schema of this RDD; inserting the rows of
* the RDD in order via the simple statement
* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
* @group output
* @since 1.3.0
*/
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
insertIntoJDBC(url, table, overwrite, new Properties())
}
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
* using connection properties defined in `properties`.
* Assumes the table already exists and has a compatible schema. If you
* pass `true` for `overwrite`, it will `TRUNCATE` the table before
* performing the `INSERT`s.
*
* The table must already exist on the database. It must have a schema
* that is compatible with the schema of this RDD; inserting the rows of
* the RDD in order via the simple statement
* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
* @group output
* @since 1.4.0
*/
def insertIntoJDBC(
url: String,
table: String,
overwrite: Boolean,
properties: Properties): Unit = {
if (overwrite) {
val conn = DriverManager.getConnection(url, properties)
try {
val sql = s"TRUNCATE TABLE $table"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
}
JDBCWriteDetails.saveTable(this, url, table, properties)
}
////////////////////////////////////////////////////////////////////////////
// for Python API
////////////////////////////////////////////////////////////////////////////
@ -1682,4 +1358,264 @@ class DataFrame private[sql](
val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
SerDeUtil.javaToPython(jrdd)
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
/** Left here for backward compatibility. */
@deprecated("use toDF", "1.3.0")
def toSchemaRDD: DataFrame = this
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
* If you pass `true` for `allowExisting`, it will drop any table with the
* given name; if you pass `false`, it will throw if the table already
* exists.
* @group output
*/
@deprecated("Use write.jdbc()", "1.4.0")
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
val w = if (allowExisting) write.mode(SaveMode.Overwrite) else write
w.jdbc(url, table, new Properties)
}
/**
* Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
* Assumes the table already exists and has a compatible schema. If you
* pass `true` for `overwrite`, it will `TRUNCATE` the table before
* performing the `INSERT`s.
*
* The table must already exist on the database. It must have a schema
* that is compatible with the schema of this RDD; inserting the rows of
* the RDD in order via the simple statement
* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
* @group output
*/
@deprecated("Use write.jdbc()", "1.4.0")
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
w.jdbc(url, table, new Properties)
}
/**
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
* @group output
*/
@deprecated("Use write.parquet(path)", "1.4.0")
def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
}
/**
* Creates a table from the the contents of this DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
* This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String): Unit = {
write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
}
/**
* Creates a table from the the contents of this DataFrame, using the default data source
* configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
// If table already exists and the save mode is Append,
// we will just call insertInto to append the contents of this DataFrame.
insertInto(tableName, overwrite = false)
} else {
write.mode(mode).saveAsTable(tableName)
}
}
/**
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source and a set of options,
* using [[SaveMode.ErrorIfExists]] as the save mode.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String): Unit = {
write.format(source).saveAsTable(tableName)
}
/**
* :: Experimental ::
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
write.format(source).mode(mode).saveAsTable(tableName)
}
/**
* Creates a table at the given path from the the contents of this DataFrame
* based on a given data source, [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
* (Scala-specific)
* Creates a table from the the contents of this DataFrame based on a given data source,
* [[SaveMode]] specified by mode, and a set of options.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*
* Also note that while this function can persist the table metadata into Hive's metastore,
* the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
* @group output
*/
@deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
"1.4.0")
def saveAsTable(
tableName: String,
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).saveAsTable(tableName)
}
/**
* Saves the contents of this DataFrame to the given path,
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
*/
@deprecated("Use write.save(path)", "1.4.0")
def save(path: String): Unit = {
write.save(path)
}
/**
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
* @group output
*/
@deprecated("Use write.mode(mode).save(path)", "1.4.0")
def save(path: String, mode: SaveMode): Unit = {
write.mode(mode).save(path)
}
/**
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
*/
@deprecated("Use write.format(source).save(path)", "1.4.0")
def save(path: String, source: String): Unit = {
write.format(source).save(path)
}
/**
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
* @group output
*/
@deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
def save(path: String, source: String, mode: SaveMode): Unit = {
write.format(source).mode(mode).save(path)
}
/**
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
* @group output
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: java.util.Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).save()
}
/**
* (Scala-specific)
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
* @group output
*/
@deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
def save(
source: String,
mode: SaveMode,
options: Map[String, String]): Unit = {
write.format(source).mode(mode).options(options).save()
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// End of eeprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
}

View file

@ -17,12 +17,16 @@
package org.apache.spark.sql
import java.util.Properties
import org.apache.hadoop.fs.Path
import org.apache.spark.Partition
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
import org.apache.spark.sql.parquet.ParquetRelation2
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
@ -31,7 +35,7 @@ import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
* Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
* key-value stores, etc).
* key-value stores, etc). Use [[SQLContext.read]] to access this.
*
* @since 1.4.0
*/
@ -94,6 +98,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
* Specifies the input partitioning. If specified, the underlying data source does not need to
* discover the data partitioning scheme, and thus can speed up very large inputs.
*
* This is only applicable for Parquet at the moment.
*
* @since 1.4.0
*/
@scala.annotation.varargs
@ -128,6 +134,87 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
DataFrame(sqlContext, LogicalRelation(resolved.relation))
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
*
* @since 1.4.0
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride
* @param upperBound the maximum value of `columnName` used to decide partition stride
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
*
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
connectionProperties: Properties): DataFrame = {
val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, connectionProperties)
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table using connection properties. The `predicates` parameter gives a list
* expressions suitable for inclusion in WHERE clauses; each one defines one partition
* of the [[DataFrame]].
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param predicates Condition in the where clause for each partition.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
* @since 1.4.0
*/
def jdbc(
url: String,
table: String,
predicates: Array[String],
connectionProperties: Properties): DataFrame = {
val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
jdbc(url, table, parts, connectionProperties)
}
private def jdbc(
url: String,
table: String,
parts: Array[Partition],
connectionProperties: Properties): DataFrame = {
val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
sqlContext.baseRelationToDataFrame(relation)
}
/**
* Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
*

View file

@ -17,14 +17,17 @@
package org.apache.spark.sql
import java.util.Properties
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
/**
* :: Experimental ::
* Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
* key-value stores, etc).
* key-value stores, etc). Use [[DataFrame.write]] to access this.
*
* @since 1.4.0
*/
@ -110,6 +113,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
* Partitions the output by the given columns on the file system. If specified, the output is
* laid out on the file system similar to Hive's partitioning scheme.
*
* This is only applicable for Parquet at the moment.
*
* @since 1.4.0
*/
@scala.annotation.varargs
@ -161,6 +166,52 @@ final class DataFrameWriter private[sql](df: DataFrame) {
df.sqlContext.executePlan(cmd).toRdd
}
/**
* Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
* table already exists in the external database, behavior of this function depends on the
* save mode, specified by the `mode` function (default to throwing an exception).
*
* Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
* your external database systems.
*
* @param url JDBC database url of the form `jdbc:subprotocol:subname`
* @param table Name of the table in the external database.
* @param connectionProperties JDBC database connection arguments, a list of arbitrary string
* tag/value. Normally at least a "user" and "password" property
* should be included.
*/
def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
val conn = JdbcUtils.createConnection(url, connectionProperties)
try {
var tableExists = JdbcUtils.tableExists(conn, table)
if (mode == SaveMode.Ignore && tableExists) {
return
}
if (mode == SaveMode.ErrorIfExists && tableExists) {
sys.error(s"Table $table already exists.")
}
if (mode == SaveMode.Overwrite && tableExists) {
JdbcUtils.dropTable(conn, table)
tableExists = false
}
// Create the table if the table didn't exist.
if (!tableExists) {
val schema = JDBCWriteDetails.schemaString(df, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
}
} finally {
conn.close()
}
JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
}
/**
* Saves the content of the [[DataFrame]] in JSON format at the specified path.
* This is equivalent to:

View file

@ -28,6 +28,7 @@ import scala.util.control.NonFatal
import com.google.common.reflect.TypeToken
import org.apache.spark.SparkContext
import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
import org.apache.spark.rdd.RDD
@ -40,11 +41,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.ParserDialect
import org.apache.spark.sql.execution.{Filter, _}
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
* The entry point for working with structured data (rows and columns) in Spark. Allows the
@ -531,67 +530,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
createDataFrame(rdd.rdd, beanClass)
}
/**
* :: DeveloperApi ::
* Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
* It is important to make sure that the structure of every [[Row]] of the provided RDD matches
* the provided schema. Otherwise, there will be runtime exception.
* Example:
* {{{
* import org.apache.spark.sql._
* import org.apache.spark.sql.types._
* val sqlContext = new org.apache.spark.sql.SQLContext(sc)
*
* val schema =
* StructType(
* StructField("name", StringType, false) ::
* StructField("age", IntegerType, true) :: Nil)
*
* val people =
* sc.textFile("examples/src/main/resources/people.txt").map(
* _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
* val dataFrame = sqlContext. applySchema(people, schema)
* dataFrame.printSchema
* // root
* // |-- name: string (nullable = false)
* // |-- age: integer (nullable = true)
*
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
*/
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}
/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}
/**
* Applies a schema to an RDD of Java Beans.
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
*/
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}
/**
* :: Experimental ::
* Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
@ -606,205 +544,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
@Experimental
def read: DataFrameReader = new DataFrameReader(this)
/**
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.parquet()", "1.4.0")
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
read.parquet(paths : _*)
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
}
}
/**
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String): DataFrame = {
read.json(path)
}
/**
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String, schema: StructType): DataFrame = {
read.schema(schema).json(path)
}
/**
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(path)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}
/**
* Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.load(path)", "1.4.0")
def load(path: String): DataFrame = {
read.load(path)
}
/**
* Returns the dataset stored at path as a DataFrame, using the given data source.
*
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.format(source).load(path)", "1.4.0")
def load(path: String, source: String): DataFrame = {
read.format(source).load(path)
}
/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}
/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}
/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(
source: String,
schema: StructType,
options: java.util.Map[String, String]): DataFrame = {
read.format(source).schema(schema).options(options).load()
}
/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
* @group genericdata
* @since 1.3.0
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
read.format(source).schema(schema).options(options).load()
}
/**
* :: Experimental ::
* Creates an external table from the given path and returns the corresponding DataFrame.
@ -923,132 +662,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
table(tableName)
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table.
*
* @group specificdata
* @since 1.3.0
*/
@Experimental
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table and connection properties.
*
* @group specificdata
* @since 1.4.0
*/
@Experimental
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null), properties)
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride
* @param upperBound the maximum value of `columnName` used to decide partition stride
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
* @group specificdata
* @since 1.3.0
*/
@Experimental
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): DataFrame = {
jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties())
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride
* @param upperBound the maximum value of `columnName` used to decide partition stride
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
* @param properties connection properties
* @group specificdata
* @since 1.4.0
*/
@Experimental
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int,
properties: Properties): DataFrame = {
val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
val parts = JDBCRelation.columnPartition(partitioning)
jdbc(url, table, parts, properties)
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
* of the [[DataFrame]].
*
* @group specificdata
* @since 1.3.0
*/
@Experimental
def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
jdbc(url, table, theParts, new Properties())
}
/**
* :: Experimental ::
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table using connection properties. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
* of the [[DataFrame]].
*
* @group specificdata
* @since 1.4.0
*/
@Experimental
def jdbc(
url: String,
table: String,
theParts: Array[String],
properties: Properties): DataFrame = {
val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
jdbc(url, table, parts, properties)
}
private def jdbc(
url: String,
table: String,
parts: Array[Partition],
properties: Properties): DataFrame = {
val relation = JDBCRelation(url, table, parts, properties)(this)
baseRelationToDataFrame(relation)
}
/**
* Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
* only during the lifetime of this instance of SQLContext.
@ -1372,6 +985,263 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// Deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
createDataFrame(rowRDD, schema)
}
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd, beanClass)
}
/**
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
* @group specificdata
*/
@deprecated("Use read.parquet()", "1.4.0")
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
if (paths.isEmpty) {
emptyDataFrame
} else if (conf.parquetUseDataSourceApi) {
read.parquet(paths : _*)
} else {
DataFrame(this, parquet.ParquetRelation(
paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
}
}
/**
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String): DataFrame = {
read.json(path)
}
/**
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String, schema: StructType): DataFrame = {
read.schema(schema).json(path)
}
/**
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(path)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
/**
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}
/**
* Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
read.schema(schema).json(json)
}
/**
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@deprecated("Use read.json()", "1.4.0")
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
read.option("samplingRatio", samplingRatio.toString).json(json)
}
/**
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
*/
@deprecated("Use read.load(path)", "1.4.0")
def load(path: String): DataFrame = {
read.load(path)
}
/**
* Returns the dataset stored at path as a DataFrame, using the given data source.
*
* @group genericdata
*/
@deprecated("Use read.format(source).load(path)", "1.4.0")
def load(path: String, source: String): DataFrame = {
read.format(source).load(path)
}
/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
*/
@deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}
/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
*/
@deprecated("Use read.format(source).options(options).load()", "1.4.0")
def load(source: String, options: Map[String, String]): DataFrame = {
read.options(options).format(source).load()
}
/**
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame =
{
read.format(source).schema(schema).options(options).load()
}
/**
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
*/
@deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = {
read.format(source).schema(schema).options(options).load()
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table.
*
* @group specificdata
*/
@deprecated("use read.jdbc()", "1.4.0")
def jdbc(url: String, table: String): DataFrame = {
read.jdbc(url, table, new Properties)
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` used to decide partition stride
* @param upperBound the maximum value of `columnName` used to decide partition stride
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
* @group specificdata
*/
@deprecated("use read.jdbc()", "1.4.0")
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): DataFrame = {
read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties)
}
/**
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
* of the [[DataFrame]].
*
* @group specificdata
*/
@deprecated("use read.jdbc()", "1.4.0")
def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
read.jdbc(url, table, theParts, new Properties)
}
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
// End of eeprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
}

View file

@ -29,7 +29,16 @@ import org.apache.spark.sql.catalyst.util.DateUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.sources._
/**
* Data corresponding to one partition of a JDBCRDD.
*/
private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
override def index: Int = idx
}
private[sql] object JDBCRDD extends Logging {
/**
* Maps a JDBC type to a Catalyst type. This function is called only when
* the DriverQuirks class corresponding to your database driver returns null.
@ -168,6 +177,7 @@ private[sql] object JDBCRDD extends Logging {
DriverManager.getConnection(url, properties)
}
}
/**
* Build and return JDBCRDD from the given information.
*
@ -193,18 +203,14 @@ private[sql] object JDBCRDD extends Logging {
requiredColumns: Array[String],
filters: Array[Filter],
parts: Array[Partition]): RDD[Row] = {
val prunedSchema = pruneSchema(schema, requiredColumns)
return new
JDBCRDD(
sc,
getConnector(driver, url, properties),
prunedSchema,
fqTable,
requiredColumns,
filters,
parts)
new JDBCRDD(
sc,
getConnector(driver, url, properties),
pruneSchema(schema, requiredColumns),
fqTable,
requiredColumns,
filters,
parts)
}
}

View file

@ -17,26 +17,16 @@
package org.apache.spark.sql.jdbc
import java.sql.DriverManager
import java.util.Properties
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Partition
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
/**
* Data corresponding to one partition of a JDBCRDD.
*/
private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
override def index: Int = idx
}
/**
* Instructions on how to partition the table among workers.
@ -152,6 +142,8 @@ private[sql] case class JDBCRelation(
}
override def insert(data: DataFrame, overwrite: Boolean): Unit = {
data.insertIntoJDBC(url, table, overwrite, properties)
data.write
.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
.jdbc(url, table, properties)
}
}

View file

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
import java.sql.{Connection, DriverManager}
import java.util.Properties
import scala.util.Try
/**
* Util functions for JDBC tables.
*/
private[sql] object JdbcUtils {
/**
* Establishes a JDBC connection.
*/
def createConnection(url: String, connectionProperties: Properties): Connection = {
DriverManager.getConnection(url, connectionProperties)
}
/**
* Returns true if the table already exists in the JDBC database.
*/
def tableExists(conn: Connection, table: String): Boolean = {
// Somewhat hacky, but there isn't a good way to identify whether a table exists for all
// SQL database systems, considering "table" could also include the database name.
Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
}
/**
* Drops a table from the JDBC database.
*/
def dropTable(conn: Connection, table: String): Unit = {
conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
}
}

View file

@ -163,8 +163,8 @@ package object jdbc {
table: String,
properties: Properties = new Properties()) {
val quirks = DriverQuirks.get(url)
var nullTypes: Array[Int] = df.schema.fields.map(field => {
var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
val nullTypes: Array[Int] = df.schema.fields.map { field =>
val nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
if (nullType.isEmpty) {
field.dataType match {
case IntegerType => java.sql.Types.INTEGER
@ -183,7 +183,7 @@ package object jdbc {
s"Can't translate null value for field $field")
}
} else nullType.get
}).toArray
}
val rddSchema = df.schema
df.foreachPartition { iterator =>

View file

@ -187,14 +187,14 @@ public class JavaApplySchemaSuite implements Serializable {
null,
"this is another simple string."));
DataFrame df1 = sqlContext.jsonRDD(jsonRDD);
DataFrame df1 = sqlContext.read().json(jsonRDD);
StructType actualSchema1 = df1.schema();
Assert.assertEquals(expectedSchema, actualSchema1);
df1.registerTempTable("jsonTable1");
List<Row> actual1 = sqlContext.sql("select * from jsonTable1").collectAsList();
Assert.assertEquals(expectedResult, actual1);
DataFrame df2 = sqlContext.jsonRDD(jsonRDD, expectedSchema);
DataFrame df2 = sqlContext.read().schema(expectedSchema).json(jsonRDD);
StructType actualSchema2 = df2.schema();
Assert.assertEquals(expectedSchema, actualSchema2);
df2.registerTempTable("jsonTable2");

View file

@ -67,7 +67,7 @@ public class JavaSaveLoadSuite {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
df = sqlContext.jsonRDD(rdd);
df = sqlContext.read().json(rdd);
df.registerTempTable("jsonTable");
}
@ -75,10 +75,8 @@ public class JavaSaveLoadSuite {
public void saveAndLoad() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.save("json", SaveMode.ErrorIfExists, options);
df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
checkAnswer(loadedDF, df.collectAsList());
}
@ -86,12 +84,12 @@ public class JavaSaveLoadSuite {
public void saveAndLoadWithSchema() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.save("json", SaveMode.ErrorIfExists, options);
df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame loadedDF = sqlContext.load("json", schema, options);
DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load();
checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
}

View file

@ -221,22 +221,25 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("Basic API") {
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3)
assert(TestSQLContext.read.jdbc(
urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
}
test("Partitioning via JDBCPartitioningInfo API") {
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3)
.collect.size === 3)
assert(
TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
.collect().length === 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts).collect().size === 3)
assert(TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
.collect().length === 3)
}
test("H2 integral types") {
val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows(0).getInt(0) === 1)
assert(rows(0).getBoolean(1) === false)
assert(rows(0).getInt(2) === 3)
@ -246,7 +249,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
test("H2 null entries") {
val rows = sql("SELECT * FROM inttypes WHERE A IS NULL").collect()
assert(rows.size === 1)
assert(rows.length === 1)
assert(rows(0).isNullAt(0))
assert(rows(0).isNullAt(1))
assert(rows(0).isNullAt(2))
@ -286,24 +289,28 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("test DATE types") {
val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
val cachedRows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().collect()
val rows = TestSQLContext.read.jdbc(
urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
val cachedRows = TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
.cache().collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(rows(1).getAs[java.sql.Date](1) === null)
assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
}
test("test DATE types in cache") {
val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
TestSQLContext
.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().registerTempTable("mycached_date")
val rows =
TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
.cache().registerTempTable("mycached_date")
val cachedRows = sql("select * from mycached_date").collect()
assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
}
test("test types for null value") {
val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.NULLTYPES").collect()
val rows = TestSQLContext.read.jdbc(
urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
assert((0 to 14).forall(i => rows(0).isNullAt(i)))
}

View file

@ -22,7 +22,7 @@ import java.util.Properties
import org.scalatest.{BeforeAndAfter, FunSuite}
import org.apache.spark.sql.Row
import org.apache.spark.sql.{SaveMode, Row}
import org.apache.spark.sql.test._
import org.apache.spark.sql.types._
@ -90,64 +90,66 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
test("Basic CREATE") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
df.createJDBCTable(url, "TEST.BASICCREATETEST", false)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties)
assert(2 == TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
assert(2 ==
TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
}
test("CREATE with overwrite") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
df.createJDBCTable(url1, "TEST.DROPTEST", false, properties)
assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
assert(3 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
df.write.jdbc(url1, "TEST.DROPTEST", properties)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
assert(3 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
df2.createJDBCTable(url1, "TEST.DROPTEST", true, properties)
assert(1 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
assert(1 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
}
test("CREATE then INSERT to append") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
df.createJDBCTable(url, "TEST.APPENDTEST", false)
df2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
df.write.jdbc(url, "TEST.APPENDTEST", new Properties)
df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties)
assert(3 == TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).count)
assert(2 ==
TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length)
}
test("CREATE then INSERT to truncate") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
df.createJDBCTable(url1, "TEST.TRUNCATETEST", false, properties)
df2.insertIntoJDBC(url1, "TEST.TRUNCATETEST", true, properties)
assert(1 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
assert(1 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
}
test("Incompatible INSERT to append") {
val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false)
df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
intercept[org.apache.spark.SparkException] {
df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
}
}
test("INSERT to JDBC Datasource") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
test("INSERT to JDBC Datasource with overwrite") {
TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
}
}

View file

@ -81,7 +81,7 @@ public class JavaMetastoreDataSourcesSuite {
jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
}
JavaRDD<String> rdd = sc.parallelize(jsonObjects);
df = sqlContext.jsonRDD(rdd);
df = sqlContext.read().json(rdd);
df.registerTempTable("jsonTable");
}
@ -96,7 +96,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
df.write()
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.options(options)
.saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@ -115,7 +119,11 @@ public class JavaMetastoreDataSourcesSuite {
public void saveExternalTableWithSchemaAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
df.write()
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.options(options)
.saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),
@ -138,7 +146,11 @@ public class JavaMetastoreDataSourcesSuite {
@Test
public void saveTableAndQueryIt() {
Map<String, String> options = new HashMap<String, String>();
df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
df.write()
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.options(options)
.saveAsTable("javaSavedTable");
checkAnswer(
sqlContext.sql("SELECT * FROM javaSavedTable"),

View file

@ -162,7 +162,7 @@ class CachedTableSuite extends QueryTest {
test("REFRESH TABLE also needs to recache the data (data source tables)") {
val tempPath: File = Utils.createTempDir()
tempPath.delete()
table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
sql("DROP TABLE IF EXISTS refreshTable")
createExternalTable("refreshTable", tempPath.toString, "parquet")
checkAnswer(
@ -172,7 +172,7 @@ class CachedTableSuite extends QueryTest {
sql("CACHE TABLE refreshTable")
assertCached(table("refreshTable"))
// Append new data.
table("src").save(tempPath.toString, "parquet", SaveMode.Append)
table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
// We are still using the old data.
assertCached(table("refreshTable"))
checkAnswer(

View file

@ -409,11 +409,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = jsonRDD(rdd)
val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
df.saveAsTable("savedJsonTable")
df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@ -443,11 +443,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = jsonRDD(rdd)
val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
// Save the df as a managed table (by not specifiying the path).
df.saveAsTable("savedJsonTable")
df.write.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@ -455,17 +455,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Right now, we cannot append to an existing JSON table.
intercept[RuntimeException] {
df.saveAsTable("savedJsonTable", SaveMode.Append)
df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
}
// We can overwrite it.
df.saveAsTable("savedJsonTable", SaveMode.Overwrite)
df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
// When the save mode is Ignore, we will do nothing when the table already exists.
df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore)
df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
assert(df.schema === table("savedJsonTable").schema)
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
@ -479,11 +479,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
// Create an external table by specifying the path.
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
df.saveAsTable(
"savedJsonTable",
"org.apache.spark.sql.json",
SaveMode.Append,
Map("path" -> tempPath.toString))
df.write
.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.option("path", tempPath.toString)
.saveAsTable("savedJsonTable")
checkAnswer(
sql("SELECT * FROM savedJsonTable"),
df.collect())
@ -501,14 +501,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
val originalDefaultSource = conf.defaultDataSourceName
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
val df = jsonRDD(rdd)
val df = read.json(rdd)
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
df.saveAsTable(
"savedJsonTable",
"org.apache.spark.sql.json",
SaveMode.Append,
Map("path" -> tempPath.toString))
df.write.format("org.apache.spark.sql.json")
.mode(SaveMode.Append)
.option("path", tempPath.toString)
.saveAsTable("savedJsonTable")
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
createExternalTable("createdJsonTable", tempPath.toString)
@ -566,7 +565,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
jsonRDD(rdd).registerTempTable("jt")
read.json(rdd).registerTempTable("jt")
sql(
"""
|create table test_parquet_ctas STORED AS parquET
@ -601,7 +600,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
val df2 =
createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
@ -610,10 +609,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("arrayInParquet", overwrite = false)
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
.saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
.saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
.mode(SaveMode.Append).saveAsTable("arrayInParquet")
refreshTable("arrayInParquet")
checkAnswer(
@ -634,7 +633,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructType(
StructField("a", mapType1, nullable = true) :: Nil)
assert(df1.schema === expectedSchema1)
df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
val df2 =
createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
@ -644,10 +643,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
StructField("a", mapType2, nullable = true) :: Nil)
assert(df2.schema === expectedSchema2)
df2.insertInto("mapInParquet", overwrite = false)
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
.saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
.saveAsTable("mapInParquet", "parquet", SaveMode.Append)
createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
.saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
.format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
refreshTable("mapInParquet")
checkAnswer(
@ -711,30 +710,30 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
def createDF(from: Int, to: Int): DataFrame =
createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
createDF(0, 9).saveAsTable("insertParquet", "parquet")
createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 9).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
createDF(10, 19).saveAsTable("insertParquet", "parquet")
createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
}
createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
(6 to 19).map(i => Row(i, s"str$i")))
createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
(6 to 24).map(i => Row(i, s"str$i")))
intercept[AnalysisException] {
createDF(30, 39).saveAsTable("insertParquet")
createDF(30, 39).write.saveAsTable("insertParquet")
}
createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
(6 to 34).map(i => Row(i, s"str$i")))
@ -744,11 +743,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
(6 to 44).map(i => Row(i, s"str$i")))
createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
(52 to 54).map(i => Row(i, s"str$i")))
createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
checkAnswer(
sql("SELECT p.c1, c2 FROM insertParquet p"),
(50 to 59).map(i => Row(i, s"str$i")))

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql}
import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, jsonRDD, sql}
import org.apache.spark.sql.hive.test.TestHive.implicits._
case class Nested(a: Int, B: Int)
@ -31,14 +31,14 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested])
class HiveResolutionSuite extends HiveComparisonTest {
test("SPARK-3698: case insensitive test for nested data") {
jsonRDD(sparkContext.makeRDD(
read.json(sparkContext.makeRDD(
"""{"a": [{"a": {"a": 1}}]}""" :: Nil)).registerTempTable("nested")
// This should be successfully analyzed
sql("SELECT a[0].A.A from nested").queryExecution.analyzed
}
test("SPARK-5278: check ambiguous reference to fields") {
jsonRDD(sparkContext.makeRDD(
read.json(sparkContext.makeRDD(
"""{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested")
// there are 2 filed matching field name "b", we should report Ambiguous reference error

View file

@ -535,14 +535,14 @@ class SQLQuerySuite extends QueryTest {
test("SPARK-4296 Grouping field with Hive UDF as sub expression") {
val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil)
jsonRDD(rdd).registerTempTable("data")
read.json(rdd).registerTempTable("data")
checkAnswer(
sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"),
Row("str-1", 1970))
dropTempTable("data")
jsonRDD(rdd).registerTempTable("data")
read.json(rdd).registerTempTable("data")
checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971))
dropTempTable("data")
@ -550,7 +550,7 @@ class SQLQuerySuite extends QueryTest {
test("resolve udtf with single alias") {
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
jsonRDD(rdd).registerTempTable("data")
read.json(rdd).registerTempTable("data")
val df = sql("SELECT explode(a) AS val FROM data")
val col = df("val")
}
@ -563,7 +563,7 @@ class SQLQuerySuite extends QueryTest {
// PreInsertionCasts will actually start to work before ImplicitGenerate and then
// generates an invalid query plan.
val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}"""))
jsonRDD(rdd).registerTempTable("data")
read.json(rdd).registerTempTable("data")
val originalConf = getConf("spark.sql.hive.convertCTAS", "false")
setConf("spark.sql.hive.convertCTAS", "false")

View file

@ -150,9 +150,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest {
}
val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}"""))
jsonRDD(rdd1).registerTempTable("jt")
read.json(rdd1).registerTempTable("jt")
val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}"""))
jsonRDD(rdd2).registerTempTable("jt_array")
read.json(rdd2).registerTempTable("jt_array")
setConf("spark.sql.hive.convertMetastoreParquet", "true")
}
@ -617,16 +617,16 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest {
sql("drop table if exists spark_6016_fix")
// Create a DataFrame with two partitions. So, the created table will have two parquet files.
val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
val df1 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2))
df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
checkAnswer(
sql("select * from spark_6016_fix"),
(1 to 10).map(i => Row(i))
)
// Create a DataFrame with four partitions. So, the created table will have four parquet files.
val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite)
val df2 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4))
df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix")
// For the bug of SPARK-6016, we are caching two outdated footers for df1. Then,
// since the new table has four parquet files, we are trying to read new footers from two files
// and then merge metadata in footers of these four (two outdated ones and two latest one),
@ -663,7 +663,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
StructField("a", arrayType1, nullable = true) :: Nil)
assert(df.schema === expectedSchema1)
df.saveAsTable("alwaysNullable", "parquet")
df.write.format("parquet").saveAsTable("alwaysNullable")
val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true)
val arrayType2 = ArrayType(IntegerType, containsNull = true)

View file

@ -120,10 +120,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - non-partitioned table - ErrorIfExists") {
withTempDir { file =>
intercept[RuntimeException] {
testDF.save(
path = file.getCanonicalPath,
source = dataSourceName,
mode = SaveMode.ErrorIfExists)
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath)
}
}
}
@ -233,10 +230,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
test("save()/load() - partitioned table - Ignore") {
withTempDir { file =>
partitionedTestDF.save(
path = file.getCanonicalPath,
source = dataSourceName,
mode = SaveMode.Ignore)
partitionedTestDF.write
.format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath)
val path = new Path(file.getCanonicalPath)
val fs = path.getFileSystem(SparkHadoopUtil.get.conf)
@ -249,11 +244,9 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Overwrite") {
testDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
testDF.write.format(dataSourceName).mode(SaveMode.Overwrite)
.option("dataSchema", dataSchema.json)
.saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.collect())
@ -261,15 +254,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
}
test("saveAsTable()/load() - non-partitioned table - Append") {
testDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.Overwrite)
testDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.Append)
testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t")
testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t")
withTable("t") {
checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect())
@ -281,10 +267,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
withTempTable("t") {
intercept[AnalysisException] {
testDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.ErrorIfExists)
testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t")
}
}
}
@ -293,21 +276,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest {
Seq.empty[(Int, String)].toDF().registerTempTable("t")
withTempTable("t") {
testDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.Ignore)
testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t")
assert(table("t").collect().isEmpty)
}
}
test("saveAsTable()/load() - partitioned table - simple queries") {
partitionedTestDF.saveAsTable(
tableName = "t",
source = dataSourceName,
mode = SaveMode.Overwrite,
Map("dataSchema" -> dataSchema.json))
partitionedTestDF.write.format(dataSourceName)
.mode(SaveMode.Overwrite)
.option("dataSchema", dataSchema.json)
.saveAsTable("t")
withTable("t") {
checkQueries(table("t"))
@ -492,11 +470,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest {
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
load(
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchemaWithPartition.json)))
read.format(dataSourceName)
.option("dataSchema", dataSchemaWithPartition.json)
.load(file.getCanonicalPath))
}
}
}
@ -518,18 +494,16 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest {
sparkContext
.parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1))
.toDF("a", "b", "p1")
.saveAsParquetFile(partitionDir.toString)
.write.parquet(partitionDir.toString)
}
val dataSchemaWithPartition =
StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true))
checkQueries(
load(
source = dataSourceName,
options = Map(
"path" -> file.getCanonicalPath,
"dataSchema" -> dataSchemaWithPartition.json)))
read.format(dataSourceName)
.option("dataSchema", dataSchemaWithPartition.json)
.load(file.getCanonicalPath))
}
}
}