[SQL] SPARK-1333 First draft of java API
WIP: Some work remains... * [x] Hive support * [x] Tests * [x] Update docs Feedback welcome! Author: Michael Armbrust <michael@databricks.com> Closes #248 from marmbrus/javaSchemaRDD and squashes the following commits: b393913 [Michael Armbrust] @srowen 's java style suggestions. f531eb1 [Michael Armbrust] Address matei's comments. 33a1b1a [Michael Armbrust] Ignore JavaHiveSuite. 822f626 [Michael Armbrust] improve docs. ab91750 [Michael Armbrust] Improve Java SQL API: * Change JavaRow => Row * Add support for querying RDDs of JavaBeans * Docs * Tests * Hive support 0b859c8 [Michael Armbrust] First draft of java API.
This commit is contained in:
parent
c1ea3afb51
commit
b8f534196f
|
@ -8,6 +8,10 @@ title: Spark SQL Programming Guide
|
|||
{:toc}
|
||||
|
||||
# Overview
|
||||
|
||||
<div class="codetabs">
|
||||
<div data-lang="scala" markdown="1">
|
||||
|
||||
Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
|
||||
Spark. At the core of this component is a new type of RDD,
|
||||
[SchemaRDD](api/sql/core/index.html#org.apache.spark.sql.SchemaRDD). SchemaRDDs are composed
|
||||
|
@ -18,11 +22,27 @@ file, or by running HiveQL against data stored in [Apache Hive](http://hive.apac
|
|||
|
||||
**All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell.**
|
||||
|
||||
</div>
|
||||
|
||||
<div data-lang="java" markdown="1">
|
||||
Spark SQL allows relational queries expressed in SQL, HiveQL, or Scala to be executed using
|
||||
Spark. At the core of this component is a new type of RDD,
|
||||
[JavaSchemaRDD](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSchemaRDD). JavaSchemaRDDs are composed
|
||||
[Row](api/sql/catalyst/index.html#org.apache.spark.sql.api.java.Row) objects along with
|
||||
a schema that describes the data types of each column in the row. A JavaSchemaRDD is similar to a table
|
||||
in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, parquet
|
||||
file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/).
|
||||
</div>
|
||||
</div>
|
||||
|
||||
***************************************************************************************************
|
||||
|
||||
# Getting Started
|
||||
|
||||
The entry point into all relational functionallity in Spark is the
|
||||
<div class="codetabs">
|
||||
<div data-lang="scala" markdown="1">
|
||||
|
||||
The entry point into all relational functionality in Spark is the
|
||||
[SQLContext](api/sql/core/index.html#org.apache.spark.sql.SQLContext) class, or one of its
|
||||
decendents. To create a basic SQLContext, all you need is a SparkContext.
|
||||
|
||||
|
@ -34,8 +54,30 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
|||
import sqlContext._
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
<div data-lang="java" markdown="1">
|
||||
|
||||
The entry point into all relational functionality in Spark is the
|
||||
[JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one
|
||||
of its decendents. To create a basic JavaSQLContext, all you need is a JavaSparkContext.
|
||||
|
||||
{% highlight java %}
|
||||
JavaSparkContext ctx = ...; // An existing JavaSparkContext.
|
||||
JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx);
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
## Running SQL on RDDs
|
||||
One type of table that is supported by Spark SQL is an RDD of Scala case classetees. The case class
|
||||
|
||||
<div class="codetabs">
|
||||
|
||||
<div data-lang="scala" markdown="1">
|
||||
|
||||
One type of table that is supported by Spark SQL is an RDD of Scala case classes. The case class
|
||||
defines the schema of the table. The names of the arguments to the case class are read using
|
||||
reflection and become the names of the columns. Case classes can also be nested or contain complex
|
||||
types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be
|
||||
|
@ -60,7 +102,83 @@ val teenagers = sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
|||
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
|
||||
{% endhighlight %}
|
||||
|
||||
**Note that Spark SQL currently uses a very basic SQL parser, and the keywords are case sensitive.**
|
||||
</div>
|
||||
|
||||
<div data-lang="java" markdown="1">
|
||||
|
||||
One type of table that is supported by Spark SQL is an RDD of [JavaBeans](http://stackoverflow.com/questions/3295496/what-is-a-javabean-exactly). The BeanInfo
|
||||
defines the schema of the table. Currently, Spark SQL does not support JavaBeans that contain
|
||||
nested or contain complex types such as Lists or Arrays. You can create a JavaBean by creating a
|
||||
class that implements Serializable and has getters and setters for all of its fields.
|
||||
|
||||
{% highlight java %}
|
||||
|
||||
public static class Person implements Serializable {
|
||||
private String name;
|
||||
private int age;
|
||||
|
||||
String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
int getAge() {
|
||||
return age;
|
||||
}
|
||||
|
||||
void setAge(int age) {
|
||||
this.age = age;
|
||||
}
|
||||
}
|
||||
|
||||
{% endhighlight %}
|
||||
|
||||
|
||||
A schema can be applied to an existing RDD by calling `applySchema` and providing the Class object
|
||||
for the JavaBean.
|
||||
|
||||
{% highlight java %}
|
||||
JavaSQLContext ctx = new org.apache.spark.sql.api.java.JavaSQLContext(sc)
|
||||
|
||||
// Load a text file and convert each line to a JavaBean.
|
||||
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
|
||||
new Function<String, Person>() {
|
||||
public Person call(String line) throws Exception {
|
||||
String[] parts = line.split(",");
|
||||
|
||||
Person person = new Person();
|
||||
person.setName(parts[0]);
|
||||
person.setAge(Integer.parseInt(parts[1].trim()));
|
||||
|
||||
return person;
|
||||
}
|
||||
});
|
||||
|
||||
// Apply a schema to an RDD of JavaBeans and register it as a table.
|
||||
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
|
||||
schemaPeople.registerAsTable("people");
|
||||
|
||||
// SQL can be run over RDDs that have been registered as tables.
|
||||
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
|
||||
|
||||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||||
// The columns of a row in the result can be accessed by ordinal.
|
||||
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
|
||||
public String call(Row row) {
|
||||
return "Name: " + row.getString(0);
|
||||
}
|
||||
}).collect();
|
||||
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
**Note that Spark SQL currently uses a very basic SQL parser.**
|
||||
Users that want a more complete dialect of SQL should look at the HiveQL support provided by
|
||||
`HiveContext`.
|
||||
|
||||
|
@ -70,17 +188,21 @@ Parquet is a columnar format that is supported by many other data processing sys
|
|||
provides support for both reading and writing parquet files that automatically preserves the schema
|
||||
of the original data. Using the data from the above example:
|
||||
|
||||
<div class="codetabs">
|
||||
|
||||
<div data-lang="scala" markdown="1">
|
||||
|
||||
{% highlight scala %}
|
||||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||||
import sqlContext._
|
||||
|
||||
val people: RDD[Person] // An RDD of case class objects, from the previous example.
|
||||
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
|
||||
|
||||
// The RDD is implicitly converted to a SchemaRDD, allowing it to be stored using parquet.
|
||||
people.saveAsParquetFile("people.parquet")
|
||||
|
||||
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||||
// The result of loading a parquet file is also a SchemaRDD.
|
||||
// The result of loading a parquet file is also a JavaSchemaRDD.
|
||||
val parquetFile = sqlContext.parquetFile("people.parquet")
|
||||
|
||||
//Parquet files can also be registered as tables and then used in SQL statements.
|
||||
|
@ -89,15 +211,43 @@ val teenagers = sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"
|
|||
teenagers.collect().foreach(println)
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
<div data-lang="java" markdown="1">
|
||||
|
||||
{% highlight java %}
|
||||
|
||||
JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.
|
||||
|
||||
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
|
||||
schemaPeople.saveAsParquetFile("people.parquet");
|
||||
|
||||
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||||
// The result of loading a parquet file is also a JavaSchemaRDD.
|
||||
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
|
||||
|
||||
//Parquet files can also be registered as tables and then used in SQL statements.
|
||||
parquetFile.registerAsTable("parquetFile");
|
||||
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
|
||||
|
||||
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
## Writing Language-Integrated Relational Queries
|
||||
|
||||
**Language-Integrated queries are currently only supported in Scala.**
|
||||
|
||||
Spark SQL also supports a domain specific language for writing queries. Once again,
|
||||
using the data from the above examples:
|
||||
|
||||
{% highlight scala %}
|
||||
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
|
||||
import sqlContext._
|
||||
val people: RDD[Person] // An RDD of case class objects, from the first example.
|
||||
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.
|
||||
|
||||
// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
|
||||
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
|
||||
|
@ -114,14 +264,17 @@ evaluated by the SQL execution engine. A full list of the functions supported c
|
|||
|
||||
Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
|
||||
However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
|
||||
In order to use Hive you must first run '`sbt/sbt hive/assembly`'. This command builds a new assembly
|
||||
jar that includes Hive. When this jar is present, Spark will use the Hive
|
||||
assembly instead of the normal Spark assembly. Note that this Hive assembly jar must also be present
|
||||
In order to use Hive you must first run '`SPARK_HIVE=true sbt/sbt assembly/assembly`'. This command builds a new assembly
|
||||
jar that includes Hive. Note that this Hive assembly jar must also be present
|
||||
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
|
||||
(SerDes) in order to acccess data stored in Hive.
|
||||
|
||||
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
|
||||
|
||||
<div class="codetabs">
|
||||
|
||||
<div data-lang="scala" markdown="1">
|
||||
|
||||
When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and
|
||||
adds support for finding tables in in the MetaStore and writing queries using HiveQL. Users who do
|
||||
not have an existing Hive deployment can also experiment with the `LocalHiveContext`,
|
||||
|
@ -135,9 +288,34 @@ val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
|
|||
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
|
||||
import hiveContext._
|
||||
|
||||
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||||
sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
||||
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
|
||||
hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
|
||||
|
||||
// Queries are expressed in HiveQL
|
||||
sql("SELECT key, value FROM src").collect().foreach(println)
|
||||
hql("FROM src SELECT key, value").collect().foreach(println)
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
<div data-lang="java" markdown="1">
|
||||
|
||||
When working with Hive one must construct a `JavaHiveContext`, which inherits from `JavaSQLContext`, and
|
||||
adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to
|
||||
the `sql` method a `JavaHiveContext` also provides an `hql` methods, which allows queries to be
|
||||
expressed in HiveQL.
|
||||
|
||||
{% highlight java %}
|
||||
JavaSparkContext ctx = ...; // An existing JavaSparkContext.
|
||||
JavaHiveContext hiveCtx = new org.apache.spark.sql.hive.api.java.HiveContext(ctx);
|
||||
|
||||
hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)");
|
||||
hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src");
|
||||
|
||||
// Queries are expressed in HiveQL.
|
||||
Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect();
|
||||
|
||||
{% endhighlight %}
|
||||
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.examples.sql;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.spark.api.java.JavaRDD;
|
||||
import org.apache.spark.api.java.JavaSparkContext;
|
||||
import org.apache.spark.api.java.function.Function;
|
||||
import org.apache.spark.api.java.function.VoidFunction;
|
||||
|
||||
import org.apache.spark.sql.api.java.JavaSQLContext;
|
||||
import org.apache.spark.sql.api.java.JavaSchemaRDD;
|
||||
import org.apache.spark.sql.api.java.Row;
|
||||
|
||||
public class JavaSparkSQL {
|
||||
public static class Person implements Serializable {
|
||||
private String name;
|
||||
private int age;
|
||||
|
||||
String getName() {
|
||||
return name;
|
||||
}
|
||||
|
||||
void setName(String name) {
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
int getAge() {
|
||||
return age;
|
||||
}
|
||||
|
||||
void setAge(int age) {
|
||||
this.age = age;
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL",
|
||||
System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class));
|
||||
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
|
||||
|
||||
// Load a text file and convert each line to a Java Bean.
|
||||
JavaRDD<Person> people = ctx.textFile("examples/src/main/resources/people.txt").map(
|
||||
new Function<String, Person>() {
|
||||
public Person call(String line) throws Exception {
|
||||
String[] parts = line.split(",");
|
||||
|
||||
Person person = new Person();
|
||||
person.setName(parts[0]);
|
||||
person.setAge(Integer.parseInt(parts[1].trim()));
|
||||
|
||||
return person;
|
||||
}
|
||||
});
|
||||
|
||||
// Apply a schema to an RDD of Java Beans and register it as a table.
|
||||
JavaSchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
|
||||
schemaPeople.registerAsTable("people");
|
||||
|
||||
// SQL can be run over RDDs that have been registered as tables.
|
||||
JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
|
||||
|
||||
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
|
||||
// The columns of a row in the result can be accessed by ordinal.
|
||||
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
|
||||
public String call(Row row) {
|
||||
return "Name: " + row.getString(0);
|
||||
}
|
||||
}).collect();
|
||||
|
||||
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
|
||||
schemaPeople.saveAsParquetFile("people.parquet");
|
||||
|
||||
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
|
||||
// The result of loading a parquet file is also a JavaSchemaRDD.
|
||||
JavaSchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
|
||||
|
||||
//Parquet files can also be registered as tables and then used in SQL statements.
|
||||
parquetFile.registerAsTable("parquetFile");
|
||||
JavaSchemaRDD teenagers2 = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
|
||||
}
|
||||
}
|
|
@ -17,13 +17,13 @@
|
|||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.sql.catalyst.analysis._
|
||||
import org.apache.spark.sql.catalyst.expressions._
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
|
||||
import org.apache.spark.sql.catalyst.types.BooleanType
|
||||
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
|
||||
|
||||
/**
|
||||
* <span class="badge" style="float: right; background-color: darkblue;">ALPHA COMPONENT</span>
|
||||
|
@ -92,23 +92,10 @@ import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
|
|||
*/
|
||||
class SchemaRDD(
|
||||
@transient val sqlContext: SQLContext,
|
||||
@transient val logicalPlan: LogicalPlan)
|
||||
extends RDD[Row](sqlContext.sparkContext, Nil) {
|
||||
@transient protected[spark] val logicalPlan: LogicalPlan)
|
||||
extends RDD[Row](sqlContext.sparkContext, Nil) with SchemaRDDLike {
|
||||
|
||||
/**
|
||||
* A lazily computed query execution workflow. All other RDD operations are passed
|
||||
* through to the RDD that is produced by this workflow.
|
||||
*
|
||||
* We want this to be lazy because invoking the whole query optimization pipeline can be
|
||||
* expensive.
|
||||
*/
|
||||
@transient
|
||||
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
|
||||
|
||||
override def toString =
|
||||
s"""${super.toString}
|
||||
|== Query Plan ==
|
||||
|${queryExecution.executedPlan}""".stripMargin.trim
|
||||
def baseSchemaRDD = this
|
||||
|
||||
// =========================================================================================
|
||||
// RDD functions: Copy the interal row representation so we present immutable data to users.
|
||||
|
@ -312,31 +299,12 @@ class SchemaRDD(
|
|||
sqlContext,
|
||||
InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))
|
||||
|
||||
/**
|
||||
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
|
||||
* are written out using this method can be read back in as a SchemaRDD using the ``function
|
||||
*
|
||||
* @group schema
|
||||
*/
|
||||
def saveAsParquetFile(path: String): Unit = {
|
||||
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
|
||||
* table is tied to the [[SQLContext]] that was used to create this SchemaRDD.
|
||||
*
|
||||
* @group schema
|
||||
*/
|
||||
def registerAsTable(tableName: String): Unit = {
|
||||
sqlContext.registerRDDAsTable(this, tableName)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns this RDD as a SchemaRDD.
|
||||
* @group schema
|
||||
*/
|
||||
def toSchemaRDD = this
|
||||
|
||||
/** FOR INTERNAL USE ONLY */
|
||||
def analyze = sqlContext.analyzer(logicalPlan)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql
|
||||
|
||||
import org.apache.spark.sql.catalyst.plans.logical._
|
||||
|
||||
/**
|
||||
* Contains functions that are shared between all SchemaRDD types (i.e., Scala, Java)
|
||||
*/
|
||||
trait SchemaRDDLike {
|
||||
@transient val sqlContext: SQLContext
|
||||
@transient protected[spark] val logicalPlan: LogicalPlan
|
||||
|
||||
private[sql] def baseSchemaRDD: SchemaRDD
|
||||
|
||||
/**
|
||||
* A lazily computed query execution workflow. All other RDD operations are passed
|
||||
* through to the RDD that is produced by this workflow.
|
||||
*
|
||||
* We want this to be lazy because invoking the whole query optimization pipeline can be
|
||||
* expensive.
|
||||
*/
|
||||
@transient
|
||||
protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
|
||||
|
||||
override def toString =
|
||||
s"""${super.toString}
|
||||
|== Query Plan ==
|
||||
|${queryExecution.executedPlan}""".stripMargin.trim
|
||||
|
||||
|
||||
/**
|
||||
* Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema. Files that
|
||||
* are written out using this method can be read back in as a SchemaRDD using the ``function
|
||||
*
|
||||
* @group schema
|
||||
*/
|
||||
def saveAsParquetFile(path: String): Unit = {
|
||||
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers this RDD as a temporary table using the given name. The lifetime of this temporary
|
||||
* table is tied to the [[SQLContext]] that was used to create this SchemaRDD.
|
||||
*
|
||||
* @group schema
|
||||
*/
|
||||
def registerAsTable(tableName: String): Unit = {
|
||||
sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,100 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.api.java
|
||||
|
||||
import java.beans.{Introspector, PropertyDescriptor}
|
||||
|
||||
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
|
||||
import org.apache.spark.sql.SQLContext
|
||||
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
|
||||
import org.apache.spark.sql.catalyst.types._
|
||||
import org.apache.spark.sql.parquet.ParquetRelation
|
||||
import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
|
||||
|
||||
/**
|
||||
* The entry point for executing Spark SQL queries from a Java program.
|
||||
*/
|
||||
class JavaSQLContext(sparkContext: JavaSparkContext) {
|
||||
|
||||
val sqlContext = new SQLContext(sparkContext.sc)
|
||||
|
||||
/**
|
||||
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
|
||||
*/
|
||||
def sql(sqlQuery: String): JavaSchemaRDD = {
|
||||
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
|
||||
// We force query optimization to happen right away instead of letting it happen lazily like
|
||||
// when using the query DSL. This is so DDL commands behave as expected. This is only
|
||||
// generates the RDD lineage for DML queries, but do not perform any execution.
|
||||
result.queryExecution.toRdd
|
||||
result
|
||||
}
|
||||
|
||||
/**
|
||||
* Applies a schema to an RDD of Java Beans.
|
||||
*/
|
||||
def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
|
||||
// TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
|
||||
val beanInfo = Introspector.getBeanInfo(beanClass)
|
||||
|
||||
val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
|
||||
val schema = fields.map { property =>
|
||||
val dataType = property.getPropertyType match {
|
||||
case c: Class[_] if c == classOf[java.lang.String] => StringType
|
||||
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
|
||||
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
|
||||
case c: Class[_] if c == java.lang.Long.TYPE => LongType
|
||||
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
|
||||
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
|
||||
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
|
||||
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
|
||||
}
|
||||
|
||||
AttributeReference(property.getName, dataType, true)()
|
||||
}
|
||||
|
||||
val className = beanClass.getCanonicalName
|
||||
val rowRdd = rdd.rdd.mapPartitions { iter =>
|
||||
// BeanInfo is not serializable so we must rediscover it remotely for each partition.
|
||||
val localBeanInfo = Introspector.getBeanInfo(Class.forName(className))
|
||||
val extractors =
|
||||
localBeanInfo.getPropertyDescriptors.filterNot(_.getName == "class").map(_.getReadMethod)
|
||||
|
||||
iter.map { row =>
|
||||
new GenericRow(extractors.map(e => e.invoke(row)).toArray[Any]): ScalaRow
|
||||
}
|
||||
}
|
||||
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
|
||||
*/
|
||||
def parquetFile(path: String): JavaSchemaRDD =
|
||||
new JavaSchemaRDD(sqlContext, ParquetRelation("ParquetFile", path))
|
||||
|
||||
|
||||
/**
|
||||
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
|
||||
* during the lifetime of this instance of SQLContext.
|
||||
*/
|
||||
def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
|
||||
sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,48 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.api.java
|
||||
|
||||
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
|
||||
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
|
||||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
|
||||
import org.apache.spark.rdd.RDD
|
||||
|
||||
/**
|
||||
* An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to
|
||||
* standard RDD operations, a JavaSchemaRDD can also be registered as a table in the JavaSQLContext
|
||||
* that was used to create. Registering a JavaSchemaRDD allows its contents to be queried in
|
||||
* future SQL statement.
|
||||
*
|
||||
* @groupname schema SchemaRDD Functions
|
||||
* @groupprio schema -1
|
||||
* @groupname Ungrouped Base RDD Functions
|
||||
*/
|
||||
class JavaSchemaRDD(
|
||||
@transient val sqlContext: SQLContext,
|
||||
@transient protected[spark] val logicalPlan: LogicalPlan)
|
||||
extends JavaRDDLike[Row, JavaRDD[Row]]
|
||||
with SchemaRDDLike {
|
||||
|
||||
private[sql] val baseSchemaRDD = new SchemaRDD(sqlContext, logicalPlan)
|
||||
|
||||
override val classTag = scala.reflect.classTag[Row]
|
||||
|
||||
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
|
||||
|
||||
val rdd = baseSchemaRDD.map(new Row(_))
|
||||
}
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.api.java
|
||||
|
||||
import org.apache.spark.sql.catalyst.expressions.{Row => ScalaRow}
|
||||
|
||||
/**
|
||||
* A result row from a SparkSQL query.
|
||||
*/
|
||||
class Row(row: ScalaRow) extends Serializable {
|
||||
|
||||
/** Returns the number of columns present in this Row. */
|
||||
def length: Int = row.length
|
||||
|
||||
/** Returns the value of column `i`. */
|
||||
def get(i: Int): Any =
|
||||
row(i)
|
||||
|
||||
/** Returns true if value at column `i` is NULL. */
|
||||
def isNullAt(i: Int) = get(i) == null
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as an int. This function will throw an exception if the value
|
||||
* is at `i` is not an integer, or if it is null.
|
||||
*/
|
||||
def getInt(i: Int): Int =
|
||||
row.getInt(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a long. This function will throw an exception if the value
|
||||
* is at `i` is not a long, or if it is null.
|
||||
*/
|
||||
def getLong(i: Int): Long =
|
||||
row.getLong(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a double. This function will throw an exception if the
|
||||
* value is at `i` is not a double, or if it is null.
|
||||
*/
|
||||
def getDouble(i: Int): Double =
|
||||
row.getDouble(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a bool. This function will throw an exception if the value
|
||||
* is at `i` is not a boolean, or if it is null.
|
||||
*/
|
||||
def getBoolean(i: Int): Boolean =
|
||||
row.getBoolean(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a short. This function will throw an exception if the value
|
||||
* is at `i` is not a short, or if it is null.
|
||||
*/
|
||||
def getShort(i: Int): Short =
|
||||
row.getShort(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a byte. This function will throw an exception if the value
|
||||
* is at `i` is not a byte, or if it is null.
|
||||
*/
|
||||
def getByte(i: Int): Byte =
|
||||
row.getByte(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a float. This function will throw an exception if the value
|
||||
* is at `i` is not a float, or if it is null.
|
||||
*/
|
||||
def getFloat(i: Int): Float =
|
||||
row.getFloat(i)
|
||||
|
||||
/**
|
||||
* Returns the value of column `i` as a String. This function will throw an exception if the
|
||||
* value is at `i` is not a String.
|
||||
*/
|
||||
def getString(i: Int): String =
|
||||
row.getString(i)
|
||||
}
|
||||
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.api.java
|
||||
|
||||
import scala.beans.BeanProperty
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.test.TestSQLContext
|
||||
|
||||
// Implicits
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
class PersonBean extends Serializable {
|
||||
@BeanProperty
|
||||
var name: String = _
|
||||
|
||||
@BeanProperty
|
||||
var age: Int = _
|
||||
}
|
||||
|
||||
class JavaSQLSuite extends FunSuite {
|
||||
val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
|
||||
val javaSqlCtx = new JavaSQLContext(javaCtx)
|
||||
|
||||
test("schema from JavaBeans") {
|
||||
val person = new PersonBean
|
||||
person.setName("Michael")
|
||||
person.setAge(29)
|
||||
|
||||
val rdd = javaCtx.parallelize(person :: Nil)
|
||||
val schemaRDD = javaSqlCtx.applySchema(rdd, classOf[PersonBean])
|
||||
|
||||
schemaRDD.registerAsTable("people")
|
||||
javaSqlCtx.sql("SELECT * FROM people").collect()
|
||||
}
|
||||
}
|
|
@ -71,6 +71,18 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
|
|||
override def executePlan(plan: LogicalPlan): this.QueryExecution =
|
||||
new this.QueryExecution { val logical = plan }
|
||||
|
||||
/**
|
||||
* Executes a query expressed in HiveQL using Spark, returning the result as a SchemaRDD.
|
||||
*/
|
||||
def hql(hqlQuery: String): SchemaRDD = {
|
||||
val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
|
||||
// We force query optimization to happen right away instead of letting it happen lazily like
|
||||
// when using the query DSL. This is so DDL commands behave as expected. This is only
|
||||
// generates the RDD lineage for DML queries, but do not perform any execution.
|
||||
result.queryExecution.toRdd
|
||||
result
|
||||
}
|
||||
|
||||
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
|
||||
@transient
|
||||
protected val outputBuffer = new java.io.OutputStream {
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.api.java
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.api.java.{JavaSQLContext, JavaSchemaRDD}
|
||||
import org.apache.spark.sql.hive.{HiveContext, HiveQl}
|
||||
|
||||
/**
|
||||
* The entry point for executing Spark SQL queries from a Java program.
|
||||
*/
|
||||
class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(sparkContext) {
|
||||
|
||||
override val sqlContext = new HiveContext(sparkContext)
|
||||
|
||||
/**
|
||||
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
|
||||
*/
|
||||
def hql(hqlQuery: String): JavaSchemaRDD = {
|
||||
val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
|
||||
// We force query optimization to happen right away instead of letting it happen lazily like
|
||||
// when using the query DSL. This is so DDL commands behave as expected. This is only
|
||||
// generates the RDD lineage for DML queries, but do not perform any execution.
|
||||
result.queryExecution.toRdd
|
||||
result
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.sql.hive.api.java
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark.api.java.JavaSparkContext
|
||||
import org.apache.spark.sql.test.TestSQLContext
|
||||
import org.apache.spark.sql.hive.TestHive
|
||||
|
||||
// Implicits
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
class JavaHiveSQLSuite extends FunSuite {
|
||||
ignore("SELECT * FROM src") {
|
||||
val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)
|
||||
// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
|
||||
val javaSqlCtx = new JavaHiveContext(javaCtx) {
|
||||
override val sqlContext = TestHive
|
||||
}
|
||||
|
||||
assert(
|
||||
javaSqlCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
|
||||
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue