### What changes were proposed in this pull request?
With benchmark original, where the timestamp values are valid to the new parser
the result is
```scala
[info] Running benchmark: Read dates and timestamps
[info] Running case: timestamp strings
[info] Stopped after 3 iterations, 5781 ms
[info] Running case: parse timestamps from Dataset[String]
[info] Stopped after 3 iterations, 44764 ms
[info] Running case: infer timestamps from Dataset[String]
[info] Stopped after 3 iterations, 93764 ms
[info] Running case: from_json(timestamp)
[info] Stopped after 3 iterations, 59021 ms
```
When we modify the benchmark to
```scala
def timestampStr: Dataset[String] = {
spark.range(0, rowsNum, 1, 1).mapPartitions { iter =>
iter.map(i => s"""{"timestamp":"1970-01-01T01:02:03.${i % 100}"}""")
}.select($"value".as("timestamp")).as[String]
}
readBench.addCase("timestamp strings", numIters) { _ =>
timestampStr.noop()
}
readBench.addCase("parse timestamps from Dataset[String]", numIters) { _ =>
spark.read.schema(tsSchema).json(timestampStr).noop()
}
readBench.addCase("infer timestamps from Dataset[String]", numIters) { _ =>
spark.read.json(timestampStr).noop()
}
```
where the timestamp values are invalid for the new parser which causes a fallback to legacy parser(2.4).
the result is
```scala
[info] Running benchmark: Read dates and timestamps
[info] Running case: timestamp strings
[info] Stopped after 3 iterations, 5623 ms
[info] Running case: parse timestamps from Dataset[String]
[info] Stopped after 3 iterations, 506637 ms
[info] Running case: infer timestamps from Dataset[String]
[info] Stopped after 3 iterations, 509076 ms
```
About 10x perf-regression
BUT if we modify the timestamp pattern to `....HH:mm:ss[.SSS][XXX]` which make all timestamp values valid for the new parser to prohibit fallback, the result is
```scala
[info] Running benchmark: Read dates and timestamps
[info] Running case: timestamp strings
[info] Stopped after 3 iterations, 5623 ms
[info] Running case: parse timestamps from Dataset[String]
[info] Stopped after 3 iterations, 506637 ms
[info] Running case: infer timestamps from Dataset[String]
[info] Stopped after 3 iterations, 509076 ms
```
### Why are the changes needed?
Fix performance regression.
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
new tests added.
Closes#28181 from yaooqinn/SPARK-31414.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
In the PR, I propose to replace `.collect()`, `.count()` and `.foreach(_ => ())` in SQL benchmarks and use the `NoOp` datasource. I added an implicit class to `SqlBasedBenchmark` with the `.noop()` method. It can be used in benchmark like: `ds.noop()`. The last one is unfolded to `ds.write.format("noop").mode(Overwrite).save()`.
### Why are the changes needed?
To avoid additional overhead that `collect()` (and other actions) has. For example, `.collect()` has to convert values according to external types and pull data to the driver. This can hide actual performance regressions or improvements of benchmarked operations.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Re-run all modified benchmarks using Amazon EC2.
| Item | Description |
| ---- | ----|
| Region | us-west-2 (Oregon) |
| Instance | r3.xlarge (spot instance) |
| AMI | ami-06f2f779464715dc5 (ubuntu/images/hvm-ssd/ubuntu-bionic-18.04-amd64-server-20190722.1) |
| Java | OpenJDK8/10 |
- Run `TPCDSQueryBenchmark` using instructions from the PR #26049
```
# `spark-tpcds-datagen` needs this. (JDK8)
$ git clone https://github.com/apache/spark.git -b branch-2.4 --depth 1 spark-2.4
$ export SPARK_HOME=$PWD
$ ./build/mvn clean package -DskipTests
# Generate data. (JDK8)
$ git clone gitgithub.com:maropu/spark-tpcds-datagen.git
$ cd spark-tpcds-datagen/
$ build/mvn clean package
$ mkdir -p /data/tpcds
$ ./bin/dsdgen --output-location /data/tpcds/s1 // This need `Spark 2.4`
```
- Other benchmarks ran by the script:
```
#!/usr/bin/env python3
import os
from sparktestsupport.shellutils import run_cmd
benchmarks = [
['sql/test', 'org.apache.spark.sql.execution.benchmark.AggregateBenchmark'],
['avro/test', 'org.apache.spark.sql.execution.benchmark.AvroReadBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.DataSourceReadBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.DateTimeBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.ExtractBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.FilterPushdownBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.InExpressionBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.IntervalBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.JoinBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.MakeDateTimeBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.MiscBenchmark'],
['hive/test', 'org.apache.spark.sql.execution.benchmark.ObjectHashAggregateExecBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.OrcNestedSchemaPruningBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.OrcV2NestedSchemaPruningBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.ParquetNestedSchemaPruningBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.RangeBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.UDFBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.WideSchemaBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.benchmark.WideTableBenchmark'],
['hive/test', 'org.apache.spark.sql.hive.orc.OrcReadBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.datasources.csv.CSVBenchmark'],
['sql/test', 'org.apache.spark.sql.execution.datasources.json.JsonBenchmark']
]
print('Set SPARK_GENERATE_BENCHMARK_FILES=1')
os.environ['SPARK_GENERATE_BENCHMARK_FILES'] = '1'
for b in benchmarks:
print("Run benchmark: %s" % b[1])
run_cmd(['build/sbt', '%s:runMain %s' % (b[0], b[1])])
```
Closes#27078 from MaxGekk/noop-in-benchmarks.
Lead-authored-by: Maxim Gekk <max.gekk@gmail.com>
Co-authored-by: Maxim Gekk <maxim.gekk@databricks.com>
Co-authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>