[MINOR][DOCS] Fix minor typos in python example code
## What changes were proposed in this pull request? Fix minor typos python example code in streaming programming guide ## How was this patch tested? N/A Author: Dmitriy Sokolov <silentsokolov@gmail.com> Closes #14805 from silentsokolov/fix-typos.
This commit is contained in:
parent
befab9c1c6
commit
d4eee9932e
|
@ -460,8 +460,10 @@ Elasticsearch ESInputFormat:
|
|||
{% highlight python %}
|
||||
$ SPARK_CLASSPATH=/path/to/elasticsearch-hadoop.jar ./bin/pyspark
|
||||
>>> conf = {"es.resource" : "index/type"} # assume Elasticsearch is running on localhost defaults
|
||||
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",\
|
||||
"org.apache.hadoop.io.NullWritable", "org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=conf)
|
||||
>>> rdd = sc.newAPIHadoopRDD("org.elasticsearch.hadoop.mr.EsInputFormat",
|
||||
"org.apache.hadoop.io.NullWritable",
|
||||
"org.elasticsearch.hadoop.mr.LinkedMapWritable",
|
||||
conf=conf)
|
||||
>>> rdd.first() # the result is a MapWritable that is converted to a Python dict
|
||||
(u'Elasticsearch ID',
|
||||
{u'field1': True,
|
||||
|
@ -797,7 +799,6 @@ def increment_counter(x):
|
|||
rdd.foreach(increment_counter)
|
||||
|
||||
print("Counter value: ", counter)
|
||||
|
||||
{% endhighlight %}
|
||||
</div>
|
||||
|
||||
|
@ -1455,13 +1456,14 @@ The code below shows an accumulator being used to add up the elements of an arra
|
|||
|
||||
{% highlight python %}
|
||||
>>> accum = sc.accumulator(0)
|
||||
>>> accum
|
||||
Accumulator<id=0, value=0>
|
||||
|
||||
>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
|
||||
...
|
||||
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s
|
||||
|
||||
scala> accum.value
|
||||
>>> accum.value
|
||||
10
|
||||
{% endhighlight %}
|
||||
|
||||
|
|
|
@ -1495,16 +1495,15 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
|
|||
</div>
|
||||
<div data-lang="python" markdown="1">
|
||||
{% highlight python %}
|
||||
|
||||
def getWordBlacklist(sparkContext):
|
||||
if ('wordBlacklist' not in globals()):
|
||||
globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"])
|
||||
return globals()['wordBlacklist']
|
||||
if ("wordBlacklist" not in globals()):
|
||||
globals()["wordBlacklist"] = sparkContext.broadcast(["a", "b", "c"])
|
||||
return globals()["wordBlacklist"]
|
||||
|
||||
def getDroppedWordsCounter(sparkContext):
|
||||
if ('droppedWordsCounter' not in globals()):
|
||||
globals()['droppedWordsCounter'] = sparkContext.accumulator(0)
|
||||
return globals()['droppedWordsCounter']
|
||||
if ("droppedWordsCounter" not in globals()):
|
||||
globals()["droppedWordsCounter"] = sparkContext.accumulator(0)
|
||||
return globals()["droppedWordsCounter"]
|
||||
|
||||
def echo(time, rdd):
|
||||
# Get or register the blacklist Broadcast
|
||||
|
@ -1626,12 +1625,12 @@ See the full [source code]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_
|
|||
|
||||
# Lazily instantiated global instance of SparkSession
|
||||
def getSparkSessionInstance(sparkConf):
|
||||
if ('sparkSessionSingletonInstance' not in globals()):
|
||||
globals()['sparkSessionSingletonInstance'] = SparkSession\
|
||||
if ("sparkSessionSingletonInstance" not in globals()):
|
||||
globals()["sparkSessionSingletonInstance"] = SparkSession \
|
||||
.builder \
|
||||
.config(conf=sparkConf) \
|
||||
.getOrCreate()
|
||||
return globals()['sparkSessionSingletonInstance']
|
||||
return globals()["sparkSessionSingletonInstance"]
|
||||
|
||||
...
|
||||
|
||||
|
@ -1830,7 +1829,7 @@ This behavior is made simple by using `StreamingContext.getOrCreate`. This is us
|
|||
# Function to create and setup a new StreamingContext
|
||||
def functionToCreateContext():
|
||||
sc = SparkContext(...) # new context
|
||||
ssc = new StreamingContext(...)
|
||||
ssc = StreamingContext(...)
|
||||
lines = ssc.socketTextStream(...) # create DStreams
|
||||
...
|
||||
ssc.checkpoint(checkpointDirectory) # set checkpoint directory
|
||||
|
|
|
@ -126,20 +126,20 @@ This `lines` DataFrame represents an unbounded table containing the streaming te
|
|||
# Create DataFrame representing the stream of input lines from connection to localhost:9999
|
||||
lines = spark \
|
||||
.readStream \
|
||||
.format('socket')\
|
||||
.option('host', 'localhost')\
|
||||
.option('port', 9999)\
|
||||
.format("socket") \
|
||||
.option("host", "localhost") \
|
||||
.option("port", 9999) \
|
||||
.load()
|
||||
|
||||
# Split the lines into words
|
||||
words = lines.select(
|
||||
explode(
|
||||
split(lines.value, ' ')
|
||||
).alias('word')
|
||||
split(lines.value, " ")
|
||||
).alias("word")
|
||||
)
|
||||
|
||||
# Generate running word count
|
||||
wordCounts = words.groupBy('word').count()
|
||||
wordCounts = words.groupBy("word").count()
|
||||
{% endhighlight %}
|
||||
|
||||
This `lines` DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named "value", and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have used two built-in SQL functions - split and explode, to split each line into multiple rows with a word each. In addition, we use the function `alias` to name the new column as "word". Finally, we have defined the `wordCounts` DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
|
||||
|
@ -182,8 +182,8 @@ query.awaitTermination();
|
|||
# Start running the query that prints the running counts to the console
|
||||
query = wordCounts \
|
||||
.writeStream \
|
||||
.outputMode('complete')\
|
||||
.format('console')\
|
||||
.outputMode("complete") \
|
||||
.format("console") \
|
||||
.start()
|
||||
|
||||
query.awaitTermination()
|
||||
|
@ -596,7 +596,6 @@ ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API
|
|||
<div data-lang="python" markdown="1">
|
||||
|
||||
{% highlight python %}
|
||||
|
||||
df = ... # streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType }
|
||||
|
||||
# Select the devices which have signal more than 10
|
||||
|
@ -657,7 +656,7 @@ words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String
|
|||
|
||||
# Group the data by window and word and compute the count of each group
|
||||
windowedCounts = words.groupBy(
|
||||
window(words.timestamp, '10 minutes', '5 minutes'),
|
||||
window(words.timestamp, "10 minutes", "5 minutes"),
|
||||
words.word
|
||||
).count()
|
||||
{% endhighlight %}
|
||||
|
|
Loading…
Reference in a new issue