[SPARK-23240][PYTHON] Better error message when extraneous data in pyspark.daemon's stdout

## What changes were proposed in this pull request?

Print more helpful message when daemon module's stdout is empty or contains a bad port number.

## How was this patch tested?

Manually recreated the environmental issues that caused the mysterious exceptions at one site. Tested that the expected messages are logged.

Also, ran all scala unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Bruce Robbins <bersprockets@gmail.com>

Closes #20424 from bersprockets/SPARK-23240_prop2.
This commit is contained in:
Bruce Robbins 2018-02-20 20:26:26 +09:00 committed by hyukjinkwon
parent aadf9535b4
commit 862fa697d8

View file

@ -17,7 +17,7 @@
package org.apache.spark.api.python
import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, OutputStreamWriter}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.util.Arrays
@ -182,7 +182,8 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
val command = Arrays.asList(pythonExec, "-m", daemonModule)
val pb = new ProcessBuilder(command)
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
@ -191,7 +192,29 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
daemon = pb.start()
val in = new DataInputStream(daemon.getInputStream)
daemonPort = in.readInt()
try {
daemonPort = in.readInt()
} catch {
case _: EOFException =>
throw new SparkException(s"No port number in $daemonModule's stdout")
}
// test that the returned port number is within a valid range.
// note: this does not cover the case where the port number
// is arbitrary data but is also coincidentally within range
if (daemonPort < 1 || daemonPort > 0xffff) {
val exceptionMessage = f"""
|Bad data in $daemonModule's standard output. Invalid port number:
| $daemonPort (0x$daemonPort%08x)
|Python command to execute the daemon was:
| ${command.asScala.mkString(" ")}
|Check that you don't have any unexpected modules or libraries in
|your PYTHONPATH:
| $pythonPath
|Also, check if you have a sitecustomize.py module in your python path,
|or in your python installation, that is printing to standard output"""
throw new SparkException(exceptionMessage.stripMargin)
}
// Redirect daemon stdout and stderr
redirectStreamsToStderr(in, daemon.getErrorStream)