[SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correctly into PythonRunner in submit with client mode in spark-submit

## What changes were proposed in this pull request?

In client side before context initialization specifically,  .py file doesn't work in client side before context initialization when the application is a Python file. See below:

```
$ cat /home/spark/tmp.py
def testtest():
    return 1
```

This works:

```
$ cat app.py
import pyspark
pyspark.sql.SparkSession.builder.getOrCreate()
import tmp
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
...
************************1
```

but this doesn't:

```
$ cat app.py
import pyspark
import tmp
pyspark.sql.SparkSession.builder.getOrCreate()
print("************************%s" % tmp.testtest())

$ ./bin/spark-submit --master yarn --deploy-mode client --py-files /home/spark/tmp.py app.py
Traceback (most recent call last):
  File "/home/spark/spark/app.py", line 2, in <module>
    import tmp
ImportError: No module named tmp
```

### How did it happen?

In client mode specifically, the paths are being added into PythonRunner as are:

628c7b5179/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala (L430)

628c7b5179/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala (L49-L88)

The problem here is, .py file shouldn't be added as are since `PYTHONPATH` expects a directory or an archive like zip or egg.

### How does this PR fix?

We shouldn't simply just add its parent directory because other files in the parent directory could also be added into the `PYTHONPATH` in client mode before context initialization.

Therefore, we copy .py files into a temp directory for .py files and add it to `PYTHONPATH`.

## How was this patch tested?

Unit tests are added and manually tested in both standalond and yarn client modes with submit.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21426 from HyukjinKwon/SPARK-24384.
This commit is contained in:
hyukjinkwon 2018-05-30 10:33:34 -07:00 committed by Marcelo Vanzin
parent 1e46f92f95
commit b142157dcc
2 changed files with 33 additions and 11 deletions

View file

@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.io.File import java.io.File
import java.net.{InetAddress, URI} import java.net.{InetAddress, URI}
import java.nio.file.Files
import scala.collection.JavaConverters._ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.ArrayBuffer
@ -48,7 +49,7 @@ object PythonRunner {
// Format python file paths before adding them to the PYTHONPATH // Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile) val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = formatPaths(pyFiles) val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
// Launch a Py4J gateway server for the process to connect to; this will let it see our // Launch a Py4J gateway server for the process to connect to; this will let it see our
// Java system properties and such // Java system properties and such
@ -153,4 +154,30 @@ object PythonRunner {
.map { p => formatPath(p, testWindows) } .map { p => formatPath(p, testWindows) }
} }
/**
* Resolves the ".py" files. ".py" file should not be added as is because PYTHONPATH does
* not expect a file. This method creates a temporary directory and puts the ".py" files
* if exist in the given paths.
*/
private def resolvePyFiles(pyFiles: Array[String]): Array[String] = {
lazy val dest = Utils.createTempDir(namePrefix = "localPyFiles")
pyFiles.flatMap { pyFile =>
// In case of client with submit, the python paths should be set before context
// initialization because the context initialization can be done later.
// We will copy the local ".py" files because ".py" file shouldn't be added
// alone but its parent directory in PYTHONPATH. See SPARK-24384.
if (pyFile.endsWith(".py")) {
val source = new File(pyFile)
if (source.exists() && source.isFile && source.canRead) {
Files.copy(source.toPath, new File(dest, source.getName).toPath)
Some(dest.getAbsolutePath)
} else {
// Don't have to add it if it doesn't exist or isn't readable.
None
}
} else {
Some(pyFile)
}
}.distinct
}
} }

View file

@ -271,12 +271,7 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
"PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv
val moduleDir = val moduleDir = {
if (clientMode) {
// In client-mode, .py files added with --py-files are not visible in the driver.
// This is something that the launcher library would have to handle.
tempDir
} else {
val subdir = new File(tempDir, "pyModules") val subdir = new File(tempDir, "pyModules")
subdir.mkdir() subdir.mkdir()
subdir subdir