[SPARK-31606][SQL] Reduce the perf regression of vectorized parquet reader caused by datetime rebase

### What changes were proposed in this pull request?

Push the rebase logic to the lower level of the parquet vectorized reader, to make the final code more vectorization-friendly.

### Why are the changes needed?

Parquet vectorized reader is carefully implemented, to make it more likely to be vectorized by the JVM. However, the newly added datetime rebase degrade the performance a lot, as it breaks vectorization, even if the datetime values don't need to rebase (this is very likely as dates before 1582 is rare).

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

Run part of the `DateTimeRebaseBenchmark` locally. The results:
before this patch
```
[info] Load dates from parquet:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] after 1582, vec on, rebase off                     2677           2838         142         37.4          26.8       1.0X
[info] after 1582, vec on, rebase on                      3828           4331         805         26.1          38.3       0.7X
[info] before 1582, vec on, rebase off                    2903           2926          34         34.4          29.0       0.9X
[info] before 1582, vec on, rebase on                     4163           4197          38         24.0          41.6       0.6X

[info] Load timestamps from parquet:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] after 1900, vec on, rebase off                     3537           3627         104         28.3          35.4       1.0X
[info] after 1900, vec on, rebase on                      6891           7010         105         14.5          68.9       0.5X
[info] before 1900, vec on, rebase off                    3692           3770          72         27.1          36.9       1.0X
[info] before 1900, vec on, rebase on                     7588           7610          30         13.2          75.9       0.5X
```

After this patch
```
[info] Load dates from parquet:                  Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] after 1582, vec on, rebase off                     2758           2944         197         36.3          27.6       1.0X
[info] after 1582, vec on, rebase on                      2908           2966          51         34.4          29.1       0.9X
[info] before 1582, vec on, rebase off                    2840           2878          37         35.2          28.4       1.0X
[info] before 1582, vec on, rebase on                     3407           3433          24         29.4          34.1       0.8X

[info] Load timestamps from parquet:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
[info] ------------------------------------------------------------------------------------------------------------------------
[info] after 1900, vec on, rebase off                     3861           4003         139         25.9          38.6       1.0X
[info] after 1900, vec on, rebase on                      4194           4283          77         23.8          41.9       0.9X
[info] before 1900, vec on, rebase off                    3849           3937          79         26.0          38.5       1.0X
[info] before 1900, vec on, rebase on                     7512           7546          55         13.3          75.1       0.5X
```

Date type is 30% faster if the values don't need to rebase, 20% faster if need to rebase.
Timestamp type is 60% faster if the values don't need to rebase, no difference if need to rebase.

Closes #28406 from cloud-fan/perf.

Lead-authored-by: Wenchen Fan <wenchen@databricks.com>
Co-authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Wenchen Fan 2020-05-04 15:30:10 +09:00 committed by HyukjinKwon
parent 7ef0b69a92
commit f72220b8ab
8 changed files with 295 additions and 160 deletions

View file

@ -71,6 +71,8 @@ object RebaseDateTime {
-719164, -682945, -646420, -609895, -536845, -500320, -463795,
-390745, -354220, -317695, -244645, -208120, -171595, -141427)
final val lastSwitchJulianDay: Int = julianGregDiffSwitchDay.last
// The first days of Common Era (CE) which is mapped to the '0001-01-01' date in Julian calendar.
private final val julianCommonEraStartDay = julianGregDiffSwitchDay(0)
@ -416,6 +418,8 @@ object RebaseDateTime {
// in the interval: [julianGregDiffSwitchMicros(i), julianGregDiffSwitchMicros(i+1))
private val julianGregRebaseMap = loadRebaseRecords("julian-gregorian-rebase-micros.json")
final val lastSwitchJulianTs: Long = julianGregRebaseMap.values.map(_.switches.last).max
/**
* An optimized version of [[rebaseJulianToGregorianMicros(ZoneId, Long)]]. This method leverages
* the pre-calculated rebasing maps to save calculation. If the rebasing map doesn't contain

View file

@ -2,93 +2,93 @@
Rebasing dates/timestamps in Parquet datasource
================================================================================================
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 21171 21171 0 4.7 211.7 1.0X
before 1582, noop 11036 11036 0 9.1 110.4 1.9X
after 1582, rebase off 34321 34321 0 2.9 343.2 0.6X
after 1582, rebase on 33269 33269 0 3.0 332.7 0.6X
before 1582, rebase off 22016 22016 0 4.5 220.2 1.0X
before 1582, rebase on 23338 23338 0 4.3 233.4 0.9X
after 1582, noop 20073 20073 0 5.0 200.7 1.0X
before 1582, noop 10985 10985 0 9.1 109.9 1.8X
after 1582, rebase off 32245 32245 0 3.1 322.4 0.6X
after 1582, rebase on 31434 31434 0 3.2 314.3 0.6X
before 1582, rebase off 21590 21590 0 4.6 215.9 0.9X
before 1582, rebase on 22963 22963 0 4.4 229.6 0.9X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off, rebase off 12791 13089 287 7.8 127.9 1.0X
after 1582, vec off, rebase on 13203 13271 81 7.6 132.0 1.0X
after 1582, vec on, rebase off 3709 3764 49 27.0 37.1 3.4X
after 1582, vec on, rebase on 5082 5114 29 19.7 50.8 2.5X
before 1582, vec off, rebase off 13059 13153 87 7.7 130.6 1.0X
before 1582, vec off, rebase on 14211 14236 27 7.0 142.1 0.9X
before 1582, vec on, rebase off 3687 3749 72 27.1 36.9 3.5X
before 1582, vec on, rebase on 5449 5497 56 18.4 54.5 2.3X
after 1582, vec off, rebase off 12815 12858 40 7.8 128.1 1.0X
after 1582, vec off, rebase on 13030 13167 148 7.7 130.3 1.0X
after 1582, vec on, rebase off 3705 3712 6 27.0 37.1 3.5X
after 1582, vec on, rebase on 3788 3791 3 26.4 37.9 3.4X
before 1582, vec off, rebase off 12873 12943 61 7.8 128.7 1.0X
before 1582, vec off, rebase on 14072 14165 80 7.1 140.7 0.9X
before 1582, vec on, rebase off 3694 3708 15 27.1 36.9 3.5X
before 1582, vec on, rebase on 4403 4484 81 22.7 44.0 2.9X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 2831 2831 0 35.3 28.3 1.0X
before 1582, noop 2816 2816 0 35.5 28.2 1.0X
after 1582, rebase off 15543 15543 0 6.4 155.4 0.2X
after 1582, rebase on 18391 18391 0 5.4 183.9 0.2X
before 1582, rebase off 15747 15747 0 6.4 157.5 0.2X
before 1582, rebase on 18846 18846 0 5.3 188.5 0.2X
after 1900, noop 3032 3032 0 33.0 30.3 1.0X
before 1900, noop 3043 3043 0 32.9 30.4 1.0X
after 1900, rebase off 15634 15634 0 6.4 156.3 0.2X
after 1900, rebase on 18233 18233 0 5.5 182.3 0.2X
before 1900, rebase off 15820 15820 0 6.3 158.2 0.2X
before 1900, rebase on 19921 19921 0 5.0 199.2 0.2X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off, rebase off 16126 16216 78 6.2 161.3 1.0X
after 1582, vec off, rebase on 18277 18453 165 5.5 182.8 0.9X
after 1582, vec on, rebase off 5030 5067 42 19.9 50.3 3.2X
after 1582, vec on, rebase on 8553 8583 43 11.7 85.5 1.9X
before 1582, vec off, rebase off 15828 15872 39 6.3 158.3 1.0X
before 1582, vec off, rebase on 18899 18959 103 5.3 189.0 0.9X
before 1582, vec on, rebase off 4961 5009 43 20.2 49.6 3.3X
before 1582, vec on, rebase on 9099 9140 40 11.0 91.0 1.8X
after 1900, vec off, rebase off 14987 15008 18 6.7 149.9 1.0X
after 1900, vec off, rebase on 17500 17628 210 5.7 175.0 0.9X
after 1900, vec on, rebase off 5030 5036 7 19.9 50.3 3.0X
after 1900, vec on, rebase on 5066 5109 44 19.7 50.7 3.0X
before 1900, vec off, rebase off 15094 15213 121 6.6 150.9 1.0X
before 1900, vec off, rebase on 18098 18175 101 5.5 181.0 0.8X
before 1900, vec on, rebase off 5008 5012 4 20.0 50.1 3.0X
before 1900, vec on, rebase on 8803 8848 55 11.4 88.0 1.7X
================================================================================================
Rebasing dates/timestamps in ORC datasource
================================================================================================
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 21026 21026 0 4.8 210.3 1.0X
before 1582, noop 11040 11040 0 9.1 110.4 1.9X
after 1582 28171 28171 0 3.5 281.7 0.7X
before 1582 18955 18955 0 5.3 189.5 1.1X
after 1582, noop 19593 19593 0 5.1 195.9 1.0X
before 1582, noop 10581 10581 0 9.5 105.8 1.9X
after 1582 27843 27843 0 3.6 278.4 0.7X
before 1582 19435 19435 0 5.1 194.4 1.0X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off 10876 10931 49 9.2 108.8 1.0X
after 1582, vec on 3900 3913 20 25.6 39.0 2.8X
before 1582, vec off 11165 11174 12 9.0 111.6 1.0X
before 1582, vec on 4208 4214 7 23.8 42.1 2.6X
after 1582, vec off 10395 10507 119 9.6 103.9 1.0X
after 1582, vec on 3921 3945 22 25.5 39.2 2.7X
before 1582, vec off 10762 10860 127 9.3 107.6 1.0X
before 1582, vec on 4194 4226 41 23.8 41.9 2.5X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 2924 2924 0 34.2 29.2 1.0X
before 1582, noop 2820 2820 0 35.5 28.2 1.0X
after 1582 22228 22228 0 4.5 222.3 0.1X
before 1582 22590 22590 0 4.4 225.9 0.1X
after 1900, noop 3003 3003 0 33.3 30.0 1.0X
before 1900, noop 3016 3016 0 33.2 30.2 1.0X
after 1900 21804 21804 0 4.6 218.0 0.1X
before 1900 23920 23920 0 4.2 239.2 0.1X
OpenJDK 64-Bit Server VM 11.0.6+10-post-Ubuntu-1ubuntu118.04.1 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 11.0.7+10-post-Ubuntu-2ubuntu218.04 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off 13591 13658 59 7.4 135.9 1.0X
after 1582, vec on 7399 7488 126 13.5 74.0 1.8X
before 1582, vec off 14065 14096 30 7.1 140.7 1.0X
before 1582, vec on 7950 8127 249 12.6 79.5 1.7X
after 1900, vec off 14112 14128 17 7.1 141.1 1.0X
after 1900, vec on 7347 7459 134 13.6 73.5 1.9X
before 1900, vec off 15170 15192 27 6.6 151.7 0.9X
before 1900, vec on 8280 8312 52 12.1 82.8 1.7X

View file

@ -2,93 +2,93 @@
Rebasing dates/timestamps in Parquet datasource
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save dates to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 24114 24114 0 4.1 241.1 1.0X
before 1582, noop 10250 10250 0 9.8 102.5 2.4X
after 1582, rebase off 36672 36672 0 2.7 366.7 0.7X
after 1582, rebase on 37123 37123 0 2.7 371.2 0.6X
before 1582, rebase off 21925 21925 0 4.6 219.2 1.1X
before 1582, rebase on 22341 22341 0 4.5 223.4 1.1X
after 1582, noop 23088 23088 0 4.3 230.9 1.0X
before 1582, noop 10782 10782 0 9.3 107.8 2.1X
after 1582, rebase off 34821 34821 0 2.9 348.2 0.7X
after 1582, rebase on 35040 35040 0 2.9 350.4 0.7X
before 1582, rebase off 22151 22151 0 4.5 221.5 1.0X
before 1582, rebase on 24677 24677 0 4.1 246.8 0.9X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load dates from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off, rebase off 12456 12601 126 8.0 124.6 1.0X
after 1582, vec off, rebase on 13299 13336 32 7.5 133.0 0.9X
after 1582, vec on, rebase off 3623 3660 40 27.6 36.2 3.4X
after 1582, vec on, rebase on 5160 5177 15 19.4 51.6 2.4X
before 1582, vec off, rebase off 13177 13264 76 7.6 131.8 0.9X
before 1582, vec off, rebase on 14102 14149 46 7.1 141.0 0.9X
before 1582, vec on, rebase off 3649 3670 34 27.4 36.5 3.4X
before 1582, vec on, rebase on 5652 5667 15 17.7 56.5 2.2X
after 1582, vec off, rebase off 13559 13650 79 7.4 135.6 1.0X
after 1582, vec off, rebase on 12942 12973 28 7.7 129.4 1.0X
after 1582, vec on, rebase off 3657 3689 29 27.3 36.6 3.7X
after 1582, vec on, rebase on 3859 3902 53 25.9 38.6 3.5X
before 1582, vec off, rebase off 12588 12607 17 7.9 125.9 1.1X
before 1582, vec off, rebase on 13396 13420 25 7.5 134.0 1.0X
before 1582, vec on, rebase off 3631 3650 19 27.5 36.3 3.7X
before 1582, vec on, rebase on 4706 4755 77 21.3 47.1 2.9X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save timestamps to parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 2871 2871 0 34.8 28.7 1.0X
before 1582, noop 2753 2753 0 36.3 27.5 1.0X
after 1582, rebase off 15927 15927 0 6.3 159.3 0.2X
after 1582, rebase on 19138 19138 0 5.2 191.4 0.1X
before 1582, rebase off 16137 16137 0 6.2 161.4 0.2X
before 1582, rebase on 19584 19584 0 5.1 195.8 0.1X
after 1900, noop 2681 2681 0 37.3 26.8 1.0X
before 1900, noop 3051 3051 0 32.8 30.5 0.9X
after 1900, rebase off 16901 16901 0 5.9 169.0 0.2X
after 1900, rebase on 19725 19725 0 5.1 197.3 0.1X
before 1900, rebase off 16900 16900 0 5.9 169.0 0.2X
before 1900, rebase on 20381 20381 0 4.9 203.8 0.1X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load timestamps from parquet: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off, rebase off 14995 15047 47 6.7 150.0 1.0X
after 1582, vec off, rebase on 18111 18146 37 5.5 181.1 0.8X
after 1582, vec on, rebase off 4837 4873 44 20.7 48.4 3.1X
after 1582, vec on, rebase on 9542 9669 111 10.5 95.4 1.6X
before 1582, vec off, rebase off 14993 15090 94 6.7 149.9 1.0X
before 1582, vec off, rebase on 18675 18712 64 5.4 186.7 0.8X
before 1582, vec on, rebase off 4908 4923 15 20.4 49.1 3.1X
before 1582, vec on, rebase on 10128 10148 19 9.9 101.3 1.5X
after 1900, vec off, rebase off 15236 15291 62 6.6 152.4 1.0X
after 1900, vec off, rebase on 17832 18047 187 5.6 178.3 0.9X
after 1900, vec on, rebase off 4875 4901 31 20.5 48.7 3.1X
after 1900, vec on, rebase on 5354 5386 37 18.7 53.5 2.8X
before 1900, vec off, rebase off 15229 15338 108 6.6 152.3 1.0X
before 1900, vec off, rebase on 18626 18668 44 5.4 186.3 0.8X
before 1900, vec on, rebase off 4968 4975 6 20.1 49.7 3.1X
before 1900, vec on, rebase on 9913 9932 16 10.1 99.1 1.5X
================================================================================================
Rebasing dates/timestamps in ORC datasource
================================================================================================
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save dates to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 23977 23977 0 4.2 239.8 1.0X
before 1582, noop 10094 10094 0 9.9 100.9 2.4X
after 1582 33115 33115 0 3.0 331.2 0.7X
before 1582 19430 19430 0 5.1 194.3 1.2X
after 1582, noop 22942 22942 0 4.4 229.4 1.0X
before 1582, noop 11035 11035 0 9.1 110.4 2.1X
after 1582 31341 31341 0 3.2 313.4 0.7X
before 1582 20376 20376 0 4.9 203.8 1.1X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load dates from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off 10217 10241 21 9.8 102.2 1.0X
after 1582, vec on 3671 3691 31 27.2 36.7 2.8X
before 1582, vec off 10800 10874 114 9.3 108.0 0.9X
before 1582, vec on 4118 4165 74 24.3 41.2 2.5X
after 1582, vec off 10361 10378 29 9.7 103.6 1.0X
after 1582, vec on 3820 3828 11 26.2 38.2 2.7X
before 1582, vec off 10709 10720 13 9.3 107.1 1.0X
before 1582, vec on 4136 4153 15 24.2 41.4 2.5X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Save timestamps to ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, noop 2691 2691 0 37.2 26.9 1.0X
before 1582, noop 2743 2743 0 36.5 27.4 1.0X
after 1582 21409 21409 0 4.7 214.1 0.1X
before 1582 22554 22554 0 4.4 225.5 0.1X
after 1900, noop 2888 2888 0 34.6 28.9 1.0X
before 1900, noop 2823 2823 0 35.4 28.2 1.0X
after 1900 19790 19790 0 5.1 197.9 0.1X
before 1900 20774 20774 0 4.8 207.7 0.1X
OpenJDK 64-Bit Server VM 1.8.0_242-8u242-b08-0ubuntu3~18.04-b08 on Linux 4.15.0-1063-aws
OpenJDK 64-Bit Server VM 1.8.0_252-8u252-b09-1~18.04-b09 on Linux 4.15.0-1063-aws
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
Load timestamps from ORC: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
------------------------------------------------------------------------------------------------------------------------
after 1582, vec off 14752 14855 103 6.8 147.5 1.0X
after 1582, vec on 8146 8185 34 12.3 81.5 1.8X
before 1582, vec off 15247 15294 46 6.6 152.5 1.0X
before 1582, vec on 8414 8466 52 11.9 84.1 1.8X
after 1900, vec off 14649 14687 38 6.8 146.5 1.0X
after 1900, vec on 7850 7937 130 12.7 78.5 1.9X
before 1900, vec off 15354 15417 108 6.5 153.5 1.0X
before 1900, vec on 8382 8408 22 11.9 83.8 1.7X

View file

@ -423,15 +423,8 @@ public class VectorizedColumnReader {
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (column.dataType() == DataTypes.DateType ) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putInt(
rowId + i,
RebaseDateTime.rebaseJulianToGregorianDays(dataColumn.readInteger()));
} else {
column.putNull(rowId + i);
}
}
defColumn.readIntegersWithRebase(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
defColumn.readIntegers(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
@ -449,15 +442,8 @@ public class VectorizedColumnReader {
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else if (originalType == OriginalType.TIMESTAMP_MICROS) {
if (rebaseDateTime) {
for (int i = 0; i < num; i++) {
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(
rowId + i,
RebaseDateTime.rebaseJulianToGregorianMicros(dataColumn.readLong()));
} else {
column.putNull(rowId + i);
}
}
defColumn.readLongsWithRebase(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
} else {
defColumn.readLongs(
num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);

View file

@ -22,6 +22,7 @@ import java.nio.ByteOrder;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import org.apache.parquet.column.values.ValuesReader;
@ -81,6 +82,33 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
}
}
// A fork of `readIntegers` to rebase the date values. For performance reasons, this method
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
// if rebase is not needed.
@Override
public final void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;
ByteBuffer buffer = getBuffer(requiredBytes);
boolean rebase = false;
for (int i = 0; i < total; i += 1) {
rebase |= buffer.getInt(buffer.position() + i * 4) < RebaseDateTime.lastSwitchJulianDay();
}
if (rebase) {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, RebaseDateTime.rebaseJulianToGregorianDays(buffer.getInt()));
}
} else {
if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putIntsLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putInt(rowId + i, buffer.getInt());
}
}
}
}
@Override
public final void readLongs(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 8;
@ -96,6 +124,33 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
}
}
// A fork of `readLongs` to rebase the timestamp values. For performance reasons, this method
// iterates the values twice: check if we need to rebase first, then go to the optimized branch
// if rebase is not needed.
@Override
public final void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 8;
ByteBuffer buffer = getBuffer(requiredBytes);
boolean rebase = false;
for (int i = 0; i < total; i += 1) {
rebase |= buffer.getLong(buffer.position() + i * 8) < RebaseDateTime.lastSwitchJulianTs();
}
if (rebase) {
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, RebaseDateTime.rebaseJulianToGregorianMicros(buffer.getLong()));
}
} else {
if (buffer.hasArray()) {
int offset = buffer.arrayOffset() + buffer.position();
c.putLongsLittleEndian(rowId, total, buffer.array(), offset);
} else {
for (int i = 0; i < total; i += 1) {
c.putLong(rowId + i, buffer.getLong());
}
}
}
}
@Override
public final void readFloats(int total, WritableColumnVector c, int rowId) {
int requiredBytes = total * 4;

View file

@ -26,6 +26,7 @@ import org.apache.parquet.column.values.bitpacking.Packer;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.spark.sql.catalyst.util.RebaseDateTime;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
import java.io.IOException;
@ -203,6 +204,43 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
// A fork of `readIntegers`, which rebases the date int value (days) before filling
// the Spark column vector.
public void readIntegersWithRebase(
int total,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readIntegersWithRebase(n, c, rowId);
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putInt(rowId + i,
RebaseDateTime.rebaseJulianToGregorianDays(data.readInteger()));
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}
// TODO: can this code duplication be removed without a perf penalty?
public void readBooleans(
int total,
@ -342,6 +380,43 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
// A fork of `readLongs`, which rebases the timestamp long value (microseconds) before filling
// the Spark column vector.
public void readLongsWithRebase(
int total,
WritableColumnVector c,
int rowId,
int level,
VectorizedValuesReader data) throws IOException {
int left = total;
while (left > 0) {
if (this.currentCount == 0) this.readNextGroup();
int n = Math.min(left, this.currentCount);
switch (mode) {
case RLE:
if (currentValue == level) {
data.readLongsWithRebase(n, c, rowId);
} else {
c.putNulls(rowId, n);
}
break;
case PACKED:
for (int i = 0; i < n; ++i) {
if (currentBuffer[currentBufferIdx++] == level) {
c.putLong(rowId + i,
RebaseDateTime.rebaseJulianToGregorianMicros(data.readLong()));
} else {
c.putNull(rowId + i);
}
}
break;
}
rowId += n;
left -= n;
currentCount -= n;
}
}
public void readFloats(
int total,
WritableColumnVector c,
@ -508,6 +583,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
}
}
@Override
public void readIntegersWithRebase(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}
@Override
public byte readByte() {
throw new UnsupportedOperationException("only readInts is valid.");
@ -523,6 +603,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
throw new UnsupportedOperationException("only readInts is valid.");
}
@Override
public void readLongsWithRebase(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");
}
@Override
public void readBinary(int total, WritableColumnVector c, int rowId) {
throw new UnsupportedOperationException("only readInts is valid.");

View file

@ -40,7 +40,9 @@ public interface VectorizedValuesReader {
void readBooleans(int total, WritableColumnVector c, int rowId);
void readBytes(int total, WritableColumnVector c, int rowId);
void readIntegers(int total, WritableColumnVector c, int rowId);
void readIntegersWithRebase(int total, WritableColumnVector c, int rowId);
void readLongs(int total, WritableColumnVector c, int rowId);
void readLongsWithRebase(int total, WritableColumnVector c, int rowId);
void readFloats(int total, WritableColumnVector c, int rowId);
void readDoubles(int total, WritableColumnVector c, int rowId);
void readBinary(int total, WritableColumnVector c, int rowId);

View file

@ -49,15 +49,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
.select($"seconds".cast("timestamp").as("ts"))
}
private def genTsAfter1582(cardinality: Int): DataFrame = {
val start = LocalDateTime.of(1582, 10, 15, 0, 0, 0)
private def genTsAfter1900(cardinality: Int): DataFrame = {
val start = LocalDateTime.of(1900, 1, 31, 0, 0, 0)
val end = LocalDateTime.of(3000, 1, 1, 0, 0, 0)
genTs(cardinality, start, end)
}
private def genTsBefore1582(cardinality: Int): DataFrame = {
private def genTsBefore1900(cardinality: Int): DataFrame = {
val start = LocalDateTime.of(10, 1, 1, 0, 0, 0)
val end = LocalDateTime.of(1580, 1, 1, 0, 0, 0)
val end = LocalDateTime.of(1900, 1, 1, 0, 0, 0)
genTs(cardinality, start, end)
}
@ -71,34 +71,35 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
}
private def genDateAfter1582(cardinality: Int): DataFrame = {
val start = LocalDate.of(1582, 10, 15)
val start = LocalDate.of(1582, 10, 31)
val end = LocalDate.of(3000, 1, 1)
genDate(cardinality, start, end)
}
private def genDateBefore1582(cardinality: Int): DataFrame = {
val start = LocalDate.of(10, 1, 1)
val end = LocalDate.of(1580, 1, 1)
val end = LocalDate.of(1580, 10, 1)
genDate(cardinality, start, end)
}
private def genDF(cardinality: Int, dateTime: String, after1582: Boolean): DataFrame = {
(dateTime, after1582) match {
private def genDF(cardinality: Int, dateTime: String, modernDates: Boolean): DataFrame = {
(dateTime, modernDates) match {
case ("date", true) => genDateAfter1582(cardinality)
case ("date", false) => genDateBefore1582(cardinality)
case ("timestamp", true) => genTsAfter1582(cardinality)
case ("timestamp", false) => genTsBefore1582(cardinality)
case ("timestamp", true) => genTsAfter1900(cardinality)
case ("timestamp", false) => genTsBefore1900(cardinality)
case _ => throw new IllegalArgumentException(
s"cardinality = $cardinality dateTime = $dateTime after1582 = $after1582")
s"cardinality = $cardinality dateTime = $dateTime modernDates = $modernDates")
}
}
private def benchmarkInputs(benchmark: Benchmark, rowsNum: Int, dateTime: String): Unit = {
benchmark.addCase("after 1582, noop", 1) { _ =>
genDF(rowsNum, dateTime, after1582 = true).noop()
val year = if (dateTime == "date") 1582 else 1900
benchmark.addCase(s"after $year, noop", 1) { _ =>
genDF(rowsNum, dateTime, modernDates = true).noop()
}
benchmark.addCase("before 1582, noop", 1) { _ =>
genDF(rowsNum, dateTime, after1582 = false).noop()
benchmark.addCase(s"before $year, noop", 1) { _ =>
genDF(rowsNum, dateTime, modernDates = false).noop()
}
}
@ -107,23 +108,26 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
}
private def caseName(
after1582: Boolean,
modernDates: Boolean,
dateTime: String,
rebase: Option[Boolean] = None,
vec: Option[Boolean] = None): String = {
val period = if (after1582) "after" else "before"
val period = if (modernDates) "after" else "before"
val year = if (dateTime == "date") 1582 else 1900
val vecFlag = vec.map(flagToStr).map(flag => s", vec $flag").getOrElse("")
val rebaseFlag = rebase.map(flagToStr).map(flag => s", rebase $flag").getOrElse("")
s"$period 1582$vecFlag$rebaseFlag"
s"$period $year$vecFlag$rebaseFlag"
}
private def getPath(
basePath: File,
dateTime: String,
after1582: Boolean,
modernDates: Boolean,
rebase: Option[Boolean] = None): String = {
val period = if (after1582) "after" else "before"
val period = if (modernDates) "after" else "before"
val year = if (dateTime == "date") 1582 else 1900
val rebaseFlag = rebase.map(flagToStr).map(flag => s"_$flag").getOrElse("")
basePath.getAbsolutePath + s"/${dateTime}_${period}_1582$rebaseFlag"
basePath.getAbsolutePath + s"/${dateTime}_${period}_$year$rebaseFlag"
}
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
@ -139,16 +143,16 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
rowsNum,
output = output)
benchmarkInputs(benchmark, rowsNum, dateTime)
Seq(true, false).foreach { after1582 =>
Seq(true, false).foreach { modernDates =>
Seq(false, true).foreach { rebase =>
benchmark.addCase(caseName(after1582, Some(rebase)), 1) { _ =>
benchmark.addCase(caseName(modernDates, dateTime, Some(rebase)), 1) { _ =>
withSQLConf(
SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_WRITE.key -> rebase.toString) {
genDF(rowsNum, dateTime, after1582)
genDF(rowsNum, dateTime, modernDates)
.write
.mode("overwrite")
.format("parquet")
.save(getPath(path, dateTime, after1582, Some(rebase)))
.save(getPath(path, dateTime, modernDates, Some(rebase)))
}
}
}
@ -157,16 +161,15 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
val benchmark2 = new Benchmark(
s"Load ${dateTime}s from parquet", rowsNum, output = output)
Seq(true, false).foreach { after1582 =>
Seq(true, false).foreach { modernDates =>
Seq(false, true).foreach { vec =>
Seq(false, true).foreach { rebase =>
benchmark2.addCase(caseName(after1582, Some(rebase), Some(vec)), 3) { _ =>
withSQLConf(
SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString,
SQLConf.LEGACY_PARQUET_REBASE_DATETIME_IN_READ.key -> rebase.toString) {
val name = caseName(modernDates, dateTime, Some(rebase), Some(vec))
benchmark2.addCase(name, 3) { _ =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vec.toString) {
spark.read
.format("parquet")
.load(getPath(path, dateTime, after1582, Some(rebase)))
.load(getPath(path, dateTime, modernDates, Some(rebase)))
.noop()
}
}
@ -183,13 +186,13 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
Seq("date", "timestamp").foreach { dateTime =>
val benchmark = new Benchmark(s"Save ${dateTime}s to ORC", rowsNum, output = output)
benchmarkInputs(benchmark, rowsNum, dateTime)
Seq(true, false).foreach { after1582 =>
benchmark.addCase(caseName(after1582), 1) { _ =>
genDF(rowsNum, dateTime, after1582)
Seq(true, false).foreach { modernDates =>
benchmark.addCase(caseName(modernDates, dateTime), 1) { _ =>
genDF(rowsNum, dateTime, modernDates)
.write
.mode("overwrite")
.format("orc")
.save(getPath(path, dateTime, after1582))
.save(getPath(path, dateTime, modernDates))
}
}
benchmark.run()
@ -198,14 +201,14 @@ object DateTimeRebaseBenchmark extends SqlBasedBenchmark {
s"Load ${dateTime}s from ORC",
rowsNum,
output = output)
Seq(true, false).foreach { after1582 =>
Seq(true, false).foreach { modernDates =>
Seq(false, true).foreach { vec =>
benchmark2.addCase(caseName(after1582, vec = Some(vec)), 3) { _ =>
benchmark2.addCase(caseName(modernDates, dateTime, vec = Some(vec)), 3) { _ =>
withSQLConf(SQLConf.ORC_VECTORIZED_READER_ENABLED.key -> vec.toString) {
spark
.read
.format("orc")
.load(getPath(path, dateTime, after1582))
.load(getPath(path, dateTime, modernDates))
.noop()
}
}