Apache Spark - A unified analytics engine for large-scale data processing
Go to file
Jungtaek Lim (HeartSaVioR) e6795cd341 [SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue
### What changes were proposed in this pull request?

In many operations on CompactibleFileStreamLog reads a metadata log file and materializes all entries into memory. As the nature of the compact operation, CompactibleFileStreamLog may have a huge compact log file with bunch of entries included, and for now they're just monotonically increasing, which means the amount of memory to materialize also grows incrementally. This leads pressure on GC.

This patch proposes to streamline the logic on file stream source and sink whenever possible to avoid memory issue. To make this possible we have to break the existing behavior of excluding entries - now the `compactLogs` method is called with all entries, which forces us to materialize all entries into memory. This is hopefully no effect on end users, because only file stream sink has a condition to exclude entries, and the condition has been never true. (DELETE_ACTION has been never set.)

Based on the observation, this patch also changes the existing UT a bit which simulates the situation where "A" file is added, and another batch marks the "A" file as deleted. This situation simply doesn't work with the change, but as I mentioned earlier it hasn't been used. (I'm not sure the UT is from the actual run. I guess not.)

### Why are the changes needed?

The memory issue (OOME) is reported by both JIRA issue and user mailing list.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

* Existing UTs
* Manual test done

The manual test leverages the simple apps which continuously writes the file stream sink metadata log.

bea7680e4c

The test is configured to have a batch metadata log file at 1.9M (10,000 entries) whereas other Spark configuration is set to the default. (compact interval = 10) The app runs as driver, and the heap memory on driver is set to 3g.

> before the patch

<img width="1094" alt="Screen Shot 2020-06-23 at 3 37 44 PM" src="https://user-images.githubusercontent.com/1317309/85375841-d94f3480-b571-11ea-817b-c6b48b34888a.png">

It only ran for 40 mins, with the latest compact batch file size as 1.3G. The process struggled with GC, and after some struggling, it threw OOME.

> after the patch

<img width="1094" alt="Screen Shot 2020-06-23 at 3 53 29 PM" src="https://user-images.githubusercontent.com/1317309/85375901-eff58b80-b571-11ea-837e-30d107f677f9.png">

It sustained 2 hours run (manually stopped as it's expected to run more), with the latest compact batch file size as 2.2G. The actual memory usage didn't even go up to 1.2G, and be cleaned up soon without outstanding GC activity.

Closes #28904 from HeartSaVioR/SPARK-30462.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
2020-08-20 02:26:38 -07:00
.github [SPARK-32645][INFRA] Upload unit-tests.log as an artifact 2020-08-19 12:28:36 +09:00
assembly [SPARK-30950][BUILD] Setting version to 3.1.0-SNAPSHOT 2020-02-25 19:44:31 -08:00
bin [SPARK-32227] Fix regression bug in load-spark-env.cmd with Spark 3.0.0 2020-07-30 21:44:49 +09:00
build [SPARK-31041][BUILD] Show Maven errors from within make-distribution.sh 2020-03-11 08:22:02 -05:00
common [SPARK-32559][SQL] Fix the trim logic in UTF8String.toInt/toLong did't handle non-ASCII characters correctly 2020-08-07 05:00:33 +00:00
conf [SPARK-32004][ALL] Drop references to slave 2020-07-13 14:05:33 -07:00
core [SPARK-32658][CORE] Fix PartitionWriterStream partition length overflow 2020-08-20 07:08:30 +00:00
data [SPARK-22666][ML][SQL] Spark datasource for image format 2018-09-05 11:59:00 -07:00
dev [SPARK-32319][PYSPARK] Disallow the use of unused imports 2020-08-08 08:51:57 -07:00
docs [SPARK-32018][FOLLOWUP][DOC] Add migration guide for decimal value overflow in sum aggregation 2020-08-19 11:37:53 +08:00
examples [SPARK-32319][PYSPARK] Disallow the use of unused imports 2020-08-08 08:51:57 -07:00
external [SPARK-32621][SQL] 'path' option can cause issues while inferring schema in CSV/JSON datasources 2020-08-19 16:23:22 +00:00
graphx [SPARK-32398][TESTS][CORE][STREAMING][SQL][ML] Update to scalatest 3.2.0 for Scala 2.13.3+ 2020-07-23 16:20:17 -07:00
hadoop-cloud [SPARK-30950][BUILD] Setting version to 3.1.0-SNAPSHOT 2020-02-25 19:44:31 -08:00
launcher [SPARK-32434][CORE] Support Scala 2.13 in AbstractCommandBuilder and load-spark-env scripts 2020-07-25 08:19:02 -07:00
licenses [SPARK-32435][PYTHON] Remove heapq3 port from Python 3 2020-07-27 20:10:13 +09:00
licenses-binary [SPARK-32435][PYTHON] Remove heapq3 port from Python 3 2020-07-27 20:10:13 +09:00
mllib [SPARK-32310][ML][PYSPARK] ML params default value parity in feature and tuning 2020-08-03 08:50:34 -07:00
mllib-local [SPARK-32398][TESTS][CORE][STREAMING][SQL][ML] Update to scalatest 3.2.0 for Scala 2.13.3+ 2020-07-23 16:20:17 -07:00
project [SPARK-31198][CORE] Use graceful decommissioning as part of dynamic scaling 2020-08-12 17:07:18 -07:00
python [MINOR][DOCS] Add KMeansSummary and InheritableThread to documentation 2020-08-19 14:30:07 +09:00
R [SPARK-32647][INFRA] Report SparkR test results with JUnit reporter 2020-08-18 19:35:15 +09:00
repl [SPARK-31399][CORE][TEST-HADOOP3.2][TEST-JAVA11] Support indylambda Scala closure in ClosureCleaner 2020-05-18 05:32:57 +00:00
resource-managers [SPARK-32657][K8S] Update the log strings we check for & imports in decommission K8s 2020-08-19 18:28:21 -07:00
sbin [SPARK-32004][ALL] Drop references to slave 2020-07-13 14:05:33 -07:00
sql [SPARK-30462][SS] Streamline the logic on file stream source and sink metadata log to avoid memory issue 2020-08-20 02:26:38 -07:00
streaming [SPARK-32651][CORE] Decommission switch configuration should have the highest hierarchy 2020-08-19 06:53:06 +00:00
tools [SPARK-30950][BUILD] Setting version to 3.1.0-SNAPSHOT 2020-02-25 19:44:31 -08:00
.asf.yaml [SPARK-31352] Add .asf.yaml to control Github settings 2020-04-06 09:06:01 -05:00
.gitattributes [SPARK-30653][INFRA][SQL] EOL character enforcement for java/scala/xml/py/R files 2020-01-27 10:20:51 -08:00
.gitignore [SPARK-32179][SPARK-32188][PYTHON][DOCS] Replace and redesign the documentation base 2020-07-27 17:49:21 +09:00
appveyor.yml [SPARK-32647][INFRA] Report SparkR test results with JUnit reporter 2020-08-18 19:35:15 +09:00
CONTRIBUTING.md [MINOR][DOCS] Tighten up some key links to the project and download pages to use HTTPS 2019-05-21 10:56:42 -07:00
LICENSE [SPARK-32435][PYTHON] Remove heapq3 port from Python 3 2020-07-27 20:10:13 +09:00
LICENSE-binary [SPARK-32435][PYTHON] Remove heapq3 port from Python 3 2020-07-27 20:10:13 +09:00
NOTICE [SPARK-29674][CORE] Update dropwizard metrics to 4.1.x for JDK 9+ 2019-11-03 15:13:06 -08:00
NOTICE-binary [SPARK-29674][CORE] Update dropwizard metrics to 4.1.x for JDK 9+ 2019-11-03 15:13:06 -08:00
pom.xml [SPARK-32610][DOCS] Fix the link to metrics.dropwizard.io in monitoring.md to refer the proper version 2020-08-16 12:07:37 -05:00
README.md [MINOR][DOCS] Fix Jenkins build image and link in README.md 2020-01-20 23:08:24 -08:00
scalastyle-config.xml [SPARK-32539][INFRA] Disallow FileSystem.get(Configuration conf) in style check by default 2020-08-06 05:56:59 +00:00

Apache Spark

Spark is a unified analytics engine for large-scale data processing. It provides high-level APIs in Scala, Java, Python, and R, and an optimized engine that supports general computation graphs for data analysis. It also supports a rich set of higher-level tools including Spark SQL for SQL and DataFrames, MLlib for machine learning, GraphX for graph processing, and Structured Streaming for stream processing.

https://spark.apache.org/

Jenkins Build AppVeyor Build PySpark Coverage

Online Documentation

You can find the latest Spark documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.

Building Spark

Spark is built using Apache Maven. To build Spark and its example programs, run:

./build/mvn -DskipTests clean package

(You do not need to do this if you downloaded a pre-built package.)

More detailed documentation is available from the project site, at "Building Spark".

For general development tips, including info on developing Spark using an IDE, see "Useful Developer Tools".

Interactive Scala Shell

The easiest way to start using Spark is through the Scala shell:

./bin/spark-shell

Try the following command, which should return 1,000,000,000:

scala> spark.range(1000 * 1000 * 1000).count()

Interactive Python Shell

Alternatively, if you prefer Python, you can use the Python shell:

./bin/pyspark

And run the following command, which should also return 1,000,000,000:

>>> spark.range(1000 * 1000 * 1000).count()

Example Programs

Spark also comes with several sample programs in the examples directory. To run one of them, use ./bin/run-example <class> [params]. For example:

./bin/run-example SparkPi

will run the Pi example locally.

You can set the MASTER environment variable when running examples to submit examples to a cluster. This can be a mesos:// or spark:// URL, "yarn" to run on YARN, and "local" to run locally with one thread, or "local[N]" to run locally with N threads. You can also use an abbreviated class name if the class is in the examples package. For instance:

MASTER=spark://host:7077 ./bin/run-example SparkPi

Many of the example programs print usage help if no params are given.

Running Tests

Testing first requires building Spark. Once Spark is built, tests can be run using:

./dev/run-tests

Please see the guidance on how to run tests for a module, or individual tests.

There is also a Kubernetes integration test, see resource-managers/kubernetes/integration-tests/README.md

A Note About Hadoop Versions

Spark uses the Hadoop core library to talk to HDFS and other Hadoop-supported storage systems. Because the protocols have changed in different versions of Hadoop, you must build Spark against the same version that your cluster runs.

Please refer to the build documentation at "Specifying the Hadoop Version and Enabling YARN" for detailed guidance on building for a particular distribution of Hadoop, including building for particular Hive and Hive Thriftserver distributions.

Configuration

Please refer to the Configuration Guide in the online documentation for an overview on how to configure Spark.

Contributing

Please review the Contribution to Spark guide for information on how to get started contributing to the project.