[SPARK-22959][PYTHON] Configuration to select the modules for daemon and worker in PySpark

## What changes were proposed in this pull request?

We are now forced to use `pyspark/daemon.py` and `pyspark/worker.py` in PySpark.

This doesn't allow a custom modification for it (well, maybe we can still do this in a super hacky way though, for example, setting Python executable that has the custom modification). Because of this, for example, it's sometimes hard to debug what happens inside Python worker processes.

This is actually related with [SPARK-7721](https://issues.apache.org/jira/browse/SPARK-7721) too as somehow Coverage is unable to detect the coverage from `os.fork`. If we have some custom fixes to force the coverage, it works fine.

This is also related with [SPARK-20368](https://issues.apache.org/jira/browse/SPARK-20368). This JIRA describes Sentry support which (roughly) needs some changes within worker side.

With this configuration advanced users will be able to do a lot of pluggable workarounds and we can meet such potential needs in the future.

As an example, let's say if I configure the module `coverage_daemon` and had `coverage_daemon.py` in the python path:

```python
import os

from pyspark import daemon

if "COVERAGE_PROCESS_START" in os.environ:
    from pyspark.worker import main

    def _cov_wrapped(*args, **kwargs):
        import coverage
        cov = coverage.coverage(
            config_file=os.environ["COVERAGE_PROCESS_START"])
        cov.start()
        try:
            main(*args, **kwargs)
        finally:
            cov.stop()
            cov.save()
    daemon.worker_main = _cov_wrapped

if __name__ == '__main__':
    daemon.manager()
```

I can track the coverages in worker side too.

More importantly, we can leave the main code intact but allow some workarounds.

## How was this patch tested?

Manually tested.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #20151 from HyukjinKwon/configuration-daemon-worker.
This commit is contained in:
hyukjinkwon 2018-01-14 11:26:49 +09:00
parent 0066d6f6fa
commit afae8f2bc8

View file

@ -34,10 +34,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
import PythonWorkerFactory._
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
// only works on UNIX-based systems now because it uses signals for child management, so we can
// also fall back to launching workers (pyspark/worker.py) directly.
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon,
// pyspark/daemon.py (by default) and tell it to fork new workers for our tasks. This daemon
// currently only works on UNIX-based systems now because it uses signals for child management,
// so we can also fall back to launching workers, pyspark/worker.py (by default) directly.
val useDaemon = {
val useDaemonEnabled = SparkEnv.get.conf.getBoolean("spark.python.use.daemon", true)
@ -45,6 +45,28 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
!System.getProperty("os.name").startsWith("Windows") && useDaemonEnabled
}
// WARN: Both configurations, 'spark.python.daemon.module' and 'spark.python.worker.module' are
// for very advanced users and they are experimental. This should be considered
// as expert-only option, and shouldn't be used before knowing what it means exactly.
// This configuration indicates the module to run the daemon to execute its Python workers.
val daemonModule = SparkEnv.get.conf.getOption("spark.python.daemon.module").map { value =>
logInfo(
s"Python daemon module in PySpark is set to [$value] in 'spark.python.daemon.module', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is enabled and the platform is not Windows.")
value
}.getOrElse("pyspark.daemon")
// This configuration indicates the module to run each Python worker.
val workerModule = SparkEnv.get.conf.getOption("spark.python.worker.module").map { value =>
logInfo(
s"Python worker module in PySpark is set to [$value] in 'spark.python.worker.module', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
"'spark.python.use.daemon' is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")
var daemon: Process = null
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
@ -74,8 +96,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
* Connect to a worker launched through pyspark/daemon.py (by default), which forks python
* processes itself to avoid the high cost of forking from Java. This currently only works
* on UNIX-based systems.
*/
private def createThroughDaemon(): Socket = {
@ -108,7 +131,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
/**
* Launch a worker by executing worker.py directly and telling it to connect to us.
* Launch a worker by executing worker.py (by default) directly and telling it to connect to us.
*/
private def createSimpleWorker(): Socket = {
var serverSocket: ServerSocket = null
@ -116,7 +139,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
@ -159,7 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", daemonModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)