[SPARK-15942][REPL] Unblock :reset command in REPL.

## What changes were proposed in this pull
(Paste from JIRA issue.)
As a follow up for SPARK-15697, I have following semantics for `:reset` command.
On `:reset` we forget all that user has done but not the initialization of spark. To avoid confusion or make it more clear, we show the message `spark` and `sc` are not erased, infact they are in same state as they were left by previous operations done by the user.
While doing above, somewhere I felt that this is not usually what reset means. But an accidental shutdown of a cluster can be very costly, so may be in that sense this is less surprising and still useful.

## How was this patch tested?

Manually, by calling `:reset` command, by both altering the state of SparkContext and creating some local variables.

Author: Prashant Sharma <prashant@apache.org>
Author: Prashant Sharma <prashsh1@in.ibm.com>

Closes #13661 from ScrapCodes/repl-reset-command.
This commit is contained in:
Prashant Sharma 2016-06-19 20:12:00 +01:00 committed by Sean Owen
parent 001a589603
commit 1b3a9b966a
2 changed files with 16 additions and 3 deletions

View file

@ -36,7 +36,11 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
def initializeSpark() {
intp.beQuietDuring {
processLine("""
@transient val spark = org.apache.spark.repl.Main.createSparkSession()
@transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) {
org.apache.spark.repl.Main.sparkSession
} else {
org.apache.spark.repl.Main.createSparkSession()
}
@transient val sc = {
val _sc = spark.sparkContext
_sc.uiWebUrl.foreach(webUrl => println(s"Spark context Web UI available at ${webUrl}"))
@ -50,6 +54,7 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
processLine("import spark.implicits._")
processLine("import spark.sql")
processLine("import org.apache.spark.sql.functions._")
replayCommandStack = Nil // remove above commands from session history.
}
}
@ -70,7 +75,8 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
echo("Type :help for more information.")
}
private val blockedCommands = Set[String]("reset")
/** Add repl commands that needs to be blocked. e.g. reset */
private val blockedCommands = Set[String]()
/** Standard commands */
lazy val sparkStandardCommands: List[SparkILoop.this.LoopCommand] =
@ -88,6 +94,12 @@ class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter)
initializeSpark()
super.loadFiles(settings)
}
override def resetCommand(line: String): Unit = {
super.resetCommand(line)
initializeSpark()
echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.")
}
}
object SparkILoop {

View file

@ -49,7 +49,8 @@ class ReplSuite extends SparkFunSuite {
val oldExecutorClasspath = System.getProperty(CONF_EXECUTOR_CLASSPATH)
System.setProperty(CONF_EXECUTOR_CLASSPATH, classpath)
Main.sparkContext = null
Main.sparkSession = null // causes recreation of SparkContext for each test.
Main.conf.set("spark.master", master)
Main.doMain(Array("-classpath", classpath), new SparkILoop(in, new PrintWriter(out)))