[SPARK-16734][EXAMPLES][SQL] Revise examples of all language bindings

## What changes were proposed in this pull request?

This PR makes various minor updates to examples of all language bindings to make sure they are consistent with each other. Some typos and missing parts (JDBC example in Scala/Java/Python) are also fixed.

## How was this patch tested?

Manually tested.

Author: Cheng Lian <lian@databricks.com>

Closes #14368 from liancheng/revise-examples.
This commit is contained in:
Cheng Lian 2016-08-02 15:02:40 +08:00 committed by Wenchen Fan
parent 5184df06b3
commit 10e1c0e638
9 changed files with 137 additions and 117 deletions

View file

@ -132,7 +132,7 @@ from a Hive table, or from [Spark data sources](#data-sources).
As an example, the following creates a DataFrame based on the content of a JSON file: As an example, the following creates a DataFrame based on the content of a JSON file:
{% include_example create_DataFrames r/RSparkSQLExample.R %} {% include_example create_df r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -180,7 +180,7 @@ In addition to simple column references and expressions, DataFrames also have a
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example dataframe_operations r/RSparkSQLExample.R %} {% include_example untyped_ops r/RSparkSQLExample.R %}
For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/R/index.html). For a complete list of the types of operations that can be performed on a DataFrame refer to the [API Documentation](api/R/index.html).
@ -214,7 +214,7 @@ The `sql` function on a `SparkSession` enables applications to run SQL queries p
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`. The `sql` function enables applications to run SQL queries programmatically and returns the result as a `SparkDataFrame`.
{% include_example sql_query r/RSparkSQLExample.R %} {% include_example run_sql r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -377,7 +377,7 @@ In the simplest form, the default data source (`parquet` unless otherwise config
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example source_parquet r/RSparkSQLExample.R %} {% include_example generic_load_save_functions r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -400,13 +400,11 @@ using this syntax.
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% include_example manual_load_options python/sql/datasource.py %} {% include_example manual_load_options python/sql/datasource.py %}
</div> </div>
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example manual_load_options r/RSparkSQLExample.R %}
{% include_example source_json r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -425,13 +423,11 @@ file directly with SQL.
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% include_example direct_sql python/sql/datasource.py %} {% include_example direct_sql python/sql/datasource.py %}
</div> </div>
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example direct_sql r/RSparkSQLExample.R %}
{% include_example direct_query r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -523,7 +519,7 @@ Using the data from the above example:
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example load_programmatically r/RSparkSQLExample.R %} {% include_example basic_parquet_example r/RSparkSQLExample.R %}
</div> </div>
@ -839,7 +835,7 @@ Note that the file that is offered as _a json file_ is not a typical JSON file.
line must contain a separate, self-contained valid JSON object. As a consequence, line must contain a separate, self-contained valid JSON object. As a consequence,
a regular multi-line JSON file will most often fail. a regular multi-line JSON file will most often fail.
{% include_example load_json_file r/RSparkSQLExample.R %} {% include_example json_dataset r/RSparkSQLExample.R %}
</div> </div>
@ -925,7 +921,7 @@ You may need to grant write privilege to the user who starts the spark applicati
When working with Hive one must instantiate `SparkSession` with Hive support. This When working with Hive one must instantiate `SparkSession` with Hive support. This
adds support for finding tables in the MetaStore and writing queries using HiveQL. adds support for finding tables in the MetaStore and writing queries using HiveQL.
{% include_example hive_table r/RSparkSQLExample.R %} {% include_example spark_hive r/RSparkSQLExample.R %}
</div> </div>
</div> </div>
@ -1067,43 +1063,19 @@ the Data Sources API. The following options are supported:
<div class="codetabs"> <div class="codetabs">
<div data-lang="scala" markdown="1"> <div data-lang="scala" markdown="1">
{% include_example jdbc_dataset scala/org/apache/spark/examples/sql/SQLDataSourceExample.scala %}
{% highlight scala %}
val jdbcDF = spark.read.format("jdbc").options(
Map("url" -> "jdbc:postgresql:dbserver",
"dbtable" -> "schema.tablename")).load()
{% endhighlight %}
</div> </div>
<div data-lang="java" markdown="1"> <div data-lang="java" markdown="1">
{% include_example jdbc_dataset java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java %}
{% highlight java %}
Map<String, String> options = new HashMap<>();
options.put("url", "jdbc:postgresql:dbserver");
options.put("dbtable", "schema.tablename");
Dataset<Row> jdbcDF = spark.read().format("jdbc"). options(options).load();
{% endhighlight %}
</div> </div>
<div data-lang="python" markdown="1"> <div data-lang="python" markdown="1">
{% include_example jdbc_dataset python/sql/datasource.py %}
{% highlight python %}
df = spark.read.format('jdbc').options(url='jdbc:postgresql:dbserver', dbtable='schema.tablename').load()
{% endhighlight %}
</div> </div>
<div data-lang="r" markdown="1"> <div data-lang="r" markdown="1">
{% include_example jdbc_dataset r/RSparkSQLExample.R %}
{% include_example jdbc r/RSparkSQLExample.R %}
</div> </div>
<div data-lang="sql" markdown="1"> <div data-lang="sql" markdown="1">

View file

@ -25,7 +25,6 @@ import java.util.List;
// $example on:basic_parquet_example$ // $example on:basic_parquet_example$
import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Encoders;
// import org.apache.spark.sql.Encoders;
// $example on:schema_merging$ // $example on:schema_merging$
// $example on:json_dataset$ // $example on:json_dataset$
import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Dataset;
@ -92,7 +91,7 @@ public class JavaSQLDataSourceExample {
public static void main(String[] args) { public static void main(String[] args) {
SparkSession spark = SparkSession SparkSession spark = SparkSession
.builder() .builder()
.appName("Java Spark SQL Data Sources Example") .appName("Java Spark SQL data sources example")
.config("spark.some.config.option", "some-value") .config("spark.some.config.option", "some-value")
.getOrCreate(); .getOrCreate();
@ -100,6 +99,7 @@ public class JavaSQLDataSourceExample {
runBasicParquetExample(spark); runBasicParquetExample(spark);
runParquetSchemaMergingExample(spark); runParquetSchemaMergingExample(spark);
runJsonDatasetExample(spark); runJsonDatasetExample(spark);
runJdbcDatasetExample(spark);
spark.stop(); spark.stop();
} }
@ -183,10 +183,10 @@ public class JavaSQLDataSourceExample {
// The final schema consists of all 3 columns in the Parquet files together // The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths // with the partitioning column appeared in the partition directory paths
// root // root
// |-- value: int (nullable = true) // |-- value: int (nullable = true)
// |-- square: int (nullable = true) // |-- square: int (nullable = true)
// |-- cube: int (nullable = true) // |-- cube: int (nullable = true)
// |-- key : int (nullable = true) // |-- key: int (nullable = true)
// $example off:schema_merging$ // $example off:schema_merging$
} }
@ -216,4 +216,15 @@ public class JavaSQLDataSourceExample {
// $example off:json_dataset$ // $example off:json_dataset$
} }
private static void runJdbcDatasetExample(SparkSession spark) {
// $example on:jdbc_dataset$
Dataset<Row> jdbcDF = spark.read()
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load();
// $example off:jdbc_dataset$
}
} }

View file

@ -88,7 +88,7 @@ public class JavaSparkSQLExample {
// $example on:init_session$ // $example on:init_session$
SparkSession spark = SparkSession SparkSession spark = SparkSession
.builder() .builder()
.appName("Java Spark SQL Example") .appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value") .config("spark.some.config.option", "some-value")
.getOrCreate(); .getOrCreate();
// $example off:init_session$ // $example off:init_session$

View file

@ -182,7 +182,7 @@ if __name__ == "__main__":
# $example on:init_session$ # $example on:init_session$
spark = SparkSession \ spark = SparkSession \
.builder \ .builder \
.appName("PythonSQL") \ .appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \ .config("spark.some.config.option", "some-value") \
.getOrCreate() .getOrCreate()
# $example off:init_session$ # $example off:init_session$

View file

@ -92,14 +92,14 @@ def parquet_schema_merging_example(spark):
# The final schema consists of all 3 columns in the Parquet files together # The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths. # with the partitioning column appeared in the partition directory paths.
# root # root
# |-- double: long (nullable = true) # |-- double: long (nullable = true)
# |-- single: long (nullable = true) # |-- single: long (nullable = true)
# |-- triple: long (nullable = true) # |-- triple: long (nullable = true)
# |-- key: integer (nullable = true) # |-- key: integer (nullable = true)
# $example off:schema_merging$ # $example off:schema_merging$
def json_dataset_examplg(spark): def json_dataset_example(spark):
# $example on:json_dataset$ # $example on:json_dataset$
# spark is from the previous example. # spark is from the previous example.
sc = spark.sparkContext sc = spark.sparkContext
@ -112,8 +112,8 @@ def json_dataset_examplg(spark):
# The inferred schema can be visualized using the printSchema() method # The inferred schema can be visualized using the printSchema() method
peopleDF.printSchema() peopleDF.printSchema()
# root # root
# |-- age: long (nullable = true) # |-- age: long (nullable = true)
# |-- name: string (nullable = true) # |-- name: string (nullable = true)
# Creates a temporary view using the DataFrame # Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people") peopleDF.createOrReplaceTempView("people")
@ -140,15 +140,29 @@ def json_dataset_examplg(spark):
# +---------------+----+ # +---------------+----+
# $example off:json_dataset$ # $example off:json_dataset$
def jdbc_dataset_example(spark):
# $example on:jdbc_dataset$
jdbcDF = spark.read \
.format("jdbc") \
.option("url", "jdbc:postgresql:dbserver") \
.option("dbtable", "schema.tablename") \
.option("user", "username") \
.option("password", "password") \
.load()
# $example off:jdbc_dataset$
if __name__ == "__main__": if __name__ == "__main__":
spark = SparkSession \ spark = SparkSession \
.builder \ .builder \
.appName("PythonSQL") \ .appName("Python Spark SQL data source example") \
.getOrCreate() .getOrCreate()
basic_datasource_example(spark) basic_datasource_example(spark)
parquet_example(spark) parquet_example(spark)
parquet_schema_merging_example(spark) parquet_schema_merging_example(spark)
json_dataset_examplg(spark) json_dataset_example(spark)
jdbc_dataset_example(spark)
spark.stop() spark.stop()

View file

@ -38,7 +38,7 @@ if __name__ == "__main__":
spark = SparkSession \ spark = SparkSession \
.builder \ .builder \
.appName("PythonSQL") \ .appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \ .config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \ .enableHiveSupport() \
.getOrCreate() .getOrCreate()

View file

@ -18,31 +18,43 @@
library(SparkR) library(SparkR)
# $example on:init_session$ # $example on:init_session$
sparkR.session(appName = "MyApp", sparkConfig = list(spark.executor.memory = "1g")) sparkR.session(appName = "MyApp", sparkConfig = list(spark.some.config.option = "some-value"))
# $example off:init_session$ # $example off:init_session$
# $example on:create_DataFrames$ # $example on:create_df$
df <- read.json("examples/src/main/resources/people.json") df <- read.json("examples/src/main/resources/people.json")
# Displays the content of the DataFrame # Displays the content of the DataFrame
head(df) head(df)
## age name
## 1 NA Michael
## 2 30 Andy
## 3 19 Justin
# Another method to print the first few rows and optionally truncate the printing of long values # Another method to print the first few rows and optionally truncate the printing of long values
showDF(df) showDF(df)
# $example off:create_DataFrames$ ## +----+-------+
## | age| name|
## +----+-------+
## |null|Michael|
## | 30| Andy|
## | 19| Justin|
## +----+-------+
## $example off:create_df$
# $example on:dataframe_operations$ # $example on:untyped_ops$
# Create the DataFrame # Create the DataFrame
df <- read.json("examples/src/main/resources/people.json") df <- read.json("examples/src/main/resources/people.json")
# Show the content of the DataFrame # Show the content of the DataFrame
head(df) head(df)
## age name ## age name
## null Michael ## 1 NA Michael
## 30 Andy ## 2 30 Andy
## 19 Justin ## 3 19 Justin
# Print the schema in a tree format # Print the schema in a tree format
printSchema(df) printSchema(df)
@ -52,58 +64,58 @@ printSchema(df)
# Select only the "name" column # Select only the "name" column
head(select(df, "name")) head(select(df, "name"))
## name ## name
## Michael ## 1 Michael
## Andy ## 2 Andy
## Justin ## 3 Justin
# Select everybody, but increment the age by 1 # Select everybody, but increment the age by 1
head(select(df, df$name, df$age + 1)) head(select(df, df$name, df$age + 1))
## name (age + 1) ## name (age + 1.0)
## Michael null ## 1 Michael NA
## Andy 31 ## 2 Andy 31
## Justin 20 ## 3 Justin 20
# Select people older than 21 # Select people older than 21
head(where(df, df$age > 21)) head(where(df, df$age > 21))
## age name ## age name
## 30 Andy ## 1 30 Andy
# Count people by age # Count people by age
head(count(groupBy(df, "age"))) head(count(groupBy(df, "age")))
## age count ## age count
## null 1 ## 1 19 1
## 19 1 ## 2 NA 1
## 30 1 ## 3 30 1
# $example off:dataframe_operations$ # $example off:untyped_ops$
# Register this DataFrame as a table. # Register this DataFrame as a table.
createOrReplaceTempView(df, "table") createOrReplaceTempView(df, "table")
# $example on:sql_query$ # $example on:run_sql$
df <- sql("SELECT * FROM table") df <- sql("SELECT * FROM table")
# $example off:sql_query$ # $example off:run_sql$
# $example on:source_parquet$ # $example on:generic_load_save_functions$
df <- read.df("examples/src/main/resources/users.parquet") df <- read.df("examples/src/main/resources/users.parquet")
write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet") write.df(select(df, "name", "favorite_color"), "namesAndFavColors.parquet")
# $example off:source_parquet$ # $example off:generic_load_save_functions$
# $example on:source_json$ # $example on:manual_load_options$
df <- read.df("examples/src/main/resources/people.json", "json") df <- read.df("examples/src/main/resources/people.json", "json")
namesAndAges <- select(df, "name", "age") namesAndAges <- select(df, "name", "age")
write.df(namesAndAges, "namesAndAges.parquet", "parquet") write.df(namesAndAges, "namesAndAges.parquet", "parquet")
# $example off:source_json$ # $example off:manual_load_options$
# $example on:direct_query$ # $example on:direct_sql$
df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`") df <- sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
# $example off:direct_query$ # $example off:direct_sql$
# $example on:load_programmatically$ # $example on:basic_parquet_example$
df <- read.df("examples/src/main/resources/people.json", "json") df <- read.df("examples/src/main/resources/people.json", "json")
# SparkDataFrame can be saved as Parquet files, maintaining the schema information. # SparkDataFrame can be saved as Parquet files, maintaining the schema information.
@ -117,7 +129,7 @@ parquetFile <- read.parquet("people.parquet")
createOrReplaceTempView(parquetFile, "parquetFile") createOrReplaceTempView(parquetFile, "parquetFile")
teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") teenagers <- sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
head(teenagers) head(teenagers)
## name ## name
## 1 Justin ## 1 Justin
# We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:" # We can also run custom R-UDFs on Spark DataFrames. Here we prefix all the names with "Name:"
@ -129,7 +141,7 @@ for (teenName in collect(teenNames)$name) {
## Name: Michael ## Name: Michael
## Name: Andy ## Name: Andy
## Name: Justin ## Name: Justin
# $example off:load_programmatically$ # $example off:basic_parquet_example$
# $example on:schema_merging$ # $example on:schema_merging$
@ -146,18 +158,17 @@ write.df(df2, "data/test_table/key=2", "parquet", "overwrite")
# Read the partitioned table # Read the partitioned table
df3 <- read.df("data/test_table", "parquet", mergeSchema = "true") df3 <- read.df("data/test_table", "parquet", mergeSchema = "true")
printSchema(df3) printSchema(df3)
# The final schema consists of all 3 columns in the Parquet files together # The final schema consists of all 3 columns in the Parquet files together
# with the partitioning column appeared in the partition directory paths. # with the partitioning column appeared in the partition directory paths
# root ## root
# |-- single: double (nullable = true) ## |-- single: double (nullable = true)
# |-- double: double (nullable = true) ## |-- double: double (nullable = true)
# |-- triple: double (nullable = true) ## |-- triple: double (nullable = true)
# |-- key : int (nullable = true) ## |-- key: integer (nullable = true)
# $example off:schema_merging$ # $example off:schema_merging$
# $example on:load_json_file$ # $example on:json_dataset$
# A JSON dataset is pointed to by path. # A JSON dataset is pointed to by path.
# The path can be either a single text file or a directory storing text files. # The path can be either a single text file or a directory storing text files.
path <- "examples/src/main/resources/people.json" path <- "examples/src/main/resources/people.json"
@ -166,9 +177,9 @@ people <- read.json(path)
# The inferred schema can be visualized using the printSchema() method. # The inferred schema can be visualized using the printSchema() method.
printSchema(people) printSchema(people)
# root ## root
# |-- age: long (nullable = true) ## |-- age: long (nullable = true)
# |-- name: string (nullable = true) ## |-- name: string (nullable = true)
# Register this DataFrame as a table. # Register this DataFrame as a table.
createOrReplaceTempView(people, "people") createOrReplaceTempView(people, "people")
@ -176,12 +187,12 @@ createOrReplaceTempView(people, "people")
# SQL statements can be run by using the sql methods. # SQL statements can be run by using the sql methods.
teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") teenagers <- sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
head(teenagers) head(teenagers)
## name ## name
## 1 Justin ## 1 Justin
# $example off:load_json_file$ # $example off:json_dataset$
# $example on:hive_table$ # $example on:spark_hive$
# enableHiveSupport defaults to TRUE # enableHiveSupport defaults to TRUE
sparkR.session(enableHiveSupport = TRUE) sparkR.session(enableHiveSupport = TRUE)
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
@ -189,12 +200,12 @@ sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src
# Queries can be expressed in HiveQL. # Queries can be expressed in HiveQL.
results <- collect(sql("FROM src SELECT key, value")) results <- collect(sql("FROM src SELECT key, value"))
# $example off:hive_table$ # $example off:spark_hive$
# $example on:jdbc$ # $example on:jdbc_dataset$
df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password") df <- read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", user = "username", password = "password")
# $example off:jdbc$ # $example off:jdbc_dataset$
# Stop the SparkSession now # Stop the SparkSession now
sparkR.session.stop() sparkR.session.stop()

View file

@ -25,7 +25,7 @@ object SQLDataSourceExample {
def main(args: Array[String]) { def main(args: Array[String]) {
val spark = SparkSession val spark = SparkSession
.builder() .builder()
.appName("Spark SQL Data Soures Example") .appName("Spark SQL data sources example")
.config("spark.some.config.option", "some-value") .config("spark.some.config.option", "some-value")
.getOrCreate() .getOrCreate()
@ -33,6 +33,7 @@ object SQLDataSourceExample {
runBasicParquetExample(spark) runBasicParquetExample(spark)
runParquetSchemaMergingExample(spark) runParquetSchemaMergingExample(spark)
runJsonDatasetExample(spark) runJsonDatasetExample(spark)
runJdbcDatasetExample(spark)
spark.stop() spark.stop()
} }
@ -99,10 +100,10 @@ object SQLDataSourceExample {
// The final schema consists of all 3 columns in the Parquet files together // The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths // with the partitioning column appeared in the partition directory paths
// root // root
// |-- value: int (nullable = true) // |-- value: int (nullable = true)
// |-- square: int (nullable = true) // |-- square: int (nullable = true)
// |-- cube: int (nullable = true) // |-- cube: int (nullable = true)
// |-- key : int (nullable = true) // |-- key: int (nullable = true)
// $example off:schema_merging$ // $example off:schema_merging$
} }
@ -145,4 +146,15 @@ object SQLDataSourceExample {
// $example off:json_dataset$ // $example off:json_dataset$
} }
private def runJdbcDatasetExample(spark: SparkSession): Unit = {
// $example on:jdbc_dataset$
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:postgresql:dbserver")
.option("dbtable", "schema.tablename")
.option("user", "username")
.option("password", "password")
.load()
// $example off:jdbc_dataset$
}
} }

View file

@ -42,7 +42,7 @@ object SparkSQLExample {
// $example on:init_session$ // $example on:init_session$
val spark = SparkSession val spark = SparkSession
.builder() .builder()
.appName("Spark SQL Example") .appName("Spark SQL basic example")
.config("spark.some.config.option", "some-value") .config("spark.some.config.option", "some-value")
.getOrCreate() .getOrCreate()