From 17e078671ef3214ac2415b4602c62f979713a9fe Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 16 May 2015 22:01:53 -0700 Subject: [PATCH] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface. Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API. Author: Reynold Xin Closes #6210 from rxin/df-writer-reader-jdbc and squashes the following commits: 7465c2c [Reynold Xin] Fixed unit test. 118e609 [Reynold Xin] Updated tests. 3441b57 [Reynold Xin] Updated javadoc. 13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface. (cherry picked from commit 517eb37a85e0a28820bcfd5d98c50d02df6521c6) Signed-off-by: Reynold Xin --- .../spark/examples/sql/JavaSparkSQL.java | 4 +- .../org/apache/spark/sql/DataFrame.scala | 586 +++++++--------- .../apache/spark/sql/DataFrameReader.scala | 89 ++- .../apache/spark/sql/DataFrameWriter.scala | 53 +- .../org/apache/spark/sql/SQLContext.scala | 646 +++++++----------- .../org/apache/spark/sql/jdbc/JDBCRDD.scala | 30 +- .../apache/spark/sql/jdbc/JDBCRelation.scala | 16 +- .../org/apache/spark/sql/jdbc/JdbcUtils.scala | 52 ++ .../org/apache/spark/sql/jdbc/jdbc.scala | 6 +- .../spark/sql/JavaApplySchemaSuite.java | 4 +- .../spark/sql/sources/JavaSaveLoadSuite.java | 10 +- .../org/apache/spark/sql/jdbc/JDBCSuite.scala | 31 +- .../spark/sql/jdbc/JDBCWriteSuite.scala | 54 +- .../hive/JavaMetastoreDataSourcesSuite.java | 20 +- .../spark/sql/hive/CachedTableSuite.scala | 4 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 73 +- .../hive/execution/HiveResolutionSuite.scala | 6 +- .../sql/hive/execution/SQLQuerySuite.scala | 8 +- .../apache/spark/sql/hive/parquetSuites.scala | 14 +- .../sql/sources/hadoopFsRelationSuites.scala | 68 +- 20 files changed, 880 insertions(+), 894 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java index 173633ce05..afee279ec3 100644 --- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java +++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java @@ -94,7 +94,7 @@ public class JavaSparkSQL { System.out.println("=== Data source: Parquet File ==="); // DataFrames can be saved as parquet files, maintaining the schema information. - schemaPeople.saveAsParquetFile("people.parquet"); + schemaPeople.write().parquet("people.parquet"); // Read in the parquet file created above. // Parquet files are self-describing so the schema is preserved. @@ -151,7 +151,7 @@ public class JavaSparkSQL { List jsonData = Arrays.asList( "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}"); JavaRDD anotherPeopleRDD = ctx.parallelize(jsonData); - DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd()); + DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd()); // Take a look at the schema of this new DataFrame. peopleFromJsonRDD.printSchema(); diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 55ef357a99..27e9af49f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql import java.io.CharArrayWriter -import java.sql.DriverManager import java.util.Properties import scala.collection.JavaConversions._ @@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _} import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser} import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD} -import org.apache.spark.sql.jdbc.JDBCWriteDetails import org.apache.spark.sql.json.JacksonGenerator -import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource} +import org.apache.spark.sql.sources.CreateTableUsingAsSelect import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils @@ -227,10 +225,6 @@ class DataFrame private[sql]( } } - /** Left here for backward compatibility. */ - @deprecated("1.3.0", "use toDF") - def toSchemaRDD: DataFrame = this - /** * Returns the object itself. * @group basic @@ -1299,230 +1293,6 @@ class DataFrame private[sql]( @Experimental def write: DataFrameWriter = new DataFrameWriter(this) - /** - * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema. - * Files that are written out using this method can be read back in as a [[DataFrame]] - * using the `parquetFile` function in [[SQLContext]]. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.parquet(path)", "1.4.0") - def saveAsParquetFile(path: String): Unit = { - if (sqlContext.conf.parquetUseDataSourceApi) { - write.format("parquet").mode(SaveMode.ErrorIfExists).save(path) - } else { - sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd - } - } - - /** - * Creates a table from the the contents of this DataFrame. - * It will use the default data source configured by spark.sql.sources.default. - * This will fail if the table already exists. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.saveAsTable(tableName)", "1.4.0") - def saveAsTable(tableName: String): Unit = { - write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName) - } - - /** - * Creates a table from the the contents of this DataFrame, using the default data source - * configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0") - def saveAsTable(tableName: String, mode: SaveMode): Unit = { - if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) { - // If table already exists and the save mode is Append, - // we will just call insertInto to append the contents of this DataFrame. - insertInto(tableName, overwrite = false) - } else { - write.mode(mode).saveAsTable(tableName) - } - } - - /** - * Creates a table at the given path from the the contents of this DataFrame - * based on a given data source and a set of options, - * using [[SaveMode.ErrorIfExists]] as the save mode. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0") - def saveAsTable(tableName: String, source: String): Unit = { - write.format(source).saveAsTable(tableName) - } - - /** - * :: Experimental :: - * Creates a table at the given path from the the contents of this DataFrame - * based on a given data source, [[SaveMode]] specified by mode, and a set of options. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0") - def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = { - write.format(source).mode(mode).saveAsTable(tableName) - } - - /** - * Creates a table at the given path from the the contents of this DataFrame - * based on a given data source, [[SaveMode]] specified by mode, and a set of options. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)", - "1.4.0") - def saveAsTable( - tableName: String, - source: String, - mode: SaveMode, - options: java.util.Map[String, String]): Unit = { - write.format(source).mode(mode).options(options).saveAsTable(tableName) - } - - /** - * (Scala-specific) - * Creates a table from the the contents of this DataFrame based on a given data source, - * [[SaveMode]] specified by mode, and a set of options. - * - * Note that this currently only works with DataFrames that are created from a HiveContext as - * there is no notion of a persisted catalog in a standard SQL context. Instead you can write - * an RDD out to a parquet file, and then register that file as a table. This "table" can then - * be the target of an `insertInto`. - * - * Also note that while this function can persist the table metadata into Hive's metastore, - * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)", - "1.4.0") - def saveAsTable( - tableName: String, - source: String, - mode: SaveMode, - options: Map[String, String]): Unit = { - write.format(source).mode(mode).options(options).saveAsTable(tableName) - } - - /** - * Saves the contents of this DataFrame to the given path, - * using the default data source configured by spark.sql.sources.default and - * [[SaveMode.ErrorIfExists]] as the save mode. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.save(path)", "1.4.0") - def save(path: String): Unit = { - write.save(path) - } - - /** - * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode, - * using the default data source configured by spark.sql.sources.default. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.mode(mode).save(path)", "1.4.0") - def save(path: String, mode: SaveMode): Unit = { - write.mode(mode).save(path) - } - - /** - * Saves the contents of this DataFrame to the given path based on the given data source, - * using [[SaveMode.ErrorIfExists]] as the save mode. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).save(path)", "1.4.0") - def save(path: String, source: String): Unit = { - write.format(source).save(path) - } - - /** - * Saves the contents of this DataFrame to the given path based on the given data source and - * [[SaveMode]] specified by mode. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0") - def save(path: String, source: String, mode: SaveMode): Unit = { - write.format(source).mode(mode).save(path) - } - - /** - * Saves the contents of this DataFrame based on the given data source, - * [[SaveMode]] specified by mode, and a set of options. - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0") - def save( - source: String, - mode: SaveMode, - options: java.util.Map[String, String]): Unit = { - write.format(source).mode(mode).options(options).save() - } - - /** - * (Scala-specific) - * Saves the contents of this DataFrame based on the given data source, - * [[SaveMode]] specified by mode, and a set of options - * @group output - * @since 1.3.0 - */ - @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0") - def save( - source: String, - mode: SaveMode, - options: Map[String, String]): Unit = { - write.format(source).mode(mode).options(options).save() - } - /** * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. @@ -1576,100 +1346,6 @@ class DataFrame private[sql]( } } - //////////////////////////////////////////////////////////////////////////// - // JDBC Write Support - //////////////////////////////////////////////////////////////////////////// - - /** - * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`. - * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. - * If you pass `true` for `allowExisting`, it will drop any table with the - * given name; if you pass `false`, it will throw if the table already - * exists. - * @group output - * @since 1.3.0 - */ - def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = { - createJDBCTable(url, table, allowExisting, new Properties()) - } - - /** - * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table` - * using connection properties defined in `properties`. - * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. - * If you pass `true` for `allowExisting`, it will drop any table with the - * given name; if you pass `false`, it will throw if the table already - * exists. - * @group output - * @since 1.4.0 - */ - def createJDBCTable( - url: String, - table: String, - allowExisting: Boolean, - properties: Properties): Unit = { - val conn = DriverManager.getConnection(url, properties) - try { - if (allowExisting) { - val sql = s"DROP TABLE IF EXISTS $table" - conn.prepareStatement(sql).executeUpdate() - } - val schema = JDBCWriteDetails.schemaString(this, url) - val sql = s"CREATE TABLE $table ($schema)" - conn.prepareStatement(sql).executeUpdate() - } finally { - conn.close() - } - JDBCWriteDetails.saveTable(this, url, table, properties) - } - - /** - * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`. - * Assumes the table already exists and has a compatible schema. If you - * pass `true` for `overwrite`, it will `TRUNCATE` the table before - * performing the `INSERT`s. - * - * The table must already exist on the database. It must have a schema - * that is compatible with the schema of this RDD; inserting the rows of - * the RDD in order via the simple statement - * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. - * @group output - * @since 1.3.0 - */ - def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = { - insertIntoJDBC(url, table, overwrite, new Properties()) - } - - /** - * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table` - * using connection properties defined in `properties`. - * Assumes the table already exists and has a compatible schema. If you - * pass `true` for `overwrite`, it will `TRUNCATE` the table before - * performing the `INSERT`s. - * - * The table must already exist on the database. It must have a schema - * that is compatible with the schema of this RDD; inserting the rows of - * the RDD in order via the simple statement - * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. - * @group output - * @since 1.4.0 - */ - def insertIntoJDBC( - url: String, - table: String, - overwrite: Boolean, - properties: Properties): Unit = { - if (overwrite) { - val conn = DriverManager.getConnection(url, properties) - try { - val sql = s"TRUNCATE TABLE $table" - conn.prepareStatement(sql).executeUpdate() - } finally { - conn.close() - } - } - JDBCWriteDetails.saveTable(this, url, table, properties) - } //////////////////////////////////////////////////////////////////////////// // for Python API //////////////////////////////////////////////////////////////////////////// @@ -1682,4 +1358,264 @@ class DataFrame private[sql]( val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD() SerDeUtil.javaToPython(jrdd) } + + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // Deprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + + /** Left here for backward compatibility. */ + @deprecated("use toDF", "1.3.0") + def toSchemaRDD: DataFrame = this + + /** + * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`. + * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements. + * If you pass `true` for `allowExisting`, it will drop any table with the + * given name; if you pass `false`, it will throw if the table already + * exists. + * @group output + */ + @deprecated("Use write.jdbc()", "1.4.0") + def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = { + val w = if (allowExisting) write.mode(SaveMode.Overwrite) else write + w.jdbc(url, table, new Properties) + } + + /** + * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`. + * Assumes the table already exists and has a compatible schema. If you + * pass `true` for `overwrite`, it will `TRUNCATE` the table before + * performing the `INSERT`s. + * + * The table must already exist on the database. It must have a schema + * that is compatible with the schema of this RDD; inserting the rows of + * the RDD in order via the simple statement + * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail. + * @group output + */ + @deprecated("Use write.jdbc()", "1.4.0") + def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = { + val w = if (overwrite) write.mode(SaveMode.Overwrite) else write + w.jdbc(url, table, new Properties) + } + + /** + * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema. + * Files that are written out using this method can be read back in as a [[DataFrame]] + * using the `parquetFile` function in [[SQLContext]]. + * @group output + */ + @deprecated("Use write.parquet(path)", "1.4.0") + def saveAsParquetFile(path: String): Unit = { + if (sqlContext.conf.parquetUseDataSourceApi) { + write.format("parquet").mode(SaveMode.ErrorIfExists).save(path) + } else { + sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd + } + } + + /** + * Creates a table from the the contents of this DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.saveAsTable(tableName)", "1.4.0") + def saveAsTable(tableName: String): Unit = { + write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName) + } + + /** + * Creates a table from the the contents of this DataFrame, using the default data source + * configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0") + def saveAsTable(tableName: String, mode: SaveMode): Unit = { + if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) { + // If table already exists and the save mode is Append, + // we will just call insertInto to append the contents of this DataFrame. + insertInto(tableName, overwrite = false) + } else { + write.mode(mode).saveAsTable(tableName) + } + } + + /** + * Creates a table at the given path from the the contents of this DataFrame + * based on a given data source and a set of options, + * using [[SaveMode.ErrorIfExists]] as the save mode. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0") + def saveAsTable(tableName: String, source: String): Unit = { + write.format(source).saveAsTable(tableName) + } + + /** + * :: Experimental :: + * Creates a table at the given path from the the contents of this DataFrame + * based on a given data source, [[SaveMode]] specified by mode, and a set of options. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0") + def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = { + write.format(source).mode(mode).saveAsTable(tableName) + } + + /** + * Creates a table at the given path from the the contents of this DataFrame + * based on a given data source, [[SaveMode]] specified by mode, and a set of options. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)", + "1.4.0") + def saveAsTable( + tableName: String, + source: String, + mode: SaveMode, + options: java.util.Map[String, String]): Unit = { + write.format(source).mode(mode).options(options).saveAsTable(tableName) + } + + /** + * (Scala-specific) + * Creates a table from the the contents of this DataFrame based on a given data source, + * [[SaveMode]] specified by mode, and a set of options. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + * + * Also note that while this function can persist the table metadata into Hive's metastore, + * the table will NOT be accessible from Hive, until SPARK-7550 is resolved. + * @group output + */ + @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)", + "1.4.0") + def saveAsTable( + tableName: String, + source: String, + mode: SaveMode, + options: Map[String, String]): Unit = { + write.format(source).mode(mode).options(options).saveAsTable(tableName) + } + + /** + * Saves the contents of this DataFrame to the given path, + * using the default data source configured by spark.sql.sources.default and + * [[SaveMode.ErrorIfExists]] as the save mode. + * @group output + */ + @deprecated("Use write.save(path)", "1.4.0") + def save(path: String): Unit = { + write.save(path) + } + + /** + * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode, + * using the default data source configured by spark.sql.sources.default. + * @group output + */ + @deprecated("Use write.mode(mode).save(path)", "1.4.0") + def save(path: String, mode: SaveMode): Unit = { + write.mode(mode).save(path) + } + + /** + * Saves the contents of this DataFrame to the given path based on the given data source, + * using [[SaveMode.ErrorIfExists]] as the save mode. + * @group output + */ + @deprecated("Use write.format(source).save(path)", "1.4.0") + def save(path: String, source: String): Unit = { + write.format(source).save(path) + } + + /** + * Saves the contents of this DataFrame to the given path based on the given data source and + * [[SaveMode]] specified by mode. + * @group output + */ + @deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0") + def save(path: String, source: String, mode: SaveMode): Unit = { + write.format(source).mode(mode).save(path) + } + + /** + * Saves the contents of this DataFrame based on the given data source, + * [[SaveMode]] specified by mode, and a set of options. + * @group output + */ + @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0") + def save( + source: String, + mode: SaveMode, + options: java.util.Map[String, String]): Unit = { + write.format(source).mode(mode).options(options).save() + } + + /** + * (Scala-specific) + * Saves the contents of this DataFrame based on the given data source, + * [[SaveMode]] specified by mode, and a set of options + * @group output + */ + @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0") + def save( + source: String, + mode: SaveMode, + options: Map[String, String]): Unit = { + write.format(source).mode(mode).options(options).save() + } + + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // End of eeprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4d63faad6f..381c10f48f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -17,12 +17,16 @@ package org.apache.spark.sql +import java.util.Properties + import org.apache.hadoop.fs.Path +import org.apache.spark.Partition import org.apache.spark.annotation.Experimental import org.apache.spark.api.java.JavaRDD import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.rdd.RDD +import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.json.{JsonRDD, JSONRelation} import org.apache.spark.sql.parquet.ParquetRelation2 import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource} @@ -31,7 +35,7 @@ import org.apache.spark.sql.types.StructType /** * :: Experimental :: * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems, - * key-value stores, etc). + * key-value stores, etc). Use [[SQLContext.read]] to access this. * * @since 1.4.0 */ @@ -94,6 +98,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { * Specifies the input partitioning. If specified, the underlying data source does not need to * discover the data partitioning scheme, and thus can speed up very large inputs. * + * This is only applicable for Parquet at the moment. + * * @since 1.4.0 */ @scala.annotation.varargs @@ -128,6 +134,87 @@ class DataFrameReader private[sql](sqlContext: SQLContext) { DataFrame(sqlContext, LogicalRelation(resolved.relation)) } + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table and connection properties. + * + * @since 1.4.0 + */ + def jdbc(url: String, table: String, properties: Properties): DataFrame = { + jdbc(url, table, JDBCRelation.columnPartition(null), properties) + } + + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * @param url JDBC database url of the form `jdbc:subprotocol:subname` + * @param table Name of the table in the external database. + * @param columnName the name of a column of integral type that will be used for partitioning. + * @param lowerBound the minimum value of `columnName` used to decide partition stride + * @param upperBound the maximum value of `columnName` used to decide partition stride + * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split + * evenly into this many partitions + * @param connectionProperties JDBC database connection arguments, a list of arbitrary string + * tag/value. Normally at least a "user" and "password" property + * should be included. + * + * @since 1.4.0 + */ + def jdbc( + url: String, + table: String, + columnName: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int, + connectionProperties: Properties): DataFrame = { + val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions) + val parts = JDBCRelation.columnPartition(partitioning) + jdbc(url, table, parts, connectionProperties) + } + + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table using connection properties. The `predicates` parameter gives a list + * expressions suitable for inclusion in WHERE clauses; each one defines one partition + * of the [[DataFrame]]. + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * @param url JDBC database url of the form `jdbc:subprotocol:subname` + * @param table Name of the table in the external database. + * @param predicates Condition in the where clause for each partition. + * @param connectionProperties JDBC database connection arguments, a list of arbitrary string + * tag/value. Normally at least a "user" and "password" property + * should be included. + * @since 1.4.0 + */ + def jdbc( + url: String, + table: String, + predicates: Array[String], + connectionProperties: Properties): DataFrame = { + val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) => + JDBCPartition(part, i) : Partition + } + jdbc(url, table, parts, connectionProperties) + } + + private def jdbc( + url: String, + table: String, + parts: Array[Partition], + connectionProperties: Properties): DataFrame = { + val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext) + sqlContext.baseRelationToDataFrame(relation) + } + /** * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]]. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 9f42f0f1f4..f2e721d4db 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql +import java.util.Properties + import org.apache.spark.annotation.Experimental +import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils} import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect} /** * :: Experimental :: * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems, - * key-value stores, etc). + * key-value stores, etc). Use [[DataFrame.write]] to access this. * * @since 1.4.0 */ @@ -110,6 +113,8 @@ final class DataFrameWriter private[sql](df: DataFrame) { * Partitions the output by the given columns on the file system. If specified, the output is * laid out on the file system similar to Hive's partitioning scheme. * + * This is only applicable for Parquet at the moment. + * * @since 1.4.0 */ @scala.annotation.varargs @@ -161,6 +166,52 @@ final class DataFrameWriter private[sql](df: DataFrame) { df.sqlContext.executePlan(cmd).toRdd } + /** + * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the + * table already exists in the external database, behavior of this function depends on the + * save mode, specified by the `mode` function (default to throwing an exception). + * + * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash + * your external database systems. + * + * @param url JDBC database url of the form `jdbc:subprotocol:subname` + * @param table Name of the table in the external database. + * @param connectionProperties JDBC database connection arguments, a list of arbitrary string + * tag/value. Normally at least a "user" and "password" property + * should be included. + */ + def jdbc(url: String, table: String, connectionProperties: Properties): Unit = { + val conn = JdbcUtils.createConnection(url, connectionProperties) + + try { + var tableExists = JdbcUtils.tableExists(conn, table) + + if (mode == SaveMode.Ignore && tableExists) { + return + } + + if (mode == SaveMode.ErrorIfExists && tableExists) { + sys.error(s"Table $table already exists.") + } + + if (mode == SaveMode.Overwrite && tableExists) { + JdbcUtils.dropTable(conn, table) + tableExists = false + } + + // Create the table if the table didn't exist. + if (!tableExists) { + val schema = JDBCWriteDetails.schemaString(df, url) + val sql = s"CREATE TABLE $table ($schema)" + conn.prepareStatement(sql).executeUpdate() + } + } finally { + conn.close() + } + + JDBCWriteDetails.saveTable(df, url, table, connectionProperties) + } + /** * Saves the content of the [[DataFrame]] in JSON format at the specified path. * This is equivalent to: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 34a50e522c..ac1a800219 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import com.google.common.reflect.TypeToken +import org.apache.spark.SparkContext import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.rdd.RDD @@ -40,11 +41,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.execution.{Filter, _} -import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.util.Utils -import org.apache.spark.{Partition, SparkContext} /** * The entry point for working with structured data (rows and columns) in Spark. Allows the @@ -531,67 +530,6 @@ class SQLContext(@transient val sparkContext: SparkContext) createDataFrame(rdd.rdd, beanClass) } - /** - * :: DeveloperApi :: - * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD. - * It is important to make sure that the structure of every [[Row]] of the provided RDD matches - * the provided schema. Otherwise, there will be runtime exception. - * Example: - * {{{ - * import org.apache.spark.sql._ - * import org.apache.spark.sql.types._ - * val sqlContext = new org.apache.spark.sql.SQLContext(sc) - * - * val schema = - * StructType( - * StructField("name", StringType, false) :: - * StructField("age", IntegerType, true) :: Nil) - * - * val people = - * sc.textFile("examples/src/main/resources/people.txt").map( - * _.split(",")).map(p => Row(p(0), p(1).trim.toInt)) - * val dataFrame = sqlContext. applySchema(people, schema) - * dataFrame.printSchema - * // root - * // |-- name: string (nullable = false) - * // |-- age: integer (nullable = true) - * - * dataFrame.registerTempTable("people") - * sqlContext.sql("select name from people").collect.foreach(println) - * }}} - */ - @deprecated("use createDataFrame", "1.3.0") - def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = { - createDataFrame(rowRDD, schema) - } - - @deprecated("use createDataFrame", "1.3.0") - def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { - createDataFrame(rowRDD, schema) - } - - /** - * Applies a schema to an RDD of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - */ - @deprecated("use createDataFrame", "1.3.0") - def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = { - createDataFrame(rdd, beanClass) - } - - /** - * Applies a schema to an RDD of Java Beans. - * - * WARNING: Since there is no guaranteed ordering for fields in a Java Bean, - * SELECT * queries will return the columns in an undefined order. - */ - @deprecated("use createDataFrame", "1.3.0") - def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { - createDataFrame(rdd, beanClass) - } - /** * :: Experimental :: * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]]. @@ -606,205 +544,6 @@ class SQLContext(@transient val sparkContext: SparkContext) @Experimental def read: DataFrameReader = new DataFrameReader(this) - /** - * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty - * [[DataFrame]] if no paths are passed in. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.parquet()", "1.4.0") - @scala.annotation.varargs - def parquetFile(paths: String*): DataFrame = { - if (paths.isEmpty) { - emptyDataFrame - } else if (conf.parquetUseDataSourceApi) { - read.parquet(paths : _*) - } else { - DataFrame(this, parquet.ParquetRelation( - paths.mkString(","), Some(sparkContext.hadoopConfiguration), this)) - } - } - - /** - * Loads a JSON file (one object per line), returning the result as a [[DataFrame]]. - * It goes through the entire dataset once to determine the schema. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonFile(path: String): DataFrame = { - read.json(path) - } - - /** - * Loads a JSON file (one object per line) and applies the given schema, - * returning the result as a [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonFile(path: String, schema: StructType): DataFrame = { - read.schema(schema).json(path) - } - - /** - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonFile(path: String, samplingRatio: Double): DataFrame = { - read.option("samplingRatio", samplingRatio.toString).json(path) - } - - /** - * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a - * [[DataFrame]]. - * It goes through the entire dataset once to determine the schema. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: RDD[String]): DataFrame = read.json(json) - - /** - * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a - * [[DataFrame]]. - * It goes through the entire dataset once to determine the schema. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json) - - /** - * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, - * returning the result as a [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { - read.schema(schema).json(json) - } - - /** - * Loads an JavaRDD storing JSON objects (one object per record) and applies the given - * schema, returning the result as a [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = { - read.schema(schema).json(json) - } - - /** - * Loads an RDD[String] storing JSON objects (one object per record) inferring the - * schema, returning the result as a [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { - read.option("samplingRatio", samplingRatio.toString).json(json) - } - - /** - * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the - * schema, returning the result as a [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @deprecated("Use read.json()", "1.4.0") - def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = { - read.option("samplingRatio", samplingRatio.toString).json(json) - } - - /** - * Returns the dataset stored at path as a DataFrame, - * using the default data source configured by spark.sql.sources.default. - * - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.load(path)", "1.4.0") - def load(path: String): DataFrame = { - read.load(path) - } - - /** - * Returns the dataset stored at path as a DataFrame, using the given data source. - * - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.format(source).load(path)", "1.4.0") - def load(path: String, source: String): DataFrame = { - read.format(source).load(path) - } - - /** - * (Java-specific) Returns the dataset specified by the given data source and - * a set of options as a DataFrame. - * - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.format(source).options(options).load()", "1.4.0") - def load(source: String, options: java.util.Map[String, String]): DataFrame = { - read.options(options).format(source).load() - } - - /** - * (Scala-specific) Returns the dataset specified by the given data source and - * a set of options as a DataFrame. - * - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.format(source).options(options).load()", "1.4.0") - def load(source: String, options: Map[String, String]): DataFrame = { - read.options(options).format(source).load() - } - - /** - * (Java-specific) Returns the dataset specified by the given data source and - * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. - * - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0") - def load( - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - read.format(source).schema(schema).options(options).load() - } - - /** - * (Scala-specific) Returns the dataset specified by the given data source and - * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. - * @group genericdata - * @since 1.3.0 - */ - @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0") - def load( - source: String, - schema: StructType, - options: Map[String, String]): DataFrame = { - read.format(source).schema(schema).options(options).load() - } - /** * :: Experimental :: * Creates an external table from the given path and returns the corresponding DataFrame. @@ -923,132 +662,6 @@ class SQLContext(@transient val sparkContext: SparkContext) table(tableName) } - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table. - * - * @group specificdata - * @since 1.3.0 - */ - @Experimental - def jdbc(url: String, table: String): DataFrame = { - jdbc(url, table, JDBCRelation.columnPartition(null), new Properties()) - } - - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table and connection properties. - * - * @group specificdata - * @since 1.4.0 - */ - @Experimental - def jdbc(url: String, table: String, properties: Properties): DataFrame = { - jdbc(url, table, JDBCRelation.columnPartition(null), properties) - } - - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table. Partitions of the table will be retrieved in parallel based on the parameters - * passed to this function. - * - * @param columnName the name of a column of integral type that will be used for partitioning. - * @param lowerBound the minimum value of `columnName` used to decide partition stride - * @param upperBound the maximum value of `columnName` used to decide partition stride - * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split - * evenly into this many partitions - * @group specificdata - * @since 1.3.0 - */ - @Experimental - def jdbc( - url: String, - table: String, - columnName: String, - lowerBound: Long, - upperBound: Long, - numPartitions: Int): DataFrame = { - jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties()) - } - - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table. Partitions of the table will be retrieved in parallel based on the parameters - * passed to this function. - * - * @param columnName the name of a column of integral type that will be used for partitioning. - * @param lowerBound the minimum value of `columnName` used to decide partition stride - * @param upperBound the maximum value of `columnName` used to decide partition stride - * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split - * evenly into this many partitions - * @param properties connection properties - * @group specificdata - * @since 1.4.0 - */ - @Experimental - def jdbc( - url: String, - table: String, - columnName: String, - lowerBound: Long, - upperBound: Long, - numPartitions: Int, - properties: Properties): DataFrame = { - val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions) - val parts = JDBCRelation.columnPartition(partitioning) - jdbc(url, table, parts, properties) - } - - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table. The theParts parameter gives a list expressions - * suitable for inclusion in WHERE clauses; each one defines one partition - * of the [[DataFrame]]. - * - * @group specificdata - * @since 1.3.0 - */ - @Experimental - def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = { - jdbc(url, table, theParts, new Properties()) - } - - /** - * :: Experimental :: - * Construct a [[DataFrame]] representing the database table accessible via JDBC URL - * url named table using connection properties. The theParts parameter gives a list expressions - * suitable for inclusion in WHERE clauses; each one defines one partition - * of the [[DataFrame]]. - * - * @group specificdata - * @since 1.4.0 - */ - @Experimental - def jdbc( - url: String, - table: String, - theParts: Array[String], - properties: Properties): DataFrame = { - val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) => - JDBCPartition(part, i) : Partition - } - jdbc(url, table, parts, properties) - } - - private def jdbc( - url: String, - table: String, - parts: Array[Partition], - properties: Properties): DataFrame = { - val relation = JDBCRelation(url, table, parts, properties)(this) - baseRelationToDataFrame(relation) - } - /** * Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist * only during the lifetime of this instance of SQLContext. @@ -1372,6 +985,263 @@ class SQLContext(@transient val sparkContext: SparkContext) } } + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // Deprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + + @deprecated("use createDataFrame", "1.3.0") + def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = { + createDataFrame(rowRDD, schema) + } + + @deprecated("use createDataFrame", "1.3.0") + def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = { + createDataFrame(rowRDD, schema) + } + + @deprecated("use createDataFrame", "1.3.0") + def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = { + createDataFrame(rdd, beanClass) + } + + @deprecated("use createDataFrame", "1.3.0") + def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = { + createDataFrame(rdd, beanClass) + } + + /** + * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty + * [[DataFrame]] if no paths are passed in. + * + * @group specificdata + */ + @deprecated("Use read.parquet()", "1.4.0") + @scala.annotation.varargs + def parquetFile(paths: String*): DataFrame = { + if (paths.isEmpty) { + emptyDataFrame + } else if (conf.parquetUseDataSourceApi) { + read.parquet(paths : _*) + } else { + DataFrame(this, parquet.ParquetRelation( + paths.mkString(","), Some(sparkContext.hadoopConfiguration), this)) + } + } + + /** + * Loads a JSON file (one object per line), returning the result as a [[DataFrame]]. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonFile(path: String): DataFrame = { + read.json(path) + } + + /** + * Loads a JSON file (one object per line) and applies the given schema, + * returning the result as a [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonFile(path: String, schema: StructType): DataFrame = { + read.schema(schema).json(path) + } + + /** + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonFile(path: String, samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(path) + } + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * [[DataFrame]]. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: RDD[String]): DataFrame = read.json(json) + + /** + * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a + * [[DataFrame]]. + * It goes through the entire dataset once to determine the schema. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json) + + /** + * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema, + * returning the result as a [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { + read.schema(schema).json(json) + } + + /** + * Loads an JavaRDD storing JSON objects (one object per record) and applies the given + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = { + read.schema(schema).json(json) + } + + /** + * Loads an RDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(json) + } + + /** + * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the + * schema, returning the result as a [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("Use read.json()", "1.4.0") + def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = { + read.option("samplingRatio", samplingRatio.toString).json(json) + } + + /** + * Returns the dataset stored at path as a DataFrame, + * using the default data source configured by spark.sql.sources.default. + * + * @group genericdata + */ + @deprecated("Use read.load(path)", "1.4.0") + def load(path: String): DataFrame = { + read.load(path) + } + + /** + * Returns the dataset stored at path as a DataFrame, using the given data source. + * + * @group genericdata + */ + @deprecated("Use read.format(source).load(path)", "1.4.0") + def load(path: String, source: String): DataFrame = { + read.format(source).load(path) + } + + /** + * (Java-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame. + * + * @group genericdata + */ + @deprecated("Use read.format(source).options(options).load()", "1.4.0") + def load(source: String, options: java.util.Map[String, String]): DataFrame = { + read.options(options).format(source).load() + } + + /** + * (Scala-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame. + * + * @group genericdata + */ + @deprecated("Use read.format(source).options(options).load()", "1.4.0") + def load(source: String, options: Map[String, String]): DataFrame = { + read.options(options).format(source).load() + } + + /** + * (Java-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * + * @group genericdata + */ + @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0") + def load(source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame = + { + read.format(source).schema(schema).options(options).load() + } + + /** + * (Scala-specific) Returns the dataset specified by the given data source and + * a set of options as a DataFrame, using the given schema as the schema of the DataFrame. + * + * @group genericdata + */ + @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0") + def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = { + read.format(source).schema(schema).options(options).load() + } + + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table. + * + * @group specificdata + */ + @deprecated("use read.jdbc()", "1.4.0") + def jdbc(url: String, table: String): DataFrame = { + read.jdbc(url, table, new Properties) + } + + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table. Partitions of the table will be retrieved in parallel based on the parameters + * passed to this function. + * + * @param columnName the name of a column of integral type that will be used for partitioning. + * @param lowerBound the minimum value of `columnName` used to decide partition stride + * @param upperBound the maximum value of `columnName` used to decide partition stride + * @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split + * evenly into this many partitions + * @group specificdata + */ + @deprecated("use read.jdbc()", "1.4.0") + def jdbc( + url: String, + table: String, + columnName: String, + lowerBound: Long, + upperBound: Long, + numPartitions: Int): DataFrame = { + read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties) + } + + /** + * Construct a [[DataFrame]] representing the database table accessible via JDBC URL + * url named table. The theParts parameter gives a list expressions + * suitable for inclusion in WHERE clauses; each one defines one partition + * of the [[DataFrame]]. + * + * @group specificdata + */ + @deprecated("use read.jdbc()", "1.4.0") + def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = { + read.jdbc(url, table, theParts, new Properties) + } + + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// + // End of eeprecated methods + //////////////////////////////////////////////////////////////////////////// + //////////////////////////////////////////////////////////////////////////// } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala index 40483d3ec7..95935ba874 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala @@ -29,7 +29,16 @@ import org.apache.spark.sql.catalyst.util.DateUtils import org.apache.spark.sql.types._ import org.apache.spark.sql.sources._ +/** + * Data corresponding to one partition of a JDBCRDD. + */ +private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition { + override def index: Int = idx +} + + private[sql] object JDBCRDD extends Logging { + /** * Maps a JDBC type to a Catalyst type. This function is called only when * the DriverQuirks class corresponding to your database driver returns null. @@ -168,6 +177,7 @@ private[sql] object JDBCRDD extends Logging { DriverManager.getConnection(url, properties) } } + /** * Build and return JDBCRDD from the given information. * @@ -193,18 +203,14 @@ private[sql] object JDBCRDD extends Logging { requiredColumns: Array[String], filters: Array[Filter], parts: Array[Partition]): RDD[Row] = { - - val prunedSchema = pruneSchema(schema, requiredColumns) - - return new - JDBCRDD( - sc, - getConnector(driver, url, properties), - prunedSchema, - fqTable, - requiredColumns, - filters, - parts) + new JDBCRDD( + sc, + getConnector(driver, url, properties), + pruneSchema(schema, requiredColumns), + fqTable, + requiredColumns, + filters, + parts) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala index 93e82549f2..09d6865457 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala @@ -17,26 +17,16 @@ package org.apache.spark.sql.jdbc -import java.sql.DriverManager import java.util.Properties import scala.collection.mutable.ArrayBuffer import org.apache.spark.Partition import org.apache.spark.rdd.RDD -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.sources._ import org.apache.spark.sql.types.StructType -import org.apache.spark.util.Utils - -/** - * Data corresponding to one partition of a JDBCRDD. - */ -private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition { - override def index: Int = idx -} /** * Instructions on how to partition the table among workers. @@ -152,6 +142,8 @@ private[sql] case class JDBCRelation( } override def insert(data: DataFrame, overwrite: Boolean): Unit = { - data.insertIntoJDBC(url, table, overwrite, properties) + data.write + .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append) + .jdbc(url, table, properties) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala new file mode 100644 index 0000000000..cc918c2371 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.jdbc + +import java.sql.{Connection, DriverManager} +import java.util.Properties + +import scala.util.Try + +/** + * Util functions for JDBC tables. + */ +private[sql] object JdbcUtils { + + /** + * Establishes a JDBC connection. + */ + def createConnection(url: String, connectionProperties: Properties): Connection = { + DriverManager.getConnection(url, connectionProperties) + } + + /** + * Returns true if the table already exists in the JDBC database. + */ + def tableExists(conn: Connection, table: String): Boolean = { + // Somewhat hacky, but there isn't a good way to identify whether a table exists for all + // SQL database systems, considering "table" could also include the database name. + Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess + } + + /** + * Drops a table from the JDBC database. + */ + def dropTable(conn: Connection, table: String): Unit = { + conn.prepareStatement(s"DROP TABLE $table").executeUpdate() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala index c099881a01..a61790b847 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala @@ -163,8 +163,8 @@ package object jdbc { table: String, properties: Properties = new Properties()) { val quirks = DriverQuirks.get(url) - var nullTypes: Array[Int] = df.schema.fields.map(field => { - var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2 + val nullTypes: Array[Int] = df.schema.fields.map { field => + val nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2 if (nullType.isEmpty) { field.dataType match { case IntegerType => java.sql.Types.INTEGER @@ -183,7 +183,7 @@ package object jdbc { s"Can't translate null value for field $field") } } else nullType.get - }).toArray + } val rddSchema = df.schema df.foreachPartition { iterator => diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java index c344a9b095..fcb8f5499c 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java @@ -187,14 +187,14 @@ public class JavaApplySchemaSuite implements Serializable { null, "this is another simple string.")); - DataFrame df1 = sqlContext.jsonRDD(jsonRDD); + DataFrame df1 = sqlContext.read().json(jsonRDD); StructType actualSchema1 = df1.schema(); Assert.assertEquals(expectedSchema, actualSchema1); df1.registerTempTable("jsonTable1"); List actual1 = sqlContext.sql("select * from jsonTable1").collectAsList(); Assert.assertEquals(expectedResult, actual1); - DataFrame df2 = sqlContext.jsonRDD(jsonRDD, expectedSchema); + DataFrame df2 = sqlContext.read().schema(expectedSchema).json(jsonRDD); StructType actualSchema2 = df2.schema(); Assert.assertEquals(expectedSchema, actualSchema2); df2.registerTempTable("jsonTable2"); diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java index 6a0bcefe7a..2706e01bd2 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java @@ -67,7 +67,7 @@ public class JavaSaveLoadSuite { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } JavaRDD rdd = sc.parallelize(jsonObjects); - df = sqlContext.jsonRDD(rdd); + df = sqlContext.read().json(rdd); df.registerTempTable("jsonTable"); } @@ -75,10 +75,8 @@ public class JavaSaveLoadSuite { public void saveAndLoad() { Map options = new HashMap(); options.put("path", path.toString()); - df.save("json", SaveMode.ErrorIfExists, options); - + df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save(); DataFrame loadedDF = sqlContext.read().format("json").options(options).load(); - checkAnswer(loadedDF, df.collectAsList()); } @@ -86,12 +84,12 @@ public class JavaSaveLoadSuite { public void saveAndLoadWithSchema() { Map options = new HashMap(); options.put("path", path.toString()); - df.save("json", SaveMode.ErrorIfExists, options); + df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save(); List fields = new ArrayList(); fields.add(DataTypes.createStructField("b", DataTypes.StringType, true)); StructType schema = DataTypes.createStructType(fields); - DataFrame loadedDF = sqlContext.load("json", schema, options); + DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load(); checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList()); } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 2abfe7f167..5a7b6f0aac 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -221,22 +221,25 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { } test("Basic API") { - assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3) + assert(TestSQLContext.read.jdbc( + urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3) } test("Partitioning via JDBCPartitioningInfo API") { - assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3) - .collect.size === 3) + assert( + TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties) + .collect().length === 3) } test("Partitioning via list-of-where-clauses API") { val parts = Array[String]("THEID < 2", "THEID >= 2") - assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts).collect().size === 3) + assert(TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties) + .collect().length === 3) } test("H2 integral types") { val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows(0).getInt(0) === 1) assert(rows(0).getBoolean(1) === false) assert(rows(0).getInt(2) === 3) @@ -246,7 +249,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { test("H2 null entries") { val rows = sql("SELECT * FROM inttypes WHERE A IS NULL").collect() - assert(rows.size === 1) + assert(rows.length === 1) assert(rows(0).isNullAt(0)) assert(rows(0).isNullAt(1)) assert(rows(0).isNullAt(2)) @@ -286,24 +289,28 @@ class JDBCSuite extends FunSuite with BeforeAndAfter { } test("test DATE types") { - val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect() - val cachedRows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().collect() + val rows = TestSQLContext.read.jdbc( + urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect() + val cachedRows = TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) + .cache().collect() assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) assert(rows(1).getAs[java.sql.Date](1) === null) assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) } test("test DATE types in cache") { - val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect() - TestSQLContext - .jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().registerTempTable("mycached_date") + val rows = + TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect() + TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties) + .cache().registerTempTable("mycached_date") val cachedRows = sql("select * from mycached_date").collect() assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01")) } test("test types for null value") { - val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.NULLTYPES").collect() + val rows = TestSQLContext.read.jdbc( + urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect() assert((0 to 14).forall(i => rows(0).isNullAt(i))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala index 0800eded44..2e4c12f9da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala @@ -22,7 +22,7 @@ import java.util.Properties import org.scalatest.{BeforeAndAfter, FunSuite} -import org.apache.spark.sql.Row +import org.apache.spark.sql.{SaveMode, Row} import org.apache.spark.sql.test._ import org.apache.spark.sql.types._ @@ -90,64 +90,66 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter { test("Basic CREATE") { val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) - df.createJDBCTable(url, "TEST.BASICCREATETEST", false) - assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count) - assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length) + df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties) + assert(2 == TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count) + assert(2 == + TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length) } test("CREATE with overwrite") { val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - df.createJDBCTable(url1, "TEST.DROPTEST", false, properties) - assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count) - assert(3 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + df.write.jdbc(url1, "TEST.DROPTEST", properties) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(3 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) - df2.createJDBCTable(url1, "TEST.DROPTEST", true, properties) - assert(1 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count) - assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) + df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties) + assert(1 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length) } test("CREATE then INSERT to append") { val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - df.createJDBCTable(url, "TEST.APPENDTEST", false) - df2.insertIntoJDBC(url, "TEST.APPENDTEST", false) - assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count) - assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length) + df.write.jdbc(url, "TEST.APPENDTEST", new Properties) + df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties) + assert(3 == TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).count) + assert(2 == + TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length) } test("CREATE then INSERT to truncate") { val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2) - df.createJDBCTable(url1, "TEST.TRUNCATETEST", false, properties) - df2.insertIntoJDBC(url1, "TEST.TRUNCATETEST", true, properties) - assert(1 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).count) - assert(2 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) + df.write.jdbc(url1, "TEST.TRUNCATETEST", properties) + df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties) + assert(1 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length) } test("Incompatible INSERT to append") { val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2) val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3) - df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false) + df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties) intercept[org.apache.spark.SparkException] { - df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true) + df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties) } } - + test("INSERT to JDBC Datasource") { TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count) - assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } - + test("INSERT to JDBC Datasource with overwrite") { TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE") TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE") - assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count) - assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count) + assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length) } } diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java index 53ddecf579..58fe96adab 100644 --- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java +++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java @@ -81,7 +81,7 @@ public class JavaMetastoreDataSourcesSuite { jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}"); } JavaRDD rdd = sc.parallelize(jsonObjects); - df = sqlContext.jsonRDD(rdd); + df = sqlContext.read().json(rdd); df.registerTempTable("jsonTable"); } @@ -96,7 +96,11 @@ public class JavaMetastoreDataSourcesSuite { public void saveExternalTableAndQueryIt() { Map options = new HashMap(); options.put("path", path.toString()); - df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); checkAnswer( sqlContext.sql("SELECT * FROM javaSavedTable"), @@ -115,7 +119,11 @@ public class JavaMetastoreDataSourcesSuite { public void saveExternalTableWithSchemaAndQueryIt() { Map options = new HashMap(); options.put("path", path.toString()); - df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); checkAnswer( sqlContext.sql("SELECT * FROM javaSavedTable"), @@ -138,7 +146,11 @@ public class JavaMetastoreDataSourcesSuite { @Test public void saveTableAndQueryIt() { Map options = new HashMap(); - df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options); + df.write() + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .options(options) + .saveAsTable("javaSavedTable"); checkAnswer( sqlContext.sql("SELECT * FROM javaSavedTable"), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index fc6c3c3503..945596db80 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -162,7 +162,7 @@ class CachedTableSuite extends QueryTest { test("REFRESH TABLE also needs to recache the data (data source tables)") { val tempPath: File = Utils.createTempDir() tempPath.delete() - table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite) + table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString) sql("DROP TABLE IF EXISTS refreshTable") createExternalTable("refreshTable", tempPath.toString, "parquet") checkAnswer( @@ -172,7 +172,7 @@ class CachedTableSuite extends QueryTest { sql("CACHE TABLE refreshTable") assertCached(table("refreshTable")) // Append new data. - table("src").save(tempPath.toString, "parquet", SaveMode.Append) + table("src").write.mode(SaveMode.Append).parquet(tempPath.toString) // We are still using the old data. assertCached(table("refreshTable")) checkAnswer( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 58b0b80c31..30db976a3a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -409,11 +409,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { val originalDefaultSource = conf.defaultDataSourceName val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - val df = jsonRDD(rdd) + val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") // Save the df as a managed table (by not specifiying the path). - df.saveAsTable("savedJsonTable") + df.write.saveAsTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"), @@ -443,11 +443,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { val originalDefaultSource = conf.defaultDataSourceName val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - val df = jsonRDD(rdd) + val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") // Save the df as a managed table (by not specifiying the path). - df.saveAsTable("savedJsonTable") + df.write.saveAsTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable"), @@ -455,17 +455,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // Right now, we cannot append to an existing JSON table. intercept[RuntimeException] { - df.saveAsTable("savedJsonTable", SaveMode.Append) + df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable") } // We can overwrite it. - df.saveAsTable("savedJsonTable", SaveMode.Overwrite) + df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable"), df.collect()) // When the save mode is Ignore, we will do nothing when the table already exists. - df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore) + df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable") assert(df.schema === table("savedJsonTable").schema) checkAnswer( sql("SELECT * FROM savedJsonTable"), @@ -479,11 +479,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { // Create an external table by specifying the path. conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - df.saveAsTable( - "savedJsonTable", - "org.apache.spark.sql.json", - SaveMode.Append, - Map("path" -> tempPath.toString)) + df.write + .format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .option("path", tempPath.toString) + .saveAsTable("savedJsonTable") checkAnswer( sql("SELECT * FROM savedJsonTable"), df.collect()) @@ -501,14 +501,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { val originalDefaultSource = conf.defaultDataSourceName val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - val df = jsonRDD(rdd) + val df = read.json(rdd) conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name") - df.saveAsTable( - "savedJsonTable", - "org.apache.spark.sql.json", - SaveMode.Append, - Map("path" -> tempPath.toString)) + df.write.format("org.apache.spark.sql.json") + .mode(SaveMode.Append) + .option("path", tempPath.toString) + .saveAsTable("savedJsonTable") conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json") createExternalTable("createdJsonTable", tempPath.toString) @@ -566,7 +565,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) - jsonRDD(rdd).registerTempTable("jt") + read.json(rdd).registerTempTable("jt") sql( """ |create table test_parquet_ctas STORED AS parquET @@ -601,7 +600,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil) assert(df1.schema === expectedSchema1) - df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet") val df2 = createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a") @@ -610,10 +609,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil) assert(df2.schema === expectedSchema2) df2.insertInto("arrayInParquet", overwrite = false) - createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a") - .saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto. - createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a") - .saveAsTable("arrayInParquet", "parquet", SaveMode.Append) + createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) + .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto. + createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write + .mode(SaveMode.Append).saveAsTable("arrayInParquet") refreshTable("arrayInParquet") checkAnswer( @@ -634,7 +633,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructType( StructField("a", mapType1, nullable = true) :: Nil) assert(df1.schema === expectedSchema1) - df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet") val df2 = createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a") @@ -644,10 +643,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { StructField("a", mapType2, nullable = true) :: Nil) assert(df2.schema === expectedSchema2) df2.insertInto("mapInParquet", overwrite = false) - createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a") - .saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto. - createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a") - .saveAsTable("mapInParquet", "parquet", SaveMode.Append) + createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append) + .saveAsTable("mapInParquet") // This one internally calls df2.insertInto. + createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write + .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet") refreshTable("mapInParquet") checkAnswer( @@ -711,30 +710,30 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { def createDF(from: Int, to: Int): DataFrame = createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2") - createDF(0, 9).saveAsTable("insertParquet", "parquet") + createDF(0, 9).write.format("parquet").saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), (6 to 9).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { - createDF(10, 19).saveAsTable("insertParquet", "parquet") + createDF(10, 19).write.format("parquet").saveAsTable("insertParquet") } - createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append) + createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"), (6 to 19).map(i => Row(i, s"str$i"))) - createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append) + createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"), (6 to 24).map(i => Row(i, s"str$i"))) intercept[AnalysisException] { - createDF(30, 39).saveAsTable("insertParquet") + createDF(30, 39).write.saveAsTable("insertParquet") } - createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append) + createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"), (6 to 34).map(i => Row(i, s"str$i"))) @@ -744,11 +743,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach { sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"), (6 to 44).map(i => Row(i, s"str$i"))) - createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite) + createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"), (52 to 54).map(i => Row(i, s"str$i"))) - createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore) + createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet") checkAnswer( sql("SELECT p.c1, c2 FROM insertParquet p"), (50 to 59).map(i => Row(i, s"str$i"))) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala index 8ad3627504..3dfa6e72e1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hive.execution import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.hive.test.TestHive.{sparkContext, jsonRDD, sql} +import org.apache.spark.sql.hive.test.TestHive.{read, sparkContext, jsonRDD, sql} import org.apache.spark.sql.hive.test.TestHive.implicits._ case class Nested(a: Int, B: Int) @@ -31,14 +31,14 @@ case class Data(a: Int, B: Int, n: Nested, nestedArray: Seq[Nested]) class HiveResolutionSuite extends HiveComparisonTest { test("SPARK-3698: case insensitive test for nested data") { - jsonRDD(sparkContext.makeRDD( + read.json(sparkContext.makeRDD( """{"a": [{"a": {"a": 1}}]}""" :: Nil)).registerTempTable("nested") // This should be successfully analyzed sql("SELECT a[0].A.A from nested").queryExecution.analyzed } test("SPARK-5278: check ambiguous reference to fields") { - jsonRDD(sparkContext.makeRDD( + read.json(sparkContext.makeRDD( """{"a": [{"b": 1, "B": 2}]}""" :: Nil)).registerTempTable("nested") // there are 2 filed matching field name "b", we should report Ambiguous reference error diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index dfe73c62c4..ca2c4b4019 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -535,14 +535,14 @@ class SQLQuerySuite extends QueryTest { test("SPARK-4296 Grouping field with Hive UDF as sub expression") { val rdd = sparkContext.makeRDD( """{"a": "str", "b":"1", "c":"1970-01-01 00:00:00"}""" :: Nil) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer( sql("SELECT concat(a, '-', b), year(c) FROM data GROUP BY concat(a, '-', b), year(c)"), Row("str-1", 1970)) dropTempTable("data") - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") checkAnswer(sql("SELECT year(c) + 1 FROM data GROUP BY year(c) + 1"), Row(1971)) dropTempTable("data") @@ -550,7 +550,7 @@ class SQLQuerySuite extends QueryTest { test("resolve udtf with single alias") { val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") val df = sql("SELECT explode(a) AS val FROM data") val col = df("val") } @@ -563,7 +563,7 @@ class SQLQuerySuite extends QueryTest { // PreInsertionCasts will actually start to work before ImplicitGenerate and then // generates an invalid query plan. val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 1}]}""")) - jsonRDD(rdd).registerTempTable("data") + read.json(rdd).registerTempTable("data") val originalConf = getConf("spark.sql.hive.convertCTAS", "false") setConf("spark.sql.hive.convertCTAS", "false") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index a0075f1e44..05d99983b6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -150,9 +150,9 @@ class ParquetMetastoreSuiteBase extends ParquetPartitioningTest { } val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str$i"}""")) - jsonRDD(rdd1).registerTempTable("jt") + read.json(rdd1).registerTempTable("jt") val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":[$i, null]}""")) - jsonRDD(rdd2).registerTempTable("jt_array") + read.json(rdd2).registerTempTable("jt_array") setConf("spark.sql.hive.convertMetastoreParquet", "true") } @@ -617,16 +617,16 @@ class ParquetSourceSuiteBase extends ParquetPartitioningTest { sql("drop table if exists spark_6016_fix") // Create a DataFrame with two partitions. So, the created table will have two parquet files. - val df1 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2)) - df1.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite) + val df1 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i}"""), 2)) + df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") checkAnswer( sql("select * from spark_6016_fix"), (1 to 10).map(i => Row(i)) ) // Create a DataFrame with four partitions. So, the created table will have four parquet files. - val df2 = jsonRDD(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4)) - df2.saveAsTable("spark_6016_fix", "parquet", SaveMode.Overwrite) + val df2 = read.json(sparkContext.parallelize((1 to 10).map(i => s"""{"b":$i}"""), 4)) + df2.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("spark_6016_fix") // For the bug of SPARK-6016, we are caching two outdated footers for df1. Then, // since the new table has four parquet files, we are trying to read new footers from two files // and then merge metadata in footers of these four (two outdated ones and two latest one), @@ -663,7 +663,7 @@ class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase { StructField("a", arrayType1, nullable = true) :: Nil) assert(df.schema === expectedSchema1) - df.saveAsTable("alwaysNullable", "parquet") + df.write.format("parquet").saveAsTable("alwaysNullable") val mapType2 = MapType(IntegerType, IntegerType, valueContainsNull = true) val arrayType2 = ArrayType(IntegerType, containsNull = true) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala index f44b3c521e..9d9b436cab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala @@ -120,10 +120,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - non-partitioned table - ErrorIfExists") { withTempDir { file => intercept[RuntimeException] { - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.ErrorIfExists) + testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).save(file.getCanonicalPath) } } } @@ -233,10 +230,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { test("save()/load() - partitioned table - Ignore") { withTempDir { file => - partitionedTestDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Ignore) + partitionedTestDF.write + .format(dataSourceName).mode(SaveMode.Ignore).save(file.getCanonicalPath) val path = new Path(file.getCanonicalPath) val fs = path.getFileSystem(SparkHadoopUtil.get.conf) @@ -249,11 +244,9 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - non-partitioned table - Overwrite") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) + testDF.write.format(dataSourceName).mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .saveAsTable("t") withTable("t") { checkAnswer(table("t"), testDF.collect()) @@ -261,15 +254,8 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { } test("saveAsTable()/load() - non-partitioned table - Append") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append) + testDF.write.format(dataSourceName).mode(SaveMode.Overwrite).saveAsTable("t") + testDF.write.format(dataSourceName).mode(SaveMode.Append).saveAsTable("t") withTable("t") { checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) @@ -281,10 +267,7 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { withTempTable("t") { intercept[AnalysisException] { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.ErrorIfExists) + testDF.write.format(dataSourceName).mode(SaveMode.ErrorIfExists).saveAsTable("t") } } } @@ -293,21 +276,16 @@ class HadoopFsRelationTest extends QueryTest with ParquetTest { Seq.empty[(Int, String)].toDF().registerTempTable("t") withTempTable("t") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Ignore) - + testDF.write.format(dataSourceName).mode(SaveMode.Ignore).saveAsTable("t") assert(table("t").collect().isEmpty) } } test("saveAsTable()/load() - partitioned table - simple queries") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) + partitionedTestDF.write.format(dataSourceName) + .mode(SaveMode.Overwrite) + .option("dataSchema", dataSchema.json) + .saveAsTable("t") withTable("t") { checkQueries(table("t")) @@ -492,11 +470,9 @@ class SimpleTextHadoopFsRelationSuite extends HadoopFsRelationTest { StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchemaWithPartition.json))) + read.format(dataSourceName) + .option("dataSchema", dataSchemaWithPartition.json) + .load(file.getCanonicalPath)) } } } @@ -518,18 +494,16 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { sparkContext .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) .toDF("a", "b", "p1") - .saveAsParquetFile(partitionDir.toString) + .write.parquet(partitionDir.toString) } val dataSchemaWithPartition = StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) checkQueries( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchemaWithPartition.json))) + read.format(dataSourceName) + .option("dataSchema", dataSchemaWithPartition.json) + .load(file.getCanonicalPath)) } } }