Isolation and scheduler writeup

This commit is contained in:
Oliver Kennedy 2022-03-25 16:16:51 -04:00
parent 55baf1d238
commit 4c4efd668e
Signed by: okennedy
GPG key ID: 3E5F9B3ABD3FDB60
2 changed files with 73 additions and 180 deletions

View file

@ -1,53 +1,38 @@
%!TEX root=../main.tex
In this section, we discuss the execution of an individual cell.
Recall that in a classical notebook, a cell is run by evaluating its code in the kernel, a single running python interpreter.
Conversely, we isolate cells by executing each in a freshly allocated python interpreter.
To facilitate parallel execution, as well as incremental updates, \systemname isolates cells by executing each in a fresh kernel.
We note that isolation is incompatible, at least directly, with classical computational notebooks:
(i) Cells normally communicate through kernel state, precluding a cell executing in one kernel from accessing variables created in another;
(ii) Variables generated by one kernel may need to be accessed after the kernel has exited;
(iii) Isolation comes at an impractically high performance cost.
When running code imported from a classical computational notebook, the runtime must be able to reconstruct the global interpreter state, or at least the necessary subset of it needed to run the cell.
In this section, we assume inter-cell communication is explicit --- each cell's code must include explicit instructions to interact with the state.
We discuss how cells are instrumented to include these instructions in \Cref{sec:import}.
The global system state is managed by a monitor process, which maintains for each running cell a \emph{scope}, a mapping from variable names to serialized python objects.
Under a serial cell execution, each cell updates the scope, and receives the scope emitted by the preceding cell.
We discuss how this model is refined in more detail in \Cref{sec:scheduler}.
To allow later cells access to a cell-local variable, the cell explicitly exports the varaible by serializing it and sending it to the monitor, which writes it into the scope.
Likewise cells can import cell-local variables from the monitor by requesting a specific scope element by name.
\tinysection{Communication Model}
First, the runtime must be able to reconstruct the global interpreter state, or at least the necessary subset of it needed to run the cell.
We start with a simplified model where inter-cell communication is explicit --- we discuss converting Jupyter notebooks into this model in \Cref{sec:import}.
Concretely, for a variable defined in one cell (the writer) to be used in a subsequent cell (the reader): (i) the writer must explicitly export the variable into the global state, and (ii) the reader must explicitly import the variable from the global state.
\systemname provides setter and getter functions (respectively) on a global state variable for this purpose.
\tinysection{State Serialization}
When a state variable is exported, it is serialized by the python interpreter.
We refer to the serialized state as an \emph{artifact}.
The artifact is delivered to a central monitor process, and assigned to a name in the global state.
When a cell imports a symbol from the global state, it contacts the monitor to retrieve the artifact associated with the symbol.
The runtime deserializes the artifact and places it into the kernel-local state.
By default, state is serialized with python's native \texttt{pickle} library, although \systemname can be easily extended with codecs for specialized types that are either unsupported by \texttt{pickle}, or for which it is not efficient:
(i) Python code (e.g., import statements, and function or class definitions) is exported as raw python code and imported with \texttt{eval}.
(ii) Pandas dataframes are exported in parquet format, and are exposed to subsequent cells by the monitor process through Apache Arrow direct access.
One notable challenge is the need to support transitive dependencies. One function, for example, may rely on a second function.
When the former function's symbol is exported (resp., imported) the latter symbol must also be exported (imported).
\tinysection{Optimizing Python Startup}
Allocating a fresh python interpreter for each individual cell poses a significant performance problem.
Even under ideal circumstances, the python interpreter can take nearly a full second to start.
This slow startup cost is especially problematic for cells that complete relatively quickly.
We address this by maintaining a pool of python workers.
When a cell begins executing, its code is shipped to an already running python process.
The Python process has a high start-up cost, ranging anywhere from 600ms with a recent processor with SSDs, to multiple seconds on a less powerful computer.
Python's start-up cost dominates the runtime of many short-lived cells, making it impractical to start python kernels on-demand.
Instead \systemname maintains a pool of pre-allocated python kernels.
When a cell begins executing, its code is shipped to an already running kernel, which executes the code and returns the response to the monitor process.
\tinysection{Serialization Special Cases}
A naive solution would rely exclusively on object serialization, for example via python's native \texttt{pickle} library.
\paragraph{Specialized Migration}
\begin{itemize}
\item Default to pickling
\item Provide a special-case bypass (e.g. dataframes via arrow / parquet)
\item Mutable state (i.e., figuring out which variables to export)
\item Side-effecting variables:
\begin{itemize}
\item Record the characteristics of the endpoint if possible (e.g., a file) and re-open it later?
\item This is not something that I'd expect to see pipelined through multiple cells... maybe mark the value as an unpredicted read/write dependency?
\item Might be a workaround to allow checkpointed state
\end{itemize}
\item Functions and Classes
\begin{itemize}
\item Here, the code needs to be imported explicitly
\item (cloudpickle allows serializing functions... can we do that here as well)
\item Note that the exported function may introduce chained dependencies that also need to be registered in cells that use the function. These dependencies may be to cells that appear after (which means we need to extend the misprediction cases).
\end{itemize}
\end{itemize}
\paragraph{Chained Dependencies}
TODO
\paragraph{Interpreter Re-Use}
\begin{itemize}
\item Cells that re-use the same state already have a mutual dependency. We may be able to re-use the same interpreter.
\item We might also be able to re-use python state with some sort of fork/join trickery.
\item Q: is this something we can pull off here, or is this strictly future work?
\end{itemize}
In our current implementation, the kernel chosen to run a piece of code is selected arbitrarily.
We note an opportunity for future work: artifacts created during cell execution or imported from the global state can be cached in the executing kernel.
If we know which artifacts a cell will use, we can prioritize kernels that have already loaded these artifacts.

View file

@ -1,149 +1,57 @@
%!TEX root=../main.tex
\systemname is a workflow system, and relies on an internal representation of notebooks that differs significantly from that of Jupyter.
We present the internal representation first, and later discuss how Jupyter notebooks are translated into this model in \Cref{sec:import}.
A \systemname notebook is an ordered list of isolated, atomic units of work called \emph{cells}.
In this section, we overview \systemname's provenance model and how it is used to support parallel code execution and incremental updates.
We define the baseline notebook semantics as a serial execution of the cells in notebook order, and refer to the set of symbols imported or exported by each cell as the cell's read and write sets, respectively.
We define a correct execution in terms of view serializability: any reordering of the cells (e.g., to parallelize execution) that preserves data flow dependencies is correct.
As a simplification for this preliminary work, we assume that cell execution is atomic and idempotent: we are allowed to freely interrupt the execution of a cell, or to restart it.
A cell is \emph{isolated} --- \systemname assumes that each can be safely executed in a fresh python interpreter.
To manage inter-cell dependencies, information flow between cells must be explicitly declared.
All communication between cells is mediated by a \emph{scope}, a partial mapping from variable \emph{symbols} to \emph{artifacts} (e.g., python literals).
As a cell runs, it generates artifacts that are serialized and persisted by \systemname; The cell may then assign the artifact to one or more symbols in the scope --- we denote this as a \emph{write} to the symbol.
When a subsequent cell \emph{reads} a symbol, \systemname provides it with the corresponding artifact.
We refer to the set of sybols read (resp., written) by a cell as the cell's read- (resp., write-)set.
We describe both sets together as the cell's dependencies.
\tinysection{Naive Scheduling}
Let $N$ denote a notebook, a sequence of cells $[c_1, \ldots, c_n]$.
Assume, initially, that for each cell $c_i \in N$ we are given exact read and write sets ($\mathcal R(c_i)$ and $\mathcal W(c_i)$ respectively).
We define the notebook's data dependency graph $(N, D)$ through a series of edges $(r, w, \ell) \in D$ as follows:
\begin{multline*}
D = \{\; (c_r, c_w, \ell) \;|\; c_r,c_w \in N, \ell \in \mathcal R(c_r), \ell \in \mathcal W(c_w), w < r, \\
\not \exists c_{w'} \in N \text{ s.t. }w < w' < r, \ell \in \mathcal W(c_{w'}) \;\}
\end{multline*}
An edge labelled $\ell$ exists from any cell $c_r$ that reads symbol $\ell$ to the most recent preceding cell that writes symbol $\ell$.
A cell is also \emph{atomic} --- \systemname assumes that the cells can be safely executed in any order or even multiple times, so long as the cell's reads return the correct artifacts.
We define the baseline semantics of the notebook by an in-order execution of the cells.
Execution starts with an empty scope; Each cell's writes are recorded in the scope as they happen.
Each cell receives the scope generated by the prior cell; When a cell reads a symbol, it receives the artifact most recently written into the scope.
Given such a graph, scheduling is trivial.
Denote by $\mathcal S(c) \in \{ \text{PENDING}, \text{DONE} \}$ the state of a cell (i.e., \text{DONE} after it has completed execution); a cell $c$ can be scheduled for execution when all input edges are \text{DONE}:
$$\forall (c, c_w, \ell) \in D : \mathcal S(c_w) = \text{DONE}$$
It can be trivially shown that this scheduling approach guarantees conflict equivalence between the execution and notebook orders.
\begin{example}
...
\end{example}
\tinysection{Approximating Dependencies}
A cell's read and write sets can not be known exactly until runtime, but can often be approximated.
In principle, a dependency graph could be created from these approximations and the scheduling could proceed as in the naive approach.
However, when a cell executes and its exact read and write sets become known, the notebook's dependency graph changes.
\systemname's \emph{dependency graph must be dynamic}.
There are four possible errors that we now consider in turn.
Here, we are primarily concerned with ensuring correctness (i.e., serializability) of the notebook execution in the presence of execution errors, even at the cost of performance.
\paragraph{Dependency Bounds}
Python is a Turing-complete language, and it is impossible to predict the \emph{actual} set of reads and writes that the cell will perform without running it.
Instead, \systemname maintains two collections of dependencies.
The first set, the dependency \emph{bounds}, are an upper bound on the read- and write-sets.
A cell must declare every symbol that it can read or write in its dependency bounds.
We note that we do not expect users to explicitly provide the dependency bounds --- they are computed through best-effort static analysis as described in \Cref{sec:import}.
The second set, the actual dependencies, result from instrumenting the cell's execution.
This is exactly the set of symbols that the cell read or wrote the most recent execution.
A false positive read (a symbol in the approximate read set that is never accessed by the cell) corresponds to an unnecessary edge in the dependency graph.
The presence of this edge in the dependency graph is a performance problem, as it may have been possible to execute the cell earlier, but does not result in a correctness error.
\begin{algorithm}
\caption{\texttt{build\_dag}$(\mathcal N)$}
\label{alg:buildDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Ensure{$\mathcal G$: A labeled edge-list for the dependency DAG}
\State{$\mathcal S \leftarrow \{\}$; $\mathcal G \leftarrow \{\}$}
\For{$c \in \mathcal N$}
\State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[x], x)\;|\;x \in c.\texttt{reads}\;\}$}
% \For{$r \in c.\texttt{reads}$}
% \State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[r])\;\}$}
% \EndFor
\For{$x \in c.\texttt{writes}$}
\State{$\mathcal S \leftarrow \mathcal S \cup \{\;x \mapsto c\;\}$}
\EndFor
\EndFor
\end{algorithmic}
\end{algorithm}
A false positive write (a symbol in the approximate write set that is never modified by the cell) corresponds to zero or more edges that need to be redirected to an earlier cell.
Modulo errors in the notebook (i.e., undefined variables), reads on the symbol instead reference the output of a preceding cell.
All dependent cells are blocked until the prediction error is discovered, so as with a false positive read, there is no potential for correctness errors.
However, a cell that could have been scheduled if the symbol had been written may now need to block until a preceding cell completes.
\paragraph{Scheduling}
Identifying opportunities for parallelism requires converting the provided sequence of cells into a partial order.
\Cref{alg:buildDag} accomplishes this by simulating sequential execution of the workflow to create a directed acyclic graph (DAG) from the workflow's dependency bounds.
The algorithm maintains a virtual scope ($\mathcal S$), a mapping from artifact identifiers to the most recent cell to write the artifact (lines 5-6).
Each artifact read creates an edge from the writing cell to every cell that reads the same artifact.
A false negative read (a symbol not in the approximate read set that is read by the cell) corresponds to an edge missing from the dependency graph.
Since the artifact is not available to the cell until the read is triggered, correctness errors can be avoided if cell execution is blocked until the dependent cell completes.
We note that any resources allocated to the kernel can not be released until the cell is unblocked.
When a pool of kernels is used, this could potentially lead to starvation if the entire pool blocks.
Once the dependency DAG is available, scheduling proceeds as expected.
All cells with no in-edges in the DAG are initially marked as \textbf{runnable}, and all remaining cells are marked \textbf{waiting}.
Cells are executed by a worker pool; as soon as a cell enters the \textbf{runnable}, the first available worker begins to execute its code.
When the worker finishes executing the cell, it is marked as \textbf{done}.
A cell becomes \textbf{runnable} when all of its in-edges are from cells in the \textbf{done} state.
A false negative write (a symbol not in the approximate write set that is written to by the cell) corresponds to zero or more edges that need to be redirected to a later cell.
This is only a correctness error if one of the dependent cells has already been scheduled or completed --- if so, the cell must be aborted and rescheduled after the current cell completes.
In this preliminary work, we focus on workloads where we can guarantee that we have an upper bound on the read and write sets (i.e., no false negatives).
In this setting, approximation errors can lead to poor performance, but not correctness errors.
As an optimization, \systemname only checks to see if a cell $c$ is \textbf{runnable} when another cell with an in-edge to $c$ is marked as \textbf{done}.
As a further optimization, the dependency DAG is pruned when a cell finishes executing.
A cell may not to write every artifact in its dependency bounds.
As noted above, cell execution is instrumented and the actual write-set is collected.
When cell $c$ completes, the dependency DAG is recomputed based on its \emph{actual} write set rather than its write bounds.
Each out-edge $(c, c', x)$ for a symbol $x$ not in the actual write set will be redirected to an earlier cell in the workflow.
This redirection is safe, because we are only interested in whether the cell's read dependencies are available, and not the specific artifact identified by $x$.
\paragraph{Incremental Re-execution}
Users may trigger the re-execution of one or more cells (e.g., to retrieve new input data).
When this happens, the re-executed cells and all of their descendants in the dependency DAG enter the \textbf{waiting} state and execution proceeds mostly as above.
All cells that are not descendants of an updated cell remain in the \textbf{done} state, and their output artifacts are assumed to be re-usable from the cell's prior execution.
When this happens, the re-executed cells and all of their descendants in the dependency DAG enter the \text{PENDING} state and execution proceeds mostly as above.
All cells that are not descendants of an updated cell remain in the \text{DONE} state, and their output artifacts are assumed to be re-usable from the cell's prior execution.
During a partial re-execution of the workflow, the cell's actual dependencies from the prior execution are also available.
When computing the dependency DAG, the \emph{actual} read dependencies from the cell's prior execution are used.
When computing the dependency DAG, the exact read dependencies from the cell's prior execution are used.
This is safe under the assumption that the cell's behavior is deterministic --- that the cell's execution trace is unchanged if it is re-executed with the same inputs.
\paragraph{Bounding Error}
\systemname relies on static analysis to predict bounds for a cell's read- and write-sets.
As we discuss later, static analysis can not achieve 100\% accuracy for such predictions.
Thus, \systemname is designed to address the possibility of a cell reading or writing a symbol that is not in its dependency bounds.
We assume that cells are idempotent, as is typically the case for our target workloads of ETL and model-fitting.
There are four possibilities for prediction inaccuracies:
Cell execution completes without reading an artifact that was in the predicted read-set (over-prediction),
the cell reads an artifact not in the predicted read-set (under-prediction),
and same two possibilities for the write-set.
Over-prediction, whether for reads or writes, may limit opportunities for parallel execution.
However, it does not pose a correctness problem, and by the time it is discovered the cell will have already finished running.
The primary challenge is thus coping with reads and writes that are not in the predicted read- or write-set, respectively.
In either case, the change may add or remove edges to/from the dependency DAG, requiring re-execution of running or even already completed cells.
To streamline updates to the dependency DAG, \systemname caches the virtual input scope ($\mathcal S$) computed for each cell by \Cref{alg:buildDag} (denote by $\mathcal S_c$ the scope emitted by the cell prior to $c$).
When cell $c$ performs an under-predicted read for identifier $r$, an edge $(c, \mathcal S_c[r])$.
There are now two possibilities: either cell $\mathcal S_c[r]$ is marked done or it is not.
In the former case, the dependency has already been satisfied and execution may proceed unimpeded.
In the latter case, execution of cell $c$ is immediately paused until $S_c[r]$ has completed.
\begin{algorithm}
\caption{\texttt{update\_dag}$(\mathcal N, c_0, w)$}
\label{alg:updateDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell performing an unpredicted write}
\Require{$w$: The identifier of the unpredicted write}
\Ensure{$\forall c \in \mathcal N : \mathcal S_c$: Cached input scopes}
\Ensure{$\mathcal G$: An edge-list for the cell dependency DAG}
\For{$c \in \mathcal N$ s.t. $c > c_0$}
\State{$\mathcal S_c \leftarrow \mathcal S_c - \{\;w \rightarrow *\;\} \cup \{\;w \rightarrow c_0\;\}$}
\If{$w \in c.\texttt{reads}$}
\If{$c.\texttt{done} \vee c.\texttt{running}$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndIf
\State{$\mathcal G \leftarrow \mathcal G - \{\;c \rightarrow *\;\} \cup \{\; (c, \mathcal S[r])\;|\;r \in c.\texttt{reads}\;\}$}
\EndIf
\If{$w \in c.\texttt{writes}$}
\Return
\EndIf
\EndFor
\end{algorithmic}
\end{algorithm}
\begin{algorithm}
\caption{\texttt{abort}$(\mathcal N, c_0)$}
\label{alg:abort}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell to abort}
\If{$c_0$.\texttt{running}}
\State{Abort $c_0$'s execution}
\ElsIf{$c_0$.\texttt{done}}
\State{Clear $c_0$'s results}
\For{$c$ s.t. $(c_0, c) \in \mathcal G$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndFor
\EndIf
\end{algorithmic}
\end{algorithm}
An unpredicted write from cell $c$ on identifier $r$ may redirect edges in the dependency DAG from another cell to $c$.
The process is summarized in \Cref{alg:updateDag}, which iterates through every cell following $c_0$ in order.
The iteration stops at the first cell (if one exists) to overwrite $w$ (line 7).
The cached scope for each cell is updated (line 2), removing prior writes to $w$ and adding $c_0$'s write.
If another cell reads the unpredicted $w$, the dependency DAG must be updated.
All out-edges from the cell are removed and recomputed (lines 6).
If the cell was already running or has already completed (lines 4-5), its execution is stopped, results are cleared, and any running dependencies are recursively aborted (\Cref{alg:abort}).