![]() |
+
Classical DB | +
These Problems | +Classical DB | +|
---|---|---|
Expressive Queries | +Expressive Queries | +✔ | +
Changing Data | +Static Data | +🗶 | +
Static Queries | +Ad-Hoc Queries | +✔ | +
Latency: Msec | +Latency: Sec/Min | +🗶 | +
![]() 🗶
+ |
+ |
Classical DB | +Publish/Subscribe | +
These Problems | +Pub/Sub | +|
---|---|---|
Expressive Queries | +Filter Queries | +🗶 | +
Changing Data | +Changing Data | +✔ | +
Static Queries | +Static Queries | +✔ | +
Latency: Msec | +Latency: Msec | +✔ | +
↕
+↕
+↕
+Start with something familiar
+
+ SELECT A, B, C, ...
+ FROM [Query]
+
+
+ Emit tuples emitted by [Query] with only columns A, B, C
+
+ FILTER { [Condition] } [Query]
+
+ Emit only tuples emitted by [Query] that pass [Condition]
+
+ [Query1] UNION [Query2]
+
+ Emit any tuples emitted by either [Query1] or [Query2]
+How to fix?
+
+ [Query1] NEXT { [Condition] } [Query2]
+
+ + Blocking operators are not ok. Need semantics that allow tuples to be emitted sooner. +
+ +
+ [Query1] FOLD { [Condition1], [Condition2], [Agg] } [Query2]
+
+ Analogous to...
+
+ [Query1] NEXT { [Condition1] } [Query2]
+ NEXT { [Condition1] } [Query2]
+ NEXT { [Condition1] } [Query2]
+ NEXT { [Condition1] } [Query2]
+ ... until [Condition2] is failed
+
+ Model a program by a directed graph
+ +The program accepts an input: A string.
+/Hi+!/ ↣ + "Hi!" + "OHiiiiii!" + "Ha!"
+... but what if we don't know which edge to take?
+The program state is a set of active states
+ +/Ha?i!+/ ↣ + "Hi!" + "OHai!" + "HaHai!" + "HiHaH!" +
+Letter | +Start | +$S_1$ | +$S_2$ | +$S_3$ | +End | +
---|---|---|---|---|---|
+ | ✓ | ++ | + | + | + |
H | +✓ | +✓ | ++ | + | + |
i | +✓ | ++ | ✓ | ++ | + |
H | +✓ | +✓ | ++ | + | + |
a | +✓ | ++ | + | ✓ | ++ |
H | +✓ | +✓ | +✓ | ++ | + |
! | +✓ | ++ | + | + | ✓ | +
NDFAs can be compiled down to DFAs
+Each node of the NDFA is a relation.
+Each transition of the NDFA is a join condition + projection
+
+ SELECT Name, MaxPrice, MinPrice, Price as FinalPrice
+ -- Only consider aggregates spanning 10 minutes or more
+ FROM FILTER { dur ≥ 10 min } (
+ (
+ -- Trigger aggregate when a Stock w/ Volume > 10000 sells
+ SELECT Name, Price_1 AS MaxPrice, Price as MinPrice
+ FROM Filter { Volume > 10000 } Stock
+ ) FOLD {
+ $2.Name = $.Name, -- Grouping Condition
+ $2.Price < $.Price -- Continue Condition
+ } Stock -- Fold over any stock
+ ) NEXT {
+ -- Find the next upturn after a 10 minute descending run
+ $2.Name = $1.Name AND $2.Price > 1.05 * $1.MinPrice
+ } Stock
+
+
+ CREATE TABLE A(
+ Name_l STRING, -- From LHS
+ MaxPrice DECIMAL, -- From LHS
+ MinPrice DECIMAL, -- From LHS
+ Name_r STRING, -- From RHS
+ Price Decimal, -- From RHS
+ Start Int, -- From LHS
+ End Int -- From RHS
+ )
+
+
+
+ CREATE TABLE B(
+ Name STRING,
+ MaxPrice DECIMAL,
+ MinPrice DECIMAL,
+ Price Decimal,
+ Start Int,
+ End Int
+ )
+
+ Name | Price | Valuation | Time |
---|
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 90 | +
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
IBM | 85 | 7,000 | 9:15 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 85 | +
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
IBM | 85 | 7,000 | 9:15 |
Dell | 40 | 11,000 | 9:17 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 85 | +||||
Dell | 40 | Dell | 40 | +
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
IBM | 85 | 7,000 | 9:15 |
Dell | 40 | 11,000 | 9:17 |
IBM | 81 | 8,000 | 9:21 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 81 | +IBM | 90 | 81 | +|
Dell | 40 | Dell | 40 | +
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
IBM | 85 | 7,000 | 9:15 |
Dell | 40 | 11,000 | 9:17 |
IBM | 81 | 8,000 | 9:21 |
MSFT | 25 | 6,000 | 9:23 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 81 | +IBM | 90 | 81 | +|
Dell | 40 | Dell | 40 | +
Name | Price | Valuation | Time |
---|---|---|---|
IBM | 90 | 15,000 | 9:10 |
IBM | 85 | 7,000 | 9:15 |
Dell | 40 | 11,000 | 9:17 |
IBM | 81 | 8,000 | 9:21 |
MSFT | 25 | 6,000 | 9:23 |
IBM | 91 | 9,000 | 9:24 |
State A | State B | Emitted | |||||
Name_l | MinPrice | Name_r | Price | +Name | MinPrice | Price | +  | +
---|---|---|---|---|---|---|---|
IBM | 90 | IBM | 81 | ++ | IBM! | +||
Dell | 40 | Dell | 40 | +
Simple Solution: Add a delay to event processing to buffer for out-of-order arrival.
+Mostly Simple Solution: Parallel processing of one event to create a new state, swap in the new state, repeat.
+Not so Simple Solution: Add an epoch-based garbage collector to detect when an object falls out of scope.
+ +(Reference counting creates points of contention on every refcount update)
+Simple Solution: Index the states to make it easier to discover which states a new event interacts with.
+Simple Solution: Build a dictionary of strings (can be done asynchronously while the event is waiting to be processed).
+