2016-03-25 20:37:16 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
|
|
|
import sys
|
|
|
|
if sys.version >= "3":
|
|
|
|
from io import BytesIO
|
|
|
|
else:
|
|
|
|
from StringIO import StringIO
|
[SPARK-22313][PYTHON][FOLLOWUP] Explicitly import warnings namespace in flume.py
## What changes were proposed in this pull request?
This PR explicitly imports the missing `warnings` in `flume.py`.
## How was this patch tested?
Manually tested.
```python
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>> from pyspark.streaming import flume
>>> flume.FlumeUtils.createStream(None, None, None)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/.../spark/python/pyspark/streaming/flume.py", line 60, in createStream
warnings.warn(
NameError: global name 'warnings' is not defined
```
```python
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>> from pyspark.streaming import flume
>>> flume.FlumeUtils.createStream(None, None, None)
/.../spark/python/pyspark/streaming/flume.py:65: DeprecationWarning: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. See SPARK-22142.
DeprecationWarning)
...
```
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #20110 from HyukjinKwon/SPARK-22313-followup.
2017-12-29 00:46:03 -05:00
|
|
|
import warnings
|
|
|
|
|
2016-03-25 20:37:16 -04:00
|
|
|
from py4j.protocol import Py4JJavaError
|
|
|
|
|
|
|
|
from pyspark.storagelevel import StorageLevel
|
|
|
|
from pyspark.serializers import PairDeserializer, NoOpSerializer, UTF8Deserializer, read_int
|
|
|
|
from pyspark.streaming import DStream
|
|
|
|
|
|
|
|
__all__ = ['FlumeUtils', 'utf8_decoder']
|
|
|
|
|
|
|
|
|
|
|
|
def utf8_decoder(s):
|
|
|
|
""" Decode the unicode as UTF-8 """
|
|
|
|
if s is None:
|
|
|
|
return None
|
|
|
|
return s.decode('utf-8')
|
|
|
|
|
|
|
|
|
|
|
|
class FlumeUtils(object):
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def createStream(ssc, hostname, port,
|
|
|
|
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
|
|
|
|
enableDecompression=False,
|
|
|
|
bodyDecoder=utf8_decoder):
|
|
|
|
"""
|
|
|
|
Create an input stream that pulls events from Flume.
|
|
|
|
|
|
|
|
:param ssc: StreamingContext object
|
|
|
|
:param hostname: Hostname of the slave machine to which the flume data will be sent
|
|
|
|
:param port: Port of the slave machine to which the flume data will be sent
|
|
|
|
:param storageLevel: Storage level to use for storing the received objects
|
|
|
|
:param enableDecompression: Should netty server decompress input stream
|
|
|
|
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
|
|
|
|
:return: A DStream object
|
2017-10-06 10:08:28 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-22142.
|
2016-03-25 20:37:16 -04:00
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-22142.",
|
|
|
|
DeprecationWarning)
|
2016-03-25 20:37:16 -04:00
|
|
|
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
|
|
|
|
helper = FlumeUtils._get_helper(ssc._sc)
|
|
|
|
jstream = helper.createStream(ssc._jssc, hostname, port, jlevel, enableDecompression)
|
|
|
|
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def createPollingStream(ssc, addresses,
|
|
|
|
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
|
|
|
|
maxBatchSize=1000,
|
|
|
|
parallelism=5,
|
|
|
|
bodyDecoder=utf8_decoder):
|
|
|
|
"""
|
|
|
|
Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
|
|
|
|
This stream will poll the sink for data and will pull events as they are available.
|
|
|
|
|
|
|
|
:param ssc: StreamingContext object
|
|
|
|
:param addresses: List of (host, port)s on which the Spark Sink is running.
|
|
|
|
:param storageLevel: Storage level to use for storing the received objects
|
|
|
|
:param maxBatchSize: The maximum number of events to be pulled from the Spark sink
|
|
|
|
in a single RPC call
|
|
|
|
:param parallelism: Number of concurrent requests this stream should send to the sink.
|
|
|
|
Note that having a higher number of requests concurrently being pulled
|
|
|
|
will result in this stream using more threads
|
|
|
|
:param bodyDecoder: A function used to decode body (default is utf8_decoder)
|
|
|
|
:return: A DStream object
|
2017-10-06 10:08:28 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-22142.
|
2016-03-25 20:37:16 -04:00
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Flume support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-22142.",
|
|
|
|
DeprecationWarning)
|
2016-03-25 20:37:16 -04:00
|
|
|
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
|
|
|
|
hosts = []
|
|
|
|
ports = []
|
|
|
|
for (host, port) in addresses:
|
|
|
|
hosts.append(host)
|
|
|
|
ports.append(port)
|
|
|
|
helper = FlumeUtils._get_helper(ssc._sc)
|
|
|
|
jstream = helper.createPollingStream(
|
|
|
|
ssc._jssc, hosts, ports, jlevel, maxBatchSize, parallelism)
|
|
|
|
return FlumeUtils._toPythonDStream(ssc, jstream, bodyDecoder)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _toPythonDStream(ssc, jstream, bodyDecoder):
|
|
|
|
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
|
|
|
|
stream = DStream(jstream, ssc, ser)
|
|
|
|
|
|
|
|
def func(event):
|
|
|
|
headersBytes = BytesIO(event[0]) if sys.version >= "3" else StringIO(event[0])
|
|
|
|
headers = {}
|
|
|
|
strSer = UTF8Deserializer()
|
|
|
|
for i in range(0, read_int(headersBytes)):
|
|
|
|
key = strSer.loads(headersBytes)
|
|
|
|
value = strSer.loads(headersBytes)
|
|
|
|
headers[key] = value
|
|
|
|
body = bodyDecoder(event[1])
|
|
|
|
return (headers, body)
|
|
|
|
return stream.map(func)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _get_helper(sc):
|
|
|
|
try:
|
|
|
|
return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
|
|
|
|
except TypeError as e:
|
|
|
|
if str(e) == "'JavaPackage' object is not callable":
|
|
|
|
FlumeUtils._printErrorMsg(sc)
|
|
|
|
raise
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _printErrorMsg(sc):
|
|
|
|
print("""
|
|
|
|
________________________________________________________________________________________________
|
|
|
|
|
|
|
|
Spark Streaming's Flume libraries not found in class path. Try one of the following.
|
|
|
|
|
|
|
|
1. Include the Flume library and its dependencies with in the
|
|
|
|
spark-submit command as
|
|
|
|
|
|
|
|
$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:%s ...
|
|
|
|
|
|
|
|
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
|
|
|
|
Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = %s.
|
|
|
|
Then, include the jar in the spark-submit command as
|
|
|
|
|
|
|
|
$ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
|
|
|
|
|
|
|
|
________________________________________________________________________________________________
|
|
|
|
|
|
|
|
""" % (sc.version, sc.version))
|