diff --git a/pyspark.cmd b/pyspark.cmd new file mode 100644 index 0000000000..7c26fbbac2 --- /dev/null +++ b/pyspark.cmd @@ -0,0 +1,23 @@ +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem This is the entry point for running PySpark. To avoid polluting the +rem environment, it just launches a new cmd to do the real work. + +cmd /V /E /C %~dp0pyspark2.cmd %* diff --git a/pyspark2.cmd b/pyspark2.cmd new file mode 100644 index 0000000000..f58e349643 --- /dev/null +++ b/pyspark2.cmd @@ -0,0 +1,55 @@ +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +set SCALA_VERSION=2.9.3 + +rem Figure out where the Spark framework is installed +set FWDIR=%~dp0 + +rem Export this as SPARK_HOME +set SPARK_HOME=%FWDIR% + +rem Test whether the user has built Spark +if exist "%FWDIR%RELEASE" goto skip_build_test +set FOUND_JAR=0 +for %%d in ("%FWDIR%assembly\target\scala-%SCALA_VERSION%\spark-assembly*hadoop*.jar") do ( + set FOUND_JAR=1 +) +if "%FOUND_JAR%"=="0" ( + echo Failed to find Spark assembly JAR. + echo You need to build Spark with sbt\sbt assembly before running this program. + goto exit +) +:skip_build_test + +rem Load environment variables from conf\spark-env.cmd, if it exists +if exist "%FWDIR%conf\spark-env.cmd" call "%FWDIR%conf\spark-env.cmd" + +rem Figure out which Python to use. +if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python + +set PYTHONPATH=%FWDIR%python;%PYTHONPATH% + +set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% +set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py + +echo Running %PYSPARK_PYTHON% with PYTHONPATH=%PYTHONPATH% + +"%PYSPARK_PYTHON%" %* +:exit diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 26fbe0f080..e615c1e9b6 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -18,6 +18,7 @@ import os import sys import signal +import platform from subprocess import Popen, PIPE from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -29,12 +30,18 @@ SPARK_HOME = os.environ["SPARK_HOME"] def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and SPARK_MEM settings from spark-env.sh - command = [os.path.join(SPARK_HOME, "spark-class"), "py4j.GatewayServer", + on_windows = platform.system() == "Windows" + script = "spark-class.cmd" if on_windows else "spark-class" + command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] - # Don't send ctrl-c / SIGINT to the Java gateway: - def preexec_function(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_function) + if not on_windows: + # Don't send ctrl-c / SIGINT to the Java gateway: + def preexec_func(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + else: + # preexec_fn not supported on Windows + proc = Popen(command, stdout=PIPE, stdin=PIPE) # Determine which ephemeral port the server started on: port = int(proc.stdout.readline()) # Create a thread to echo output from the GatewayServer, which is required