paper-ParallelPython-Short/sections/scheduler.tex

150 lines
9.8 KiB
TeX
Raw Normal View History

2022-03-13 13:09:20 -04:00
%!TEX root=../main.tex
\systemname is a workflow system, and relies on an internal representation of notebooks that differs significantly from that of Jupyter.
2022-03-13 15:50:37 -04:00
We present the internal representation first, and later discuss how Jupyter notebooks are translated into this model in \Cref{sec:import}.
A \systemname notebook is an ordered list of isolated, atomic units of work called \emph{cells}.
A cell is \emph{isolated} --- \systemname assumes that each can be safely executed in a fresh python interpreter.
To manage inter-cell dependencies, information flow between cells must be explicitly declared.
All communication between cells is mediated by a \emph{scope}, a partial mapping from variable \emph{symbols} to \emph{artifacts} (e.g., python literals).
As a cell runs, it generates artifacts that are serialized and persisted by \systemname; The cell may then assign the artifact to one or more symbols in the scope --- we denote this as a \emph{write} to the symbol.
When a subsequent cell \emph{reads} a symbol, \systemname provides it with the corresponding artifact.
We refer to the set of sybols read (resp., written) by a cell as the cell's read- (resp., write-)set.
We describe both sets together as the cell's dependencies.
A cell is also \emph{atomic} --- \systemname assumes that the cells can be safely executed in any order or even multiple times, so long as the cell's reads return the correct artifacts.
2022-03-13 13:09:20 -04:00
We define the baseline semantics of the notebook by an in-order execution of the cells.
2022-03-13 15:50:37 -04:00
Execution starts with an empty scope; Each cell's writes are recorded in the scope as they happen.
Each cell receives the scope generated by the prior cell; When a cell reads a symbol, it receives the artifact most recently written into the scope.
2022-03-13 13:09:20 -04:00
\begin{example}
...
\end{example}
2022-03-13 15:50:37 -04:00
\paragraph{Dependency Bounds}
Python is a Turing-complete language, and it is impossible to predict the \emph{actual} set of reads and writes that the cell will perform without running it.
Instead, \systemname maintains two collections of dependencies.
The first set, the dependency \emph{bounds}, are an upper bound on the read- and write-sets.
A cell must declare every symbol that it can read or write in its dependency bounds.
We note that we do not expect users to explicitly provide the dependency bounds --- they are computed through best-effort static analysis as described in \Cref{sec:import}.
The second set, the actual dependencies, result from instrumenting the cell's execution.
This is exactly the set of symbols that the cell read or wrote the most recent execution.
2022-03-13 13:09:20 -04:00
\begin{algorithm}
\caption{\texttt{build\_dag}$(\mathcal N)$}
\label{alg:buildDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
2022-03-13 15:50:37 -04:00
\Ensure{$\mathcal G$: A labeled edge-list for the dependency DAG}
2022-03-13 13:09:20 -04:00
\State{$\mathcal S \leftarrow \{\}$; $\mathcal G \leftarrow \{\}$}
\For{$c \in \mathcal N$}
2022-03-13 15:50:37 -04:00
\State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[x], x)\;|\;x \in c.\texttt{reads}\;\}$}
2022-03-13 13:09:20 -04:00
% \For{$r \in c.\texttt{reads}$}
% \State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[r])\;\}$}
% \EndFor
2022-03-13 15:50:37 -04:00
\For{$x \in c.\texttt{writes}$}
\State{$\mathcal S \leftarrow \mathcal S \cup \{\;x \mapsto c\;\}$}
2022-03-13 13:09:20 -04:00
\EndFor
\EndFor
\end{algorithmic}
\end{algorithm}
\paragraph{Scheduling}
2022-03-13 15:50:37 -04:00
Identifying opportunities for parallelism requires converting the provided sequence of cells into a partial order.
\Cref{alg:buildDag} accomplishes this by simulating sequential execution of the workflow to create a directed acyclic graph (DAG) from the workflow's dependency bounds.
2022-03-13 13:09:20 -04:00
The algorithm maintains a virtual scope ($\mathcal S$), a mapping from artifact identifiers to the most recent cell to write the artifact (lines 5-6).
2022-03-13 15:50:37 -04:00
Each artifact read creates an edge from the writing cell to every cell that reads the same artifact.
2022-03-13 13:09:20 -04:00
Once the dependency DAG is available, scheduling proceeds as expected.
2022-03-13 15:50:37 -04:00
All cells with no in-edges in the DAG are initially marked as \textbf{runnable}, and all remaining cells are marked \textbf{waiting}.
Cells are executed by a worker pool; as soon as a cell enters the \textbf{runnable}, the first available worker begins to execute its code.
When the worker finishes executing the cell, it is marked as \textbf{done}.
A cell becomes \textbf{runnable} when all of its in-edges are from cells in the \textbf{done} state.
2022-03-13 13:09:20 -04:00
2022-03-13 15:50:37 -04:00
As an optimization, \systemname only checks to see if a cell $c$ is \textbf{runnable} when another cell with an in-edge to $c$ is marked as \textbf{done}.
As a further optimization, the dependency DAG is pruned when a cell finishes executing.
A cell may not to write every artifact in its dependency bounds.
As noted above, cell execution is instrumented and the actual write-set is collected.
When cell $c$ completes, the dependency DAG is recomputed based on its \emph{actual} write set rather than its write bounds.
Each out-edge $(c, c', x)$ for a symbol $x$ not in the actual write set will be redirected to an earlier cell in the workflow.
This redirection is safe, because we are only interested in whether the cell's read dependencies are available, and not the specific artifact identified by $x$.
2022-03-13 13:09:20 -04:00
2022-03-13 15:50:37 -04:00
\paragraph{Incremental Re-execution}
Users may trigger the re-execution of one or more cells (e.g., to retrieve new input data).
When this happens, the re-executed cells and all of their descendants in the dependency DAG enter the \textbf{waiting} state and execution proceeds mostly as above.
All cells that are not descendants of an updated cell remain in the \textbf{done} state, and their output artifacts are assumed to be re-usable from the cell's prior execution.
During a partial re-execution of the workflow, the cell's actual dependencies from the prior execution are also available.
When computing the dependency DAG, the \emph{actual} read dependencies from the cell's prior execution are used.
This is safe under the assumption that the cell's behavior is deterministic --- that the cell's execution trace is unchanged if it is re-executed with the same inputs.
2022-03-13 13:09:20 -04:00
2022-03-13 15:50:37 -04:00
\paragraph{Bounding Error}
\systemname relies on static analysis to predict bounds for a cell's read- and write-sets.
As we discuss later, static analysis can not achieve 100\% accuracy for such predictions.
Thus, \systemname is designed to address the possibility of a cell reading or writing a symbol that is not in its dependency bounds.
We assume that cells are idempotent, as is typically the case for our target workloads of ETL and model-fitting.
2022-03-13 13:09:20 -04:00
There are four possibilities for prediction inaccuracies:
2022-03-13 15:50:37 -04:00
Cell execution completes without reading an artifact that was in the predicted read-set (over-prediction),
the cell reads an artifact not in the predicted read-set (under-prediction),
2022-03-13 13:09:20 -04:00
and same two possibilities for the write-set.
2022-03-13 15:50:37 -04:00
Over-prediction, whether for reads or writes, may limit opportunities for parallel execution.
2022-03-13 13:09:20 -04:00
However, it does not pose a correctness problem, and by the time it is discovered the cell will have already finished running.
The primary challenge is thus coping with reads and writes that are not in the predicted read- or write-set, respectively.
In either case, the change may add or remove edges to/from the dependency DAG, requiring re-execution of running or even already completed cells.
To streamline updates to the dependency DAG, \systemname caches the virtual input scope ($\mathcal S$) computed for each cell by \Cref{alg:buildDag} (denote by $\mathcal S_c$ the scope emitted by the cell prior to $c$).
2022-03-13 15:50:37 -04:00
When cell $c$ performs an under-predicted read for identifier $r$, an edge $(c, \mathcal S_c[r])$.
2022-03-13 13:09:20 -04:00
There are now two possibilities: either cell $\mathcal S_c[r]$ is marked done or it is not.
In the former case, the dependency has already been satisfied and execution may proceed unimpeded.
In the latter case, execution of cell $c$ is immediately paused until $S_c[r]$ has completed.
\begin{algorithm}
\caption{\texttt{update\_dag}$(\mathcal N, c_0, w)$}
\label{alg:updateDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell performing an unpredicted write}
\Require{$w$: The identifier of the unpredicted write}
\Ensure{$\forall c \in \mathcal N : \mathcal S_c$: Cached input scopes}
\Ensure{$\mathcal G$: An edge-list for the cell dependency DAG}
\For{$c \in \mathcal N$ s.t. $c > c_0$}
\State{$\mathcal S_c \leftarrow \mathcal S_c - \{\;w \rightarrow *\;\} \cup \{\;w \rightarrow c_0\;\}$}
\If{$w \in c.\texttt{reads}$}
\If{$c.\texttt{done} \vee c.\texttt{running}$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndIf
\State{$\mathcal G \leftarrow \mathcal G - \{\;c \rightarrow *\;\} \cup \{\; (c, \mathcal S[r])\;|\;r \in c.\texttt{reads}\;\}$}
\EndIf
\If{$w \in c.\texttt{writes}$}
\Return
\EndIf
\EndFor
\end{algorithmic}
\end{algorithm}
\begin{algorithm}
\caption{\texttt{abort}$(\mathcal N, c_0)$}
\label{alg:abort}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell to abort}
\If{$c_0$.\texttt{running}}
\State{Abort $c_0$'s execution}
\ElsIf{$c_0$.\texttt{done}}
\State{Clear $c_0$'s results}
\For{$c$ s.t. $(c_0, c) \in \mathcal G$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndFor
\EndIf
\end{algorithmic}
\end{algorithm}
An unpredicted write from cell $c$ on identifier $r$ may redirect edges in the dependency DAG from another cell to $c$.
The process is summarized in \Cref{alg:updateDag}, which iterates through every cell following $c_0$ in order.
The iteration stops at the first cell (if one exists) to overwrite $w$ (line 7).
The cached scope for each cell is updated (line 2), removing prior writes to $w$ and adding $c_0$'s write.
If another cell reads the unpredicted $w$, the dependency DAG must be updated.
All out-edges from the cell are removed and recomputed (lines 6).
If the cell was already running or has already completed (lines 4-5), its execution is stopped, results are cleared, and any running dependencies are recursively aborted (\Cref{alg:abort}).