diff --git a/src/teaching/cse-562/2021sp/checkpoint3.erb b/src/teaching/cse-562/2021sp/checkpoint3.erb new file mode 100644 index 00000000..0cf9df2e --- /dev/null +++ b/src/teaching/cse-562/2021sp/checkpoint3.erb @@ -0,0 +1,330 @@ +--- +title: "CSE-4/562 Database Systems: Checkpoint 3" +--- + +<% +def console(str) + return "#{str}" +end + +prompt = console("$>\\n") +%> + +
+

Checkpoint 3

+ +

+ In this project, you'll extend your SQL runtime to support aggregate queries. It is worth 8 points. +

+ +

Requirements

+ + + +

Grading Rubric

+ +All tests will be run on dedicated hardware equipped with an Intel(R) Core(TM) i5-3210M CPU @ 2.50GHz with a standard 5400 RPM HDD. Queries will have a per-query timeout as listed below. Grading will be based on total runtime for each batch of queries. + +
+
8 randomly-generated queries based on the TPC-H benchmark, with a scale factor of 0.1 (100MB) and templates listed below
+
Under 80 seconds total: 8 of 8 points + leaderboard ranking
+
Under 150 seconds total: 8 of 8 points
+
Under 5 seconds total: 4 of 8 points
+
Under 1 minute per query: 2 of 8 points
+
+ +Note in particular that these queries make extensive use of aggregates, equi-joins, order-by, and limit clauses, which will all need to be supported. + +

UnresolvedFunction

+ +

+ When the Spark SQL Parser encounters something that looks like a function, it doesn't try to interpret it directly. Instead, it'll produce a UnresolvedFunction expression node. You'll need to replace these. +

+ +

+ Like most databases, Spark maintains a "Function Registry", a catalog of all functions and their implementations. All of the "built-in" functions are provided in FunctionRegistry.builtin. Here's a little snippet you can use to replace functions. It doesn't support everything, but will be sufficient for this project. +

+ +
+case UnresolvedFunction(name, arguments, isDistinct, filter, ignoreNulls) =>
+  {
+    val builder = 
+      FunctionRegistry.builtin
+        .lookupFunctionBuilder(name)
+        .getOrElse {
+          throw new RuntimeException(
+            s"Unable to resolve function `${name}`"
+          )
+        }
+    builder(arguments) // returns the replacement expression node.
+  }
+
+ +

+ FunctionRegistry.lookupFunctionBuilder returns a 'builder' function. When called on the arguments of the UnresolvedFunction, the builder function returns an expression that implements the function. For example, looking up "regexp_extract" in the registry returns a function that, when called on two string-typed expressions and a literal integer, will return a RegExpExtract object. +

+ +

Aggregates disguised as Projects

+ +

+ Because the Spark SQL Parser doesn't try to resolve functions, it is incapable of distinguishing between normal functions: +

+  SELECT regexp_extract(A, "a(b+)a", 1) FROM R
+
+and aggregate functions: +
+  SELECT sum(A) FROM R
+
+ Both of these will parse into a LogicalPlan topped with a Project node. +

+ +

+ While not required, you might find it easier to work with the resulting plans if you replace them with actual Aggregate plan nodes. Look for Project nodes with any expression in its projectList that is a subclass of AggregateFunction. +

+ +

Aggregate

+ +An Aggregate is a logical plan node with three fields: +
+
groupingExpressions
+
The GROUP BY attributes. Normally these can be any expression, but for this checkpoint, it will be sufficient to assume that all of these expressions are Attributes
+ +
aggregateExpressions
+
The SELECT expressions. Normally these can be arbitrary arithmetic over aggregates, but for this checkpoint, it will be sufficient to assume that all of these expressions are either an Alias of an AggregateFunction, or an Attribute that also appears in the groupingExpressions field.
+ +
child
+
The input operator.
+
+ + +

AggregateFunctions

+ +

+ AggregateFunctions are unevaluable, because they don't get evaluated on a single row. Instead, there are several methods on an AggregateFunction that describe how to initialize an accumulator (what Spark calls an AggregationBuffer), how to incorporate input rows into it, and how to extract a final result value from the buffer. +

+ +

+ The AggregateFunction can be an instance of either: +

+ For the purposes of this checkpoint, you will need to support SUM, COUNT, AVERAGE, MIN, and MAX, all of which are implemented in Spark as DeclarativeAggregates. +

+ +

DeclarativeAggregates

+

The following methods are relevant:

+
+
aggBufferAttributes
+
The "schema" of the aggregate buffer. Note that these are attributes, and their ExprIds here line up with the Attributes used in the expressions below.
+ +
initialValues
+
A sequence of expressions, one for every attribute in aggBufferAttributes. These are the initial values for the buffer.
+ +
updateExpressions
+
A sequence of expressions, one for every attribute in aggBufferAttributes. Evaluate these expressions on an InternalRow that includes both the aggBufferAttributes and the .output of the Aggregate's child LogicalPlan operator.
+ +
evaluateExpression
+
An expression that, if evaluated on an InternalRow storing the aggregation buffer, will return the result of the aggregate function.
+
+ + +

Example Queries

+ +

TPC-H is a standard database benchmark. The benchmark consists of a dataset generator and 22 standard query templates. This checkpoint uses three queries based on TPC-H Queries 1, 3, 5, 6, 10, 11, 12, and 14. The dataset generator and template values can be found at the TPC-H website, and is run at scaling factor (SF) 0.1. Minor variations in the queries may be made. The queries have been rewritten slightly to make them easier to Analyze.

+ +
Query 1
+
+SELECT
+  LINEITEM.RETURNFLAG,
+  LINEITEM.LINESTATUS,
+  SUM(LINEITEM.QUANTITY) AS SUM_QTY,
+  SUM(LINEITEM.EXTENDEDPRICE) AS SUM_BASE_PRICE, 
+  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS SUM_DISC_PRICE, 
+  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)*(CAST(1.0 as float)+LINEITEM.TAX)) AS SUM_CHARGE, 
+  AVG(LINEITEM.QUANTITY) AS AVG_QTY,
+  AVG(LINEITEM.EXTENDEDPRICE) AS AVG_PRICE,
+  AVG(LINEITEM.DISCOUNT) AS AVG_DISC,
+  COUNT(*) AS COUNT_ORDER
+FROM
+  LINEITEM
+WHERE
+  LINEITEM.SHIPDATE <= DATE '1998-10-01'
+GROUP BY 
+  LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS 
+ORDER BY
+  LINEITEM.RETURNFLAG, LINEITEM.LINESTATUS
+
+ +
Query 3
+
+SELECT
+  LINEITEM.ORDERKEY,
+  SUM(LINEITEM.EXTENDEDPRICE*(CAST(1.0 as float)-LINEITEM.DISCOUNT)) AS REVENUE, 
+  ORDERS.ORDERDATE,
+  ORDERS.SHIPPRIORITY
+FROM
+  CUSTOMER,
+  ORDERS,
+  LINEITEM 
+WHERE
+  CUSTOMER.MKTSEGMENT = 'BUILDING' AND CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
+  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY 
+  AND ORDERS.ORDERDATE < DATE '1995-03-15'
+  AND LINEITEM.SHIPDATE > DATE '1995-03-15'
+GROUP BY LINEITEM.ORDERKEY, ORDERS.ORDERDATE, ORDERS.SHIPPRIORITY 
+ORDER BY REVENUE DESC, ORDERDATE
+LIMIT 10
+
+ +
Query 5
+
+SELECT
+  NATION.NAME,
+  SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE 
+FROM
+  REGION, NATION, CUSTOMER, ORDERS, LINEITEM, SUPPLIER
+WHERE
+  CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
+  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY
+  AND LINEITEM.SUPPKEY = SUPPLIER.SUPPKEY
+  AND CUSTOMER.NATIONKEY = NATION.NATIONKEY 
+  AND SUPPLIER.NATIONKEY = NATION.NATIONKEY
+  AND NATION.REGIONKEY = REGION.REGIONKEY
+  AND REGION.NAME = 'ASIA'
+  AND ORDERS.ORDERDATE >= DATE '1994-01-01'
+  AND ORDERS.ORDERDATE < DATE '1995-01-01'
+GROUP BY NATION.NAME
+ORDER BY REVENUE DESC
+
+ +
Query 6
+
+SELECT
+  SUM(LINEITEM.EXTENDEDPRICE*LINEITEM.DISCOUNT) AS REVENUE
+FROM LINEITEM
+WHERE LINEITEM.SHIPDATE >= DATE '1994-01-01'
+  AND LINEITEM.SHIPDATE < DATE '1995-01-01'
+  AND LINEITEM.DISCOUNT > CAST(0.05 AS float) AND LINEITEM.DISCOUNT < CAST(0.07 as float)
+  AND LINEITEM.QUANTITY < CAST(24 AS float)
+
+ +
Query 10
+
+SELECT 
+  CUSTOMER.CUSTKEY, 
+  SUM(LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)) AS REVENUE, 
+  CUSTOMER.ACCTBAL, 
+  NATION.NAME, 
+  CUSTOMER.ADDRESS, 
+  CUSTOMER.PHONE, 
+  CUSTOMER.COMMENT
+FROM 
+  CUSTOMER, ORDERS, LINEITEM, NATION
+WHERE
+  CUSTOMER.CUSTKEY = ORDERS.CUSTKEY
+  AND LINEITEM.ORDERKEY = ORDERS.ORDERKEY
+  AND ORDERS.ORDERDATE >= DATE '1993-10-01'
+  AND ORDERS.ORDERDATE < DATE '1994-01-01'
+  AND LINEITEM.RETURNFLAG = 'R'
+  AND CUSTOMER.NATIONKEY = NATION.NATIONKEY
+GROUP BY 
+  CUSTOMER.CUSTKEY, CUSTOMER.ACCTBAL, CUSTOMER.PHONE, NATION.NAME, CUSTOMER.ADDRESS, CUSTOMER.COMMENT
+ORDER BY REVENUE ASC
+LIMIT 20
+
+ +
Query 11
+
+SELECT PK_V.PARTKEY, 
+       PK_V.VALUE
+FROM (
+  SELECT PS.PARTKEY,
+         SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
+  FROM PARTSUPP PS,
+       SUPPLIER S,
+       NATION N
+  WHERE PS.SUPPKEY = S.SUPPKEY
+    AND S.NATIONKEY = N.NATIONKEY
+    AND N.NAME = 'GERMANY'
+  GROUP BY PS.PARTKEY 
+) PK_V, (
+  SELECT SUM(PS.SUPPLYCOST * CAST(PS.AVAILQTY AS float)) AS VALUE
+  FROM PARTSUPP PS,
+       SUPPLIER S,
+       NATION N
+  WHERE PS.SUPPKEY = S.SUPPKEY
+    AND S.NATIONKEY = N.NATIONKEY
+    AND N.NAME = 'GERMANY'
+) CUTOFF_V
+WHERE PK_V.VALUE > (CUTOFF_V.VALUE * CAST(0.0001 AS double) / CAST(100.0 AS double))
+ORDER BY PK_V.VALUE DESC
+
+ +
Query 12
+
+SELECT  LINEITEM.SHIPMODE, 
+        SUM(CASE WHEN ORDERS.ORDERPRIORITY = '1-URGENT'
+                     OR ORDERS.ORDERPRIORITY = '2-HIGH'
+                   THEN 1
+                   ELSE 0 END) AS HIGH_LINE_COUNT,
+        SUM(CASE WHEN ORDERS.ORDERPRIORITY <> '1-URGENT'
+                     AND ORDERS.ORDERPRIORITY <> '2-HIGH'
+                   THEN 1
+                   ELSE 0 END) AS LOW_LINE_COUNT
+FROM LINEITEM, ORDERS
+WHERE ORDERS.ORDERKEY = LINEITEM.ORDERKEY
+  AND (LINEITEM.SHIPMODE='MAIL' OR LINEITEM.SHIPMODE='SHIP')
+  AND LINEITEM.COMMITDATE < LINEITEM.RECEIPTDATE
+  AND LINEITEM.SHIPDATE < LINEITEM.COMMITDATE
+  AND LINEITEM.RECEIPTDATE >= DATE '1994-01-01'
+  AND LINEITEM.RECEIPTDATE < DATE '1995-01-01'
+GROUP BY LINEITEM.SHIPMODE
+ORDER BY LINEITEM.SHIPMODE
+
+ +
Query 14
+
+SELECT
+  CAST(100.00 AS double) 
+    * PROMO_ONLY 
+    / ALL_REVENUE
+      AS PROMO_REVENUE
+FROM (
+  SELECT
+    SUM(
+      CASE  WHEN PART.TYPE LIKE 'PROMO%'
+            THEN LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
+            ELSE cast(0 as float)
+      END
+    ) AS PROMO_ONLY,
+    SUM(
+      LINEITEM.EXTENDEDPRICE * (CAST(1.0 as float) - LINEITEM.DISCOUNT)
+    ) AS ALL_REVENUE
+  FROM 
+    LINEITEM,
+    PART
+  WHERE
+    LINEITEM.PARTKEY = PART.PARTKEY
+    AND LINEITEM.SHIPDATE >= DATE '1995-09-01'
+    AND LINEITEM.SHIPDATE < DATE '1995-10-01'
+) AGGREGATE
+
+ +
\ No newline at end of file diff --git a/src/teaching/cse-562/2021sp/index.erb b/src/teaching/cse-562/2021sp/index.erb index 3d63ef24..d5876d46 100644 --- a/src/teaching/cse-562/2021sp/index.erb +++ b/src/teaching/cse-562/2021sp/index.erb @@ -117,8 +117,9 @@ schedule: - date: "Apr. 20" topic: "Indexing Review + Checkpoint 4" - date: "Apr. 22" - due: "Checkpoint 3" topic: "Logging + Recovery" + - date: "Apr. 26" + due: "Checkpoint 3" - date: "Apr. 27" topic: "Distributed Commit" - date: "Apr. 29" @@ -187,8 +188,8 @@ In this course, you will learn...