4fceef0159
### What changes were proposed in this pull request? The changes being proposed are to increase the accuracy of JDBCRelation's stride calculation, as outlined in: https://issues.apache.org/jira/browse/SPARK-34843 In summary: Currently, in JDBCRelation (line 123), the stride size is calculated as follows: val stride: Long = upperBound / numPartitions - lowerBound / numPartitions Due to truncation happening on both divisions, the stride size can fall short of what it should be. This can lead to a big difference between the provided upper bound and the actual start of the last partition. I'm proposing a different formula that doesn't truncate to early, and also maintains accuracy using fixed-point decimals. This helps tremendously with the size of the last partition, which can be even more amplified if there is data skew in that direction. In a real-life test, I've seen a 27% increase in performance with this more proper stride alignment. The reason for fixed-point decimals instead of floating-point decimals is because inaccuracy due to limitation of what the float can represent. This may seem small, but could shift the midpoint a bit, and depending on how granular the data is, that could translate to quite a difference. It's also just inaccurate, and I'm striving to make the partitioning as accurate as possible, within reason. Lastly, since the last partition's predicate is determined by how the strides align starting from the lower bound (plus one stride), there can be skew introduced creating a larger last partition compared to the first partition. Therefore, after calculating a more precise stride size, I've also introduced logic to move the first partition's predicate (which is an offset from the lower bound) to a position that closely matches the offset of the last partition's predicate (in relation to the upper bound). This makes the first and last partition more evenly distributed compared to each other, and helps with the last task being the largest (reducing its size). ### Why are the changes needed? The current implementation is inaccurate and can lead to the last task/partition running much longer than previous tasks. Therefore, you can end up with a single node/core running for an extended period while other nodes/cores are sitting idle. ### Does this PR introduce _any_ user-facing change? No. I would suspect some users will just get a good performance increase. As stated above, if we were to run our code on Spark that has this change implemented, we would have all of the sudden got a 27% increase in performance. ### How was this patch tested? I've added two new unit tests. I did need to update one unit test, but when you look at the comparison of the before and after, you'll see better alignment of the partitioning with the new implementation. Given that the lower partition's predicate is exclusive and the upper's is inclusive, the offset of the lower was 3 days, and the offset of the upper was 6 days... that's potentially twice the amount of data in that upper partition (could be much more depending on how the user's data is distributed). Other unit tests that utilize timestamps and two partitions have maintained their midpoint. ### Examples I've added results with and without the realignment logic to better highlight both improvements this PR brings. **Example 1:** Given the following partition config: "lowerBound" -> "1930-01-01" "upperBound" -> "2020-12-31" "numPartitions" -> 1000 _Old method (exactly what it would be BEFORE this PR):_ First partition: "PartitionColumn" < '1930-02-02' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2017-07-11' _Old method, but with new realingment logic of first partition:_ First partition: "PartitionColumn" < '1931-10-14' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2019-03-22' _New method:_ First partition: "PartitionColumn" < '1930-02-03' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2020-04-05' _New with new realingment logic of first partition (exactly what it would be AFTER this PR):_ First partition: "PartitionColumn" < '1930-06-02' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2020-08-02' **Example 2:** Given the following partition config: "lowerBound" -> "1927-04-05", "upperBound" -> "2020-10-16" "numPartitions" -> 2000 _Old method (exactly what it would be BEFORE this PR):_ First partition: "PartitionColumn" < '1927-04-21' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2014-10-29' _Old method, but with new realingment logic of first partition::_ First partition: "PartitionColumn" < '1930-04-07' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2017-10-15' _New method:_ First partition: "PartitionColumn" < '1927-04-22' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2020-04-19' _New method with new realingment logic of first partition (exactly what it would be AFTER this PR):_ First partition: "PartitionColumn" < '1927-07-13' or "PartitionColumn" is null Last partition: "PartitionColumn" >= '2020-07-10' Closes #31965 from hanover-fiste/SPARK-34843. Authored-by: hanover-fiste <jyarbrough.git@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> |
||
---|---|---|
.github | ||
assembly | ||
bin | ||
binder | ||
build | ||
common | ||
conf | ||
core | ||
data | ||
dev | ||
docs | ||
examples | ||
external | ||
graphx | ||
hadoop-cloud | ||
launcher | ||
licenses | ||
licenses-binary | ||
mllib | ||
mllib-local | ||
project | ||
python | ||
R | ||
repl | ||
resource-managers | ||
sbin | ||
sql | ||
streaming | ||
tools | ||
.asf.yaml | ||
.gitattributes | ||
.gitignore | ||
.sbtopts | ||
appveyor.yml | ||
CONTRIBUTING.md | ||
LICENSE | ||
LICENSE-binary | ||
NOTICE | ||
NOTICE-binary | ||
pom.xml | ||
README.md | ||
scalastyle-config.xml |
Apache Spark
Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.
Online Documentation
You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.
Building Spark
Spark is built using Apache Maven. To build Spark and its example programs, run:
./build/mvn -DskipTests clean package
(You do not need to do this if you downloaded a pre-built package.)
More detailed documentation is available from the project site, at "Building Spark".
For general development tips, including info on developing Spark using an IDE, see "Useful Developer Tools".
Interactive Scala Shell
The easiest way to start using Spark is through the Scala shell:
./bin/spark-shell
Try the following command, which should return 1,000,000,000:
scala> spark.range(1000 * 1000 * 1000).count()
Interactive Python Shell
Alternatively, if you prefer Python, you can use the Python shell:
./bin/pyspark
And run the following command, which should also return 1,000,000,000:
>>> spark.range(1000 * 1000 * 1000).count()
Example Programs
Spark also comes with several sample programs in the examples
directory.
To run one of them, use ./bin/run-example <class> [params]
. For example:
./bin/run-example SparkPi
will run the Pi example locally.
You can set the MASTER environment variable when running examples to submit
examples to a cluster. This can be a mesos:// or spark:// URL,
"yarn" to run on YARN, and "local" to run
locally with one thread, or "local[N]" to run locally with N threads. You
can also use an abbreviated class name if the class is in the examples
package. For instance:
MASTER=spark://host:7077 ./bin/run-example SparkPi
Many of the example programs print usage help if no params are given.
Running Tests
Testing first requires building Spark. Once Spark is built, tests can be run using:
./dev/run-tests
Please see the guidance on how to run tests for a module, or individual tests.
There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md
A Note About Hadoop Versions
Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.
Please refer to the build documentation at "Specifying the Hadoop Version and Enabling YARN" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions.
Configuration
Please refer to the Configuration Guide in the online documentation for an overview on how to configure Spark.
Contributing
Please review the Contribution to Spark guide for information on how to get started contributing to the project.