[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
import warnings
|
|
|
|
|
2015-10-20 13:52:49 -04:00
|
|
|
from py4j.protocol import Py4JJavaError
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
|
2015-04-28 02:48:02 -04:00
|
|
|
from pyspark.rdd import RDD
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
from pyspark.storagelevel import StorageLevel
|
2015-11-17 19:57:52 -05:00
|
|
|
from pyspark.serializers import AutoBatchedSerializer, PickleSerializer, PairDeserializer, \
|
|
|
|
NoOpSerializer
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
from pyspark.streaming import DStream
|
2015-07-09 16:54:44 -04:00
|
|
|
from pyspark.streaming.dstream import TransformedDStream
|
|
|
|
from pyspark.streaming.util import TransformFunction
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
|
2015-11-17 19:57:52 -05:00
|
|
|
__all__ = ['Broker', 'KafkaMessageAndMetadata', 'KafkaUtils', 'OffsetRange',
|
|
|
|
'TopicAndPartition', 'utf8_decoder']
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
|
|
|
|
|
|
|
|
def utf8_decoder(s):
|
|
|
|
""" Decode the unicode as UTF-8 """
|
2015-08-19 21:36:01 -04:00
|
|
|
if s is None:
|
|
|
|
return None
|
|
|
|
return s.decode('utf-8')
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
|
|
|
|
|
|
|
|
class KafkaUtils(object):
|
|
|
|
|
|
|
|
@staticmethod
|
2015-08-14 15:46:05 -04:00
|
|
|
def createStream(ssc, zkQuorum, groupId, topics, kafkaParams=None,
|
2015-12-18 23:06:05 -05:00
|
|
|
storageLevel=StorageLevel.MEMORY_AND_DISK_2,
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder):
|
|
|
|
"""
|
|
|
|
Create an input stream that pulls messages from a Kafka Broker.
|
|
|
|
|
|
|
|
:param ssc: StreamingContext object
|
|
|
|
:param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
|
|
|
|
:param groupId: The group id for this consumer.
|
|
|
|
:param topics: Dict of (topic_name -> numPartitions) to consume.
|
|
|
|
Each partition is consumed in its own thread.
|
|
|
|
:param kafkaParams: Additional params for Kafka
|
|
|
|
:param storageLevel: RDD storage level.
|
|
|
|
:param keyDecoder: A function used to decode key (default is utf8_decoder)
|
|
|
|
:param valueDecoder: A function used to decode value (default is utf8_decoder)
|
|
|
|
:return: A DStream object
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-08-14 15:46:05 -04:00
|
|
|
if kafkaParams is None:
|
|
|
|
kafkaParams = dict()
|
[SPARK-5154] [PySpark] [Streaming] Kafka streaming support in Python
This PR brings the Python API for Spark Streaming Kafka data source.
```
class KafkaUtils(__builtin__.object)
| Static methods defined here:
|
| createStream(ssc, zkQuorum, groupId, topics, storageLevel=StorageLevel(True, True, False, False,
2), keyDecoder=<function utf8_decoder>, valueDecoder=<function utf8_decoder>)
| Create an input stream that pulls messages from a Kafka Broker.
|
| :param ssc: StreamingContext object
| :param zkQuorum: Zookeeper quorum (hostname:port,hostname:port,..).
| :param groupId: The group id for this consumer.
| :param topics: Dict of (topic_name -> numPartitions) to consume.
| Each partition is consumed in its own thread.
| :param storageLevel: RDD storage level.
| :param keyDecoder: A function used to decode key
| :param valueDecoder: A function used to decode value
| :return: A DStream object
```
run the example:
```
bin/spark-submit --driver-class-path external/kafka-assembly/target/scala-*/spark-streaming-kafka-assembly-*.jar examples/src/main/python/streaming/kafka_wordcount.py localhost:2181 test
```
Author: Davies Liu <davies@databricks.com>
Author: Tathagata Das <tdas@databricks.com>
Closes #3715 from davies/kafka and squashes the following commits:
d93bfe0 [Davies Liu] Update make-distribution.sh
4280d04 [Davies Liu] address comments
e6d0427 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
f257071 [Davies Liu] add tests for null in RDD
23b039a [Davies Liu] address comments
9af51c4 [Davies Liu] Merge branch 'kafka' of github.com:davies/spark into kafka
a74da87 [Davies Liu] address comments
dc1eed0 [Davies Liu] Update kafka_wordcount.py
31e2317 [Davies Liu] Update kafka_wordcount.py
370ba61 [Davies Liu] Update kafka.py
97386b3 [Davies Liu] address comment
2c567a5 [Davies Liu] update logging and comment
33730d1 [Davies Liu] Merge branch 'master' of github.com:apache/spark into kafka
adeeb38 [Davies Liu] Merge pull request #3 from tdas/kafka-python-api
aea8953 [Tathagata Das] Kafka-assembly for Python API
eea16a7 [Davies Liu] refactor
f6ce899 [Davies Liu] add example and fix bugs
98c8d17 [Davies Liu] fix python style
5697a01 [Davies Liu] bypass decoder in scala
048dbe6 [Davies Liu] fix python style
75d485e [Davies Liu] add mqtt
07923c4 [Davies Liu] support kafka in Python
2015-02-02 22:16:27 -05:00
|
|
|
kafkaParams.update({
|
|
|
|
"zookeeper.connect": zkQuorum,
|
|
|
|
"group.id": groupId,
|
|
|
|
"zookeeper.connection.timeout.ms": "10000",
|
|
|
|
})
|
|
|
|
if not isinstance(topics, dict):
|
|
|
|
raise TypeError("topics should be dict")
|
|
|
|
jlevel = ssc._sc._getJavaStorageLevel(storageLevel)
|
2016-03-11 14:18:51 -05:00
|
|
|
helper = KafkaUtils._get_helper(ssc._sc)
|
|
|
|
jstream = helper.createStream(ssc._jssc, kafkaParams, topics, jlevel)
|
2015-04-28 02:48:02 -04:00
|
|
|
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
|
|
|
|
stream = DStream(jstream, ssc, ser)
|
|
|
|
return stream.map(lambda k_v: (keyDecoder(k_v[0]), valueDecoder(k_v[1])))
|
|
|
|
|
|
|
|
@staticmethod
|
2015-08-14 15:46:05 -04:00
|
|
|
def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
|
2015-11-17 19:57:52 -05:00
|
|
|
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
|
|
|
|
messageHandler=None):
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
|
|
|
Create an input stream that directly pulls messages from a Kafka Broker and specific offset.
|
|
|
|
|
|
|
|
This is not a receiver based Kafka input stream, it directly pulls the message from Kafka
|
|
|
|
in each batch duration and processed without storing.
|
|
|
|
|
|
|
|
This does not use Zookeeper to store offsets. The consumed offsets are tracked
|
|
|
|
by the stream itself. For interoperability with Kafka monitoring tools that depend on
|
|
|
|
Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
|
|
|
|
You can access the offsets used in each batch from the generated RDDs (see
|
|
|
|
|
|
|
|
To recover from driver failures, you have to enable checkpointing in the StreamingContext.
|
|
|
|
The information on consumed offset can be recovered from the checkpoint.
|
|
|
|
See the programming guide for details (constraints, etc.).
|
|
|
|
|
|
|
|
:param ssc: StreamingContext object.
|
|
|
|
:param topics: list of topic_name to consume.
|
|
|
|
:param kafkaParams: Additional params for Kafka.
|
|
|
|
:param fromOffsets: Per-topic/partition Kafka offsets defining the (inclusive) starting
|
2018-04-18 22:00:57 -04:00
|
|
|
point of the stream (a dictionary mapping `TopicAndPartition` to
|
|
|
|
integers).
|
2015-04-28 02:48:02 -04:00
|
|
|
:param keyDecoder: A function used to decode key (default is utf8_decoder).
|
|
|
|
:param valueDecoder: A function used to decode value (default is utf8_decoder).
|
2015-11-17 19:57:52 -05:00
|
|
|
:param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess
|
|
|
|
meta using messageHandler (default is None).
|
2015-04-28 02:48:02 -04:00
|
|
|
:return: A DStream object
|
2017-09-13 05:10:40 -04:00
|
|
|
|
|
|
|
.. note:: Experimental
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-08-14 15:46:05 -04:00
|
|
|
if fromOffsets is None:
|
|
|
|
fromOffsets = dict()
|
2015-04-28 02:48:02 -04:00
|
|
|
if not isinstance(topics, list):
|
|
|
|
raise TypeError("topics should be list")
|
|
|
|
if not isinstance(kafkaParams, dict):
|
|
|
|
raise TypeError("kafkaParams should be dict")
|
|
|
|
|
2015-11-17 19:57:52 -05:00
|
|
|
def funcWithoutMessageHandler(k_v):
|
|
|
|
return (keyDecoder(k_v[0]), valueDecoder(k_v[1]))
|
|
|
|
|
|
|
|
def funcWithMessageHandler(m):
|
|
|
|
m._set_key_decoder(keyDecoder)
|
|
|
|
m._set_value_decoder(valueDecoder)
|
|
|
|
return messageHandler(m)
|
|
|
|
|
2016-03-11 14:18:51 -05:00
|
|
|
helper = KafkaUtils._get_helper(ssc._sc)
|
|
|
|
|
|
|
|
jfromOffsets = dict([(k._jTopicAndPartition(helper),
|
|
|
|
v) for (k, v) in fromOffsets.items()])
|
|
|
|
if messageHandler is None:
|
|
|
|
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
|
|
|
|
func = funcWithoutMessageHandler
|
|
|
|
jstream = helper.createDirectStreamWithoutMessageHandler(
|
|
|
|
ssc._jssc, kafkaParams, set(topics), jfromOffsets)
|
|
|
|
else:
|
|
|
|
ser = AutoBatchedSerializer(PickleSerializer())
|
|
|
|
func = funcWithMessageHandler
|
|
|
|
jstream = helper.createDirectStreamWithMessageHandler(
|
|
|
|
ssc._jssc, kafkaParams, set(topics), jfromOffsets)
|
2015-04-28 02:48:02 -04:00
|
|
|
|
2015-11-17 19:57:52 -05:00
|
|
|
stream = DStream(jstream, ssc, ser).map(func)
|
2015-07-09 16:54:44 -04:00
|
|
|
return KafkaDStream(stream._jdstream, ssc, stream._jrdd_deserializer)
|
2015-04-28 02:48:02 -04:00
|
|
|
|
|
|
|
@staticmethod
|
2015-08-14 15:46:05 -04:00
|
|
|
def createRDD(sc, kafkaParams, offsetRanges, leaders=None,
|
2015-11-17 19:57:52 -05:00
|
|
|
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
|
|
|
|
messageHandler=None):
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
2016-11-19 06:24:15 -05:00
|
|
|
Create an RDD from Kafka using offset ranges for each topic and partition.
|
2015-05-18 11:35:14 -04:00
|
|
|
|
2015-04-28 02:48:02 -04:00
|
|
|
:param sc: SparkContext object
|
|
|
|
:param kafkaParams: Additional params for Kafka
|
|
|
|
:param offsetRanges: list of offsetRange to specify topic:partition:[start, end) to consume
|
|
|
|
:param leaders: Kafka brokers for each TopicAndPartition in offsetRanges. May be an empty
|
2015-05-18 11:35:14 -04:00
|
|
|
map, in which case leaders will be looked up on the driver.
|
2015-04-28 02:48:02 -04:00
|
|
|
:param keyDecoder: A function used to decode key (default is utf8_decoder)
|
|
|
|
:param valueDecoder: A function used to decode value (default is utf8_decoder)
|
2015-11-17 19:57:52 -05:00
|
|
|
:param messageHandler: A function used to convert KafkaMessageAndMetadata. You can assess
|
|
|
|
meta using messageHandler (default is None).
|
2016-11-19 06:24:15 -05:00
|
|
|
:return: An RDD object
|
2017-09-13 05:10:40 -04:00
|
|
|
|
|
|
|
.. note:: Experimental
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-08-14 15:46:05 -04:00
|
|
|
if leaders is None:
|
|
|
|
leaders = dict()
|
2015-04-28 02:48:02 -04:00
|
|
|
if not isinstance(kafkaParams, dict):
|
|
|
|
raise TypeError("kafkaParams should be dict")
|
|
|
|
if not isinstance(offsetRanges, list):
|
|
|
|
raise TypeError("offsetRanges should be list")
|
|
|
|
|
2015-11-17 19:57:52 -05:00
|
|
|
def funcWithoutMessageHandler(k_v):
|
|
|
|
return (keyDecoder(k_v[0]), valueDecoder(k_v[1]))
|
|
|
|
|
|
|
|
def funcWithMessageHandler(m):
|
|
|
|
m._set_key_decoder(keyDecoder)
|
|
|
|
m._set_value_decoder(valueDecoder)
|
|
|
|
return messageHandler(m)
|
|
|
|
|
2016-03-11 14:18:51 -05:00
|
|
|
helper = KafkaUtils._get_helper(sc)
|
|
|
|
|
|
|
|
joffsetRanges = [o._jOffsetRange(helper) for o in offsetRanges]
|
|
|
|
jleaders = dict([(k._jTopicAndPartition(helper),
|
|
|
|
v._jBroker(helper)) for (k, v) in leaders.items()])
|
|
|
|
if messageHandler is None:
|
|
|
|
jrdd = helper.createRDDWithoutMessageHandler(
|
|
|
|
sc._jsc, kafkaParams, joffsetRanges, jleaders)
|
|
|
|
ser = PairDeserializer(NoOpSerializer(), NoOpSerializer())
|
|
|
|
rdd = RDD(jrdd, sc, ser).map(funcWithoutMessageHandler)
|
|
|
|
else:
|
|
|
|
jrdd = helper.createRDDWithMessageHandler(
|
|
|
|
sc._jsc, kafkaParams, joffsetRanges, jleaders)
|
|
|
|
rdd = RDD(jrdd, sc).map(funcWithMessageHandler)
|
|
|
|
|
|
|
|
return KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _get_helper(sc):
|
2015-04-28 02:48:02 -04:00
|
|
|
try:
|
2016-03-14 15:22:02 -04:00
|
|
|
return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
|
|
|
|
except TypeError as e:
|
|
|
|
if str(e) == "'JavaPackage' object is not callable":
|
2015-04-28 02:48:02 -04:00
|
|
|
KafkaUtils._printErrorMsg(sc)
|
2016-03-11 14:18:51 -05:00
|
|
|
raise
|
2015-04-28 02:48:02 -04:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def _printErrorMsg(sc):
|
|
|
|
print("""
|
2015-02-26 16:46:07 -05:00
|
|
|
________________________________________________________________________________________________
|
|
|
|
|
|
|
|
Spark Streaming's Kafka libraries not found in class path. Try one of the following.
|
|
|
|
|
|
|
|
1. Include the Kafka library and its dependencies with in the
|
|
|
|
spark-submit command as
|
|
|
|
|
2016-05-11 15:15:41 -04:00
|
|
|
$ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:%s ...
|
2015-02-26 16:46:07 -05:00
|
|
|
|
|
|
|
2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
|
2016-05-11 15:15:41 -04:00
|
|
|
Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = %s.
|
2015-03-02 03:49:19 -05:00
|
|
|
Then, include the jar in the spark-submit command as
|
2015-02-26 16:46:07 -05:00
|
|
|
|
2016-05-11 15:15:41 -04:00
|
|
|
$ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...
|
2015-02-26 16:46:07 -05:00
|
|
|
|
|
|
|
________________________________________________________________________________________________
|
|
|
|
|
2015-04-28 02:48:02 -04:00
|
|
|
""" % (sc.version, sc.version))
|
|
|
|
|
|
|
|
|
|
|
|
class OffsetRange(object):
|
|
|
|
"""
|
|
|
|
Represents a range of offsets from a single Kafka TopicAndPartition.
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, topic, partition, fromOffset, untilOffset):
|
|
|
|
"""
|
2016-05-27 01:39:14 -04:00
|
|
|
Create an OffsetRange to represent range of offsets
|
2015-04-28 02:48:02 -04:00
|
|
|
:param topic: Kafka topic name.
|
|
|
|
:param partition: Kafka partition id.
|
|
|
|
:param fromOffset: Inclusive starting offset.
|
|
|
|
:param untilOffset: Exclusive ending offset.
|
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-07-09 16:54:44 -04:00
|
|
|
self.topic = topic
|
|
|
|
self.partition = partition
|
|
|
|
self.fromOffset = fromOffset
|
|
|
|
self.untilOffset = untilOffset
|
|
|
|
|
|
|
|
def __eq__(self, other):
|
|
|
|
if isinstance(other, self.__class__):
|
|
|
|
return (self.topic == other.topic
|
|
|
|
and self.partition == other.partition
|
|
|
|
and self.fromOffset == other.fromOffset
|
|
|
|
and self.untilOffset == other.untilOffset)
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
|
return not self.__eq__(other)
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return "OffsetRange(topic: %s, partition: %d, range: [%d -> %d]" \
|
|
|
|
% (self.topic, self.partition, self.fromOffset, self.untilOffset)
|
2015-04-28 02:48:02 -04:00
|
|
|
|
|
|
|
def _jOffsetRange(self, helper):
|
2015-07-09 16:54:44 -04:00
|
|
|
return helper.createOffsetRange(self.topic, self.partition, self.fromOffset,
|
|
|
|
self.untilOffset)
|
2015-04-28 02:48:02 -04:00
|
|
|
|
|
|
|
|
|
|
|
class TopicAndPartition(object):
|
|
|
|
"""
|
2017-02-17 10:10:29 -05:00
|
|
|
Represents a specific topic and partition for Kafka.
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, topic, partition):
|
|
|
|
"""
|
|
|
|
Create a Python TopicAndPartition to map to the Java related object
|
|
|
|
:param topic: Kafka topic name.
|
|
|
|
:param partition: Kafka partition id.
|
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-04-28 02:48:02 -04:00
|
|
|
self._topic = topic
|
|
|
|
self._partition = partition
|
|
|
|
|
|
|
|
def _jTopicAndPartition(self, helper):
|
|
|
|
return helper.createTopicAndPartition(self._topic, self._partition)
|
|
|
|
|
2015-10-27 04:29:06 -04:00
|
|
|
def __eq__(self, other):
|
|
|
|
if isinstance(other, self.__class__):
|
|
|
|
return (self._topic == other._topic
|
|
|
|
and self._partition == other._partition)
|
|
|
|
else:
|
|
|
|
return False
|
|
|
|
|
|
|
|
def __ne__(self, other):
|
|
|
|
return not self.__eq__(other)
|
|
|
|
|
2016-08-09 12:44:43 -04:00
|
|
|
def __hash__(self):
|
|
|
|
return (self._topic, self._partition).__hash__()
|
|
|
|
|
2015-04-28 02:48:02 -04:00
|
|
|
|
|
|
|
class Broker(object):
|
|
|
|
"""
|
|
|
|
Represent the host and port info for a Kafka broker.
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-04-28 02:48:02 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, host, port):
|
|
|
|
"""
|
|
|
|
Create a Python Broker to map to the Java related object.
|
|
|
|
:param host: Broker's hostname.
|
|
|
|
:param port: Broker's port.
|
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-04-28 02:48:02 -04:00
|
|
|
self._host = host
|
|
|
|
self._port = port
|
|
|
|
|
|
|
|
def _jBroker(self, helper):
|
|
|
|
return helper.createBroker(self._host, self._port)
|
2015-07-09 16:54:44 -04:00
|
|
|
|
|
|
|
|
|
|
|
class KafkaRDD(RDD):
|
|
|
|
"""
|
|
|
|
A Python wrapper of KafkaRDD, to provide additional information on normal RDD.
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-07-09 16:54:44 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, jrdd, ctx, jrdd_deserializer):
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-07-09 16:54:44 -04:00
|
|
|
RDD.__init__(self, jrdd, ctx, jrdd_deserializer)
|
|
|
|
|
|
|
|
def offsetRanges(self):
|
|
|
|
"""
|
|
|
|
Get the OffsetRange of specific KafkaRDD.
|
|
|
|
:return: A list of OffsetRange
|
|
|
|
"""
|
2016-03-11 14:18:51 -05:00
|
|
|
helper = KafkaUtils._get_helper(self.ctx)
|
|
|
|
joffsetRanges = helper.offsetRangesOfKafkaRDD(self._jrdd.rdd())
|
2015-07-09 16:54:44 -04:00
|
|
|
ranges = [OffsetRange(o.topic(), o.partition(), o.fromOffset(), o.untilOffset())
|
|
|
|
for o in joffsetRanges]
|
|
|
|
return ranges
|
|
|
|
|
|
|
|
|
|
|
|
class KafkaDStream(DStream):
|
|
|
|
"""
|
|
|
|
A Python wrapper of KafkaDStream
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-07-09 16:54:44 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, jdstream, ssc, jrdd_deserializer):
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-07-09 16:54:44 -04:00
|
|
|
DStream.__init__(self, jdstream, ssc, jrdd_deserializer)
|
|
|
|
|
|
|
|
def foreachRDD(self, func):
|
|
|
|
"""
|
|
|
|
Apply a function to each RDD in this DStream.
|
|
|
|
"""
|
|
|
|
if func.__code__.co_argcount == 1:
|
|
|
|
old_func = func
|
|
|
|
func = lambda r, rdd: old_func(rdd)
|
|
|
|
jfunc = TransformFunction(self._sc, func, self._jrdd_deserializer) \
|
|
|
|
.rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
|
|
|
|
api = self._ssc._jvm.PythonDStream
|
|
|
|
api.callForeachRDD(self._jdstream, jfunc)
|
|
|
|
|
|
|
|
def transform(self, func):
|
|
|
|
"""
|
|
|
|
Return a new DStream in which each RDD is generated by applying a function
|
|
|
|
on each RDD of this DStream.
|
|
|
|
|
|
|
|
`func` can have one argument of `rdd`, or have two arguments of
|
|
|
|
(`time`, `rdd`)
|
|
|
|
"""
|
|
|
|
if func.__code__.co_argcount == 1:
|
|
|
|
oldfunc = func
|
|
|
|
func = lambda t, rdd: oldfunc(rdd)
|
|
|
|
assert func.__code__.co_argcount == 2, "func should take one or two arguments"
|
|
|
|
|
|
|
|
return KafkaTransformedDStream(self, func)
|
|
|
|
|
|
|
|
|
|
|
|
class KafkaTransformedDStream(TransformedDStream):
|
|
|
|
"""
|
|
|
|
Kafka specific wrapper of TransformedDStream to transform on Kafka RDD.
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-07-09 16:54:44 -04:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, prev, func):
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-07-09 16:54:44 -04:00
|
|
|
TransformedDStream.__init__(self, prev, func)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def _jdstream(self):
|
|
|
|
if self._jdstream_val is not None:
|
|
|
|
return self._jdstream_val
|
|
|
|
|
|
|
|
jfunc = TransformFunction(self._sc, self.func, self.prev._jrdd_deserializer) \
|
|
|
|
.rdd_wrapper(lambda jrdd, ctx, ser: KafkaRDD(jrdd, ctx, ser))
|
|
|
|
dstream = self._sc._jvm.PythonTransformedDStream(self.prev._jdstream.dstream(), jfunc)
|
|
|
|
self._jdstream_val = dstream.asJavaDStream()
|
|
|
|
return self._jdstream_val
|
2015-11-17 19:57:52 -05:00
|
|
|
|
|
|
|
|
|
|
|
class KafkaMessageAndMetadata(object):
|
|
|
|
"""
|
|
|
|
Kafka message and metadata information. Including topic, partition, offset and message
|
2017-09-13 05:10:40 -04:00
|
|
|
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
.. note:: Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0.
|
|
|
|
See SPARK-21893.
|
2015-11-17 19:57:52 -05:00
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, topic, partition, offset, key, message):
|
|
|
|
"""
|
|
|
|
Python wrapper of Kafka MessageAndMetadata
|
|
|
|
:param topic: topic name of this Kafka message
|
|
|
|
:param partition: partition id of this Kafka message
|
|
|
|
:param offset: Offset of this Kafka message in the specific partition
|
|
|
|
:param key: key payload of this Kafka message, can be null if this Kafka message has no key
|
|
|
|
specified, the return data is undecoded bytearry.
|
|
|
|
:param message: actual message payload of this Kafka message, the return data is
|
|
|
|
undecoded bytearray.
|
|
|
|
"""
|
[SPARK-22313][PYTHON] Mark/print deprecation warnings as DeprecationWarning for deprecated APIs
## What changes were proposed in this pull request?
This PR proposes to mark the existing warnings as `DeprecationWarning` and print out warnings for deprecated functions.
This could be actually useful for Spark app developers. I use (old) PyCharm and this IDE can detect this specific `DeprecationWarning` in some cases:
**Before**
<img src="https://user-images.githubusercontent.com/6477701/31762664-df68d9f8-b4f6-11e7-8773-f0468f70a2cc.png" height="45" />
**After**
<img src="https://user-images.githubusercontent.com/6477701/31762662-de4d6868-b4f6-11e7-98dc-3c8446a0c28a.png" height="70" />
For console usage, `DeprecationWarning` is usually disabled (see https://docs.python.org/2/library/warnings.html#warning-categories and https://docs.python.org/3/library/warnings.html#warning-categories):
```
>>> import warnings
>>> filter(lambda f: f[2] == DeprecationWarning, warnings.filters)
[('ignore', <_sre.SRE_Pattern object at 0x10ba58c00>, <type 'exceptions.DeprecationWarning'>, <_sre.SRE_Pattern object at 0x10bb04138>, 0), ('ignore', None, <type 'exceptions.DeprecationWarning'>, None, 0)]
```
so, it won't actually mess up the terminal much unless it is intended.
If this is intendedly enabled, it'd should as below:
```
>>> import warnings
>>> warnings.simplefilter('always', DeprecationWarning)
>>>
>>> from pyspark.sql import functions
>>> functions.approxCountDistinct("a")
.../spark/python/pyspark/sql/functions.py:232: DeprecationWarning: Deprecated in 2.1, use approx_count_distinct instead.
"Deprecated in 2.1, use approx_count_distinct instead.", DeprecationWarning)
...
```
These instances were found by:
```
cd python/pyspark
grep -r "Deprecated" .
grep -r "deprecated" .
grep -r "deprecate" .
```
## How was this patch tested?
Manually tested.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #19535 from HyukjinKwon/deprecated-warning.
2017-10-23 23:44:47 -04:00
|
|
|
warnings.warn(
|
|
|
|
"Deprecated in 2.3.0. Kafka 0.8 support is deprecated as of Spark 2.3.0. "
|
|
|
|
"See SPARK-21893.",
|
|
|
|
DeprecationWarning)
|
2015-11-17 19:57:52 -05:00
|
|
|
self.topic = topic
|
|
|
|
self.partition = partition
|
|
|
|
self.offset = offset
|
|
|
|
self._rawKey = key
|
|
|
|
self._rawMessage = message
|
|
|
|
self._keyDecoder = utf8_decoder
|
|
|
|
self._valueDecoder = utf8_decoder
|
|
|
|
|
|
|
|
def __str__(self):
|
|
|
|
return "KafkaMessageAndMetadata(topic: %s, partition: %d, offset: %d, key and message...)" \
|
|
|
|
% (self.topic, self.partition, self.offset)
|
|
|
|
|
|
|
|
def __repr__(self):
|
|
|
|
return self.__str__()
|
|
|
|
|
|
|
|
def __reduce__(self):
|
|
|
|
return (KafkaMessageAndMetadata,
|
|
|
|
(self.topic, self.partition, self.offset, self._rawKey, self._rawMessage))
|
|
|
|
|
|
|
|
def _set_key_decoder(self, decoder):
|
|
|
|
self._keyDecoder = decoder
|
|
|
|
|
|
|
|
def _set_value_decoder(self, decoder):
|
|
|
|
self._valueDecoder = decoder
|
|
|
|
|
|
|
|
@property
|
|
|
|
def key(self):
|
|
|
|
return self._keyDecoder(self._rawKey)
|
|
|
|
|
|
|
|
@property
|
|
|
|
def message(self):
|
|
|
|
return self._valueDecoder(self._rawMessage)
|