[SPARK-2410][SQL] Merging Hive Thrift/JDBC server

JIRA issue:

- Main: [SPARK-2410](https://issues.apache.org/jira/browse/SPARK-2410)
- Related: [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678)

Cherry picked the Hive Thrift/JDBC server from [branch-1.0-jdbc](https://github.com/apache/spark/tree/branch-1.0-jdbc).

(Thanks chenghao-intel for his initial contribution of the Spark SQL CLI.)

TODO

- [x] Use `spark-submit` to launch the server, the CLI and beeline
- [x] Migration guideline draft for Shark users

----

Hit by a bug in `SparkSubmitArguments` while working on this PR: all application options that are recognized by `SparkSubmitArguments` are stolen as `SparkSubmit` options. For example:

```bash
$ spark-submit --class org.apache.hive.beeline.BeeLine spark-internal --help
```

This actually shows usage information of `SparkSubmit` rather than `BeeLine`.

~~Fixed this bug here since the `spark-internal` related stuff also touches `SparkSubmitArguments` and I'd like to avoid conflict.~~

**UPDATE** The bug mentioned above is now tracked by [SPARK-2678](https://issues.apache.org/jira/browse/SPARK-2678). Decided to revert changes to this bug since it involves more subtle considerations and worth a separate PR.

Author: Cheng Lian <lian.cs.zju@gmail.com>

Closes #1399 from liancheng/thriftserver and squashes the following commits:

090beea [Cheng Lian] Revert changes related to SPARK-2678, decided to move them to another PR
21c6cf4 [Cheng Lian] Updated Spark SQL programming guide docs
fe0af31 [Cheng Lian] Reordered spark-submit options in spark-shell[.cmd]
199e3fb [Cheng Lian] Disabled MIMA for hive-thriftserver
1083e9d [Cheng Lian] Fixed failed test suites
7db82a1 [Cheng Lian] Fixed spark-submit application options handling logic
9cc0f06 [Cheng Lian] Starts beeline with spark-submit
cfcf461 [Cheng Lian] Updated documents and build scripts for the newly added hive-thriftserver profile
061880f [Cheng Lian] Addressed all comments by @pwendell
7755062 [Cheng Lian] Adapts test suites to spark-submit settings
40bafef [Cheng Lian] Fixed more license header issues
e214aab [Cheng Lian] Added missing license headers
b8905ba [Cheng Lian] Fixed minor issues in spark-sql and start-thriftserver.sh
f975d22 [Cheng Lian] Updated docs for Hive compatibility and Shark migration guide draft
3ad4e75 [Cheng Lian] Starts spark-sql shell with spark-submit
a5310d1 [Cheng Lian] Make HiveThriftServer2 play well with spark-submit
61f39f4 [Cheng Lian] Starts Hive Thrift server via spark-submit
2c4c539 [Cheng Lian] Cherry picked the Hive Thrift server
This commit is contained in:
Cheng Lian 2014-07-25 12:20:49 -07:00 committed by Michael Armbrust
parent 32bcf9af94
commit 06dc0d2c6b
54 changed files with 1772 additions and 96 deletions

1
.gitignore vendored
View file

@ -57,3 +57,4 @@ metastore_db/
metastore/
warehouse/
TempStatsStore/
sql/hive-thriftserver/test_warehouses

View file

@ -165,6 +165,16 @@
</dependency>
</dependencies>
</profile>
<profile>
<id>hive-thriftserver</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-ganglia-lgpl</id>
<dependencies>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_2.10</artifactId>
<properties>
<sbt.project.name>bagel</sbt.project.name>
<sbt.project.name>bagel</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Bagel</name>

45
bin/beeline Executable file
View file

@ -0,0 +1,45 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
# Find the java binary
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Compute classpath using external script
classpath_output=$($FWDIR/bin/compute-classpath.sh)
if [[ "$?" != "0" ]]; then
echo "$classpath_output"
exit 1
else
CLASSPATH=$classpath_output
fi
CLASS="org.apache.hive.beeline.BeeLine"
exec "$RUNNER" -cp "$CLASSPATH" $CLASS "$@"

View file

@ -52,6 +52,7 @@ if [ -n "$SPARK_PREPEND_CLASSES" ]; then
CLASSPATH="$CLASSPATH:$FWDIR/sql/catalyst/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/core/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/sql/hive-thriftserver/target/scala-$SCALA_VERSION/classes"
CLASSPATH="$CLASSPATH:$FWDIR/yarn/stable/target/scala-$SCALA_VERSION/classes"
fi

View file

@ -46,11 +46,11 @@ function main(){
# (see https://github.com/sbt/sbt/issues/562).
stty -icanon min 1 -echo > /dev/null 2>&1
export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit --class org.apache.spark.repl.Main spark-shell "$@"
fi
}

View file

@ -19,4 +19,4 @@ rem
set SPARK_HOME=%~dp0..
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell %* --class org.apache.spark.repl.Main
cmd /V /E /C %SPARK_HOME%\bin\spark-submit.cmd spark-shell --class org.apache.spark.repl.Main %*

36
bin/spark-sql Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Shell script for starting the Spark SQL CLI
# Enter posix mode for bash
set -o posix
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/spark-sql [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
CLASS="org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<properties>
<sbt.project.name>core</sbt.project.name>
<sbt.project.name>core</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Core</name>

View file

@ -46,6 +46,10 @@ object SparkSubmit {
private val CLUSTER = 2
private val ALL_DEPLOY_MODES = CLIENT | CLUSTER
// A special jar name that indicates the class being run is inside of Spark itself, and therefore
// no user jar is needed.
private val SPARK_INTERNAL = "spark-internal"
// Special primary resource names that represent shells rather than application jars.
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"
@ -257,7 +261,9 @@ object SparkSubmit {
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (clusterManager == YARN && deployMode == CLUSTER) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
childArgs += ("--jar", args.primaryResource)
if (args.primaryResource != SPARK_INTERNAL) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
@ -332,7 +338,7 @@ object SparkSubmit {
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
!isShell(primaryResource) && !isPython(primaryResource) && !isInternal(primaryResource)
}
/**
@ -349,6 +355,10 @@ object SparkSubmit {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}
private[spark] def isInternal(primaryResource: String): Boolean = {
primaryResource == SPARK_INTERNAL
}
/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.

View file

@ -204,8 +204,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
/** Fill in values by parsing user options. */
private def parseOpts(opts: Seq[String]): Unit = {
// Delineates parsing of Spark options from parsing of user options.
var inSparkOpts = true
// Delineates parsing of Spark options from parsing of user options.
parse(opts)
def parse(opts: Seq[String]): Unit = opts match {
@ -318,7 +319,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
SparkSubmit.printErrorAndExit(errMessage)
case v =>
primaryResource =
if (!SparkSubmit.isShell(v)) {
if (!SparkSubmit.isShell(v) && !SparkSubmit.isInternal(v)) {
Utils.resolveURI(v).toString
} else {
v

View file

@ -53,7 +53,7 @@ if [[ ! "$@" =~ --package-only ]]; then
-Dusername=$GIT_USERNAME -Dpassword=$GIT_PASSWORD \
-Dmaven.javadoc.skip=true \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Dtag=$GIT_TAG -DautoVersionSubmodules=true \
--batch-mode release:prepare
@ -61,7 +61,7 @@ if [[ ! "$@" =~ --package-only ]]; then
-Darguments="-DskipTests=true -Dmaven.javadoc.skip=true -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 -Dgpg.passphrase=${GPG_PASSPHRASE}" \
-Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Dmaven.javadoc.skip=true \
-Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl\
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl\
release:perform
cd ..
@ -111,10 +111,10 @@ make_binary_release() {
spark-$RELEASE_VERSION-bin-$NAME.tgz.sha
}
make_binary_release "hadoop1" "-Phive -Dhadoop.version=1.0.4"
make_binary_release "cdh4" "-Phive -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4"
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0"
make_binary_release "hadoop2" \
"-Phive -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
"-Phive -Phive-thriftserver -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -Pyarn.version=2.2.0"
# Copy data
echo "Copying release tarballs"

View file

@ -65,7 +65,7 @@ echo "========================================================================="
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
if [ -n "$_RUN_SQL_TESTS" ]; then
echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive" sbt/sbt clean package \
echo -e "q\n" | SBT_MAVEN_PROFILES="$SBT_MAVEN_PROFILES -Phive -Phive-thriftserver" sbt/sbt clean package \
assembly/assembly test | grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
else
echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \

View file

@ -17,7 +17,7 @@
# limitations under the License.
#
echo -e "q\n" | sbt/sbt -Phive scalastyle > scalastyle.txt
echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
# Check style with YARN alpha built too
echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
>> scalastyle.txt

View file

@ -136,7 +136,7 @@ val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.createSchemaRDD
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
@ -548,7 +548,6 @@ results = hiveContext.hql("FROM src SELECT key, value").collect()
</div>
</div>
# Writing Language-Integrated Relational Queries
**Language-Integrated queries are currently only supported in Scala.**
@ -573,4 +572,199 @@ prefixed with a tick (`'`). Implicit conversions turn these symbols into expres
evaluated by the SQL execution engine. A full list of the functions supported can be found in the
[ScalaDoc](api/scala/index.html#org.apache.spark.sql.SchemaRDD).
<!-- TODO: Include the table of operations here. -->
<!-- TODO: Include the table of operations here. -->
## Running the Thrift JDBC server
The Thrift JDBC server implemented here corresponds to the [`HiveServer2`]
(https://cwiki.apache.org/confluence/display/Hive/Setting+Up+HiveServer2) in Hive 0.12. You can test
the JDBC server with the beeline script comes with either Spark or Hive 0.12. In order to use Hive
you must first run '`sbt/sbt -Phive-thriftserver assembly/assembly`' (or use `-Phive-thriftserver`
for maven).
To start the JDBC server, run the following in the Spark directory:
./sbin/start-thriftserver.sh
The default port the server listens on is 10000. You may run
`./sbin/start-thriftserver.sh --help` for a complete list of all available
options. Now you can use beeline to test the Thrift JDBC server:
./bin/beeline
Connect to the JDBC server in beeline with:
beeline> !connect jdbc:hive2://localhost:10000
Beeline will ask you for a username and password. In non-secure mode, simply enter the username on
your machine and a blank password. For secure mode, please follow the instructions given in the
[beeline documentation](https://cwiki.apache.org/confluence/display/Hive/HiveServer2+Clients)
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
You may also use the beeline script comes with Hive.
### Migration Guide for Shark Users
#### Reducer number
In Shark, default reducer number is 1 and is controlled by the property `mapred.reduce.tasks`. Spark
SQL deprecates this property by a new property `spark.sql.shuffle.partitions`, whose default value
is 200. Users may customize this property via `SET`:
```
SET spark.sql.shuffle.partitions=10;
SELECT page, count(*) c FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;
```
You may also put this property in `hive-site.xml` to override the default value.
For now, the `mapred.reduce.tasks` property is still recognized, and is converted to
`spark.sql.shuffle.partitions` automatically.
#### Caching
The `shark.cache` table property no longer exists, and tables whose name end with `_cached` are no
longer automcatically cached. Instead, we provide `CACHE TABLE` and `UNCACHE TABLE` statements to
let user control table caching explicitly:
```
CACHE TABLE logs_last_month;
UNCACHE TABLE logs_last_month;
```
**NOTE** `CACHE TABLE tbl` is lazy, it only marks table `tbl` as "need to by cached if necessary",
but doesn't actually cache it until a query that touches `tbl` is executed. To force the table to be
cached, you may simply count the table immediately after executing `CACHE TABLE`:
```
CACHE TABLE logs_last_month;
SELECT COUNT(1) FROM logs_last_month;
```
Several caching related features are not supported yet:
* User defined partition level cache eviction policy
* RDD reloading
* In-memory cache write through policy
### Compatibility with Apache Hive
#### Deploying in Exising Hive Warehouses
Spark SQL Thrift JDBC server is designed to be "out of the box" compatible with existing Hive
installations. You do not need to modify your existing Hive Metastore or change the data placement
or partitioning of your tables.
#### Supported Hive Features
Spark SQL supports the vast majority of Hive features, such as:
* Hive query statements, including:
* `SELECT`
* `GROUP BY
* `ORDER BY`
* `CLUSTER BY`
* `SORT BY`
* All Hive operators, including:
* Relational operators (`=`, `⇔`, `==`, `<>`, `<`, `>`, `>=`, `<=`, etc)
* Arthimatic operators (`+`, `-`, `*`, `/`, `%`, etc)
* Logical operators (`AND`, `&&`, `OR`, `||`, etc)
* Complex type constructors
* Mathemtatical functions (`sign`, `ln`, `cos`, etc)
* String functions (`instr`, `length`, `printf`, etc)
* User defined functions (UDF)
* User defined aggregation functions (UDAF)
* User defined serialization formats (SerDe's)
* Joins
* `JOIN`
* `{LEFT|RIGHT|FULL} OUTER JOIN`
* `LEFT SEMI JOIN`
* `CROSS JOIN`
* Unions
* Sub queries
* `SELECT col FROM ( SELECT a + b AS col from t1) t2`
* Sampling
* Explain
* Partitioned tables
* All Hive DDL Functions, including:
* `CREATE TABLE`
* `CREATE TABLE AS SELECT`
* `ALTER TABLE`
* Most Hive Data types, including:
* `TINYINT`
* `SMALLINT`
* `INT`
* `BIGINT`
* `BOOLEAN`
* `FLOAT`
* `DOUBLE`
* `STRING`
* `BINARY`
* `TIMESTAMP`
* `ARRAY<>`
* `MAP<>`
* `STRUCT<>`
#### Unsupported Hive Functionality
Below is a list of Hive features that we don't support yet. Most of these features are rarely used
in Hive deployments.
**Major Hive Features**
* Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL
doesn't support buckets yet.
**Esoteric Hive Features**
* Tables with partitions using different input formats: In Spark SQL, all table partitions need to
have the same input format.
* Non-equi outer join: For the uncommon use case of using outer joins with non-equi join conditions
(e.g. condition "`key < 10`"), Spark SQL will output wrong result for the `NULL` tuple.
* `UNIONTYPE`
* Unique join
* Single query multi insert
* Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at
the moment.
**Hive Input/Output Formats**
* File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.
* Hadoop archive
**Hive Optimizations**
A handful of Hive optimizations are not yet included in Spark. Some of these (such as indexes) are
not necessary due to Spark SQL's in-memory computational model. Others are slotted for future
releases of Spark SQL.
* Block level bitmap indexes and virtual columns (used to build indexes)
* Automatically convert a join to map join: For joining a large table with multiple small tables,
Hive automatically converts the join into a map join. We are adding this auto conversion in the
next release.
* Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you
need to control the degree of parallelism post-shuffle using "SET
spark.sql.shuffle.partitions=[num_tasks];". We are going to add auto-setting of parallelism in the
next release.
* Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still
launches tasks to compute the result.
* Skew data flag: Spark SQL does not follow the skew data flags in Hive.
* `STREAMTABLE` hint in join: Spark SQL does not follow the `STREAMTABLE` hint.
* Merge multiple small files for query results: if the result output contains multiple small files,
Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS
metadata. Spark SQL does not support that.
## Running the Spark SQL CLI
The Spark SQL CLI is a convenient tool to run the Hive metastore service in local mode and execute
queries input from command line. Note: the Spark SQL CLI cannot talk to the Thrift JDBC server.
To start the Spark SQL CLI, run the following in the Spark directory:
./bin/spark-sql
Configuration of Hive is done by placing your `hive-site.xml` file in `conf/`.
You may run `./bin/spark-sql --help` for a complete list of all available
options.

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-examples_2.10</artifactId>
<properties>
<sbt.project.name>examples</sbt.project.name>
<sbt.project.name>examples</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Examples</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.10</artifactId>
<properties>
<sbt.project.name>streaming-flume</sbt.project.name>
<sbt.project.name>streaming-flume</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Flume</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<properties>
<sbt.project.name>streaming-kafka</sbt.project.name>
<sbt.project.name>streaming-kafka</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Kafka</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-mqtt_2.10</artifactId>
<properties>
<sbt.project.name>streaming-mqtt</sbt.project.name>
<sbt.project.name>streaming-mqtt</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External MQTT</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-twitter_2.10</artifactId>
<properties>
<sbt.project.name>streaming-twitter</sbt.project.name>
<sbt.project.name>streaming-twitter</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External Twitter</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-zeromq_2.10</artifactId>
<properties>
<sbt.project.name>streaming-zeromq</sbt.project.name>
<sbt.project.name>streaming-zeromq</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project External ZeroMQ</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
<properties>
<sbt.project.name>graphx</sbt.project.name>
<sbt.project.name>graphx</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project GraphX</name>

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.10</artifactId>
<properties>
<sbt.project.name>mllib</sbt.project.name>
<sbt.project.name>mllib</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project ML Library</name>

View file

@ -95,6 +95,7 @@
<module>sql/catalyst</module>
<module>sql/core</module>
<module>sql/hive</module>
<module>sql/hive-thriftserver</module>
<module>repl</module>
<module>assembly</module>
<module>external/twitter</module>
@ -252,9 +253,9 @@
<version>3.3.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.5</version>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.5</version>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>

View file

@ -29,11 +29,11 @@ object BuildCommons {
private val buildLocation = file(".").getAbsoluteFile.getParentFile
val allProjects@Seq(bagel, catalyst, core, graphx, hive, mllib, repl, spark, sql, streaming,
streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "mllib", "repl", "spark", "sql",
"streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
"streaming-zeromq").map(ProjectRef(buildLocation, _))
val allProjects@Seq(bagel, catalyst, core, graphx, hive, hiveThriftServer, mllib, repl, spark, sql,
streaming, streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
Seq("bagel", "catalyst", "core", "graphx", "hive", "hive-thriftserver", "mllib", "repl",
"spark", "sql", "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt",
"streaming-twitter", "streaming-zeromq").map(ProjectRef(buildLocation, _))
val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) =
Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl")
@ -99,7 +99,7 @@ object SparkBuild extends PomBuild {
Properties.envOrNone("SBT_MAVEN_PROPERTIES") match {
case Some(v) =>
v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.split("=")).foreach(x => System.setProperty(x(0), x(1)))
case _ =>
case _ =>
}
override val userPropertiesMap = System.getProperties.toMap
@ -157,7 +157,7 @@ object SparkBuild extends PomBuild {
/* Enable Mima for all projects except spark, hive, catalyst, sql and repl */
// TODO: Add Sql to mima checks
allProjects.filterNot(y => Seq(spark, sql, hive, catalyst, repl).exists(x => x == y)).
allProjects.filterNot(x => Seq(spark, sql, hive, hiveThriftServer, catalyst, repl).contains(x)).
foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x))
/* Enable Assembly for all assembly projects */

36
sbin/start-thriftserver.sh Executable file
View file

@ -0,0 +1,36 @@
#!/usr/bin/env bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#
# Shell script for starting the Spark SQL Thrift server
# Enter posix mode for bash
set -o posix
# Figure out where Spark is installed
FWDIR="$(cd `dirname $0`/..; pwd)"
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-thriftserver [options]"
$FWDIR/bin/spark-submit --help 2>&1 | grep -v Usage 1>&2
exit 0
fi
CLASS="org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
exec "$FWDIR"/bin/spark-submit --class $CLASS spark-internal $@

View file

@ -32,7 +32,7 @@
<name>Spark Project Catalyst</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>catalyst</sbt.project.name>
<sbt.project.name>catalyst</sbt.project.name>
</properties>
<dependencies>

View file

@ -43,8 +43,7 @@ case class NativeCommand(cmd: String) extends Command {
*/
case class SetCommand(key: Option[String], value: Option[String]) extends Command {
override def output = Seq(
BoundReference(0, AttributeReference("key", StringType, nullable = false)()),
BoundReference(1, AttributeReference("value", StringType, nullable = false)()))
BoundReference(1, AttributeReference("", StringType, nullable = false)()))
}
/**

View file

@ -32,7 +32,7 @@
<name>Spark Project SQL</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>sql</sbt.project.name>
<sbt.project.name>sql</sbt.project.name>
</properties>
<dependencies>

View file

@ -30,12 +30,13 @@ import scala.collection.JavaConverters._
* SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
import SQLConf._
/** ************************ Spark SQL Params/Hints ******************* */
// TODO: refactor so that these hints accessors don't pollute the name space of SQLContext?
/** Number of partitions to use for shuffle operators. */
private[spark] def numShufflePartitions: Int = get("spark.sql.shuffle.partitions", "200").toInt
private[spark] def numShufflePartitions: Int = get(SHUFFLE_PARTITIONS, "200").toInt
/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
@ -43,11 +44,10 @@ trait SQLConf {
* effectively disables auto conversion.
* Hive setting: hive.auto.convert.join.noconditionaltask.size.
*/
private[spark] def autoConvertJoinSize: Int =
get("spark.sql.auto.convert.join.size", "10000").toInt
private[spark] def autoConvertJoinSize: Int = get(AUTO_CONVERT_JOIN_SIZE, "10000").toInt
/** A comma-separated list of table names marked to be broadcasted during joins. */
private[spark] def joinBroadcastTables: String = get("spark.sql.join.broadcastTables", "")
private[spark] def joinBroadcastTables: String = get(JOIN_BROADCAST_TABLES, "")
/** ********************** SQLConf functionality methods ************ */
@ -61,7 +61,7 @@ trait SQLConf {
def set(key: String, value: String): Unit = {
require(key != null, "key cannot be null")
require(value != null, s"value cannot be null for ${key}")
require(value != null, s"value cannot be null for $key")
settings.put(key, value)
}
@ -90,3 +90,13 @@ trait SQLConf {
}
}
object SQLConf {
val AUTO_CONVERT_JOIN_SIZE = "spark.sql.auto.convert.join.size"
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val JOIN_BROADCAST_TABLES = "spark.sql.join.broadcastTables"
object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
}

View file

@ -17,12 +17,13 @@
package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.{Row, SQLConf, SQLContext}
trait Command {
/**
@ -44,28 +45,53 @@ trait Command {
case class SetCommand(
key: Option[String], value: Option[String], output: Seq[Attribute])(
@transient context: SQLContext)
extends LeafNode with Command {
extends LeafNode with Command with Logging {
override protected[sql] lazy val sideEffectResult: Seq[(String, String)] = (key, value) match {
override protected[sql] lazy val sideEffectResult: Seq[String] = (key, value) match {
// Set value for key k.
case (Some(k), Some(v)) =>
context.set(k, v)
Array(k -> v)
if (k == SQLConf.Deprecated.MAPRED_REDUCE_TASKS) {
logWarning(s"Property ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS} is deprecated, " +
s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS} instead.")
context.set(SQLConf.SHUFFLE_PARTITIONS, v)
Array(s"${SQLConf.SHUFFLE_PARTITIONS}=$v")
} else {
context.set(k, v)
Array(s"$k=$v")
}
// Query the value bound to key k.
case (Some(k), _) =>
Array(k -> context.getOption(k).getOrElse("<undefined>"))
// TODO (lian) This is just a workaround to make the Simba ODBC driver work.
// Should remove this once we get the ODBC driver updated.
if (k == "-v") {
val hiveJars = Seq(
"hive-exec-0.12.0.jar",
"hive-service-0.12.0.jar",
"hive-common-0.12.0.jar",
"hive-hwi-0.12.0.jar",
"hive-0.12.0.jar").mkString(":")
Array(
"system:java.class.path=" + hiveJars,
"system:sun.java.command=shark.SharkServer2")
}
else {
Array(s"$k=${context.getOption(k).getOrElse("<undefined>")}")
}
// Query all key-value pairs that are set in the SQLConf of the context.
case (None, None) =>
context.getAll
context.getAll.map { case (k, v) =>
s"$k=$v"
}
case _ =>
throw new IllegalArgumentException()
}
def execute(): RDD[Row] = {
val rows = sideEffectResult.map { case (k, v) => new GenericRow(Array[Any](k, v)) }
val rows = sideEffectResult.map { line => new GenericRow(Array[Any](line)) }
context.sparkContext.parallelize(rows, 1)
}

View file

@ -54,10 +54,10 @@ class SQLConfSuite extends QueryTest {
assert(get(testKey, testVal + "_") == testVal)
assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
sql("set mapred.reduce.tasks=20")
assert(get("mapred.reduce.tasks", "0") == "20")
sql("set mapred.reduce.tasks = 40")
assert(get("mapred.reduce.tasks", "0") == "40")
sql("set some.property=20")
assert(get("some.property", "0") == "20")
sql("set some.property = 40")
assert(get("some.property", "0") == "40")
val key = "spark.sql.key"
val vs = "val0,val_1,val2.3,my_table"
@ -70,4 +70,9 @@ class SQLConfSuite extends QueryTest {
clear()
}
test("deprecated property") {
clear()
sql(s"set ${SQLConf.Deprecated.MAPRED_REDUCE_TASKS}=10")
assert(get(SQLConf.SHUFFLE_PARTITIONS) == "10")
}
}

View file

@ -424,25 +424,25 @@ class SQLQuerySuite extends QueryTest {
sql(s"SET $testKey=$testVal")
checkAnswer(
sql("SET"),
Seq(Seq(testKey, testVal))
Seq(Seq(s"$testKey=$testVal"))
)
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
checkAnswer(
sql("set"),
Seq(
Seq(testKey, testVal),
Seq(testKey + testKey, testVal + testVal))
Seq(s"$testKey=$testVal"),
Seq(s"${testKey + testKey}=${testVal + testVal}"))
)
// "set key"
checkAnswer(
sql(s"SET $testKey"),
Seq(Seq(testKey, testVal))
Seq(Seq(s"$testKey=$testVal"))
)
checkAnswer(
sql(s"SET $nonexistentKey"),
Seq(Seq(nonexistentKey, "<undefined>"))
Seq(Seq(s"$nonexistentKey=<undefined>"))
)
clear()
}

View file

@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-parent</artifactId>
<version>1.1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.10</artifactId>
<packaging>jar</packaging>
<name>Spark Project Hive</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>hive-thriftserver</sbt.project.name>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-cli</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.spark-project.hive</groupId>
<artifactId>hive-beeline</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
<plugins>
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>

View file

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hive.service.cli.thrift.ThriftBinaryCLIService
import org.apache.hive.service.server.{HiveServer2, ServerOptionsProcessor}
import org.apache.spark.sql.Logging
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
/**
* The main entry point for the Spark SQL port of HiveServer2. Starts up a `SparkSQLContext` and a
* `HiveThriftServer2` thrift server.
*/
private[hive] object HiveThriftServer2 extends Logging {
var LOG = LogFactory.getLog(classOf[HiveServer2])
def main(args: Array[String]) {
val optionsProcessor = new ServerOptionsProcessor("HiveThriftServer2")
if (!optionsProcessor.process(args)) {
logger.warn("Error starting HiveThriftServer2 with given arguments")
System.exit(-1)
}
val ss = new SessionState(new HiveConf(classOf[SessionState]))
// Set all properties specified via command line.
val hiveConf: HiveConf = ss.getConf
hiveConf.getAllProperties.toSeq.sortBy(_._1).foreach { case (k, v) =>
logger.debug(s"HiveConf var: $k=$v")
}
SessionState.start(ss)
logger.info("Starting SparkContext")
SparkSQLEnv.init()
SessionState.start(ss)
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
SparkSQLEnv.sparkContext.stop()
}
}
)
try {
val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)
server.init(hiveConf)
server.start()
logger.info("HiveThriftServer2 started")
} catch {
case e: Exception =>
logger.error("Error starting HiveThriftServer2", e)
System.exit(-1)
}
}
}
private[hive] class HiveThriftServer2(hiveContext: HiveContext)
extends HiveServer2
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
val sparkSqlCliService = new SparkSQLCLIService(hiveContext)
setSuperField(this, "cliService", sparkSqlCliService)
addService(sparkSqlCliService)
val thriftCliService = new ThriftBinaryCLIService(sparkSqlCliService)
setSuperField(this, "thriftCLIService", thriftCliService)
addService(thriftCliService)
initCompositeService(hiveConf)
}
}

View file

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
private[hive] object ReflectionUtils {
def setSuperField(obj : Object, fieldName: String, fieldValue: Object) {
setAncestorField(obj, 1, fieldName, fieldValue)
}
def setAncestorField(obj: AnyRef, level: Int, fieldName: String, fieldValue: AnyRef) {
val ancestor = Iterator.iterate[Class[_]](obj.getClass)(_.getSuperclass).drop(level).next()
val field = ancestor.getDeclaredField(fieldName)
field.setAccessible(true)
field.set(obj, fieldValue)
}
def getSuperField[T](obj: AnyRef, fieldName: String): T = {
getAncestorField[T](obj, 1, fieldName)
}
def getAncestorField[T](clazz: Object, level: Int, fieldName: String): T = {
val ancestor = Iterator.iterate[Class[_]](clazz.getClass)(_.getSuperclass).drop(level).next()
val field = ancestor.getDeclaredField(fieldName)
field.setAccessible(true)
field.get(clazz).asInstanceOf[T]
}
def invokeStatic(clazz: Class[_], methodName: String, args: (Class[_], AnyRef)*): AnyRef = {
invoke(clazz, null, methodName, args: _*)
}
def invoke(
clazz: Class[_],
obj: AnyRef,
methodName: String,
args: (Class[_], AnyRef)*): AnyRef = {
val (types, values) = args.unzip
val method = clazz.getDeclaredMethod(methodName, types: _*)
method.setAccessible(true)
method.invoke(obj, values.toSeq: _*)
}
}

View file

@ -0,0 +1,344 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import java.io._
import java.util.{ArrayList => JArrayList}
import jline.{ConsoleReader, History}
import org.apache.commons.lang.StringUtils
import org.apache.commons.logging.LogFactory
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.cli.{CliDriver, CliSessionState, OptionsProcessor}
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
import org.apache.hadoop.hive.common.{HiveInterruptCallback, HiveInterruptUtils, LogUtils}
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.processors.{CommandProcessor, CommandProcessorFactory}
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.thrift.transport.TSocket
import org.apache.spark.sql.Logging
private[hive] object SparkSQLCLIDriver {
private var prompt = "spark-sql"
private var continuedPrompt = "".padTo(prompt.length, ' ')
private var transport:TSocket = _
installSignalHandler()
/**
* Install an interrupt callback to cancel all Spark jobs. In Hive's CliDriver#processLine(),
* a signal handler will invoke this registered callback if a Ctrl+C signal is detected while
* a command is being processed by the current thread.
*/
def installSignalHandler() {
HiveInterruptUtils.add(new HiveInterruptCallback {
override def interrupt() {
// Handle remote execution mode
if (SparkSQLEnv.sparkContext != null) {
SparkSQLEnv.sparkContext.cancelAllJobs()
} else {
if (transport != null) {
// Force closing of TCP connection upon session termination
transport.getSocket.close()
}
}
}
})
}
def main(args: Array[String]) {
val oproc = new OptionsProcessor()
if (!oproc.process_stage1(args)) {
System.exit(1)
}
// NOTE: It is critical to do this here so that log4j is reinitialized
// before any of the other core hive classes are loaded
var logInitFailed = false
var logInitDetailMessage: String = null
try {
logInitDetailMessage = LogUtils.initHiveLog4j()
} catch {
case e: LogInitializationException =>
logInitFailed = true
logInitDetailMessage = e.getMessage
}
val sessionState = new CliSessionState(new HiveConf(classOf[SessionState]))
sessionState.in = System.in
try {
sessionState.out = new PrintStream(System.out, true, "UTF-8")
sessionState.info = new PrintStream(System.err, true, "UTF-8")
sessionState.err = new PrintStream(System.err, true, "UTF-8")
} catch {
case e: UnsupportedEncodingException => System.exit(3)
}
if (!oproc.process_stage2(sessionState)) {
System.exit(2)
}
if (!sessionState.getIsSilent) {
if (logInitFailed) System.err.println(logInitDetailMessage)
else SessionState.getConsole.printInfo(logInitDetailMessage)
}
// Set all properties specified via command line.
val conf: HiveConf = sessionState.getConf
sessionState.cmdProperties.entrySet().foreach { item: java.util.Map.Entry[Object, Object] =>
conf.set(item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
sessionState.getOverriddenConfigurations.put(
item.getKey.asInstanceOf[String], item.getValue.asInstanceOf[String])
}
SessionState.start(sessionState)
// Clean up after we exit
Runtime.getRuntime.addShutdownHook(
new Thread() {
override def run() {
SparkSQLEnv.stop()
}
}
)
// "-h" option has been passed, so connect to Hive thrift server.
if (sessionState.getHost != null) {
sessionState.connect()
if (sessionState.isRemoteMode) {
prompt = s"[${sessionState.getHost}:${sessionState.getPort}]" + prompt
continuedPrompt = "".padTo(prompt.length, ' ')
}
}
if (!sessionState.isRemoteMode && !ShimLoader.getHadoopShims.usesJobShell()) {
// Hadoop-20 and above - we need to augment classpath using hiveconf
// components.
// See also: code in ExecDriver.java
var loader = conf.getClassLoader
val auxJars = HiveConf.getVar(conf, HiveConf.ConfVars.HIVEAUXJARS)
if (StringUtils.isNotBlank(auxJars)) {
loader = Utilities.addToClassPath(loader, StringUtils.split(auxJars, ","))
}
conf.setClassLoader(loader)
Thread.currentThread().setContextClassLoader(loader)
}
val cli = new SparkSQLCLIDriver
cli.setHiveVariables(oproc.getHiveVariables)
// TODO work around for set the log output to console, because the HiveContext
// will set the output into an invalid buffer.
sessionState.in = System.in
try {
sessionState.out = new PrintStream(System.out, true, "UTF-8")
sessionState.info = new PrintStream(System.err, true, "UTF-8")
sessionState.err = new PrintStream(System.err, true, "UTF-8")
} catch {
case e: UnsupportedEncodingException => System.exit(3)
}
// Execute -i init files (always in silent mode)
cli.processInitFiles(sessionState)
if (sessionState.execString != null) {
System.exit(cli.processLine(sessionState.execString))
}
try {
if (sessionState.fileName != null) {
System.exit(cli.processFile(sessionState.fileName))
}
} catch {
case e: FileNotFoundException =>
System.err.println(s"Could not open input file for reading. (${e.getMessage})")
System.exit(3)
}
val reader = new ConsoleReader()
reader.setBellEnabled(false)
// reader.setDebug(new PrintWriter(new FileWriter("writer.debug", true)))
CliDriver.getCommandCompletor.foreach((e) => reader.addCompletor(e))
val historyDirectory = System.getProperty("user.home")
try {
if (new File(historyDirectory).exists()) {
val historyFile = historyDirectory + File.separator + ".hivehistory"
reader.setHistory(new History(new File(historyFile)))
} else {
System.err.println("WARNING: Directory for Hive history file: " + historyDirectory +
" does not exist. History will not be available during this session.")
}
} catch {
case e: Exception =>
System.err.println("WARNING: Encountered an error while trying to initialize Hive's " +
"history file. History will not be available during this session.")
System.err.println(e.getMessage)
}
val clientTransportTSocketField = classOf[CliSessionState].getDeclaredField("transport")
clientTransportTSocketField.setAccessible(true)
transport = clientTransportTSocketField.get(sessionState).asInstanceOf[TSocket]
var ret = 0
var prefix = ""
val currentDB = ReflectionUtils.invokeStatic(classOf[CliDriver], "getFormattedDb",
classOf[HiveConf] -> conf, classOf[CliSessionState] -> sessionState)
def promptWithCurrentDB = s"$prompt$currentDB"
def continuedPromptWithDBSpaces = continuedPrompt + ReflectionUtils.invokeStatic(
classOf[CliDriver], "spacesForString", classOf[String] -> currentDB)
var currentPrompt = promptWithCurrentDB
var line = reader.readLine(currentPrompt + "> ")
while (line != null) {
if (prefix.nonEmpty) {
prefix += '\n'
}
if (line.trim().endsWith(";") && !line.trim().endsWith("\\;")) {
line = prefix + line
ret = cli.processLine(line, true)
prefix = ""
currentPrompt = promptWithCurrentDB
} else {
prefix = prefix + line
currentPrompt = continuedPromptWithDBSpaces
}
line = reader.readLine(currentPrompt + "> ")
}
sessionState.close()
System.exit(ret)
}
}
private[hive] class SparkSQLCLIDriver extends CliDriver with Logging {
private val sessionState = SessionState.get().asInstanceOf[CliSessionState]
private val LOG = LogFactory.getLog("CliDriver")
private val console = new SessionState.LogHelper(LOG)
private val conf: Configuration =
if (sessionState != null) sessionState.getConf else new Configuration()
// Force initializing SparkSQLEnv. This is put here but not object SparkSQLCliDriver
// because the Hive unit tests do not go through the main() code path.
if (!sessionState.isRemoteMode) {
SparkSQLEnv.init()
}
override def processCmd(cmd: String): Int = {
val cmd_trimmed: String = cmd.trim()
val tokens: Array[String] = cmd_trimmed.split("\\s+")
val cmd_1: String = cmd_trimmed.substring(tokens(0).length()).trim()
if (cmd_trimmed.toLowerCase.equals("quit") ||
cmd_trimmed.toLowerCase.equals("exit") ||
tokens(0).equalsIgnoreCase("source") ||
cmd_trimmed.startsWith("!") ||
tokens(0).toLowerCase.equals("list") ||
sessionState.isRemoteMode) {
val start = System.currentTimeMillis()
super.processCmd(cmd)
val end = System.currentTimeMillis()
val timeTaken: Double = (end - start) / 1000.0
console.printInfo(s"Time taken: $timeTaken seconds")
0
} else {
var ret = 0
val hconf = conf.asInstanceOf[HiveConf]
val proc: CommandProcessor = CommandProcessorFactory.get(tokens(0), hconf)
if (proc != null) {
if (proc.isInstanceOf[Driver]) {
val driver = new SparkSQLDriver
driver.init()
val out = sessionState.out
val start:Long = System.currentTimeMillis()
if (sessionState.getIsVerbose) {
out.println(cmd)
}
ret = driver.run(cmd).getResponseCode
if (ret != 0) {
driver.close()
return ret
}
val res = new JArrayList[String]()
if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) {
// Print the column names.
Option(driver.getSchema.getFieldSchemas).map { fields =>
out.println(fields.map(_.getName).mkString("\t"))
}
}
try {
while (!out.checkError() && driver.getResults(res)) {
res.foreach(out.println)
res.clear()
}
} catch {
case e:IOException =>
console.printError(
s"""Failed with exception ${e.getClass.getName}: ${e.getMessage}
|${org.apache.hadoop.util.StringUtils.stringifyException(e)}
""".stripMargin)
ret = 1
}
val cret = driver.close()
if (ret == 0) {
ret = cret
}
val end = System.currentTimeMillis()
if (end > start) {
val timeTaken:Double = (end - start) / 1000.0
console.printInfo(s"Time taken: $timeTaken seconds", null)
}
// Destroy the driver to release all the locks.
driver.destroy()
} else {
if (sessionState.getIsVerbose) {
sessionState.out.println(tokens(0) + " " + cmd_1)
}
ret = proc.run(cmd_1).getResponseCode
}
}
ret
}
}
}

View file

@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import java.io.IOException
import java.util.{List => JList}
import javax.security.auth.login.LoginException
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.shims.ShimLoader
import org.apache.hive.service.Service.STATE
import org.apache.hive.service.auth.HiveAuthFactory
import org.apache.hive.service.cli.CLIService
import org.apache.hive.service.{AbstractService, Service, ServiceException}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
private[hive] class SparkSQLCLIService(hiveContext: HiveContext)
extends CLIService
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
val sparkSqlSessionManager = new SparkSQLSessionManager(hiveContext)
setSuperField(this, "sessionManager", sparkSqlSessionManager)
addService(sparkSqlSessionManager)
try {
HiveAuthFactory.loginFromKeytab(hiveConf)
val serverUserName = ShimLoader.getHadoopShims
.getShortUserName(ShimLoader.getHadoopShims.getUGIForConf(hiveConf))
setSuperField(this, "serverUserName", serverUserName)
} catch {
case e @ (_: IOException | _: LoginException) =>
throw new ServiceException("Unable to login to kerberos with given principal/keytab", e)
}
initCompositeService(hiveConf)
}
}
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
def initCompositeService(hiveConf: HiveConf) {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
serviceList.foreach(_.init(hiveConf))
// Emulating `AbstractService.init(hiveConf)`
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
}
}

View file

@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import java.util.{ArrayList => JArrayList}
import org.apache.commons.lang.exception.ExceptionUtils
import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema}
import org.apache.hadoop.hive.ql.Driver
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse
import org.apache.spark.sql.Logging
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
private[hive] class SparkSQLDriver(val context: HiveContext = SparkSQLEnv.hiveContext)
extends Driver with Logging {
private var tableSchema: Schema = _
private var hiveResponse: Seq[String] = _
override def init(): Unit = {
}
private def getResultSetSchema(query: context.QueryExecution): Schema = {
val analyzed = query.analyzed
logger.debug(s"Result Schema: ${analyzed.output}")
if (analyzed.output.size == 0) {
new Schema(new FieldSchema("Response code", "string", "") :: Nil, null)
} else {
val fieldSchemas = analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new Schema(fieldSchemas, null)
}
}
override def run(command: String): CommandProcessorResponse = {
val execution = context.executePlan(context.hql(command).logicalPlan)
// TODO unify the error code
try {
hiveResponse = execution.stringResult()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
} catch {
case cause: Throwable =>
logger.error(s"Failed in [$command]", cause)
new CommandProcessorResponse(-3, ExceptionUtils.getFullStackTrace(cause), null)
}
}
override def close(): Int = {
hiveResponse = null
tableSchema = null
0
}
override def getSchema: Schema = tableSchema
override def getResults(res: JArrayList[String]): Boolean = {
if (hiveResponse == null) {
false
} else {
res.addAll(hiveResponse)
hiveResponse = null
true
}
}
override def destroy() {
super.destroy()
hiveResponse = null
tableSchema = null
}
}

View file

@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import org.apache.hadoop.hive.ql.session.SessionState
import org.apache.spark.scheduler.{SplitInfo, StatsReportListener}
import org.apache.spark.sql.Logging
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
/** A singleton object for the master program. The slaves should not access this. */
private[hive] object SparkSQLEnv extends Logging {
logger.debug("Initializing SparkSQLEnv")
var hiveContext: HiveContext = _
var sparkContext: SparkContext = _
def init() {
if (hiveContext == null) {
sparkContext = new SparkContext(new SparkConf()
.setAppName(s"SparkSQL::${java.net.InetAddress.getLocalHost.getHostName}"))
sparkContext.addSparkListener(new StatsReportListener())
hiveContext = new HiveContext(sparkContext) {
@transient override lazy val sessionState = SessionState.get()
@transient override lazy val hiveconf = sessionState.getConf
}
}
}
/** Cleans up and shuts down the Spark SQL environments. */
def stop() {
logger.debug("Shutting down Spark SQL Environment")
// Stop the SparkContext
if (SparkSQLEnv.sparkContext != null) {
sparkContext.stop()
sparkContext = null
hiveContext = null
}
}
}

View file

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.util.concurrent.Executors
import org.apache.commons.logging.Log
import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hive.service.cli.session.SessionManager
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
import org.apache.spark.sql.hive.thriftserver.server.SparkSQLOperationManager
private[hive] class SparkSQLSessionManager(hiveContext: HiveContext)
extends SessionManager
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
val sparkSqlOperationManager = new SparkSQLOperationManager(hiveContext)
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
}

View file

@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver.server
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.math.{random, round}
import java.sql.Timestamp
import java.util.{Map => JMap}
import org.apache.hadoop.hive.common.`type`.HiveDecimal
import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.{ExecuteStatementOperation, Operation, OperationManager}
import org.apache.hive.service.cli.session.HiveSession
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.hive.thriftserver.ReflectionUtils
import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes}
import org.apache.spark.sql.{Logging, SchemaRDD, Row => SparkRow}
/**
* Executes queries using Spark SQL, and maintains a list of handles to active queries.
*/
class SparkSQLOperationManager(hiveContext: HiveContext) extends OperationManager with Logging {
val handleToOperation = ReflectionUtils
.getSuperField[JMap[OperationHandle, Operation]](this, "handleToOperation")
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val operation = new ExecuteStatementOperation(parentSession, statement, confOverlay) {
private var result: SchemaRDD = _
private var iter: Iterator[SparkRow] = _
private var dataTypes: Array[DataType] = _
def close(): Unit = {
// RDDs will be cleaned automatically upon garbage collection.
logger.debug("CLOSING")
}
def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
if (!iter.hasNext) {
new RowSet()
} else {
val maxRows = maxRowsL.toInt // Do you really want a row batch larger than Int Max? No.
var curRow = 0
var rowSet = new ArrayBuffer[Row](maxRows)
while (curRow < maxRows && iter.hasNext) {
val sparkRow = iter.next()
val row = new Row()
var curCol = 0
while (curCol < sparkRow.length) {
dataTypes(curCol) match {
case StringType =>
row.addString(sparkRow(curCol).asInstanceOf[String])
case IntegerType =>
row.addColumnValue(ColumnValue.intValue(sparkRow.getInt(curCol)))
case BooleanType =>
row.addColumnValue(ColumnValue.booleanValue(sparkRow.getBoolean(curCol)))
case DoubleType =>
row.addColumnValue(ColumnValue.doubleValue(sparkRow.getDouble(curCol)))
case FloatType =>
row.addColumnValue(ColumnValue.floatValue(sparkRow.getFloat(curCol)))
case DecimalType =>
val hiveDecimal = sparkRow.get(curCol).asInstanceOf[BigDecimal].bigDecimal
row.addColumnValue(ColumnValue.stringValue(new HiveDecimal(hiveDecimal)))
case LongType =>
row.addColumnValue(ColumnValue.longValue(sparkRow.getLong(curCol)))
case ByteType =>
row.addColumnValue(ColumnValue.byteValue(sparkRow.getByte(curCol)))
case ShortType =>
row.addColumnValue(ColumnValue.intValue(sparkRow.getShort(curCol)))
case TimestampType =>
row.addColumnValue(
ColumnValue.timestampValue(sparkRow.get(curCol).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((sparkRow.get(curCol), dataTypes(curCol)))
row.addColumnValue(ColumnValue.stringValue(hiveString))
}
curCol += 1
}
rowSet += row
curRow += 1
}
new RowSet(rowSet, 0)
}
}
def getResultSetSchema: TableSchema = {
logger.warn(s"Result Schema: ${result.queryExecution.analyzed.output}")
if (result.queryExecution.analyzed.output.size == 0) {
new TableSchema(new FieldSchema("Result", "string", "") :: Nil)
} else {
val schema = result.queryExecution.analyzed.output.map { attr =>
new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "")
}
new TableSchema(schema)
}
}
def run(): Unit = {
logger.info(s"Running query '$statement'")
setState(OperationState.RUNNING)
try {
result = hiveContext.hql(statement)
logger.debug(result.queryExecution.toString())
val groupId = round(random * 1000000).toString
hiveContext.sparkContext.setJobGroup(groupId, statement)
iter = result.queryExecution.toRdd.toLocalIterator
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
setHasResultSet(true)
} catch {
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
logger.error("Error executing query:",e)
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
}
}
handleToOperation.put(operation.getHandle, operation)
operation
}
}

View file

@ -0,0 +1,5 @@
238val_238
86val_86
311val_311
27val_27
165val_165

View file

@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.io.{BufferedReader, InputStreamReader, PrintWriter}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.sql.hive.test.TestHive
class CliSuite extends FunSuite with BeforeAndAfterAll with TestUtils {
val WAREHOUSE_PATH = TestUtils.getWarehousePath("cli")
val METASTORE_PATH = TestUtils.getMetastorePath("cli")
override def beforeAll() {
val pb = new ProcessBuilder(
"../../bin/spark-sql",
"--master",
"local",
"--hiveconf",
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
"--hiveconf",
"hive.metastore.warehouse.dir=" + WAREHOUSE_PATH)
process = pb.start()
outputWriter = new PrintWriter(process.getOutputStream, true)
inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
waitForOutput(inputReader, "spark-sql>")
}
override def afterAll() {
process.destroy()
process.waitFor()
}
test("simple commands") {
val dataFilePath = getDataFile("data/files/small_kv.txt")
executeQuery("create table hive_test1(key int, val string);")
executeQuery("load data local inpath '" + dataFilePath+ "' overwrite into table hive_test1;")
executeQuery("cache table hive_test1", "Time taken")
}
}

View file

@ -0,0 +1,125 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import scala.collection.JavaConversions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent._
import java.io.{BufferedReader, InputStreamReader}
import java.sql.{Connection, DriverManager, Statement}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.apache.spark.sql.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath
/**
* Test for the HiveThriftServer2 using JDBC.
*/
class HiveThriftServer2Suite extends FunSuite with BeforeAndAfterAll with TestUtils with Logging {
val WAREHOUSE_PATH = getTempFilePath("warehouse")
val METASTORE_PATH = getTempFilePath("metastore")
val DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"
val TABLE = "test"
// use a different port, than the hive standard 10000,
// for tests to avoid issues with the port being taken on some machines
val PORT = "10000"
// If verbose is true, the test program will print all outputs coming from the Hive Thrift server.
val VERBOSE = Option(System.getenv("SPARK_SQL_TEST_VERBOSE")).getOrElse("false").toBoolean
Class.forName(DRIVER_NAME)
override def beforeAll() { launchServer() }
override def afterAll() { stopServer() }
private def launchServer(args: Seq[String] = Seq.empty) {
// Forking a new process to start the Hive Thrift server. The reason to do this is it is
// hard to clean up Hive resources entirely, so we just start a new process and kill
// that process for cleanup.
val defaultArgs = Seq(
"../../sbin/start-thriftserver.sh",
"--master local",
"--hiveconf",
"hive.root.logger=INFO,console",
"--hiveconf",
s"javax.jdo.option.ConnectionURL=jdbc:derby:;databaseName=$METASTORE_PATH;create=true",
"--hiveconf",
s"hive.metastore.warehouse.dir=$WAREHOUSE_PATH")
val pb = new ProcessBuilder(defaultArgs ++ args)
process = pb.start()
inputReader = new BufferedReader(new InputStreamReader(process.getInputStream))
errorReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
waitForOutput(inputReader, "ThriftBinaryCLIService listening on")
// Spawn a thread to read the output from the forked process.
// Note that this is necessary since in some configurations, log4j could be blocked
// if its output to stderr are not read, and eventually blocking the entire test suite.
future {
while (true) {
val stdout = readFrom(inputReader)
val stderr = readFrom(errorReader)
if (VERBOSE && stdout.length > 0) {
println(stdout)
}
if (VERBOSE && stderr.length > 0) {
println(stderr)
}
Thread.sleep(50)
}
}
}
private def stopServer() {
process.destroy()
process.waitFor()
}
test("test query execution against a Hive Thrift server") {
Thread.sleep(5 * 1000)
val dataFilePath = getDataFile("data/files/small_kv.txt")
val stmt = createStatement()
stmt.execute("DROP TABLE IF EXISTS test")
stmt.execute("DROP TABLE IF EXISTS test_cached")
stmt.execute("CREATE TABLE test(key int, val string)")
stmt.execute(s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test")
stmt.execute("CREATE TABLE test_cached as select * from test limit 4")
stmt.execute("CACHE TABLE test_cached")
var rs = stmt.executeQuery("select count(*) from test")
rs.next()
assert(rs.getInt(1) === 5)
rs = stmt.executeQuery("select count(*) from test_cached")
rs.next()
assert(rs.getInt(1) === 4)
stmt.close()
}
def getConnection: Connection = {
val connectURI = s"jdbc:hive2://localhost:$PORT/"
DriverManager.getConnection(connectURI, System.getProperty("user.name"), "")
}
def createStatement(): Statement = getConnection.createStatement()
}

View file

@ -0,0 +1,108 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.hive.thriftserver
import java.io.{BufferedReader, PrintWriter}
import java.text.SimpleDateFormat
import java.util.Date
import org.apache.hadoop.hive.common.LogUtils
import org.apache.hadoop.hive.common.LogUtils.LogInitializationException
object TestUtils {
val timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss")
def getWarehousePath(prefix: String): String = {
System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-warehouse-" +
timestamp.format(new Date)
}
def getMetastorePath(prefix: String): String = {
System.getProperty("user.dir") + "/test_warehouses/" + prefix + "-metastore-" +
timestamp.format(new Date)
}
// Dummy function for initialize the log4j properties.
def init() { }
// initialize log4j
try {
LogUtils.initHiveLog4j()
} catch {
case e: LogInitializationException => // Ignore the error.
}
}
trait TestUtils {
var process : Process = null
var outputWriter : PrintWriter = null
var inputReader : BufferedReader = null
var errorReader : BufferedReader = null
def executeQuery(
cmd: String, outputMessage: String = "OK", timeout: Long = 15000): String = {
println("Executing: " + cmd + ", expecting output: " + outputMessage)
outputWriter.write(cmd + "\n")
outputWriter.flush()
waitForQuery(timeout, outputMessage)
}
protected def waitForQuery(timeout: Long, message: String): String = {
if (waitForOutput(errorReader, message, timeout)) {
Thread.sleep(500)
readOutput()
} else {
assert(false, "Didn't find \"" + message + "\" in the output:\n" + readOutput())
null
}
}
// Wait for the specified str to appear in the output.
protected def waitForOutput(
reader: BufferedReader, str: String, timeout: Long = 10000): Boolean = {
val startTime = System.currentTimeMillis
var out = ""
while (!out.contains(str) && System.currentTimeMillis < (startTime + timeout)) {
out += readFrom(reader)
}
out.contains(str)
}
// Read stdout output and filter out garbage collection messages.
protected def readOutput(): String = {
val output = readFrom(inputReader)
// Remove GC Messages
val filteredOutput = output.lines.filterNot(x => x.contains("[GC") || x.contains("[Full GC"))
.mkString("\n")
filteredOutput
}
protected def readFrom(reader: BufferedReader): String = {
var out = ""
var c = 0
while (reader.ready) {
c = reader.read()
out += c.asInstanceOf[Char]
}
out
}
protected def getDataFile(name: String) = {
Thread.currentThread().getContextClassLoader.getResource(name)
}
}

View file

@ -32,7 +32,7 @@
<name>Spark Project Hive</name>
<url>http://spark.apache.org/</url>
<properties>
<sbt.project.name>hive</sbt.project.name>
<sbt.project.name>hive</sbt.project.name>
</properties>
<dependencies>

View file

@ -255,7 +255,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
Seq(StringType, IntegerType, LongType, DoubleType, FloatType, BooleanType, ByteType,
ShortType, DecimalType, TimestampType, BinaryType)
protected def toHiveString(a: (Any, DataType)): String = a match {
protected[sql] def toHiveString(a: (Any, DataType)): String = a match {
case (struct: Row, StructType(fields)) =>
struct.zip(fields).map {
case (v, t) => s""""${t.name}":${toHiveStructString(v, t.dataType)}"""

View file

@ -416,10 +416,10 @@ class HiveQuerySuite extends HiveComparisonTest {
hql(s"set $testKey=$testVal")
assert(get(testKey, testVal + "_") == testVal)
hql("set mapred.reduce.tasks=20")
assert(get("mapred.reduce.tasks", "0") == "20")
hql("set mapred.reduce.tasks = 40")
assert(get("mapred.reduce.tasks", "0") == "40")
hql("set some.property=20")
assert(get("some.property", "0") == "20")
hql("set some.property = 40")
assert(get("some.property", "0") == "40")
hql(s"set $testKey=$testVal")
assert(get(testKey, "0") == testVal)
@ -433,63 +433,61 @@ class HiveQuerySuite extends HiveComparisonTest {
val testKey = "spark.sql.key.usedfortestonly"
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
def collectResults(rdd: SchemaRDD): Set[(String, String)] =
rdd.collect().map { case Row(key: String, value: String) => key -> value }.toSet
clear()
// "set" itself returns all config variables currently specified in SQLConf.
assert(hql("SET").collect().size == 0)
assertResult(Set(testKey -> testVal)) {
collectResults(hql(s"SET $testKey=$testVal"))
assertResult(Array(s"$testKey=$testVal")) {
hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
}
assert(hiveconf.get(testKey, "") == testVal)
assertResult(Set(testKey -> testVal)) {
collectResults(hql("SET"))
assertResult(Array(s"$testKey=$testVal")) {
hql(s"SET $testKey=$testVal").collect().map(_.getString(0))
}
hql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(hql("SET"))
assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) {
hql(s"SET").collect().map(_.getString(0))
}
// "set key"
assertResult(Set(testKey -> testVal)) {
collectResults(hql(s"SET $testKey"))
assertResult(Array(s"$testKey=$testVal")) {
hql(s"SET $testKey").collect().map(_.getString(0))
}
assertResult(Set(nonexistentKey -> "<undefined>")) {
collectResults(hql(s"SET $nonexistentKey"))
assertResult(Array(s"$nonexistentKey=<undefined>")) {
hql(s"SET $nonexistentKey").collect().map(_.getString(0))
}
// Assert that sql() should have the same effects as hql() by repeating the above using sql().
clear()
assert(sql("SET").collect().size == 0)
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey=$testVal"))
assertResult(Array(s"$testKey=$testVal")) {
sql(s"SET $testKey=$testVal").collect().map(_.getString(0))
}
assert(hiveconf.get(testKey, "") == testVal)
assertResult(Set(testKey -> testVal)) {
collectResults(sql("SET"))
assertResult(Array(s"$testKey=$testVal")) {
sql("SET").collect().map(_.getString(0))
}
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
collectResults(sql("SET"))
assertResult(Array(s"$testKey=$testVal", s"${testKey + testKey}=${testVal + testVal}")) {
sql("SET").collect().map(_.getString(0))
}
assertResult(Set(testKey -> testVal)) {
collectResults(sql(s"SET $testKey"))
assertResult(Array(s"$testKey=$testVal")) {
sql(s"SET $testKey").collect().map(_.getString(0))
}
assertResult(Set(nonexistentKey -> "<undefined>")) {
collectResults(sql(s"SET $nonexistentKey"))
assertResult(Array(s"$nonexistentKey=<undefined>")) {
sql(s"SET $nonexistentKey").collect().map(_.getString(0))
}
clear()

View file

@ -28,7 +28,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<properties>
<sbt.project.name>streaming</sbt.project.name>
<sbt.project.name>streaming</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Streaming</name>

View file

@ -27,7 +27,7 @@
<groupId>org.apache.spark</groupId>
<artifactId>spark-tools_2.10</artifactId>
<properties>
<sbt.project.name>tools</sbt.project.name>
<sbt.project.name>tools</sbt.project.name>
</properties>
<packaging>jar</packaging>
<name>Spark Project Tools</name>

View file

@ -24,7 +24,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
<properties>
<sbt.project.name>yarn-alpha</sbt.project.name>
<sbt.project.name>yarn-alpha</sbt.project.name>
</properties>
<groupId>org.apache.spark</groupId>

View file

@ -29,7 +29,7 @@
<packaging>pom</packaging>
<name>Spark Project YARN Parent POM</name>
<properties>
<sbt.project.name>yarn</sbt.project.name>
<sbt.project.name>yarn</sbt.project.name>
</properties>
<dependencies>

View file

@ -24,7 +24,7 @@
<relativePath>../pom.xml</relativePath>
</parent>
<properties>
<sbt.project.name>yarn-stable</sbt.project.name>
<sbt.project.name>yarn-stable</sbt.project.name>
</properties>
<groupId>org.apache.spark</groupId>