paper-ParallelPython-Short/sections/system.tex

131 lines
7.3 KiB
TeX

%!TEX root=../main.tex
\systemname is a workflow system, and relies on an internal representation of notebooks that differs significantly from that of Jupyter.
We present the internal representation first, and discuss how Jupyter notebooks are translated into this model in \Cref{sec:import}.
A \systemname notebook is an ordered list of atomic units of work called \emph{cells}.
Communication between cells is mediated by a \emph{scope}, a partial mapping from identifiers (e.g., variable names) to \emph{artifacts} (e.g., python literals).
We define the baseline semantics of the notebook by an in-order execution of the cells.
Each cell\ldots
(i) reads zero or more identifiers from the scope emitted by the preceding cell, and
(ii) writes zero or more identifier/artifact mappings, extending the preceding cell's scope.
We refer to the set of identifiers read (resp., written) by a cell as the cell's read- (resp., write-)set.
The scope emitted by a cell consists of all mappings written by the cell and the mappings from the preceding scope not in the cell's write set.
\begin{example}
...
\end{example}
\paragraph{Approximate Dependencies}
A full dependency analysis of python is intractable\OK{Cite}.
Even a bounded analysis is impractical; for example libraries with global state can introduce hidden dependencies.
In short, obtaining exact read- and write-sets is impossible without actually running the cell's code;
Needless to say, this is not desirable.
As a result, \systemname computes both best-effort \emph{predicted} read- and write-sets for each cell through static analysis (see \Cref{sec:import}), as well as \emph{actual} read- and write-sets once the cell is actually executed.
\begin{algorithm}
\caption{\texttt{build\_dag}$(\mathcal N)$}
\label{alg:buildDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Ensure{$\mathcal G$: An edge-list for the cell dependency DAG}
\State{$\mathcal S \leftarrow \{\}$; $\mathcal G \leftarrow \{\}$}
\For{$c \in \mathcal N$}
\State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[r])\;|\;r \in c.\texttt{reads}\;\}$}
% \For{$r \in c.\texttt{reads}$}
% \State{$\mathcal G \leftarrow \mathcal G \cup \{\; (c, \mathcal S[r])\;\}$}
% \EndFor
\For{$w \in c.\texttt{writes}$}
\State{$\mathcal S \leftarrow \mathcal S \cup \{\;w \mapsto c\;\}$}
\EndFor
\EndFor
\end{algorithmic}
\end{algorithm}
\paragraph{Scheduling}
Scheduling the workflow requires converting the provided sequence of cells into a partial order.
\Cref{alg:buildDag} accomplishes this by simulating sequential execution of the workflow to create a dependency DAG.
The algorithm maintains a virtual scope ($\mathcal S$), a mapping from artifact identifiers to the most recent cell to write the artifact (lines 5-6).
Each artifact read creates an edge from the reading cell to the cell that wrote the artifact read.
Once the dependency DAG is available, scheduling proceeds as expected.
All cells with no out-edges in the DAG are scheduled initially.
When a cell $c$ finishes running, it is marked as done and all cells with an out-edge to $c$ are inspected to see if they are runnable.
If all of a cell's out-edges are to cells marked done, the cell is marked as runnable and scheduled.
This process continues until all cells are marked done.
\paragraph{Prediction Error}
When the workflow is first loaded, \systemname relies on predicted read-and write-sets for each cell.
As we show, our best-effort dependency predictor has high accuracy.
However, when it fails, we need to be able to recover the workflow to a sane state.
We assume that cells are idempotent, as is typically the case for our target workloads of ETL and model-fitting\footnote{
Users can still explicitly mark cells as being non-idempotent (e.g., for a cell that deploys a model into production).
Such cells are conservatively executed serially by blocking on all preceding cels.
}.
There are four possibilities for prediction inaccuracies:
Cell execution completes without reading an artifact that was in the predicted read-set (overprediction),
the cell reads an artifact not in the predicted read-set (underprediction),
and same two possibilities for the write-set.
Overprediction, whether for reads or writes, may limit opportuniites for parallel execution.
However, it does not pose a correctness problem, and by the time it is discovered the cell will have already finished running.
The primary challenge is thus coping with reads and writes that are not in the predicted read- or write-set, respectively.
In either case, the change may add or remove edges to/from the dependency DAG, requiring re-execution of running or even already completed cells.
To streamline updates to the dependency DAG, \systemname caches the virtual input scope ($\mathcal S$) computed for each cell by \Cref{alg:buildDag} (denote by $\mathcal S_c$ the scope emitted by the cell prior to $c$).
When cell $c$ performs an underpredicted read for identifier $r$, an edge $(c, \mathcal S_c[r])$.
There are now two possibilities: either cell $\mathcal S_c[r]$ is marked done or it is not.
In the former case, the dependency has already been satisfied and execution may proceed unimpeded.
In the latter case, execution of cell $c$ is immediately paused until $S_c[r]$ has completed.
\begin{algorithm}
\caption{\texttt{update\_dag}$(\mathcal N, c_0, w)$}
\label{alg:updateDag}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell performing an unpredicted write}
\Require{$w$: The identifier of the unpredicted write}
\Ensure{$\forall c \in \mathcal N : \mathcal S_c$: Cached input scopes}
\Ensure{$\mathcal G$: An edge-list for the cell dependency DAG}
\For{$c \in \mathcal N$ s.t. $c > c_0$}
\State{$\mathcal S_c \leftarrow \mathcal S_c - \{\;w \rightarrow *\;\} \cup \{\;w \rightarrow c_0\;\}$}
\If{$w \in c.\texttt{reads}$}
\If{$c.\texttt{done} \vee c.\texttt{running}$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndIf
\State{$\mathcal G \leftarrow \mathcal G - \{\;c \rightarrow *\;\} \cup \{\; (c, \mathcal S[r])\;|\;r \in c.\texttt{reads}\;\}$}
\EndIf
\If{$w \in c.\texttt{writes}$}
\Return
\EndIf
\EndFor
\end{algorithmic}
\end{algorithm}
\begin{algorithm}
\caption{\texttt{abort}$(\mathcal N, c_0)$}
\label{alg:abort}
\begin{algorithmic}[1]
\Require{$\mathcal N$: A sequence of cells}
\Require{$c_0 \in \mathcal N$: A cell to abort}
\If{$c_0$.\texttt{running}}
\State{Abort $c_0$'s execution}
\ElsIf{$c_0$.\texttt{done}}
\State{Clear $c_0$'s results}
\For{$c$ s.t. $(c_0, c) \in \mathcal G$}
\State{\texttt{abort}$(\mathcal N, c)$}
\EndFor
\EndIf
\end{algorithmic}
\end{algorithm}
An unpredicted write from cell $c$ on identifier $r$ may redirect edges in the dependency DAG from another cell to $c$.
The process is summarized in \Cref{alg:updateDag}, which iterates through every cell following $c_0$ in order.
The iteration stops at the first cell (if one exists) to overwrite $w$ (line 7).
The cached scope for each cell is updated (line 2), removing prior writes to $w$ and adding $c_0$'s write.
If another cell reads the unpredicted $w$, the dependency DAG must be updated.
All out-edges from the cell are removed and recomputed (lines 6).
If the cell was already running or has already completed (lines 4-5), its execution is stopped, results are cleared, and any running dependencies are recursively aborted (\Cref{alg:abort}).