spark-instrumented-optimizer/python/pyspark/java_gateway.py
Josh Rosen 742c44eae6 Don't send SIGINT to Py4J gateway subprocess.
This addresses SPARK-885, a usability issue where PySpark's
Java gateway process would be killed if the user hit ctrl-c.

Note that SIGINT still won't cancel the running s

This fix is based on http://stackoverflow.com/questions/5045771
2013-08-28 16:39:44 -07:00

60 lines
2.3 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import sys
import signal
from subprocess import Popen, PIPE
from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and SPARK_MEM settings from spark-env.sh
command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_function():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_function)
# Determine which ephemeral port the server started on:
port = int(proc.stdout.readline())
# Create a thread to echo output from the GatewayServer, which is required
# for Java log output to show up:
class EchoOutputThread(Thread):
def __init__(self, stream):
Thread.__init__(self)
self.daemon = True
self.stream = stream
def run(self):
while True:
line = self.stream.readline()
sys.stderr.write(line)
EchoOutputThread(proc.stdout).start()
# Connect to the gateway
gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
# Import the classes used by PySpark
java_import(gateway.jvm, "spark.api.java.*")
java_import(gateway.jvm, "spark.api.python.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway