2016-04-20 13:32:01 -04:00
|
|
|
#
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one or more
|
|
|
|
# contributor license agreements. See the NOTICE file distributed with
|
|
|
|
# this work for additional information regarding copyright ownership.
|
|
|
|
# The ASF licenses this file to You under the Apache License, Version 2.0
|
|
|
|
# (the "License"); you may not use this file except in compliance with
|
|
|
|
# the License. You may obtain a copy of the License at
|
|
|
|
#
|
|
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
#
|
|
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
# See the License for the specific language governing permissions and
|
|
|
|
# limitations under the License.
|
|
|
|
#
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
import sys
|
|
|
|
if sys.version >= '3':
|
|
|
|
intlike = int
|
|
|
|
else:
|
|
|
|
intlike = (int, long)
|
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
from abc import ABCMeta, abstractmethod
|
|
|
|
|
|
|
|
from pyspark import since
|
2016-04-28 18:22:28 -04:00
|
|
|
from pyspark.rdd import ignore_unicode_prefix
|
2016-04-20 13:32:01 -04:00
|
|
|
|
|
|
|
__all__ = ["ContinuousQuery"]
|
|
|
|
|
|
|
|
|
|
|
|
class ContinuousQuery(object):
|
|
|
|
"""
|
|
|
|
A handle to a query that is executing continuously in the background as new data arrives.
|
|
|
|
All these methods are thread-safe.
|
|
|
|
|
|
|
|
.. note:: Experimental
|
|
|
|
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
"""
|
|
|
|
|
2016-04-28 18:22:28 -04:00
|
|
|
def __init__(self, jcq):
|
2016-04-20 13:32:01 -04:00
|
|
|
self._jcq = jcq
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
@property
|
|
|
|
@since(2.0)
|
|
|
|
def id(self):
|
|
|
|
"""The id of the continuous query. This id is unique across all queries that have been
|
|
|
|
started in the current process.
|
|
|
|
"""
|
|
|
|
return self._jcq.id()
|
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
@property
|
|
|
|
@since(2.0)
|
|
|
|
def name(self):
|
2016-06-14 05:12:29 -04:00
|
|
|
"""The name of the continuous query. This name is unique across all active queries.
|
2016-04-20 13:32:01 -04:00
|
|
|
"""
|
|
|
|
return self._jcq.name()
|
|
|
|
|
|
|
|
@property
|
|
|
|
@since(2.0)
|
|
|
|
def isActive(self):
|
|
|
|
"""Whether this continuous query is currently active or not.
|
|
|
|
"""
|
|
|
|
return self._jcq.isActive()
|
|
|
|
|
|
|
|
@since(2.0)
|
2016-04-28 18:22:28 -04:00
|
|
|
def awaitTermination(self, timeout=None):
|
2016-04-20 13:32:01 -04:00
|
|
|
"""Waits for the termination of `this` query, either by :func:`query.stop()` or by an
|
|
|
|
exception. If the query has terminated with an exception, then the exception will be thrown.
|
2016-04-28 18:22:28 -04:00
|
|
|
If `timeout` is set, it returns whether the query has terminated or not within the
|
|
|
|
`timeout` seconds.
|
2016-04-20 13:32:01 -04:00
|
|
|
|
|
|
|
If the query has terminated, then all subsequent calls to this method will either return
|
|
|
|
immediately (if the query was terminated by :func:`stop()`), or throw the exception
|
|
|
|
immediately (if the query has terminated with exception).
|
|
|
|
|
2016-04-28 18:22:28 -04:00
|
|
|
throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
|
2016-04-20 13:32:01 -04:00
|
|
|
"""
|
2016-04-28 18:22:28 -04:00
|
|
|
if timeout is not None:
|
|
|
|
if not isinstance(timeout, (int, float)) or timeout < 0:
|
|
|
|
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
|
|
|
|
return self._jcq.awaitTermination(int(timeout * 1000))
|
2016-04-20 13:32:01 -04:00
|
|
|
else:
|
|
|
|
return self._jcq.awaitTermination()
|
|
|
|
|
|
|
|
@since(2.0)
|
|
|
|
def processAllAvailable(self):
|
2016-06-06 04:35:47 -04:00
|
|
|
"""Blocks until all available data in the source has been processed and committed to the
|
2016-04-20 13:32:01 -04:00
|
|
|
sink. This method is intended for testing. Note that in the case of continually arriving
|
|
|
|
data, this method may block forever. Additionally, this method is only guaranteed to block
|
|
|
|
until data that has been synchronously appended data to a stream source prior to invocation.
|
|
|
|
(i.e. `getOffset` must immediately reflect the addition).
|
|
|
|
"""
|
|
|
|
return self._jcq.processAllAvailable()
|
|
|
|
|
|
|
|
@since(2.0)
|
|
|
|
def stop(self):
|
|
|
|
"""Stop this continuous query.
|
|
|
|
"""
|
|
|
|
self._jcq.stop()
|
|
|
|
|
|
|
|
|
2016-04-28 18:22:28 -04:00
|
|
|
class ContinuousQueryManager(object):
|
|
|
|
"""A class to manage all the :class:`ContinuousQuery` ContinuousQueries active
|
|
|
|
on a :class:`SQLContext`.
|
|
|
|
|
|
|
|
.. note:: Experimental
|
|
|
|
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, jcqm):
|
|
|
|
self._jcqm = jcqm
|
|
|
|
|
|
|
|
@property
|
|
|
|
@ignore_unicode_prefix
|
|
|
|
@since(2.0)
|
|
|
|
def active(self):
|
|
|
|
"""Returns a list of active queries associated with this SQLContext
|
|
|
|
|
|
|
|
>>> cq = df.write.format('memory').queryName('this_query').startStream()
|
2016-06-14 05:12:29 -04:00
|
|
|
>>> cqm = spark.streams
|
2016-04-28 18:22:28 -04:00
|
|
|
>>> # get the list of active continuous queries
|
|
|
|
>>> [q.name for q in cqm.active]
|
|
|
|
[u'this_query']
|
|
|
|
>>> cq.stop()
|
|
|
|
"""
|
|
|
|
return [ContinuousQuery(jcq) for jcq in self._jcqm.active()]
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
@ignore_unicode_prefix
|
2016-04-28 18:22:28 -04:00
|
|
|
@since(2.0)
|
2016-06-14 05:12:29 -04:00
|
|
|
def get(self, id):
|
2016-04-28 18:22:28 -04:00
|
|
|
"""Returns an active query from this SQLContext or throws exception if an active query
|
|
|
|
with this name doesn't exist.
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
>>> cq = df.write.format('memory').queryName('this_query').startStream()
|
|
|
|
>>> cq.name
|
|
|
|
u'this_query'
|
|
|
|
>>> cq = spark.streams.get(cq.id)
|
|
|
|
>>> cq.isActive
|
|
|
|
True
|
|
|
|
>>> cq = sqlContext.streams.get(cq.id)
|
2016-04-28 18:22:28 -04:00
|
|
|
>>> cq.isActive
|
|
|
|
True
|
|
|
|
>>> cq.stop()
|
|
|
|
"""
|
2016-06-14 05:12:29 -04:00
|
|
|
if not isinstance(id, intlike):
|
|
|
|
raise ValueError("The id for the query must be an integer. Got: %d" % id)
|
|
|
|
return ContinuousQuery(self._jcqm.get(id))
|
2016-04-28 18:22:28 -04:00
|
|
|
|
|
|
|
@since(2.0)
|
|
|
|
def awaitAnyTermination(self, timeout=None):
|
|
|
|
"""Wait until any of the queries on the associated SQLContext has terminated since the
|
|
|
|
creation of the context, or since :func:`resetTerminated()` was called. If any query was
|
|
|
|
terminated with an exception, then the exception will be thrown.
|
|
|
|
If `timeout` is set, it returns whether the query has terminated or not within the
|
|
|
|
`timeout` seconds.
|
|
|
|
|
|
|
|
If a query has terminated, then subsequent calls to :func:`awaitAnyTermination()` will
|
|
|
|
either return immediately (if the query was terminated by :func:`query.stop()`),
|
|
|
|
or throw the exception immediately (if the query was terminated with exception). Use
|
|
|
|
:func:`resetTerminated()` to clear past terminations and wait for new terminations.
|
|
|
|
|
|
|
|
In the case where multiple queries have terminated since :func:`resetTermination()`
|
|
|
|
was called, if any query has terminated with exception, then :func:`awaitAnyTermination()`
|
|
|
|
will throw any of the exception. For correctly documenting exceptions across multiple
|
|
|
|
queries, users need to stop all of them after any of them terminates with exception, and
|
|
|
|
then check the `query.exception()` for each query.
|
|
|
|
|
|
|
|
throws :class:`ContinuousQueryException`, if `this` query has terminated with an exception
|
|
|
|
"""
|
|
|
|
if timeout is not None:
|
|
|
|
if not isinstance(timeout, (int, float)) or timeout < 0:
|
|
|
|
raise ValueError("timeout must be a positive integer or float. Got %s" % timeout)
|
|
|
|
return self._jcqm.awaitAnyTermination(int(timeout * 1000))
|
|
|
|
else:
|
|
|
|
return self._jcqm.awaitAnyTermination()
|
|
|
|
|
|
|
|
@since(2.0)
|
|
|
|
def resetTerminated(self):
|
|
|
|
"""Forget about past terminated queries so that :func:`awaitAnyTermination()` can be used
|
|
|
|
again to wait for new terminations.
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
>>> spark.streams.resetTerminated()
|
2016-04-28 18:22:28 -04:00
|
|
|
"""
|
|
|
|
self._jcqm.resetTerminated()
|
|
|
|
|
|
|
|
|
2016-04-20 13:32:01 -04:00
|
|
|
class Trigger(object):
|
|
|
|
"""Used to indicate how often results should be produced by a :class:`ContinuousQuery`.
|
|
|
|
|
|
|
|
.. note:: Experimental
|
|
|
|
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
"""
|
|
|
|
|
|
|
|
__metaclass__ = ABCMeta
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
def _to_java_trigger(self, sqlContext):
|
|
|
|
"""Internal method to construct the trigger on the jvm.
|
|
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class ProcessingTime(Trigger):
|
|
|
|
"""A trigger that runs a query periodically based on the processing time. If `interval` is 0,
|
|
|
|
the query will run as fast as possible.
|
|
|
|
|
|
|
|
The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
|
|
|
|
|
|
|
|
.. note:: Experimental
|
|
|
|
|
|
|
|
.. versionadded:: 2.0
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, interval):
|
2016-04-28 18:22:28 -04:00
|
|
|
if type(interval) != str or len(interval.strip()) == 0:
|
2016-04-20 13:32:01 -04:00
|
|
|
raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
|
|
|
|
self.interval = interval
|
|
|
|
|
|
|
|
def _to_java_trigger(self, sqlContext):
|
2016-06-01 13:14:40 -04:00
|
|
|
return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
|
|
|
|
self.interval)
|
2016-04-28 18:22:28 -04:00
|
|
|
|
|
|
|
|
|
|
|
def _test():
|
|
|
|
import doctest
|
|
|
|
import os
|
|
|
|
import tempfile
|
2016-06-14 05:12:29 -04:00
|
|
|
from pyspark.sql import Row, SparkSession, SQLContext
|
|
|
|
import pyspark.sql.streaming
|
2016-04-28 18:22:28 -04:00
|
|
|
|
|
|
|
os.chdir(os.environ["SPARK_HOME"])
|
|
|
|
|
2016-06-14 05:12:29 -04:00
|
|
|
globs = pyspark.sql.streaming.__dict__.copy()
|
|
|
|
try:
|
|
|
|
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
|
|
|
|
except py4j.protocol.Py4JError:
|
|
|
|
spark = SparkSession(sc)
|
2016-04-28 18:22:28 -04:00
|
|
|
|
|
|
|
globs['tempfile'] = tempfile
|
|
|
|
globs['os'] = os
|
2016-06-14 05:12:29 -04:00
|
|
|
globs['spark'] = spark
|
|
|
|
globs['sqlContext'] = SQLContext.getOrCreate(spark.sparkContext)
|
2016-04-28 18:22:28 -04:00
|
|
|
globs['df'] = \
|
2016-06-14 05:12:29 -04:00
|
|
|
globs['spark'].read.format('text').stream('python/test_support/sql/streaming')
|
2016-04-28 18:22:28 -04:00
|
|
|
|
|
|
|
(failure_count, test_count) = doctest.testmod(
|
2016-06-14 05:12:29 -04:00
|
|
|
pyspark.sql.streaming, globs=globs,
|
2016-04-28 18:22:28 -04:00
|
|
|
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF)
|
2016-06-14 05:12:29 -04:00
|
|
|
globs['spark'].stop()
|
2016-04-28 18:22:28 -04:00
|
|
|
if failure_count:
|
|
|
|
exit(-1)
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
_test()
|