[SPARK-5166][SPARK-5247][SPARK-5258][SQL] API Cleanup / Documentation

Author: Michael Armbrust <michael@databricks.com>

Closes #4642 from marmbrus/docs and squashes the following commits:

d291c34 [Michael Armbrust] python tests
9be66e3 [Michael Armbrust] comments
d56afc2 [Michael Armbrust] fix style
f004747 [Michael Armbrust] fix build
c4a907b [Michael Armbrust] fix tests
42e2b73 [Michael Armbrust] [SQL] Documentation / API Clean-up.
This commit is contained in:
Michael Armbrust 2015-02-17 10:21:17 -08:00
parent c76da36c21
commit c74b07fa94
30 changed files with 484 additions and 406 deletions

View file

@ -361,9 +361,16 @@ object Unidoc {
publish := {},
unidocProjectFilter in(ScalaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, catalyst, streamingFlumeSink, yarn),
inAnyProject -- inProjects(OldDeps.project, repl, examples, tools, streamingFlumeSink, yarn),
unidocProjectFilter in(JavaUnidoc, unidoc) :=
inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, catalyst, streamingFlumeSink, yarn),
inAnyProject -- inProjects(OldDeps.project, repl, bagel, examples, tools, streamingFlumeSink, yarn),
// Skip actual catalyst, but include the subproject.
// Catalyst is not public API and contains quasiquotes which break scaladoc.
unidocAllSources in (ScalaUnidoc, unidoc) := {
(unidocAllSources in (ScalaUnidoc, unidoc)).value
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
},
// Skip class names containing $ and some internal packages in Javadocs
unidocAllSources in (JavaUnidoc, unidoc) := {
@ -376,6 +383,7 @@ object Unidoc {
.map(_.filterNot(_.getCanonicalPath.contains("executor")))
.map(_.filterNot(_.getCanonicalPath.contains("python")))
.map(_.filterNot(_.getCanonicalPath.contains("collection")))
.map(_.filterNot(_.getCanonicalPath.contains("sql/catalyst")))
},
// Javadoc options: create a window title, and group key packages on index page

View file

@ -252,7 +252,7 @@ class SQLContext(object):
>>> schema = StructType([StructField("field1", IntegerType(), False),
... StructField("field2", StringType(), False)])
>>> df = sqlCtx.applySchema(rdd2, schema)
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT * from table1")
>>> df2.collect()
[Row(field1=1, field2=u'row1'),..., Row(field1=3, field2=u'row3')]
@ -405,17 +405,17 @@ class SQLContext(object):
return self.applySchema(data, schema)
def registerRDDAsTable(self, rdd, tableName):
def registerDataFrameAsTable(self, rdd, tableName):
"""Registers the given RDD as a temporary table in the catalog.
Temporary tables exist only during the lifetime of this instance of
SQLContext.
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
"""
if (rdd.__class__ is DataFrame):
df = rdd._jdf
self._ssql_ctx.registerRDDAsTable(df, tableName)
self._ssql_ctx.registerDataFrameAsTable(df, tableName)
else:
raise ValueError("Can only register DataFrame as table")
@ -456,7 +456,7 @@ class SQLContext(object):
... print>>ofn, json
>>> ofn.close()
>>> df1 = sqlCtx.jsonFile(jsonFile)
>>> sqlCtx.registerRDDAsTable(df1, "table1")
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
@ -467,7 +467,7 @@ class SQLContext(object):
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df3 = sqlCtx.jsonFile(jsonFile, df1.schema)
>>> sqlCtx.registerRDDAsTable(df3, "table2")
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
@ -485,7 +485,7 @@ class SQLContext(object):
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonFile(jsonFile, schema)
>>> sqlCtx.registerRDDAsTable(df5, "table3")
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
@ -509,7 +509,7 @@ class SQLContext(object):
determine the schema.
>>> df1 = sqlCtx.jsonRDD(json)
>>> sqlCtx.registerRDDAsTable(df1, "table1")
>>> sqlCtx.registerDataFrameAsTable(df1, "table1")
>>> df2 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table1")
@ -520,7 +520,7 @@ class SQLContext(object):
Row(f1=None, f2=u'row3', f3=Row(field4=33, field5=[]), f4=None)
>>> df3 = sqlCtx.jsonRDD(json, df1.schema)
>>> sqlCtx.registerRDDAsTable(df3, "table2")
>>> sqlCtx.registerDataFrameAsTable(df3, "table2")
>>> df4 = sqlCtx.sql(
... "SELECT field1 AS f1, field2 as f2, field3 as f3, "
... "field6 as f4 from table2")
@ -538,7 +538,7 @@ class SQLContext(object):
... StructField("field5",
... ArrayType(IntegerType(), False), True)]), False)])
>>> df5 = sqlCtx.jsonRDD(json, schema)
>>> sqlCtx.registerRDDAsTable(df5, "table3")
>>> sqlCtx.registerDataFrameAsTable(df5, "table3")
>>> df6 = sqlCtx.sql(
... "SELECT field2 AS f1, field3.field5 as f2, "
... "field3.field5[0] as f3 from table3")
@ -628,7 +628,7 @@ class SQLContext(object):
def sql(self, sqlQuery):
"""Return a L{DataFrame} representing the result of the given query.
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.sql("SELECT field1 AS f1, field2 as f2 from table1")
>>> df2.collect()
[Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
@ -638,7 +638,7 @@ class SQLContext(object):
def table(self, tableName):
"""Returns the specified table as a L{DataFrame}.
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.table("table1")
>>> sorted(df.collect()) == sorted(df2.collect())
True
@ -653,7 +653,7 @@ class SQLContext(object):
The returned DataFrame has two columns, tableName and isTemporary
(a column with BooleanType indicating if a table is a temporary one or not).
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> df2 = sqlCtx.tables()
>>> df2.filter("tableName = 'table1'").first()
Row(tableName=u'table1', isTemporary=True)
@ -668,7 +668,7 @@ class SQLContext(object):
If `dbName` is not specified, the current database will be used.
>>> sqlCtx.registerRDDAsTable(df, "table1")
>>> sqlCtx.registerDataFrameAsTable(df, "table1")
>>> "table1" in sqlCtx.tableNames()
True
>>> "table1" in sqlCtx.tableNames("db")

View file

@ -1,59 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc;
import org.apache.spark.Partition;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
public class JDBCUtils {
/**
* Construct a DataFrame representing the JDBC table at the database
* specified by url with table name table.
*/
public static DataFrame jdbcRDD(SQLContext sql, String url, String table) {
Partition[] parts = new Partition[1];
parts[0] = new JDBCPartition(null, 0);
return sql.baseRelationToDataFrame(
new JDBCRelation(url, table, parts, sql));
}
/**
* Construct a DataFrame representing the JDBC table at the database
* specified by url with table name table partitioned by parts.
* Here, parts is an array of expressions suitable for insertion into a WHERE
* clause; each one defines one partition.
*/
public static DataFrame jdbcRDD(SQLContext sql, String url, String table, String[] parts) {
Partition[] partitions = new Partition[parts.length];
for (int i = 0; i < parts.length; i++)
partitions[i] = new JDBCPartition(parts[i], i);
return sql.baseRelationToDataFrame(
new JDBCRelation(url, table, partitions, sql));
}
private static JavaJDBCTrampoline trampoline = new JavaJDBCTrampoline();
public static void createJDBCTable(DataFrame rdd, String url, String table, boolean allowExisting) {
trampoline.createJDBCTable(rdd, url, table, allowExisting);
}
public static void insertIntoJDBC(DataFrame rdd, String url, String table, boolean overwrite) {
trampoline.insertIntoJDBC(rdd, url, table, overwrite);
}
}

View file

@ -17,6 +17,9 @@
package org.apache.spark.sql
import java.sql.DriverManager
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
@ -27,6 +30,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils
@ -77,6 +81,12 @@ private[sql] object DataFrame {
* .groupBy(department("name"), "gender")
* .agg(avg(people("salary")), max(people("age")))
* }}}
*
* @groupname basic Basic DataFrame functions
* @groupname dfops Language Integrated Queries
* @groupname rdd RDD Operations
* @groupname output Output Operations
* @groupname action Actions
*/
// TODO: Improve documentation.
@Experimental
@ -102,7 +112,8 @@ trait DataFrame extends RDDApi[Row] with Serializable {
def toSchemaRDD: DataFrame = this
/**
* Returns the object itself. Used to force an implicit conversion from RDD to DataFrame in Scala.
* Returns the object itself.
* @group basic
*/
// This is declared with parentheses to prevent the Scala compiler from treating
// `rdd.toDF("1")` as invoking this toDF and then apply on the returned DataFrame.
@ -116,31 +127,51 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* rdd.toDF // this implicit conversion creates a DataFrame with column name _1 and _2
* rdd.toDF("id", "name") // this creates a DataFrame with column name "id" and "name"
* }}}
* @group basic
*/
@scala.annotation.varargs
def toDF(colNames: String*): DataFrame
/** Returns the schema of this [[DataFrame]]. */
/**
* Returns the schema of this [[DataFrame]].
* @group basic
*/
def schema: StructType
/** Returns all column names and their data types as an array. */
/**
* Returns all column names and their data types as an array.
* @group basic
*/
def dtypes: Array[(String, String)]
/** Returns all column names as an array. */
/**
* Returns all column names as an array.
* @group basic
*/
def columns: Array[String] = schema.fields.map(_.name)
/** Prints the schema to the console in a nice tree format. */
/**
* Prints the schema to the console in a nice tree format.
* @group basic
*/
def printSchema(): Unit
/** Prints the plans (logical and physical) to the console for debugging purpose. */
/**
* Prints the plans (logical and physical) to the console for debugging purpose.
* @group basic
*/
def explain(extended: Boolean): Unit
/** Only prints the physical plan to the console for debugging purpose. */
/**
* Only prints the physical plan to the console for debugging purpose.
* @group basic
*/
def explain(): Unit = explain(extended = false)
/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
* @group basic
*/
def isLocal: Boolean
@ -154,6 +185,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* 1983 03 0.410516 0.442194
* 1984 04 0.450090 0.483521
* }}}
* @group basic
*/
def show(): Unit
@ -163,6 +195,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Note that cartesian joins are very expensive without an extra filter that can be pushed down.
*
* @param right Right side of the join operation.
* @group dfops
*/
def join(right: DataFrame): DataFrame
@ -174,6 +207,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df1.join(df2, $"df1Key" === $"df2Key")
* df1.join(df2).where($"df1Key" === $"df2Key")
* }}}
* @group dfops
*/
def join(right: DataFrame, joinExprs: Column): DataFrame
@ -194,6 +228,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* @param right Right side of the join.
* @param joinExprs Join expression.
* @param joinType One of: `inner`, `outer`, `left_outer`, `right_outer`, `semijoin`.
* @group dfops
*/
def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
@ -205,6 +240,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.sort($"sortcol")
* df.sort($"sortcol".asc)
* }}}
* @group dfops
*/
@scala.annotation.varargs
def sort(sortCol: String, sortCols: String*): DataFrame
@ -214,6 +250,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.sort($"col1", $"col2".desc)
* }}}
* @group dfops
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): DataFrame
@ -221,6 +258,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
* @group dfops
*/
@scala.annotation.varargs
def orderBy(sortCol: String, sortCols: String*): DataFrame
@ -228,27 +266,32 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] sorted by the given expressions.
* This is an alias of the `sort` function.
* @group dfops
*/
@scala.annotation.varargs
def orderBy(sortExprs: Column*): DataFrame
/**
* Selects column based on the column name and return it as a [[Column]].
* @group dfops
*/
def apply(colName: String): Column = col(colName)
/**
* Selects column based on the column name and return it as a [[Column]].
* @group dfops
*/
def col(colName: String): Column
/**
* Returns a new [[DataFrame]] with an alias set.
* @group dfops
*/
def as(alias: String): DataFrame
/**
* (Scala-specific) Returns a new [[DataFrame]] with an alias set.
* @group dfops
*/
def as(alias: Symbol): DataFrame
@ -257,6 +300,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.select($"colA", $"colB" + 1)
* }}}
* @group dfops
*/
@scala.annotation.varargs
def select(cols: Column*): DataFrame
@ -270,6 +314,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.select("colA", "colB")
* df.select($"colA", $"colB")
* }}}
* @group dfops
*/
@scala.annotation.varargs
def select(col: String, cols: String*): DataFrame
@ -281,6 +326,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.selectExpr("colA", "colB as newName", "abs(colC)")
* }}}
* @group dfops
*/
@scala.annotation.varargs
def selectExpr(exprs: String*): DataFrame
@ -293,6 +339,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* peopleDf.where($"age" > 15)
* peopleDf($"age" > 15)
* }}}
* @group dfops
*/
def filter(condition: Column): DataFrame
@ -301,6 +348,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* peopleDf.filter("age > 15")
* }}}
* @group dfops
*/
def filter(conditionExpr: String): DataFrame
@ -312,6 +360,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* peopleDf.where($"age" > 15)
* peopleDf($"age" > 15)
* }}}
* @group dfops
*/
def where(condition: Column): DataFrame
@ -329,6 +378,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "age" -> "max"
* ))
* }}}
* @group dfops
*/
@scala.annotation.varargs
def groupBy(cols: Column*): GroupedData
@ -350,6 +400,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "age" -> "max"
* ))
* }}}
* @group dfops
*/
@scala.annotation.varargs
def groupBy(col1: String, cols: String*): GroupedData
@ -366,6 +417,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* "expense" -> "sum"
* )
* }}}
* @group dfops
*/
def agg(aggExpr: (String, String), aggExprs: (String, String)*): DataFrame = {
groupBy().agg(aggExpr, aggExprs :_*)
@ -378,6 +430,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(Map("age" -> "max", "salary" -> "avg"))
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
* @group dfops
*/
def agg(exprs: Map[String, String]): DataFrame = groupBy().agg(exprs)
@ -388,6 +441,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(Map("age" -> "max", "salary" -> "avg"))
* df.groupBy().agg(Map("age" -> "max", "salary" -> "avg"))
* }}
* @group dfops
*/
def agg(exprs: java.util.Map[String, String]): DataFrame = groupBy().agg(exprs)
@ -398,6 +452,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* df.agg(max($"age"), avg($"salary"))
* df.groupBy().agg(max($"age"), avg($"salary"))
* }}
* @group dfops
*/
@scala.annotation.varargs
def agg(expr: Column, exprs: Column*): DataFrame = groupBy().agg(expr, exprs :_*)
@ -405,24 +460,28 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] by taking the first `n` rows. The difference between this function
* and `head` is that `head` returns an array while `limit` returns a new [[DataFrame]].
* @group dfops
*/
def limit(n: Int): DataFrame
/**
* Returns a new [[DataFrame]] containing union of rows in this frame and another frame.
* This is equivalent to `UNION ALL` in SQL.
* @group dfops
*/
def unionAll(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows only in both this frame and another frame.
* This is equivalent to `INTERSECT` in SQL.
* @group dfops
*/
def intersect(other: DataFrame): DataFrame
/**
* Returns a new [[DataFrame]] containing rows in this frame but not in another frame.
* This is equivalent to `EXCEPT` in SQL.
* @group dfops
*/
def except(other: DataFrame): DataFrame
@ -432,6 +491,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
* @param seed Seed for sampling.
* @group dfops
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Long): DataFrame
@ -440,6 +500,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
*
* @param withReplacement Sample with replacement or not.
* @param fraction Fraction of rows to generate.
* @group dfops
*/
def sample(withReplacement: Boolean, fraction: Double): DataFrame = {
sample(withReplacement, fraction, Utils.random.nextLong)
@ -464,6 +525,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
*
* val bookCountPerWord = allWords.groupBy("word").agg(countDistinct("title"))
* }}}
* @group dfops
*/
def explode[A <: Product : TypeTag](input: Column*)(f: Row => TraversableOnce[A]): DataFrame
@ -476,6 +538,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* {{{
* df.explode("words", "word")(words: String => words.split(" "))
* }}}
* @group dfops
*/
def explode[A, B : TypeTag](
inputColumn: String,
@ -486,11 +549,13 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new [[DataFrame]] by adding a column.
* @group dfops
*/
def withColumn(colName: String, col: Column): DataFrame
/**
* Returns a new [[DataFrame]] with a column renamed.
* @group dfops
*/
def withColumnRenamed(existingName: String, newName: String): DataFrame
@ -511,62 +576,84 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns a new RDD by applying a function to all rows of this DataFrame.
* @group rdd
*/
override def map[R: ClassTag](f: Row => R): RDD[R]
/**
* Returns a new RDD by first applying a function to all rows of this [[DataFrame]],
* and then flattening the results.
* @group rdd
*/
override def flatMap[R: ClassTag](f: Row => TraversableOnce[R]): RDD[R]
/**
* Returns a new RDD by applying a function to each partition of this DataFrame.
* @group rdd
*/
override def mapPartitions[R: ClassTag](f: Iterator[Row] => Iterator[R]): RDD[R]
/**
* Applies a function `f` to all rows.
* @group rdd
*/
override def foreach(f: Row => Unit): Unit
/**
* Applies a function f to each partition of this [[DataFrame]].
* @group rdd
*/
override def foreachPartition(f: Iterator[Row] => Unit): Unit
/**
* Returns the first `n` rows in the [[DataFrame]].
* @group action
*/
override def take(n: Int): Array[Row]
/**
* Returns an array that contains all of [[Row]]s in this [[DataFrame]].
* @group action
*/
override def collect(): Array[Row]
/**
* Returns a Java list that contains all of [[Row]]s in this [[DataFrame]].
* @group action
*/
override def collectAsList(): java.util.List[Row]
/**
* Returns the number of rows in the [[DataFrame]].
* @group action
*/
override def count(): Long
/**
* Returns a new [[DataFrame]] that has exactly `numPartitions` partitions.
* @group rdd
*/
override def repartition(numPartitions: Int): DataFrame
/** Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]]. */
/**
* Returns a new [[DataFrame]] that contains only the unique rows from this [[DataFrame]].
* @group dfops
*/
override def distinct: DataFrame
/**
* @group basic
*/
override def persist(): this.type
/**
* @group basic
*/
override def persist(newLevel: StorageLevel): this.type
/**
* @group basic
*/
override def unpersist(blocking: Boolean): this.type
/////////////////////////////////////////////////////////////////////////////
@ -575,16 +662,19 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* Returns the content of the [[DataFrame]] as an [[RDD]] of [[Row]]s.
* @group rdd
*/
def rdd: RDD[Row]
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
*/
def toJavaRDD: JavaRDD[Row] = rdd.toJavaRDD()
/**
* Returns the content of the [[DataFrame]] as a [[JavaRDD]] of [[Row]]s.
* @group rdd
*/
def javaRDD: JavaRDD[Row] = toJavaRDD
@ -592,7 +682,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
* table is tied to the [[SQLContext]] that was used to create this DataFrame.
*
* @group schema
* @group basic
*/
def registerTempTable(tableName: String): Unit
@ -600,6 +690,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
* Files that are written out using this method can be read back in as a [[DataFrame]]
* using the `parquetFile` function in [[SQLContext]].
* @group output
*/
def saveAsParquetFile(path: String): Unit
@ -613,6 +704,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(tableName: String): Unit = {
@ -628,6 +720,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(tableName: String, mode: SaveMode): Unit = {
@ -651,6 +744,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
@ -668,6 +762,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
@ -686,6 +781,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
@ -706,6 +802,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
* @group output
*/
@Experimental
def saveAsTable(
@ -719,6 +816,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* Saves the contents of this DataFrame to the given path,
* using the default data source configured by spark.sql.sources.default and
* [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
*/
@Experimental
def save(path: String): Unit = {
@ -729,6 +827,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
* using the default data source configured by spark.sql.sources.default.
* @group output
*/
@Experimental
def save(path: String, mode: SaveMode): Unit = {
@ -740,6 +839,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source,
* using [[SaveMode.ErrorIfExists]] as the save mode.
* @group output
*/
@Experimental
def save(path: String, source: String): Unit = {
@ -750,6 +850,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source and
* [[SaveMode]] specified by mode.
* @group output
*/
@Experimental
def save(path: String, source: String, mode: SaveMode): Unit = {
@ -760,6 +861,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options.
* @group output
*/
@Experimental
def save(
@ -774,6 +876,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* (Scala-specific)
* Saves the contents of this DataFrame based on the given data source,
* [[SaveMode]] specified by mode, and a set of options
* @group output
*/
@Experimental
def save(
@ -784,6 +887,7 @@ trait DataFrame extends RDDApi[Row] with Serializable {
/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
* @group output
*/
@Experimental
def insertInto(tableName: String, overwrite: Boolean): Unit
@ -792,15 +896,46 @@ trait DataFrame extends RDDApi[Row] with Serializable {
* :: Experimental ::
* Adds the rows from this RDD to the specified table.
* Throws an exception if the table already exists.
* @group output
*/
@Experimental
def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
/**
* Returns the content of the [[DataFrame]] as a RDD of JSON strings.
* @group rdd
*/
def toJSON: RDD[String]
////////////////////////////////////////////////////////////////////////////
// JDBC Write Support
////////////////////////////////////////////////////////////////////////////
/**
* Save this RDD to a JDBC database at `url` under the table name `table`.
* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
* If you pass `true` for `allowExisting`, it will drop any table with the
* given name; if you pass `false`, it will throw if the table already
* exists.
* @group output
*/
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit
/**
* Save this RDD to a JDBC database at `url` under the table name `table`.
* Assumes the table already exists and has a compatible schema. If you
* pass `true` for `overwrite`, it will `TRUNCATE` the table before
* performing the `INSERT`s.
*
* The table must already exist on the database. It must have a schema
* that is compatible with the schema of this RDD; inserting the rows of
* the RDD in order via the simple statement
* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
* @group output
*/
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit
////////////////////////////////////////////////////////////////////////////
// for Python API
////////////////////////////////////////////////////////////////////////////

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.io.CharArrayWriter
import java.sql.DriverManager
import scala.language.implicitConversions
import scala.reflect.ClassTag
@ -36,6 +37,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
import org.apache.spark.sql.jdbc.JDBCWriteDetails
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{NumericType, StructType}
@ -375,7 +377,7 @@ private[sql] class DataFrameImpl protected[sql](
}
override def registerTempTable(tableName: String): Unit = {
sqlContext.registerRDDAsTable(this, tableName)
sqlContext.registerDataFrameAsTable(this, tableName)
}
override def saveAsParquetFile(path: String): Unit = {
@ -441,6 +443,35 @@ private[sql] class DataFrameImpl protected[sql](
}
}
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
val conn = DriverManager.getConnection(url)
try {
if (allowExisting) {
val sql = s"DROP TABLE IF EXISTS $table"
conn.prepareStatement(sql).executeUpdate()
}
val schema = JDBCWriteDetails.schemaString(this, url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
JDBCWriteDetails.saveTable(this, url, table)
}
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
if (overwrite) {
val conn = DriverManager.getConnection(url)
try {
val sql = s"TRUNCATE TABLE $table"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
}
JDBCWriteDetails.saveTable(this, url, table)
}
////////////////////////////////////////////////////////////////////////////
// for Python API
////////////////////////////////////////////////////////////////////////////

View file

@ -20,8 +20,13 @@ package org.apache.spark.sql
import org.apache.spark.annotation.Experimental
/**
* :: Experimental ::
* Holder for experimental methods for the bravest. We make NO guarantee about the stability
* regarding binary compatibility and source compatibility of methods here.
*
* {{{
* sqlContext.experimental.extraStrategies += ...
* }}}
*/
@Experimental
class ExperimentalMethods protected[sql](sqlContext: SQLContext) {

View file

@ -173,6 +173,10 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = err()
def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = err()
override def toJSON: RDD[String] = err()
protected[sql] override def javaToPython: JavaRDD[Array[Byte]] = err()

View file

@ -43,11 +43,16 @@ import org.apache.spark.util.Utils
import org.apache.spark.{Partition, SparkContext}
/**
* The entry point for running relational queries using Spark. Allows the creation of [[DataFrame]]
* objects and the execution of SQL queries.
* The entry point for working with structured data (rows and columns) in Spark. Allows the
* creation of [[DataFrame]] objects as well as the execution of SQL queries.
*
* @groupname ddl_ops Catalog DDL functions
* @groupname userf Spark SQL Functions
* @groupname basic Basic Operations
* @groupname ddl_ops Persistent Catalog DDL
* @groupname cachemgmt Cached Table Management
* @groupname genericdata Generic Data Sources
* @groupname specificdata Specific Data Sources
* @groupname config Configuration
* @groupname dataframes Custom DataFrame Creation
* @groupname Ungrouped Support functions for language integrated queries.
*/
class SQLContext(@transient val sparkContext: SparkContext)
@ -61,24 +66,40 @@ class SQLContext(@transient val sparkContext: SparkContext)
// Note that this is a lazy val so we can override the default value in subclasses.
protected[sql] lazy val conf: SQLConf = new SQLConf
/** Set Spark SQL configuration properties. */
/**
* Set Spark SQL configuration properties.
*
* @group config
*/
def setConf(props: Properties): Unit = conf.setConf(props)
/** Set the given Spark SQL configuration property. */
/**
* Set the given Spark SQL configuration property.
*
* @group config
*/
def setConf(key: String, value: String): Unit = conf.setConf(key, value)
/** Return the value of Spark SQL configuration property for the given key. */
/**
* Return the value of Spark SQL configuration property for the given key.
*
* @group config
*/
def getConf(key: String): String = conf.getConf(key)
/**
* Return the value of Spark SQL configuration property for the given key. If the key is not set
* yet, return `defaultValue`.
*
* @group config
*/
def getConf(key: String, defaultValue: String): String = conf.getConf(key, defaultValue)
/**
* Return all the configuration properties that have been set (i.e. not the default).
* This creates a new copy of the config properties in the form of a Map.
*
* @group config
*/
def getAllConfs: immutable.Map[String, String] = conf.getAllConfs
@ -128,7 +149,9 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
* the query planner for advanced functionalities.
* the query planner for advanced functionality.
*
* @group basic
*/
@Experimental
@transient
@ -137,6 +160,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Returns a [[DataFrame]] with no rows or columns.
*
* @group basic
*/
@Experimental
@transient
@ -167,17 +192,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
* (Integer arg1, String arg2) -> arg2 + arg1),
* DataTypes.StringType);
* }}}
*
* @group basic
*/
@transient
val udf: UDFRegistration = new UDFRegistration(this)
/** Returns true if the table is currently cached in-memory. */
/**
* Returns true if the table is currently cached in-memory.
* @group cachemgmt
*/
def isCached(tableName: String): Boolean = cacheManager.isCached(tableName)
/** Caches the specified table in-memory. */
/**
* Caches the specified table in-memory.
* @group cachemgmt
*/
def cacheTable(tableName: String): Unit = cacheManager.cacheTable(tableName)
/** Removes the specified table from the in-memory cache. */
/**
* Removes the specified table from the in-memory cache.
* @group cachemgmt
*/
def uncacheTable(tableName: String): Unit = cacheManager.uncacheTable(tableName)
// scalastyle:off
@ -186,6 +222,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Implicit methods available in Scala for converting
* common Scala objects into [[DataFrame]]s.
*
* {{{
* val sqlContext = new SQLContext
* import sqlContext._
* }}}
*
* @group basic
*/
@Experimental
object implicits extends Serializable {
@ -260,7 +303,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Creates a DataFrame from an RDD of case classes.
*
* @group userf
* @group dataframes
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](rdd: RDD[A]): DataFrame = {
@ -274,6 +317,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Creates a DataFrame from a local Seq of Product.
*
* @group dataframes
*/
@Experimental
def createDataFrame[A <: Product : TypeTag](data: Seq[A]): DataFrame = {
@ -285,6 +330,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Convert a [[BaseRelation]] created for external data sources into a [[DataFrame]].
*
* @group dataframes
*/
def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = {
DataFrame(this, LogicalRelation(baseRelation))
@ -318,6 +365,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
*
* @group dataframes
*/
@DeveloperApi
def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@ -332,6 +381,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Creates a [[DataFrame]] from an [[JavaRDD]] containing [[Row]]s using the given schema.
* It is important to make sure that the structure of every [[Row]] of the provided RDD matches
* the provided schema. Otherwise, there will be runtime exception.
*
* @group dataframes
*/
@DeveloperApi
def createDataFrame(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
@ -346,6 +397,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* @param rowRDD an JavaRDD of Row
* @param columns names for each column
* @return DataFrame
* @group dataframes
*/
def createDataFrame(rowRDD: JavaRDD[Row], columns: java.util.List[String]): DataFrame = {
createDataFrame(rowRDD.rdd, columns.toSeq)
@ -356,6 +408,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
* @group dataframes
*/
def createDataFrame(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
val attributeSeq = getSchema(beanClass)
@ -383,6 +436,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
* SELECT * queries will return the columns in an undefined order.
* @group dataframes
*/
def createDataFrame(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
createDataFrame(rdd.rdd, beanClass)
@ -416,8 +470,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
* dataFrame.registerTempTable("people")
* sqlContext.sql("select name from people").collect.foreach(println)
* }}}
*
* @group userf
*/
@deprecated("use createDataFrame", "1.3.0")
def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
@ -455,7 +507,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
* [[DataFrame]] if no paths are passed in.
*
* @group userf
* @group specificdata
*/
@scala.annotation.varargs
def parquetFile(paths: String*): DataFrame = {
@ -473,7 +525,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
* @group specificdata
*/
def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
@ -482,7 +534,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads a JSON file (one object per line) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group userf
* @group specificdata
*/
@Experimental
def jsonFile(path: String, schema: StructType): DataFrame = {
@ -492,6 +544,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* @group specificdata
*/
@Experimental
def jsonFile(path: String, samplingRatio: Double): DataFrame = {
@ -504,10 +557,18 @@ class SQLContext(@transient val sparkContext: SparkContext)
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group userf
* @group specificdata
*/
def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
/**
* Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
* [[DataFrame]].
* It goes through the entire dataset once to determine the schema.
*
* @group specificdata
*/
def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
/**
@ -515,7 +576,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
* returning the result as a [[DataFrame]].
*
* @group userf
* @group specificdata
*/
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
@ -528,6 +589,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
createDataFrame(rowRDD, appliedSchema)
}
/**
* :: Experimental ::
* Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@Experimental
def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
jsonRDD(json.rdd, schema)
@ -535,6 +603,10 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Loads an RDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
@ -546,6 +618,13 @@ class SQLContext(@transient val sparkContext: SparkContext)
createDataFrame(rowRDD, appliedSchema)
}
/**
* :: Experimental ::
* Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
* schema, returning the result as a [[DataFrame]].
*
* @group specificdata
*/
@Experimental
def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
jsonRDD(json.rdd, samplingRatio);
@ -555,6 +634,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* Returns the dataset stored at path as a DataFrame,
* using the default data source configured by spark.sql.sources.default.
*
* @group genericdata
*/
@Experimental
def load(path: String): DataFrame = {
@ -565,6 +646,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Returns the dataset stored at path as a DataFrame, using the given data source.
*
* @group genericdata
*/
@Experimental
def load(path: String, source: String): DataFrame = {
@ -575,6 +658,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
*/
@Experimental
def load(source: String, options: java.util.Map[String, String]): DataFrame = {
@ -585,6 +670,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame.
*
* @group genericdata
*/
@Experimental
def load(source: String, options: Map[String, String]): DataFrame = {
@ -596,6 +683,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Java-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
*
* @group genericdata
*/
@Experimental
def load(
@ -609,6 +698,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* :: Experimental ::
* (Scala-specific) Returns the dataset specified by the given data source and
* a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
* @group genericdata
*/
@Experimental
def load(
@ -733,54 +823,70 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* :: Experimental ::
* Construct an RDD representing the database table accessible via JDBC URL
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table.
*
* @group specificdata
*/
@Experimental
def jdbcRDD(url: String, table: String): DataFrame = {
jdbcRDD(url, table, null.asInstanceOf[JDBCPartitioningInfo])
def jdbc(url: String, table: String): DataFrame = {
jdbc(url, table, JDBCRelation.columnPartition(null))
}
/**
* :: Experimental ::
* Construct an RDD representing the database table accessible via JDBC URL
* url named table. The PartitioningInfo parameter
* gives the name of a column of integral type, a number of partitions, and
* advisory minimum and maximum values for the column. The RDD is
* partitioned according to said column.
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. Partitions of the table will be retrieved in parallel based on the parameters
* passed to this function.
*
* @param columnName the name of a column of integral type that will be used for partitioning.
* @param lowerBound the minimum value of `columnName` to retrieve
* @param upperBound the maximum value of `columnName` to retrieve
* @param numPartitions the number of partitions. the range `minValue`-`maxValue` will be split
* evenly into this many partitions
*
* @group specificdata
*/
@Experimental
def jdbcRDD(url: String, table: String, partitioning: JDBCPartitioningInfo):
DataFrame = {
def jdbc(
url: String,
table: String,
columnName: String,
lowerBound: Long,
upperBound: Long,
numPartitions: Int): DataFrame = {
val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
val parts = JDBCRelation.columnPartition(partitioning)
jdbcRDD(url, table, parts)
jdbc(url, table, parts)
}
/**
* :: Experimental ::
* Construct an RDD representing the database table accessible via JDBC URL
* Construct a [[DataFrame]] representing the database table accessible via JDBC URL
* url named table. The theParts parameter gives a list expressions
* suitable for inclusion in WHERE clauses; each one defines one partition
* of the RDD.
* of the [[DataFrame]].
*
* @group specificdata
*/
@Experimental
def jdbcRDD(url: String, table: String, theParts: Array[String]): DataFrame = {
def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
JDBCPartition(part, i) : Partition
}
jdbcRDD(url, table, parts)
jdbc(url, table, parts)
}
private def jdbcRDD(url: String, table: String, parts: Array[Partition]): DataFrame = {
private def jdbc(url: String, table: String, parts: Array[Partition]): DataFrame = {
val relation = JDBCRelation(url, table, parts)(this)
baseRelationToDataFrame(relation)
}
/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
* Registers the given [[DataFrame]] as a temporary table in the catalog. Temporary tables exist
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerRDDAsTable(rdd: DataFrame, tableName: String): Unit = {
private[sql] def registerDataFrameAsTable(rdd: DataFrame, tableName: String): Unit = {
catalog.registerTable(Seq(tableName), rdd.logicalPlan)
}
@ -790,7 +896,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
*
* @param tableName the name of the table to be unregistered.
*
* @group ddl_ops
* @group basic
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
@ -801,7 +907,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Executes a SQL query using Spark, returning the result as a [[DataFrame]]. The dialect that is
* used for SQL parsing can be configured with 'spark.sql.dialect'.
*
* @group userf
* @group basic
*/
def sql(sqlText: String): DataFrame = {
if (conf.dialect == "sql") {
@ -811,7 +917,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
/** Returns the specified table as a [[DataFrame]]. */
/**
* Returns the specified table as a [[DataFrame]].
*
* @group ddl_ops
*/
def table(tableName: String): DataFrame =
DataFrame(this, catalog.lookupRelation(Seq(tableName)))
@ -819,6 +929,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Returns a [[DataFrame]] containing names of existing tables in the current database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
*/
def tables(): DataFrame = {
DataFrame(this, ShowTablesCommand(None))
@ -828,6 +940,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
* Returns a [[DataFrame]] containing names of existing tables in the given database.
* The returned DataFrame has two columns, tableName and isTemporary (a Boolean
* indicating if a table is a temporary one or not).
*
* @group ddl_ops
*/
def tables(databaseName: String): DataFrame = {
DataFrame(this, ShowTablesCommand(Some(databaseName)))
@ -835,6 +949,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Returns the names of tables in the current database as an array.
*
* @group ddl_ops
*/
def tableNames(): Array[String] = {
catalog.getTables(None).map {
@ -844,6 +960,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
/**
* Returns the names of tables in the given database as an array.
*
* @group ddl_ops
*/
def tableNames(databaseName: String): Array[String] = {
catalog.getTables(Some(databaseName)).map {

View file

@ -37,7 +37,7 @@ import org.apache.spark.sql.types.DataType
* df.select( predict(df("score")) )
* }}}
*/
case class UserDefinedFunction(f: AnyRef, dataType: DataType) {
case class UserDefinedFunction protected[sql] (f: AnyRef, dataType: DataType) {
def apply(exprs: Column*): Column = {
Column(ScalaUdf(f, dataType, exprs.map(_.expr)))
@ -58,6 +58,7 @@ private[sql] case class UserDefinedPythonFunction(
accumulator: Accumulator[JList[Array[Byte]]],
dataType: DataType) {
/** Returns a [[Column]] that will evaluate to calling this UDF with the given input. */
def apply(exprs: Column*): Column = {
val udf = PythonUDF(name, command, envVars, pythonIncludes, pythonExec, broadcastVars,
accumulator, dataType, exprs.map(_.expr))

View file

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql
/**
* Contains API classes that are specific to a single language (i.e. Java).
*/
package object api

View file

@ -144,7 +144,7 @@ case class CacheTableCommand(
override def run(sqlContext: SQLContext) = {
plan.foreach { logicalPlan =>
sqlContext.registerRDDAsTable(DataFrame(sqlContext, logicalPlan), tableName)
sqlContext.registerDataFrameAsTable(DataFrame(sqlContext, logicalPlan), tableName)
}
sqlContext.cacheTable(tableName)

View file

@ -32,7 +32,9 @@ import org.apache.spark.sql.types._
*
* Usage:
* {{{
* sql("SELECT key FROM src").debug
* import org.apache.spark.sql.execution.debug._
* sql("SELECT key FROM src").debug()
* dataFrame.typeCheck()
* }}}
*/
package object debug {
@ -144,11 +146,9 @@ package object debug {
}
/**
* :: DeveloperApi ::
* Helper functions for checking that runtime types match a given schema.
*/
@DeveloperApi
object TypeCheck {
private[sql] object TypeCheck {
def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
case (null, _) =>
@ -174,10 +174,8 @@ package object debug {
}
/**
* :: DeveloperApi ::
* Augments [[DataFrame]]s with debug methods.
*/
@DeveloperApi
private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
import TypeCheck._

View file

@ -26,11 +26,11 @@ import org.apache.spark.sql.jdbc.{JDBCPartitioningInfo, JDBCRelation, JDBCPartit
import org.apache.spark.sql.types._
package object jdbc {
object JDBCWriteDetails extends Logging {
private[sql] object JDBCWriteDetails extends Logging {
/**
* Returns a PreparedStatement that inserts a row into table via conn.
*/
private def insertStatement(conn: Connection, table: String, rddSchema: StructType):
def insertStatement(conn: Connection, table: String, rddSchema: StructType):
PreparedStatement = {
val sql = new StringBuilder(s"INSERT INTO $table VALUES (")
var fieldsLeft = rddSchema.fields.length
@ -56,7 +56,7 @@ package object jdbc {
* non-Serializable. Instead, we explicitly close over all variables that
* are used.
*/
private[jdbc] def savePartition(url: String, table: String, iterator: Iterator[Row],
def savePartition(url: String, table: String, iterator: Iterator[Row],
rddSchema: StructType, nullTypes: Array[Int]): Iterator[Byte] = {
val conn = DriverManager.getConnection(url)
var committed = false
@ -117,19 +117,14 @@ package object jdbc {
}
Array[Byte]().iterator
}
}
/**
* Make it so that you can call createJDBCTable and insertIntoJDBC on a DataFrame.
*/
implicit class JDBCDataFrame(rdd: DataFrame) {
/**
* Compute the schema string for this RDD.
*/
private def schemaString(url: String): String = {
def schemaString(df: DataFrame, url: String): String = {
val sb = new StringBuilder()
val quirks = DriverQuirks.get(url)
rdd.schema.fields foreach { field => {
df.schema.fields foreach { field => {
val name = field.name
var typ: String = quirks.getJDBCType(field.dataType)._1
if (typ == null) typ = field.dataType match {
@ -156,9 +151,9 @@ package object jdbc {
/**
* Saves the RDD to the database in a single transaction.
*/
private def saveTable(url: String, table: String) {
def saveTable(df: DataFrame, url: String, table: String) {
val quirks = DriverQuirks.get(url)
var nullTypes: Array[Int] = rdd.schema.fields.map(field => {
var nullTypes: Array[Int] = df.schema.fields.map(field => {
var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
if (nullType.isEmpty) {
field.dataType match {
@ -175,61 +170,16 @@ package object jdbc {
case DateType => java.sql.Types.DATE
case DecimalType.Unlimited => java.sql.Types.DECIMAL
case _ => throw new IllegalArgumentException(
s"Can't translate null value for field $field")
s"Can't translate null value for field $field")
}
} else nullType.get
}).toArray
val rddSchema = rdd.schema
rdd.mapPartitions(iterator => JDBCWriteDetails.savePartition(
url, table, iterator, rddSchema, nullTypes)).collect()
val rddSchema = df.schema
df.foreachPartition { iterator =>
JDBCWriteDetails.savePartition(url, table, iterator, rddSchema, nullTypes)
}
}
/**
* Save this RDD to a JDBC database at `url` under the table name `table`.
* This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
* If you pass `true` for `allowExisting`, it will drop any table with the
* given name; if you pass `false`, it will throw if the table already
* exists.
*/
def createJDBCTable(url: String, table: String, allowExisting: Boolean) {
val conn = DriverManager.getConnection(url)
try {
if (allowExisting) {
val sql = s"DROP TABLE IF EXISTS $table"
conn.prepareStatement(sql).executeUpdate()
}
val schema = schemaString(url)
val sql = s"CREATE TABLE $table ($schema)"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
saveTable(url, table)
}
/**
* Save this RDD to a JDBC database at `url` under the table name `table`.
* Assumes the table already exists and has a compatible schema. If you
* pass `true` for `overwrite`, it will `TRUNCATE` the table before
* performing the `INSERT`s.
*
* The table must already exist on the database. It must have a schema
* that is compatible with the schema of this RDD; inserting the rows of
* the RDD in order via the simple statement
* `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
*/
def insertIntoJDBC(url: String, table: String, overwrite: Boolean) {
if (overwrite) {
val conn = DriverManager.getConnection(url)
try {
val sql = s"TRUNCATE TABLE $table"
conn.prepareStatement(sql).executeUpdate()
} finally {
conn.close()
}
}
saveTable(url, table)
}
} // implicit class JDBCDataFrame
}
} // package object jdbc

View file

@ -55,7 +55,7 @@ import org.apache.spark.{Logging, SerializableWritable, TaskContext}
* Parquet table scan operator. Imports the file that backs the given
* [[org.apache.spark.sql.parquet.ParquetRelation]] as a ``RDD[Row]``.
*/
case class ParquetTableScan(
private[sql] case class ParquetTableScan(
attributes: Seq[Attribute],
relation: ParquetRelation,
columnPruningPred: Seq[Expression])
@ -210,7 +210,7 @@ case class ParquetTableScan(
* (only detected via filename pattern so will not catch all cases).
*/
@DeveloperApi
case class InsertIntoParquetTable(
private[sql] case class InsertIntoParquetTable(
relation: ParquetRelation,
child: SparkPlan,
overwrite: Boolean = false)

View file

@ -34,7 +34,7 @@ import org.apache.spark.util.Utils
* convenient to use tuples rather than special case classes when writing test cases/suites.
* Especially, `Tuple1.apply` can be used to easily wrap a single type/value.
*/
trait ParquetTest {
private[sql] trait ParquetTest {
val sqlContext: SQLContext
import sqlContext.implicits.{localSeqToDataFrameHolder, rddToDataFrameHolder}
@ -121,7 +121,7 @@ trait ParquetTest {
(data: Seq[T], tableName: String)
(f: => Unit): Unit = {
withParquetRDD(data) { rdd =>
sqlContext.registerRDDAsTable(rdd, tableName)
sqlContext.registerDataFrameAsTable(rdd, tableName)
withTempTable(tableName)(f)
}
}

View file

@ -72,7 +72,7 @@ import org.apache.spark.{Logging, Partition => SparkPartition, SerializableWrita
* null or empty string. This is similar to the `hive.exec.default.partition.name` configuration
* in Hive.
*/
class DefaultSource
private[sql] class DefaultSource
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {
@ -147,7 +147,7 @@ private[sql] case class PartitionSpec(partitionColumns: StructType, partitions:
* discovery.
*/
@DeveloperApi
case class ParquetRelation2(
private[sql] case class ParquetRelation2(
paths: Seq[String],
parameters: Map[String, String],
maybeSchema: Option[StructType] = None,
@ -600,7 +600,7 @@ case class ParquetRelation2(
}
}
object ParquetRelation2 {
private[sql] object ParquetRelation2 {
// Whether we should merge schemas collected from all Parquet part-files.
val MERGE_SCHEMA = "mergeSchema"

View file

@ -53,7 +53,7 @@ private[parquet] class NanoTime extends Serializable {
"NanoTime{julianDay=" + julianDay + ", timeOfDayNanos=" + timeOfDayNanos + "}"
}
object NanoTime {
private[sql] object NanoTime {
def fromBinary(bytes: Binary): NanoTime = {
Preconditions.checkArgument(bytes.length() == 12, "Must be 12 bytes")
val buf = bytes.toByteBuffer

View file

@ -374,7 +374,7 @@ private[sql] case class CreateTempTableUsing(
def run(sqlContext: SQLContext) = {
val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
sqlContext.registerRDDAsTable(
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Seq.empty
}
@ -390,7 +390,7 @@ private[sql] case class CreateTempTableUsingAsSelect(
def run(sqlContext: SQLContext) = {
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
sqlContext.registerRDDAsTable(
sqlContext.registerDataFrameAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
Seq.empty

View file

@ -1,102 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.jdbc;
import org.junit.*;
import static org.junit.Assert.*;
import java.sql.Connection;
import java.sql.DriverManager;
import org.apache.spark.SparkEnv;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.api.java.*;
import org.apache.spark.sql.test.TestSQLContext$;
public class JavaJDBCTest {
static String url = "jdbc:h2:mem:testdb1";
static Connection conn = null;
// This variable will always be null if TestSQLContext is intact when running
// these tests. Some Java tests do not play nicely with others, however;
// they create a SparkContext of their own at startup and stop it at exit.
// This renders TestSQLContext inoperable, meaning we have to do the same
// thing. If this variable is nonnull, that means we allocated a
// SparkContext of our own and that we need to stop it at teardown.
static JavaSparkContext localSparkContext = null;
static SQLContext sql = TestSQLContext$.MODULE$;
@Before
public void beforeTest() throws Exception {
if (SparkEnv.get() == null) { // A previous test destroyed TestSQLContext.
localSparkContext = new JavaSparkContext("local", "JavaAPISuite");
sql = new SQLContext(localSparkContext);
}
Class.forName("org.h2.Driver");
conn = DriverManager.getConnection(url);
conn.prepareStatement("create schema test").executeUpdate();
conn.prepareStatement("create table test.people (name TEXT(32) NOT NULL, theid INTEGER NOT NULL)").executeUpdate();
conn.prepareStatement("insert into test.people values ('fred', 1)").executeUpdate();
conn.prepareStatement("insert into test.people values ('mary', 2)").executeUpdate();
conn.prepareStatement("insert into test.people values ('joe', 3)").executeUpdate();
conn.commit();
}
@After
public void afterTest() throws Exception {
if (localSparkContext != null) {
localSparkContext.stop();
localSparkContext = null;
}
try {
conn.close();
} finally {
conn = null;
}
}
@Test
public void basicTest() throws Exception {
DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
Row[] rows = rdd.collect();
assertEquals(rows.length, 3);
}
@Test
public void partitioningTest() throws Exception {
String[] parts = new String[2];
parts[0] = "THEID < 2";
parts[1] = "THEID = 2"; // Deliberately forget about one of them.
DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE", parts);
Row[] rows = rdd.collect();
assertEquals(rows.length, 2);
}
@Test
public void writeTest() throws Exception {
DataFrame rdd = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLE");
JDBCUtils.createJDBCTable(rdd, url, "TEST.PEOPLECOPY", false);
DataFrame rdd2 = JDBCUtils.jdbcRDD(sql, url, "TEST.PEOPLECOPY");
Row[] rows = rdd2.collect();
assertEquals(rows.length, 3);
}
}

View file

@ -164,17 +164,16 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
}
test("Basic API") {
assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE").collect.size == 3)
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE").collect.size == 3)
}
test("Partitioning via JDBCPartitioningInfo API") {
val parts = JDBCPartitioningInfo("THEID", 0, 4, 3)
assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", "THEID", 0, 4, 3).collect.size == 3)
}
test("Partitioning via list-of-where-clauses API") {
val parts = Array[String]("THEID < 2", "THEID >= 2")
assert(TestSQLContext.jdbcRDD(url, "TEST.PEOPLE", parts).collect.size == 3)
assert(TestSQLContext.jdbc(url, "TEST.PEOPLE", parts).collect.size == 3)
}
test("H2 integral types") {

View file

@ -57,8 +57,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
val srdd = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
srdd.createJDBCTable(url, "TEST.BASICCREATETEST", false)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").count)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.BASICCREATETEST").collect()(0).length)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
}
test("CREATE with overwrite") {
@ -66,12 +66,12 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
val srdd2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
srdd.createJDBCTable(url, "TEST.DROPTEST", false)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
assert(3 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
assert(3 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
srdd2.createJDBCTable(url, "TEST.DROPTEST", true)
assert(1 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").count)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.DROPTEST").collect()(0).length)
assert(1 == TestSQLContext.jdbc(url, "TEST.DROPTEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.DROPTEST").collect()(0).length)
}
test("CREATE then INSERT to append") {
@ -80,8 +80,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
srdd.createJDBCTable(url, "TEST.APPENDTEST", false)
srdd2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
assert(3 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").count)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.APPENDTEST").collect()(0).length)
assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
}
test("CREATE then INSERT to truncate") {
@ -90,8 +90,8 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
srdd.createJDBCTable(url, "TEST.TRUNCATETEST", false)
srdd2.insertIntoJDBC(url, "TEST.TRUNCATETEST", true)
assert(1 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").count)
assert(2 == TestSQLContext.jdbcRDD(url, "TEST.TRUNCATETEST").collect()(0).length)
assert(1 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").count)
assert(2 == TestSQLContext.jdbc(url, "TEST.TRUNCATETEST").collect()(0).length)
}
test("Incompatible INSERT to append") {

View file

@ -143,7 +143,7 @@ class MySQLDatabase {
}
test("Basic test") {
val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "tbl")
val rdd = TestSQLContext.jdbc(url(ip, "foo"), "tbl")
val rows = rdd.collect
assert(rows.length == 2)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@ -153,7 +153,7 @@ class MySQLDatabase {
}
test("Numeric types") {
val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
val rdd = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@ -181,7 +181,7 @@ class MySQLDatabase {
}
test("Date types") {
val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
val rdd = TestSQLContext.jdbc(url(ip, "foo"), "dates")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@ -199,7 +199,7 @@ class MySQLDatabase {
}
test("String types") {
val rdd = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
val rdd = TestSQLContext.jdbc(url(ip, "foo"), "strings")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@ -225,9 +225,9 @@ class MySQLDatabase {
}
test("Basic write test") {
val rdd1 = TestSQLContext.jdbcRDD(url(ip, "foo"), "numbers")
val rdd2 = TestSQLContext.jdbcRDD(url(ip, "foo"), "dates")
val rdd3 = TestSQLContext.jdbcRDD(url(ip, "foo"), "strings")
val rdd1 = TestSQLContext.jdbc(url(ip, "foo"), "numbers")
val rdd2 = TestSQLContext.jdbc(url(ip, "foo"), "dates")
val rdd3 = TestSQLContext.jdbc(url(ip, "foo"), "strings")
rdd1.createJDBCTable(url(ip, "foo"), "numberscopy", false)
rdd2.createJDBCTable(url(ip, "foo"), "datescopy", false)
rdd3.createJDBCTable(url(ip, "foo"), "stringscopy", false)

View file

@ -113,7 +113,7 @@ class PostgresDatabase {
}
test("Type mapping for various types") {
val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
val rows = rdd.collect
assert(rows.length == 1)
val types = rows(0).toSeq.map(x => x.getClass.toString)
@ -142,7 +142,7 @@ class PostgresDatabase {
}
test("Basic write test") {
val rdd = TestSQLContext.jdbcRDD(url(db.ip), "public.bar")
val rdd = TestSQLContext.jdbc(url(db.ip), "public.bar")
rdd.createJDBCTable(url(db.ip), "public.barcopy", false)
// Test only that it doesn't bomb out.
}

View file

@ -29,9 +29,9 @@ import org.apache.spark.sql.hive.HiveShim
import org.apache.spark.sql.SQLContext
/**
* Implementation for "describe [extended] table".
*
* :: DeveloperApi ::
*
* Implementation for "describe [extended] table".
*/
@DeveloperApi
case class DescribeHiveTableCommand(

View file

@ -102,6 +102,10 @@ case class AddFile(path: String) extends RunnableCommand {
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class CreateMetastoreDataSource(
tableName: String,
userSpecifiedSchema: Option[StructType],
@ -141,6 +145,10 @@ case class CreateMetastoreDataSource(
}
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
case class CreateMetastoreDataSourceAsSelect(
tableName: String,
provider: String,

View file

@ -15,16 +15,11 @@
* limitations under the License.
*/
package org.apache.spark.sql.jdbc
package org.apache.spark.sql.hive
import org.apache.spark.sql.DataFrame
private[jdbc] class JavaJDBCTrampoline {
def createJDBCTable(rdd: DataFrame, url: String, table: String, allowExisting: Boolean) {
rdd.createJDBCTable(url, table, allowExisting);
}
def insertIntoJDBC(rdd: DataFrame, url: String, table: String, overwrite: Boolean) {
rdd.insertIntoJDBC(url, table, overwrite);
}
}
/**
* Physical execution operators used for running queries against data stored in Hive. These
* are not intended for use by users, but are documents so that it is easier to understand
* the output of EXPLAIN queries.
*/
package object execution

View file

@ -17,4 +17,14 @@
package org.apache.spark.sql
/**
* Support for running Spark SQL queries using functionality from Apache Hive (does not require an
* existing Hive installation). Supported Hive features include:
* - Using HiveQL to express queries.
* - Reading metadata from the Hive Metastore using HiveSerDes.
* - Hive UDFs, UDAs, UDTs
*
* Users that would like access to this functionality should create a
* [[hive.HiveContext HiveContext]] instead of a [[SQLContext]].
*/
package object hive

View file

@ -1,56 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.parquet
import java.util.Properties
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category
import org.apache.hadoop.hive.serde2.{SerDeStats, SerDe}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.io.Writable
/**
* A placeholder that allows Spark SQL users to create metastore tables that are stored as
* parquet files. It is only intended to pass the checks that the serde is valid and exists
* when a CREATE TABLE is run. The actual work of decoding will be done by ParquetTableScan
* when "spark.sql.hive.convertMetastoreParquet" is set to true.
*/
@deprecated("No code should depend on FakeParquetHiveSerDe as it is only intended as a " +
"placeholder in the Hive MetaStore", "1.2.0")
class FakeParquetSerDe extends SerDe {
override def getObjectInspector: ObjectInspector = new ObjectInspector {
override def getCategory: Category = Category.PRIMITIVE
override def getTypeName: String = "string"
}
override def deserialize(p1: Writable): AnyRef = throwError
override def initialize(p1: Configuration, p2: Properties): Unit = {}
override def getSerializedClass: Class[_ <: Writable] = throwError
override def getSerDeStats: SerDeStats = throwError
override def serialize(p1: scala.Any, p2: ObjectInspector): Writable = throwError
private def throwError =
sys.error(
"spark.sql.hive.convertMetastoreParquet must be set to true to use FakeParquetSerDe")
}

View file

@ -43,7 +43,9 @@ import org.apache.hadoop.mapred.InputFormat
import org.apache.spark.sql.types.{Decimal, DecimalType}
case class HiveFunctionWrapper(functionClassName: String) extends java.io.Serializable {
private[hive] case class HiveFunctionWrapper(functionClassName: String)
extends java.io.Serializable {
// for Serialization
def this() = this(null)
@ -249,6 +251,9 @@ private[hive] object HiveShim {
def setTblNullFormat(crtTbl: CreateTableDesc, tbl: Table) = {}
}
class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
private[hive] class ShimFileSinkDesc(
var dir: String,
var tableInfo: TableDesc,
var compressed: Boolean)
extends FileSinkDesc(dir, tableInfo, compressed) {
}

View file

@ -56,7 +56,9 @@ import org.apache.spark.sql.types.{Decimal, DecimalType}
*
* @param functionClassName UDF class name
*/
case class HiveFunctionWrapper(var functionClassName: String) extends java.io.Externalizable {
private[hive] case class HiveFunctionWrapper(var functionClassName: String)
extends java.io.Externalizable {
// for Serialization
def this() = this(null)
@ -423,7 +425,10 @@ private[hive] object HiveShim {
* Bug introduced in hive-0.13. FileSinkDesc is serilizable, but its member path is not.
* Fix it through wrapper.
*/
class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)
private[hive] class ShimFileSinkDesc(
var dir: String,
var tableInfo: TableDesc,
var compressed: Boolean)
extends Serializable with Logging {
var compressCodec: String = _
var compressType: String = _