From 3ae6e6775beae8225f8cb7404bd1a2ea961dd339 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 16 Sep 2021 14:28:47 -0700 Subject: [PATCH] [SPARK-36774][CORE][TESTS] Move SparkSubmitTestUtils to core module and use it in SparkSubmitSuite ### What changes were proposed in this pull request? This PR refactors test code in order to improve the debugability of `SparkSubmitSuite`. The `sql/hive` module contains a `SparkSubmitTestUtils` helper class which launches `spark-submit` and captures its output in order to display better error messages when tests fail. This helper is currently used by `HiveSparkSubmitSuite` and `HiveExternalCatalogVersionsSuite`, but isn't used by `SparkSubmitSuite`. In this PR, I moved `SparkSubmitTestUtils` and `ProcessTestUtils` into the `core` module and updated `SparkSubmitSuite`, `BufferHolderSparkSubmitSuite`, and `WholestageCodegenSparkSubmitSuite` to use the relocated helper classes. This required me to change `SparkSubmitTestUtils` to make its timeouts configurable and to generalize its method for locating the `spark-submit` binary. ### Why are the changes needed? Previously, `SparkSubmitSuite` tests would fail with messages like: ``` [info] - launch simple application with spark-submit *** FAILED *** (1 second, 832 milliseconds) [info] Process returned with exit code 101. See the log4j logs for more detail. (SparkSubmitSuite.scala:1551) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) ``` which require the Spark developer to hunt in log4j logs in order to view the logs from the failed `spark-submit` command. After this change, those tests will fail with detailed error messages that include the text of failed command plus timestamped logs captured from the failed proces: ``` [info] - launch simple application with spark-submit *** FAILED *** (2 seconds, 800 milliseconds) [info] spark-submit returned with exit code 101. [info] Command line: '/Users/joshrosen/oss-spark/bin/spark-submit' '--class' 'invalidClassName' '--name' 'testApp' '--master' 'local' '--conf' 'spark.ui.enabled=false' '--conf' 'spark.master.rest.enabled=false' 'file:/Users/joshrosen/oss-spark/target/tmp/spark-0a8a0c93-3aaf-435d-9cf3-b97abd318d91/testJar-1631768004882.jar' [info] [info] 2021-09-15 21:53:26.041 - stderr> SLF4J: Class path contains multiple SLF4J bindings. [info] 2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in [jar:file:/Users/joshrosen/oss-spark/assembly/target/scala-2.12/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] [info] 2021-09-15 21:53:26.042 - stderr> SLF4J: Found binding in [jar:file:/Users/joshrosen/.m2/repository/org/slf4j/slf4j-log4j12/1.7.30/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class] [info] 2021-09-15 21:53:26.042 - stderr> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. [info] 2021-09-15 21:53:26.042 - stderr> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] [info] 2021-09-15 21:53:26.619 - stderr> Error: Failed to load class invalidClassName. (SparkSubmitTestUtils.scala:97) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I manually ran the affected test suites. Closes #34013 from JoshRosen/SPARK-36774-move-SparkSubmitTestUtils-to-core. Authored-by: Josh Rosen Signed-off-by: Josh Rosen --- .../org/apache/spark}/ProcessTestUtils.scala | 2 +- .../spark/deploy/SparkSubmitSuite.scala | 41 +------------------ .../spark/deploy}/SparkSubmitTestUtils.scala | 18 ++++---- .../BufferHolderSparkSubmitSuite.scala | 8 ++-- .../WholeStageCodegenSparkSubmitSuite.scala | 8 ++-- .../sql/hive/thriftserver/CliSuite.scala | 2 +- .../HiveThriftServer2Suites.scala | 2 +- .../HiveExternalCatalogVersionsSuite.scala | 6 ++- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 5 +++ 9 files changed, 32 insertions(+), 60 deletions(-) rename {sql/core/src/test/scala/org/apache/spark/sql/test => core/src/test/scala/org/apache/spark}/ProcessTestUtils.scala (97%) rename {sql/hive/src/test/scala/org/apache/spark/sql/hive => core/src/test/scala/org/apache/spark/deploy}/SparkSubmitTestUtils.scala (89%) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala b/core/src/test/scala/org/apache/spark/ProcessTestUtils.scala similarity index 97% rename from sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala rename to core/src/test/scala/org/apache/spark/ProcessTestUtils.scala index df530d8587..e85f5cd31c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/ProcessTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/ProcessTestUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.test +package org.apache.spark import java.io.{InputStream, IOException} diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index eade41a0b5..c4e3d6aefc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -30,11 +30,8 @@ import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FSDataInputStream, Path} import org.scalatest.BeforeAndAfterEach -import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ -import org.scalatest.time.Span -import org.scalatest.time.SpanSugar._ import org.apache.spark._ import org.apache.spark.TestUtils @@ -107,18 +104,12 @@ trait TestPrematureExit { // Note: this suite mixes in ResetSystemProperties because SparkSubmit.main() sets a bunch // of properties that needed to be cleared after tests. class SparkSubmitSuite - extends SparkFunSuite + extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach with ResetSystemProperties - with TimeLimits with TestPrematureExit { - import SparkSubmitSuite._ - - // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x - implicit val defaultSignaler: Signaler = ThreadSignaler - private val emptyIvySettings = File.createTempFile("ivy", ".xml") FileUtils.write(emptyIvySettings, "", StandardCharsets.UTF_8) @@ -1527,36 +1518,6 @@ class SparkSubmitSuite } } -object SparkSubmitSuite extends SparkFunSuite with TimeLimits { - - // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x - implicit val defaultSignaler: Signaler = ThreadSignaler - - // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. - def runSparkSubmit(args: Seq[String], root: String = "..", timeout: Span = 1.minute): Unit = { - val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!")) - val sparkSubmitFile = if (Utils.isWindows) { - new File(s"$root\\bin\\spark-submit.cmd") - } else { - new File(s"$root/bin/spark-submit") - } - val process = Utils.executeCommand( - Seq(sparkSubmitFile.getCanonicalPath) ++ args, - new File(sparkHome), - Map("SPARK_TESTING" -> "1", "SPARK_HOME" -> sparkHome)) - - try { - val exitCode = failAfter(timeout) { process.waitFor() } - if (exitCode != 0) { - fail(s"Process returned with exit code $exitCode. See the log4j logs for more detail.") - } - } finally { - // Ensure we still kill the process in case it timed out - process.destroy() - } - } -} - object JarCreationTest extends Logging { def main(args: Array[String]): Unit = { TestUtils.configTestLog4j("INFO") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala similarity index 89% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala rename to core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala index 889f81b056..2ab2e17df0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/SparkSubmitTestUtils.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitTestUtils.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.sql.hive +package org.apache.spark.deploy import java.io.File import java.sql.Timestamp @@ -25,33 +25,35 @@ import scala.collection.mutable.ArrayBuffer import org.scalatest.concurrent.{Signaler, ThreadSignaler, TimeLimits} import org.scalatest.exceptions.TestFailedDueToTimeoutException +import org.scalatest.time.Span import org.scalatest.time.SpanSugar._ +import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.Utils trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits { + protected val defaultSparkSubmitTimeout: Span = 1.minute + // Necessary to make ScalaTest 3.x interrupt a thread on the JVM like ScalaTest 2.2.x implicit val defaultSignaler: Signaler = ThreadSignaler // NOTE: This is an expensive operation in terms of time (10 seconds+). Use sparingly. - // This is copied from org.apache.spark.deploy.SparkSubmitSuite protected def runSparkSubmit( args: Seq[String], sparkHomeOpt: Option[String] = None, + timeout: Span = defaultSparkSubmitTimeout, isSparkTesting: Boolean = true): Unit = { val sparkHome = sparkHomeOpt.getOrElse( sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))) val history = ArrayBuffer.empty[String] val sparkSubmit = if (Utils.isWindows) { - // On Windows, `ProcessBuilder.directory` does not change the current working directory. - new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath + new File(new File(sparkHome, "bin"), "spark-submit.cmd") } else { - "./bin/spark-submit" + new File(new File(sparkHome, "bin"), "spark-submit") } - val commands = Seq(sparkSubmit) ++ args + val commands = Seq(sparkSubmit.getCanonicalPath) ++ args val commandLine = commands.mkString("'", "' '", "'") val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome)) @@ -85,7 +87,7 @@ trait SparkSubmitTestUtils extends SparkFunSuite with TimeLimits { new ProcessOutputCapturer(process.getErrorStream, captureOutput("stderr")).start() try { - val exitCode = failAfter(300.seconds) { process.waitFor() } + val exitCode = failAfter(timeout) { process.waitFor() } if (exitCode != 0) { // include logs in output. Note that logging is async and may not have completed // at the time this exception is raised diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala index 972a832255..fd28a1decd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSparkSubmitSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.scalatest.{Assertions, BeforeAndAfterEach} import org.scalatest.matchers.must.Matchers -import org.apache.spark.{SparkFunSuite, TestUtils} -import org.apache.spark.deploy.SparkSubmitSuite +import org.apache.spark.TestUtils +import org.apache.spark.deploy.SparkSubmitTestUtils import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.util.ResetSystemProperties @@ -29,7 +29,7 @@ import org.apache.spark.util.ResetSystemProperties // A test for growing the buffer holder to nearly 2GB. Due to the heap size limitation of the Spark // unit tests JVM, the actually test code is running as a submit job. class BufferHolderSparkSubmitSuite - extends SparkFunSuite + extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach with ResetSystemProperties { @@ -46,7 +46,7 @@ class BufferHolderSparkSubmitSuite "--conf", "spark.master.rest.enabled=false", "--conf", "spark.driver.extraJavaOptions=-ea", unusedJar.toString) - SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..") + runSparkSubmit(argsForSparkSubmit) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala index ee5e0e09fc..2f626f7769 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala @@ -21,8 +21,8 @@ import org.scalatest.{Assertions, BeforeAndAfterEach} import org.scalatest.matchers.must.Matchers import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkFunSuite, TestUtils} -import org.apache.spark.deploy.SparkSubmitSuite +import org.apache.spark.TestUtils +import org.apache.spark.deploy.SparkSubmitTestUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.functions.{array, col, count, lit} @@ -31,7 +31,7 @@ import org.apache.spark.unsafe.Platform import org.apache.spark.util.ResetSystemProperties // Due to the need to set driver's extraJavaOptions, this test needs to use actual SparkSubmit. -class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite +class WholeStageCodegenSparkSubmitSuite extends SparkSubmitTestUtils with Matchers with BeforeAndAfterEach with ResetSystemProperties { @@ -51,7 +51,7 @@ class WholeStageCodegenSparkSubmitSuite extends SparkFunSuite "--conf", "spark.executor.extraJavaOptions=-XX:+UseCompressedOops", "--conf", "spark.sql.adaptive.enabled=false", unusedJar.toString) - SparkSubmitSuite.runSparkSubmit(argsForSparkSubmit, "../..", 3.minutes) + runSparkSubmit(argsForSparkSubmit, timeout = 3.minutes) } } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 7067b65a62..2ef2700163 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -29,12 +29,12 @@ import scala.concurrent.duration._ import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.scalatest.BeforeAndAfterAll +import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils._ import org.apache.spark.sql.hive.test.HiveTestJars import org.apache.spark.sql.internal.StaticSQLConf -import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} /** diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala index 979818979e..8e939a5471 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2Suites.scala @@ -44,12 +44,12 @@ import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.Eventually._ import org.apache.spark.{SparkException, SparkFunSuite} +import org.apache.spark.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.HiveTestJars import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION -import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ShutdownHookManager, ThreadUtils, Utils} object TestData { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala index 250c46063d..7eba0d3ba9 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala @@ -26,8 +26,11 @@ import scala.util.control.NonFatal import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.apache.hadoop.conf.Configuration +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ import org.apache.spark.{SparkConf, TestUtils} +import org.apache.spark.deploy.SparkSubmitTestUtils import org.apache.spark.internal.config.MASTER_REST_SERVER_ENABLED import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{QueryTest, Row, SparkSession} @@ -52,6 +55,7 @@ import org.apache.spark.util.Utils @ExtendedHiveTest class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { import HiveExternalCatalogVersionsSuite._ + override protected val defaultSparkSubmitTimeout: Span = 5.minutes private val wareHousePath = Utils.createTempDir(namePrefix = "warehouse") private val tmpDataDir = Utils.createTempDir(namePrefix = "test-data") // For local test, you can set `spark.test.cache-dir` to a static value like `/tmp/test-spark`, to @@ -216,7 +220,7 @@ class HiveExternalCatalogVersionsSuite extends SparkSubmitTestUtils { "--conf", s"spark.sql.test.version.index=$index", "--driver-java-options", s"-Dderby.system.home=${wareHousePath.getCanonicalPath}", tempPyFile.getCanonicalPath) - runSparkSubmit(args, Some(sparkHome.getCanonicalPath), false) + runSparkSubmit(args, Some(sparkHome.getCanonicalPath), isSparkTesting = false) } tempPyFile.delete() diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index d56d7f39e3..90752e70e1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -27,8 +27,11 @@ import org.apache.hadoop.hive.common.FileUtils import org.scalatest.Assertions._ import org.scalatest.BeforeAndAfterEach import org.scalatest.matchers.must.Matchers +import org.scalatest.time.Span +import org.scalatest.time.SpanSugar._ import org.apache.spark._ +import org.apache.spark.deploy.SparkSubmitTestUtils import org.apache.spark.internal.Logging import org.apache.spark.internal.config.UI.UI_ENABLED import org.apache.spark.sql.{QueryTest, Row, SparkSession} @@ -54,6 +57,8 @@ class HiveSparkSubmitSuite with BeforeAndAfterEach with ResetSystemProperties { + override protected val defaultSparkSubmitTimeout: Span = 5.minutes + override protected val enableAutoThreadAudit = false override def beforeEach(): Unit = {