[SPARK-16114][SQL] structured streaming network word count examples
## What changes were proposed in this pull request? Network word count example for structured streaming ## How was this patch tested? Run locally Author: James Thomas <jamesjoethomas@gmail.com> Author: James Thomas <jamesthomas@Jamess-MacBook-Pro.local> Closes #13816 from jjthomas/master.
This commit is contained in:
parent
8a977b0654
commit
3554713a16
|
@ -0,0 +1,82 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.examples.sql.streaming;
|
||||
|
||||
import org.apache.spark.api.java.function.FlatMapFunction;
|
||||
import org.apache.spark.sql.*;
|
||||
import org.apache.spark.sql.streaming.StreamingQuery;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Iterator;
|
||||
|
||||
/**
|
||||
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
|
||||
*
|
||||
* Usage: JavaStructuredNetworkWordCount <hostname> <port>
|
||||
* <hostname> and <port> describe the TCP server that Structured Streaming
|
||||
* would connect to receive data.
|
||||
*
|
||||
* To run this on your local machine, you need to first run a Netcat server
|
||||
* `$ nc -lk 9999`
|
||||
* and then run the example
|
||||
* `$ bin/run-example sql.streaming.JavaStructuredNetworkWordCount
|
||||
* localhost 9999`
|
||||
*/
|
||||
public final class JavaStructuredNetworkWordCount {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
String host = args[0];
|
||||
int port = Integer.parseInt(args[1]);
|
||||
|
||||
SparkSession spark = SparkSession
|
||||
.builder()
|
||||
.appName("JavaStructuredNetworkWordCount")
|
||||
.getOrCreate();
|
||||
|
||||
// Create DataFrame representing the stream of input lines from connection to host:port
|
||||
Dataset<String> lines = spark
|
||||
.readStream()
|
||||
.format("socket")
|
||||
.option("host", host)
|
||||
.option("port", port)
|
||||
.load().as(Encoders.STRING());
|
||||
|
||||
// Split the lines into words
|
||||
Dataset<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
|
||||
@Override
|
||||
public Iterator<String> call(String x) {
|
||||
return Arrays.asList(x.split(" ")).iterator();
|
||||
}
|
||||
}, Encoders.STRING());
|
||||
|
||||
// Generate running word count
|
||||
Dataset<Row> wordCounts = words.groupBy("value").count();
|
||||
|
||||
// Start running the query that prints the running counts to the console
|
||||
StreamingQuery query = wordCounts.writeStream()
|
||||
.outputMode("complete")
|
||||
.format("console")
|
||||
.start();
|
||||
|
||||
query.awaitTermination();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
"""
|
||||
Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
|
||||
Usage: structured_network_wordcount.py <hostname> <port>
|
||||
<hostname> and <port> describe the TCP server that Structured Streaming
|
||||
would connect to receive data.
|
||||
|
||||
To run this on your local machine, you need to first run a Netcat server
|
||||
`$ nc -lk 9999`
|
||||
and then run the example
|
||||
`$ bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py
|
||||
localhost 9999`
|
||||
"""
|
||||
from __future__ import print_function
|
||||
|
||||
import sys
|
||||
|
||||
from pyspark.sql import SparkSession
|
||||
from pyspark.sql.functions import explode
|
||||
from pyspark.sql.functions import split
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 3:
|
||||
print("Usage: structured_network_wordcount.py <hostname> <port>", file=sys.stderr)
|
||||
exit(-1)
|
||||
|
||||
host = sys.argv[1]
|
||||
port = int(sys.argv[2])
|
||||
|
||||
spark = SparkSession\
|
||||
.builder\
|
||||
.appName("StructuredNetworkWordCount")\
|
||||
.getOrCreate()
|
||||
|
||||
# Create DataFrame representing the stream of input lines from connection to host:port
|
||||
lines = spark\
|
||||
.readStream\
|
||||
.format('socket')\
|
||||
.option('host', host)\
|
||||
.option('port', port)\
|
||||
.load()
|
||||
|
||||
# Split the lines into words
|
||||
words = lines.select(
|
||||
explode(
|
||||
split(lines.value, ' ')
|
||||
).alias('word')
|
||||
)
|
||||
|
||||
# Generate running word count
|
||||
wordCounts = words.groupBy('word').count()
|
||||
|
||||
# Start running the query that prints the running counts to the console
|
||||
query = wordCounts\
|
||||
.writeStream\
|
||||
.outputMode('complete')\
|
||||
.format('console')\
|
||||
.start()
|
||||
|
||||
query.awaitTermination()
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
// scalastyle:off println
|
||||
package org.apache.spark.examples.sql.streaming
|
||||
|
||||
import org.apache.spark.sql.functions._
|
||||
import org.apache.spark.sql.SparkSession
|
||||
|
||||
/**
|
||||
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
|
||||
*
|
||||
* Usage: StructuredNetworkWordCount <hostname> <port>
|
||||
* <hostname> and <port> describe the TCP server that Structured Streaming
|
||||
* would connect to receive data.
|
||||
*
|
||||
* To run this on your local machine, you need to first run a Netcat server
|
||||
* `$ nc -lk 9999`
|
||||
* and then run the example
|
||||
* `$ bin/run-example sql.streaming.StructuredNetworkWordCount
|
||||
* localhost 9999`
|
||||
*/
|
||||
object StructuredNetworkWordCount {
|
||||
def main(args: Array[String]) {
|
||||
if (args.length < 2) {
|
||||
System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>")
|
||||
System.exit(1)
|
||||
}
|
||||
|
||||
val host = args(0)
|
||||
val port = args(1).toInt
|
||||
|
||||
val spark = SparkSession
|
||||
.builder
|
||||
.appName("StructuredNetworkWordCount")
|
||||
.getOrCreate()
|
||||
|
||||
import spark.implicits._
|
||||
|
||||
// Create DataFrame representing the stream of input lines from connection to host:port
|
||||
val lines = spark.readStream
|
||||
.format("socket")
|
||||
.option("host", host)
|
||||
.option("port", port)
|
||||
.load().as[String]
|
||||
|
||||
// Split the lines into words
|
||||
val words = lines.flatMap(_.split(" "))
|
||||
|
||||
// Generate running word count
|
||||
val wordCounts = words.groupBy("value").count()
|
||||
|
||||
// Start running the query that prints the running counts to the console
|
||||
val query = wordCounts.writeStream
|
||||
.outputMode("complete")
|
||||
.format("console")
|
||||
.start()
|
||||
|
||||
query.awaitTermination()
|
||||
}
|
||||
}
|
||||
// scalastyle:on println
|
Loading…
Reference in a new issue