[SPARK-36774][CORE][TESTS] Move SparkSubmitTestUtils to core module and use it in SparkSubmitSuite

### What changes were proposed in this pull request?

This PR refactors test code in order to improve the debugability of `SparkSubmitSuite`.

The `sql/hive` module contains a `SparkSubmitTestUtils` helper class which launches `spark-submit` and captures its output in order to display better error messages when tests fail. This helper is currently used by `HiveSparkSubmitSuite` and `HiveExternalCatalogVersionsSuite`, but isn't used by `SparkSubmitSuite`.

In this PR, I moved `SparkSubmitTestUtils` and `ProcessTestUtils` into the `core` module and updated `SparkSubmitSuite`, `BufferHolderSparkSubmitSuite`, and `WholestageCodegenSparkSubmitSuite` to use the relocated helper classes. This required me to change `SparkSubmitTestUtils` to make its timeouts configurable and to generalize its method for locating the `spark-submit` binary.

### Why are the changes needed?

Previously, `SparkSubmitSuite` tests would fail with messages like:

```
[info] - launch simple application with spark-submit *** FAILED *** (1 second, 832 milliseconds)
[info]   Process returned with exit code 101. See the log4j logs for more detail. (SparkSubmitSuite.scala:1551)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
```

which require the Spark developer to hunt in log4j logs in order to view the logs from the failed `spark-submit` command.

After this change, those tests will fail with detailed error messages that include the text of failed command plus timestamped logs captured from the failed proces:

```
[info] - launch simple application with spark-submit *** FAILED *** (2 seconds, 800 milliseconds)
[info]   spark-submit returned with exit code 101.
[info]   Command line: '/Users/joshrosen/oss-spark/bin/spark-submit' '--class' 'invalidClassName' '--name' 'testApp' '--master' 'local' '--conf' 'spark.ui.enabled=false' '--conf' 'spark.master.rest.enabled=false' 'file:/Users/joshrosen/oss-spark/target/tmp/spark-0a8a0c93-3aaf-435d-9cf3-b97abd318d91/testJar-1631768004882.jar'
[info]
[info]   2021-09-15 21:53:26.041 - stderr> SLF4J: Class path contains multiple SLF4J bindings.
[info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in [jar:file:/Users/joshrosen/oss-spark/assembly/target/scala-2.12/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in [jar:file:/Users/joshrosen/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
[info]   2021-09-15 21:53:26.042 - stderr> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
[info]   2021-09-15 21:53:26.042 - stderr> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
[info]   2021-09-15 21:53:26.619 - stderr> Error: Failed to load class invalidClassName. (SparkSubmitTestUtils.scala:97)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

I manually ran the affected test suites.

Closes #34013 from JoshRosen/SPARK-36774-move-SparkSubmitTestUtils-to-core.

Authored-by: Josh Rosen <joshrosen@databricks.com>
Signed-off-by: Josh Rosen <joshrosen@databricks.com>
This commit is contained in:
Josh Rosen 2021-09-16 14:28:47 -07:00
parent f1f2ec3704
commit 3ae6e6775b
9 changed files with 32 additions and 60 deletions

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.test
package org.apache.spark
import java.io.{InputStream, IOException}

View file

@ -30,11 +30,8 @@ import org.apache.commons.io.FileUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.matchers.must.Matchers
import org.scalatest.matchers.should.Matchers._
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.TestUtils
@ -107,18 +104,12 @@ trait TestPrematureExit {
// Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch
// of properties that needed to be cleared after tests.
class SparkSubmitSuite
extends SparkFunSuite
extends SparkSubmitTestUtils
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties
with TimeLimits
with TestPrematureExit {
import SparkSubmitSuite._
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
private val emptyIvySettings = File.createTempFile("ivy", ".xml")
FileUtils.write(emptyIvySettings, "<ivysettings />", StandardCharsets.UTF_8)
@ -1527,36 +1518,6 @@ class SparkSubmitSuite
}
}
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
def runSparkSubmit(args: Seq[String], root: String = "..", timeout: Span = 1.minute): Unit = {
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val sparkSubmitFile = if (Utils.isWindows) {
new File(s"$root\\bin\\spark-submit.cmd")
} else {
new File(s"$root/bin/spark-submit")
}
val process = Utils.executeCommand(
Seq(sparkSubmitFile.getCanonicalPath) ++ args,
new File(sparkHome),
Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome))
try {
val exitCode = failAfter(timeout) { process.waitFor() }
if (exitCode != 0) {
fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.")
}
} finally {
// Ensure we still kill the process in case it timed out
process.destroy()
}
}
}
object JarCreationTest extends Logging {
def main(args: Array[String]): Unit = {
TestUtils.configTestLog4j("INFO")

View file

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.spark.sql.hive
package org.apache.spark.deploy
import java.io.File
import java.sql.Timestamp
@ -25,33 +25,35 @@ import scala.collection.mutable.ArrayBuffer
import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits}
import org.scalatest.exceptions.TestFailedDueToTimeoutException
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.Utils
trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
protected val defaultSparkSubmitTimeout: Span = 1.minute
// Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x
implicit val defaultSignaler: Signaler = ThreadSignaler
// NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly.
// This is copied from org.apache.spark.deploy.SparkSubmitSuite
protected def runSparkSubmit(
args: Seq[String],
sparkHomeOpt: Option[String] = None,
timeout: Span = defaultSparkSubmitTimeout,
isSparkTesting: Boolean = true): Unit = {
val sparkHome = sparkHomeOpt.getOrElse(
sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")))
val history = ArrayBuffer.empty[String]
val sparkSubmit = if (Utils.isWindows) {
// On Windows, `ProcessBuilder.directory` does not change the current working directory.
new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
new File(new File(sparkHome, "bin"), "spark-submit.cmd")
} else {
"./bin/spark-submit"
new File(new File(sparkHome, "bin"), "spark-submit")
}
val commands = Seq(sparkSubmit) ++ args
val commands = Seq(sparkSubmit.getCanonicalPath) ++ args
val commandLine = commands.mkString("'", "' '", "'")
val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
@ -85,7 +87,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits {
new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start()
try {
val exitCode = failAfter(300.seconds) { process.waitFor() }
val exitCode = failAfter(timeout) { process.waitFor() }
if (exitCode != 0) {
// include logs in output. Note that logging is async and may not have completed
// at the time this exception is raised

View file

@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen
import org.scalatest.{Assertions, BeforeAndAfterEach}
import org.scalatest.matchers.must.Matchers
import org.apache.spark.{SparkFunSuite, TestUtils}
import org.apache.spark.deploy.SparkSubmitSuite
import org.apache.spark.TestUtils
import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.util.ResetSystemProperties
@ -29,7 +29,7 @@ import org.apache.spark.util.ResetSystemProperties
// A test for growing the buffer holder to nearly 2GB. Due to the heap size limitation of the Spark
// unit tests JVM, the actually test code is running as a submit job.
class BufferHolderSparkSubmitSuite
extends SparkFunSuite
extends SparkSubmitTestUtils
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties {
@ -46,7 +46,7 @@ class BufferHolderSparkSubmitSuite
"--conf", "spark.master.rest.enabled=false",
"--conf", "spark.driver.extraJavaOptions=-ea",
unusedJar.toString)
SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..")
runSparkSubmit(argsForSparkSubmit)
}
}

View file

@ -21,8 +21,8 @@ import org.scalatest.{Assertions, BeforeAndAfterEach}
import org.scalatest.matchers.must.Matchers
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkFunSuite, TestUtils}
import org.apache.spark.deploy.SparkSubmitSuite
import org.apache.spark.TestUtils
import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
import org.apache.spark.sql.functions.{array, col, count, lit}
@ -31,7 +31,7 @@ import org.apache.spark.unsafe.Platform
import org.apache.spark.util.ResetSystemProperties
// Due to the need to set driver's extraJavaOptions, this test needs to use actual SparkSubmit.
class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite
class WholeStageCodegenSparkSubmitSuite extends SparkSubmitTestUtils
with Matchers
with BeforeAndAfterEach
with ResetSystemProperties {
@ -51,7 +51,7 @@ class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite
"--conf", "spark.executor.extraJavaOptions=-XX:+UseCompressedOops",
"--conf", "spark.sql.adaptive.enabled=false",
unusedJar.toString)
SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..", 3.minutes)
runSparkSubmit(argsForSparkSubmit, timeout = 3.minutes)
}
}

View file

@ -29,12 +29,12 @@ import scala.concurrent.duration._
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.SparkFunSuite
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveUtils._
import org.apache.spark.sql.hive.test.HiveTestJars
import org.apache.spark.sql.internal.StaticSQLConf
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ThreadUtils, Utils}
/**

View file

@ -44,12 +44,12 @@ import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually._
import org.apache.spark.{SparkException, SparkFunSuite}
import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.test.HiveTestJars
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION
import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer
import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils}
object TestData {

View file

@ -26,8 +26,11 @@ import scala.util.control.NonFatal
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.apache.hadoop.conf.Configuration
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark.{SparkConf, TestUtils}
import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
@ -52,6 +55,7 @@ import org.apache.spark.util.Utils
@ExtendedHiveTest
class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
import HiveExternalCatalogVersionsSuite._
override protected val defaultSparkSubmitTimeout: Span = 5.minutes
private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse")
private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data")
// For local test, you can set `spark.test.cache-dir` to a static value like `/tmp/test-spark`, to
@ -216,7 +220,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils {
"--conf", s"spark.sql.test.version.index=$index",
"--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}",
tempPyFile.getCanonicalPath)
runSparkSubmit(args, Some(sparkHome.getCanonicalPath), false)
runSparkSubmit(args, Some(sparkHome.getCanonicalPath), isSparkTesting = false)
}
tempPyFile.delete()

View file

@ -27,8 +27,11 @@ import org.apache.hadoop.hive.common.FileUtils
import org.scalatest.Assertions._
import org.scalatest.BeforeAndAfterEach
import org.scalatest.matchers.must.Matchers
import org.scalatest.time.Span
import org.scalatest.time.SpanSugar._
import org.apache.spark._
import org.apache.spark.deploy.SparkSubmitTestUtils
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.{QueryTest, Row, SparkSession}
@ -54,6 +57,8 @@ class HiveSparkSubmitSuite
with BeforeAndAfterEach
with ResetSystemProperties {
override protected val defaultSparkSubmitTimeout: Span = 5.minutes
override protected val enableAutoThreadAudit = false
override def beforeEach(): Unit = {