## What changes were proposed in this pull request?
There are still some algorithms based on mllib, such as KMeans. For now, many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some performance issues for those object serialization or deserialization.
Previously dicussed: https://github.com/apache/spark/pull/19586
## How was this patch tested?
New test case.
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19661 from ConeyLiu/register_vector.
Continuation of PR#19528 (https://github.com/apache/spark/pull/19529#issuecomment-340252119)
The problem with the maven build in the previous PR was the new tests.... the creation of a spark session outside the tests meant there was more than one spark session around at a time.
I was using the spark session outside the tests so that the tests could share data; I've changed it so that each test creates the data anew.
Author: Nathan Kronenfeld <nicole.oresme@gmail.com>
Author: Nathan Kronenfeld <nkronenfeld@uncharted.software>
Closes#19705 from nkronenfeld/alternative-style-tests-2.
## What changes were proposed in this pull request?
Adds java.nio bufferedPool memory metrics to metrics system which includes both direct and mapped memory.
## How was this patch tested?
Manually tested and checked direct and mapped memory metrics too available in metrics system using Console sink.
Here is the sample console output
application_1509655862825_0016.2.jvm.direct.capacity
value = 19497
application_1509655862825_0016.2.jvm.direct.count
value = 6
application_1509655862825_0016.2.jvm.direct.used
value = 19498
application_1509655862825_0016.2.jvm.mapped.capacity
value = 0
application_1509655862825_0016.2.jvm.mapped.count
value = 0
application_1509655862825_0016.2.jvm.mapped.used
value = 0
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#19709 from vundela/SPARK-22483.
This required adding information about StreamBlockId to the store,
which is not available yet via the API. So an internal type was added
until there's a need to expose that information in the API.
The UI only lists RDDs that have cached partitions, and that information
wasn't being correctly captured in the listener, so that's also fixed,
along with some minor (internal) API adjustments so that the UI can
get the correct data.
Because of the way partitions are cached, some optimizations w.r.t. how
often the data is flushed to the store could not be applied to this code;
because of that, some different ways to make the code more performant
were added to the data structures tracking RDD blocks, with the goal of
avoiding expensive copies when lots of blocks are being updated.
Tested with existing and updated unit tests.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19679 from vanzin/SPARK-20647.
## What changes were proposed in this pull request?
Fixing nits in MetricsSystemSuite file
1) Using Sink instead of Source while casting
2) Using meaningful naming for variables, which reflect their usage
## How was this patch tested?
Ran the tests locally and all of them passing
Author: Srinivasa Reddy Vundela <vsr@cloudera.com>
Closes#19699 from vundela/master.
## What changes were proposed in this pull request?
Preliminary changes to get ClosureCleaner to work with Scala 2.12. Makes many usages just work, but not all. This does _not_ resolve the JIRA.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19675 from srowen/SPARK-14540.0.
The executors page is built on top of the REST API, so the page itself
was easy to hook up to the new code.
Some other pages depend on the `ExecutorListener` class that is being
removed, though, so they needed to be modified to use data from the
new store. Fortunately, all they seemed to need is the map of executor
logs, so that was somewhat easy too.
The executor timeline graph required adding some properties to the
ExecutorSummary API type. Instead of following the previous code,
which stored all the listener events in memory, the timeline is
now created based on the data available from the API.
I had to change some of the test golden files because the old code would
return executors in "random" order (since it used a mutable Map instead
of something that returns a sorted list), and the new code returns executors
in id order.
Tested with existing unit tests.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19678 from vanzin/SPARK-20646.
This change modifies the status listener to collect the information
needed to render the envionment page, and populates that page and the
API with information collected by the listener.
Tested with existing and added unit tests.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19677 from vanzin/SPARK-20645.
…alization.
## What changes were proposed in this pull request?
Use non-linear containsKey operation for serialized maps, lookup into underlying map.
## How was this patch tested?
unit tests
Author: Alexander Istomin <istomin@rutarget.ru>
Closes#19553 from Whoosh/SPARK-22330.
There are two somewhat unrelated things going on in this patch, but
both are meant to make integration of individual UI pages later on
much easier.
The first part is some tweaking of the code in the listener so that
it does less updates of the kvstore for data that changes fast; for
example, it avoids writing changes down to the store for every
task-related event, since those can arrive very quickly at times.
Instead, for these kinds of events, it chooses to only flush things
if a certain interval has passed. The interval is based on how often
the current spark-shell code updates the progress bar for jobs, so
that users can get reasonably accurate data.
The code also delays as much as possible hitting the underlying kvstore
when replaying apps in the history server. This is to avoid unnecessary
writes to disk.
The second set of changes prepare the history server and SparkUI for
integrating with the kvstore. A new class, AppStatusStore, is used
for translating between the stored data and the types used in the
UI / API. The SHS now populates a kvstore with data loaded from
event logs when an application UI is requested.
Because this store can hold references to disk-based resources, the
code was modified to retrieve data from the store under a read lock.
This allows the SHS to detect when the store is still being used, and
only update it (e.g. because an updated event log was detected) when
there is no other thread using the store.
This change ended up creating a lot of churn in the ApplicationCache
code, which was cleaned up a lot in the process. I also removed some
metrics which don't make too much sense with the new code.
Tested with existing and added unit tests, and by making sure the SHS
still works on a real cluster.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19582 from vanzin/SPARK-20644.
## What changes were proposed in this pull request?
This PR replaces the old the maximum array size (`Int.MaxValue`) with the new one (`ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH`).
This PR also refactor the code to calculate the new array size to easily understand why we have to use `newSize - 2` for allocating a new array.
## How was this patch tested?
Used the existing test
Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Closes#19650 from kiszk/SPARK-22254.
## What changes were proposed in this pull request?
This proposed patch is about making Spark executor task metrics available as Dropwizard metrics. This is intended to be of aid in monitoring Spark jobs and when drilling down on performance troubleshooting issues.
## How was this patch tested?
Manually tested on a Spark cluster (see JIRA for an example screenshot).
Author: LucaCanali <luca.canali@cern.ch>
Closes#19426 from LucaCanali/SparkTaskMetricsDropWizard.
## What changes were proposed in this pull request?
Using zstd compression for Spark jobs spilling 100s of TBs of data, we could reduce the amount of data written to disk by as much as 50%. This translates to significant latency gain because of reduced disk io operations. There is a degradation CPU time by 2 - 5% because of zstd compression overhead, but for jobs which are bottlenecked by disk IO, this hit can be taken.
## Benchmark
Please note that this benchmark is using real world compute heavy production workload spilling TBs of data to disk
| | zstd performance as compred to LZ4 |
| ------------- | -----:|
| spill/shuffle bytes | -48% |
| cpu time | + 3% |
| cpu reservation time | -40%|
| latency | -40% |
## How was this patch tested?
Tested by running few jobs spilling large amount of data on the cluster and amount of intermediate data written to disk reduced by as much as 50%.
Author: Sital Kedia <skedia@fb.com>
Closes#18805 from sitalkedia/skedia/upstream_zstd.
## What changes were proposed in this pull request?
Handling the NonFatal exceptions while starting the external shuffle service, if there are any NonFatal exceptions it logs and continues without the external shuffle service.
## How was this patch tested?
I verified it manually, it logs the exception and continues to serve without external shuffle service when BindException occurs.
Author: Devaraj K <devaraj@apache.org>
Closes#19396 from devaraj-kavali/SPARK-22172.
## What changes were proposed in this pull request?
PeriodicRDDCheckpointer was already moved out of mllib in Spark-5484
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19618 from zhengruifeng/checkpointer_doc.
## What changes were proposed in this pull request?
We often see the issue of Spark jobs stuck because the Executor Allocation Manager does not ask for any executor even if there are pending tasks in case dynamic allocation is turned on. Looking at the logic in Executor Allocation Manager, which calculates the running tasks, it can happen that the calculation will be wrong and the number of running tasks can become negative.
## How was this patch tested?
Added unit test
Author: Sital Kedia <skedia@fb.com>
Closes#19580 from sitalkedia/skedia/fix_stuck_job.
## What changes were proposed in this pull request?
In `UnsafeInMemorySorter`, one record may take 32 bytes: 1 `long` for pointer, 1 `long` for key-prefix, and another 2 `long`s as the temporary buffer for radix sort.
In `UnsafeExternalSorter`, we set the `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to be `1024 * 1024 * 1024 / 2`, and hoping the max size of point array to be 8 GB. However this is wrong, `1024 * 1024 * 1024 / 2 * 32` is actually 16 GB, and if we grow the point array before reach this limitation, we may hit the max-page-size error.
Users may see exception like this on large dataset:
```
Caused by: java.lang.IllegalArgumentException: Cannot allocate a page with more than 17179869176 bytes
at org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:241)
at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:121)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:374)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:396)
at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:94)
...
```
Setting `DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD` to a smaller number is not enough, users can still set the config to a big number and trigger the too large page size issue. This PR fixes it by explicitly handling the too large page size exception in the sorter and spill.
This PR also change the type of `spark.shuffle.spill.numElementsForceSpillThreshold` to int, because it's only compared with `numRecords`, which is an int. This is an internal conf so we don't have a serious compatibility issue.
## How was this patch tested?
TODO
Author: Wenchen Fan <wenchen@databricks.com>
Closes#18251 from cloud-fan/sort.
## What changes were proposed in this pull request?
When the given closure uses some fields defined in super class, `ClosureCleaner` can't figure them and don't set it properly. Those fields will be in null values.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19556 from viirya/SPARK-22328.
The initial listener code is based on the existing JobProgressListener (and others),
and tries to mimic their behavior as much as possible. The change also includes
some minor code movement so that some types and methods from the initial history
server code code can be reused.
The code introduces a few mutable versions of public API types, used internally,
to make it easier to update information without ugly copy methods, and also to
make certain updates cheaper.
Note the code here is not 100% correct. This is meant as a building ground for
the UI integration in the next milestones. As different parts of the UI are
ported, fixes will be made to the different parts of this code to account
for the needed behavior.
I also added annotations to API types so that Jackson is able to correctly
deserialize options, sequences and maps that store primitive types.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19383 from vanzin/SPARK-20643.
Currently SparkSubmit uses system properties to propagate configuration to
applications. This makes it hard to implement features such as SPARK-11035,
which would allow multiple applications to be started in the same JVM. The
current code would cause the config data from multiple apps to get mixed
up.
This change introduces a new trait, currently internal to Spark, that allows
the app configuration to be passed directly to the application, without
having to use system properties. The current "call main() method" behavior
is maintained as an implementation of this new trait. This will be useful
to allow multiple cluster mode apps to be submitted from the same JVM.
As part of this, SparkSubmit was modified to collect all configuration
directly into a SparkConf instance. Most of the changes are to tests so
they use SparkConf instead of an opaque map.
Tested with existing and added unit tests.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19519 from vanzin/SPARK-21840.
## What changes were proposed in this pull request?
Support unit tests of external code (i.e., applications that use spark) using scalatest that don't want to use FunSuite. SharedSparkContext already supports this, but SharedSQLContext does not.
I've introduced SharedSparkSession as a parent to SharedSQLContext, written in a way that it does support all scalatest styles.
## How was this patch tested?
There are three new unit test suites added that just test using FunSpec, FlatSpec, and WordSpec.
Author: Nathan Kronenfeld <nicole.oresme@gmail.com>
Closes#19529 from nkronenfeld/alternative-style-tests-2.
## What changes were proposed in this pull request?
Prior to this commit getAllBlocks implicitly assumed that the directories
managed by the DiskBlockManager contain only the files corresponding to
valid block IDs. In reality, this assumption was violated during shuffle,
which produces temporary files in the same directory as the resulting
blocks. As a result, calls to getAllBlocks during shuffle were unreliable.
The fix could be made more efficient, but this is probably good enough.
## How was this patch tested?
`DiskBlockManagerSuite`
Author: Sergei Lebedev <s.lebedev@criteo.com>
Closes#19458 from superbobry/block-id-option.
## What changes were proposed in this pull request?
Scala 2.12's `Future` defines two new methods to implement, `transform` and `transformWith`. These can be implemented naturally in Spark's `FutureAction` extension and subclasses, but, only in terms of the new methods that don't exist in Scala 2.11. To support both at the same time, reflection is used to implement these.
## How was this patch tested?
Existing tests.
Author: Sean Owen <sowen@cloudera.com>
Closes#19561 from srowen/SPARK-22322.
In `SparkSubmit`, call `loginUserFromKeytab` before attempting to make RPC calls to the NameNode.
I manually tested this patch by:
1. Confirming that my Spark application failed to launch with the error reported in https://issues.apache.org/jira/browse/SPARK-22319.
2. Applying this patch and confirming that the app no longer fails to launch, even when I have not manually run `kinit` on the host.
Presumably we also want integration tests for secure clusters so that we catch this sort of thing. I'm happy to take a shot at this if it's feasible and someone can point me in the right direction.
Author: Steven Rand <srand@palantir.com>
Closes#19540 from sjrand/SPARK-22319.
Change-Id: Ic306bfe7181107fbcf92f61d75856afcb5b6f761
## What changes were proposed in this pull request?
This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.
## How was this patch tested?
Exisiting tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19517 from ueshin/issues/SPARK-20396/fup2.
## What changes were proposed in this pull request?
Fix java style issues
## How was this patch tested?
Run `./dev/lint-java` locally since it's not run on Jenkins
Author: Andrew Ash <andrew@andrewash.com>
Closes#19486 from ash211/aash/fix-lint-java.
## What changes were proposed in this pull request?
The HTTP Strict-Transport-Security response header (often abbreviated as HSTS) is a security feature that lets a web site tell browsers that it should only be communicated with using HTTPS, instead of using HTTP.
Note: The Strict-Transport-Security header is ignored by the browser when your site is accessed using HTTP; this is because an attacker may intercept HTTP connections and inject the header or remove it. When your site is accessed over HTTPS with no certificate errors, the browser knows your site is HTTPS capable and will honor the Strict-Transport-Security header.
The HTTP X-XSS-Protection response header is a feature of Internet Explorer, Chrome and Safari that stops pages from loading when they detect reflected cross-site scripting (XSS) attacks.
The HTTP X-Content-Type-Options response header is used to protect against MIME sniffing vulnerabilities.
## How was this patch tested?
Checked on my system locally.
<img width="750" alt="screen shot 2017-10-03 at 6 49 20 pm" src="https://user-images.githubusercontent.com/6433184/31127234-eadf7c0c-a86b-11e7-8e5d-f6ea3f97b210.png">
Author: krishna-pandey <krish.pandey21@gmail.com>
Author: Krishna Pandey <krish.pandey21@gmail.com>
Closes#19419 from krishna-pandey/SPARK-22188.
Hive delegation tokens are only needed when the Spark driver has no access
to the kerberos TGT. That happens only in two situations:
- when using a proxy user
- when using cluster mode without a keytab
This change modifies the Hive provider so that it only generates delegation
tokens in those situations, and tweaks the YARN AM so that it makes the proper
user visible to the Hive code when running with keytabs, so that the TGT
can be used instead of a delegation token.
The effect of this change is that now it's possible to initialize multiple,
non-concurrent SparkContext instances in the same JVM. Before, the second
invocation would fail to fetch a new Hive delegation token, which then could
make the second (or third or...) application fail once the token expired.
With this change, the TGT will be used to authenticate to the HMS instead.
This change also avoids polluting the current logged in user's credentials
when launching applications. The credentials are copied only when running
applications as a proxy user. This makes it possible to implement SPARK-11035
later, where multiple threads might be launching applications, and each app
should have its own set of credentials.
Tested by verifying HDFS and Hive access in following scenarios:
- client and cluster mode
- client and cluster mode with proxy user
- client and cluster mode with principal / keytab
- long-running cluster app with principal / keytab
- pyspark app that creates (and stops) multiple SparkContext instances
through its lifetime
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19509 from vanzin/SPARK-22290.
## What changes were proposed in this pull request?
I see that block updates are not logged to the event log.
This makes sense as a default for performance reasons.
However, I find it helpful when trying to get a better understanding of caching for a job to be able to log these updates.
This PR adds a configuration setting `spark.eventLog.blockUpdates` (defaulting to false) which allows block updates to be recorded in the log.
This contribution is original work which is licensed to the Apache Spark project.
## How was this patch tested?
Current and additional unit tests.
Author: Michael Mior <mmior@uwaterloo.ca>
Closes#19263 from michaelmior/log-block-updates.
## What changes were proposed in this pull request?
In the current BlockManager's `getRemoteBytes`, it will call `BlockTransferService#fetchBlockSync` to get remote block. In the `fetchBlockSync`, Spark will allocate a temporary `ByteBuffer` to store the whole fetched block. This will potentially lead to OOM if block size is too big or several blocks are fetched simultaneously in this executor.
So here leveraging the idea of shuffle fetch, to spill the large block to local disk before consumed by upstream code. The behavior is controlled by newly added configuration, if block size is smaller than the threshold, then this block will be persisted in memory; otherwise it will first spill to disk, and then read from disk file.
To achieve this feature, what I did is:
1. Rename `TempShuffleFileManager` to `TempFileManager`, since now it is not only used by shuffle.
2. Add a new `TempFileManager` to manage the files of fetched remote blocks, the files are tracked by weak reference, will be deleted when no use at all.
## How was this patch tested?
This was tested by adding UT, also manual verification in local test to perform GC to clean the files.
Author: jerryshao <sshao@hortonworks.com>
Closes#19476 from jerryshao/SPARK-22062.
## What changes were proposed in this pull request?
Update the config `spark.files.ignoreEmptySplits`, rename it and make it internal.
This is followup of #19464
## How was this patch tested?
Exsiting tests.
Author: Xingbo Jiang <xingbo.jiang@databricks.com>
Closes#19504 from jiangxb1987/partitionsplit.
## What changes were proposed in this pull request?
PR #19294 added support for null's - but spark 2.1 handled other error cases where path argument can be invalid.
Namely:
* empty string
* URI parse exception while creating Path
This is resubmission of PR #19487, which I messed up while updating my repo.
## How was this patch tested?
Enhanced test to cover new support added.
Author: Mridul Muralidharan <mridul@gmail.com>
Closes#19497 from mridulm/master.
## What changes were proposed in this pull request?
Add a flag spark.files.ignoreEmptySplits. When true, methods like that use HadoopRDD and NewHadoopRDD such as SparkContext.textFiles will not create a partition for input splits that are empty.
Author: liulijia <liulijia@meituan.com>
Closes#19464 from liutang123/SPARK-22233.
## What changes were proposed in this pull request?
We only need request `bbos.size - unrollMemoryUsedByThisBlock` after unrolled the block.
## How was this patch tested?
Existing UT.
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19316 from ConeyLiu/putIteratorAsBytes.
This change adds a new SQL config key that is equivalent to SparkContext's
"spark.extraListeners", allowing users to register QueryExecutionListener
instances through the Spark configuration system instead of having to
explicitly do it in code.
The code used by SparkContext to implement the feature was refactored into
a helper method in the Utils class, and SQL's ExecutionListenerManager was
modified to use it to initialize listener declared in the configuration.
Unit tests were added to verify all the new functionality.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19309 from vanzin/SPARK-19558.
## What changes were proposed in this pull request?
1. a test reproducing [SPARK-21907](https://issues.apache.org/jira/browse/SPARK-21907)
2. a fix for the root cause of the issue.
`org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill` calls `org.apache.spark.util.collection.unsafe.sort.UnsafeInMemorySorter.reset` which may trigger another spill,
when this happens the `array` member is already de-allocated but still referenced by the code, this causes the nested spill to fail with an NPE in `org.apache.spark.memory.TaskMemoryManager.getPage`.
This patch introduces a reproduction in a test case and a fix, the fix simply sets the in-mem sorter's array member to an empty array before actually performing the allocation. This prevents the spilling code from 'touching' the de-allocated array.
## How was this patch tested?
introduced a new test case: `org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorterSuite#testOOMDuringSpill`.
Author: Eyal Farago <eyal@nrgene.com>
Closes#19181 from eyalfa/SPARK-21907__oom_during_spill.
## What changes were proposed in this pull request?
In a bare metal system with No DNS setup, spark may be configured with SPARK_LOCAL* for IP and host properties.
During a driver failover, in cluster deployment mode. SPARK_LOCAL* should be ignored while restarting on another node and should be picked up from target system's local environment.
## How was this patch tested?
Distributed deployment against a spark standalone cluster of 6 Workers. Tested by killing JVM's running driver and verified the restarted JVMs have right configurations on them.
Author: Prashant Sharma <prashsh1@in.ibm.com>
Author: Prashant Sharma <prashant@apache.org>
Closes#17357 from ScrapCodes/driver-failover-fix.
## What changes were proposed in this pull request?
The number of cores assigned to each executor is configurable. When this is not explicitly set, multiple executors from the same application may be launched on the same worker too.
## How was this patch tested?
N/A
Author: liuxian <liu.xian3@zte.com.cn>
Closes#18711 from 10110346/executorcores.
## What changes were proposed in this pull request?
We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.
cc: srowen gatorsmile
## How was this patch tested?
Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Feng Liu <fengliu@databricks.com>
Closes#19460 from liufengdb/fix_array_max.
## What changes were proposed in this pull request?
This PR disables console progress bar feature in non-shell environment by overriding the configuration.
## How was this patch tested?
Manual. Run the following examples with and without `spark.ui.showConsoleProgress` in order to see progress bar on master branch and this PR.
**Scala Shell**
```scala
spark.range(1000000000).map(_ + 1).count
```
**PySpark**
```python
spark.range(10000000).rdd.map(lambda x: len(x)).count()
```
**Spark Submit**
```python
from pyspark.sql import SparkSession
if __name__ == "__main__":
spark = SparkSession.builder.getOrCreate()
spark.range(2000000).rdd.map(lambda row: len(row)).count()
spark.stop()
```
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19061 from dongjoon-hyun/SPARK-21568.
## What changes were proposed in this pull request?
As the detail scenario described in [SPARK-22074](https://issues.apache.org/jira/browse/SPARK-22074), unnecessary resubmitted may cause stage hanging in currently release versions. This patch add a new var in TaskInfo to mark this task killed by other attempt or not.
## How was this patch tested?
Add a new UT `[SPARK-22074] Task killed by other attempt task should not be resubmitted` in TaskSetManagerSuite, this UT recreate the scenario in JIRA description, it failed without the changes in this PR and passed conversely.
Author: Yuanjian Li <xyliyuanjian@gmail.com>
Closes#19287 from xuanyuanking/SPARK-22074.
## What changes were proposed in this pull request?
Prior to this commit BlockId.hashCode and BlockId.equals were defined
in terms of BlockId.name. This allowed the subclasses to be concise and
enforced BlockId.name as a single unique identifier for a block. All
subclasses override BlockId.name with an expression involving an
allocation of StringBuilder and ultimatelly String. This is suboptimal
since it induced unnecessary GC pressure on the dirver, see
BlockManagerMasterEndpoint.
The commit removes the definition of hashCode and equals from the base
class. No other change is necessary since all subclasses are in fact
case classes and therefore have auto-generated hashCode and equals. No
change of behaviour is expected.
Sidenote: you might be wondering, why did the subclasses use the base
implementation and the auto-generated one? Apparently, this behaviour
is documented in the spec. See this SO answer for details
https://stackoverflow.com/a/44990210/262432.
## How was this patch tested?
BlockIdSuite
Author: Sergei Lebedev <s.lebedev@criteo.com>
Closes#19369 from superbobry/blockid-equals-hashcode.
## What changes were proposed in this pull request?
Fix for SPARK-20466, full description of the issue in the JIRA. To summarize, `HadoopRDD` uses a metadata cache to cache `JobConf` objects. The cache uses soft-references, which means the JVM can delete entries from the cache whenever there is GC pressure. `HadoopRDD#getJobConf` had a bug where it would check if the cache contained the `JobConf`, if it did it would get the `JobConf` from the cache and return it. This doesn't work when soft-references are used as the JVM can delete the entry between the existence check and the get call.
## How was this patch tested?
Haven't thought of a good way to test this yet given the issue only occurs sometimes, and happens during high GC pressure. Was thinking of using mocks to verify `#getJobConf` is doing the right thing. I deleted the method `HadoopRDD#containsCachedMetadata` so that we don't hit this issue again.
Author: Sahil Takiar <stakiar@cloudera.com>
Closes#19413 from sahilTakiar/master.
## What changes were proposed in this pull request?
Improve the Spark-Mesos coarse-grained scheduler to consider the preferred locations when dynamic allocation is enabled.
## How was this patch tested?
Added a unittest, and performed manual testing on AWS.
Author: Gene Pang <gene.pang@gmail.com>
Closes#18098 from gpang/mesos_data_locality.
## What changes were proposed in this pull request?
When zinc is running the pwd might be in the root of the project. A quick solution to this is to not go a level up incase we are in the root rather than root/core/. If we are in the root everything works fine, if we are in core add a script which goes and runs the level up
## How was this patch tested?
set -x in the SparkR install scripts.
Author: Holden Karau <holden@us.ibm.com>
Closes#19402 from holdenk/SPARK-22167-sparkr-packaging-issue-allow-zinc.
## What changes were proposed in this pull request?
Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.
## How was this patch tested?
Added a pretty sophisticated test based on chi square test ...
Author: Reynold Xin <rxin@databricks.com>
Closes#19387 from rxin/SPARK-22160.
## What changes were proposed in this pull request?
This patch add latest failure reason for task set blacklist.Which can be showed on spark ui and let user know failure reason directly.
Till now , every job which aborted by completed blacklist just show log like below which has no more information:
`Aborting $taskSet because task $indexInTaskSet (partition $partition) cannot run anywhere due to node and executor blacklist. Blacklisting behavior cannot run anywhere due to node and executor blacklist.Blacklisting behavior can be configured via spark.blacklist.*."`
**After modify:**
```
Aborting TaskSet 0.0 because task 0 (partition 0)
cannot run anywhere due to node and executor blacklist.
Most recent failure:
Some(Lost task 0.1 in stage 0.0 (TID 3,xxx, executor 1): java.lang.Exception: Fake error!
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:73)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:305)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
).
Blacklisting behavior can be configured via spark.blacklist.*.
```
## How was this patch tested?
Unit test and manually test.
Author: zhoukang <zhoukang199191@gmail.com>
Closes#19338 from caneGuy/zhoukang/improve-blacklist.
## What changes were proposed in this pull request?
Fix finalizer checkstyle violation by just turning it off; re-disable checkstyle as it won't be run by SBT PR builder. See https://github.com/apache/spark/pull/18887#issuecomment-332580700
## How was this patch tested?
`./dev/lint-java` runs successfully
Author: Sean Owen <sowen@cloudera.com>
Closes#19371 from srowen/HotfixFinalizerCheckstlye.
## What changes were proposed in this pull request?
Currently we use Arrow File format to communicate with Python worker when invoking vectorized UDF but we can use Arrow Stream format.
This pr replaces the Arrow File format with the Arrow Stream format.
## How was this patch tested?
Existing tests.
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#19349 from ueshin/issues/SPARK-22125.
The application listing is still generated from event logs, but is now stored
in a KVStore instance. By default an in-memory store is used, but a new config
allows setting a local disk path to store the data, in which case a LevelDB
store will be created.
The provider stores things internally using the public REST API types; I believe
this is better going forward since it will make it easier to get rid of the
internal history server API which is mostly redundant at this point.
I also added a finalizer to LevelDBIterator, to make sure that resources are
eventually released. This helps when code iterates but does not exhaust the
iterator, thus not triggering the auto-close code.
HistoryServerSuite was modified to not re-start the history server unnecessarily;
this makes the json validation tests run more quickly.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#18887 from vanzin/SPARK-20642.
## What changes were proposed in this pull request?
MemoryStore.evictBlocksToFreeSpace acquires write locks for all the
blocks it intends to evict up front. If there is a failure to evict
blocks (eg., some failure dropping a block to disk), then we have to
release the lock. Otherwise the lock is never released and an executor
trying to get the lock will wait forever.
## How was this patch tested?
Added unit test.
Author: Imran Rashid <irashid@cloudera.com>
Closes#19311 from squito/SPARK-22083.
## What changes were proposed in this pull request?
Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:
- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19307 from srowen/Scala212.
## What changes were proposed in this pull request?
EventLoggingListener use `val in = new BufferedInputStream(fs.open(log))` and will close it if `codec.map(_.compressedInputStream(in)).getOrElse(in)` occurs an exception .
But, if `CompressionCodec.createCodec(new SparkConf, c)` throws an exception, the BufferedInputStream `in` will not be closed anymore.
## How was this patch tested?
exist tests
Author: zuotingbing <zuo.tingbing9@zte.com.cn>
Closes#19277 from zuotingbing/SPARK-22058.
## What changes were proposed in this pull request?
This PR proposes to remove `assume` in `Utils.resolveURIs` and replace `assume` to `assert` in `Utils.resolveURI` in the test cases in `UtilsSuite`.
It looks `Utils.resolveURIs` supports multiple but also single paths as input. So, it looks not meaningful to check if the input has `,`.
For the test for `Utils.resolveURI`, I replaced it to `assert` because it looks taking single path and in order to prevent future mistakes when adding more tests here.
For `assume` in `HiveDDLSuite`, it looks it should be `assert` to test at the last
## How was this patch tested?
Fixed unit tests.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#19332 from HyukjinKwon/SPARK-22093.
## What changes were proposed in this pull request?
Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19266 from srowen/SPARK-22033.
This PR adds vectorized UDFs to the Python API
**Proposed API**
Introduce a flag to turn on vectorization for a defined UDF, for example:
```
pandas_udf(DoubleType())
def plus(a, b)
return a + b
```
or
```
plus = pandas_udf(lambda a, b: a + b, DoubleType())
```
Usage is the same as normal UDFs
0-parameter UDFs
pandas_udf functions can declare an optional `**kwargs` and when evaluated, will contain a key "size" that will give the required length of the output. For example:
```
pandas_udf(LongType())
def f0(**kwargs):
return pd.Series(1).repeat(kwargs["size"])
df.select(f0())
```
Added new unit tests in pyspark.sql that are enabled if pyarrow and Pandas are available.
- [x] Fix support for promoted types with null values
- [ ] Discuss 0-param UDF API (use of kwargs)
- [x] Add tests for chained UDFs
- [ ] Discuss behavior when pyarrow not installed / enabled
- [ ] Cleanup pydoc and add user docs
Author: Bryan Cutler <cutlerb@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Closes#18659 from BryanCutler/arrow-vectorized-udfs-SPARK-21404.
## What changes were proposed in this pull request?
We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader. In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.
## How was this patch tested?
Manual tests & existing suite via jenkins. I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally. I manually verified the warning message is no longer present.
Author: Imran Rashid <irashid@cloudera.com>
Closes#19280 from squito/SPARK-21928_ser_classloader.
## What changes were proposed in this pull request?
This is a followup work of SPARK-9104 to expose the Netty memory usage to MetricsSystem. Current the shuffle Netty memory usage of `NettyBlockTransferService` will be exposed, if using external shuffle, then the Netty memory usage of `ExternalShuffleClient` and `ExternalShuffleService` will be exposed instead. Currently I don't expose Netty memory usage of `YarnShuffleService`, because `YarnShuffleService` doesn't have `MetricsSystem` itself, and is better to connect to Hadoop's MetricsSystem.
## How was this patch tested?
Manually verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19160 from jerryshao/SPARK-21934.
## What changes were proposed in this pull request?
Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19282 from srowen/SPARK-22066.
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.
The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.
The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.
Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19211 from vanzin/SPARK-18838.
## What changes were proposed in this pull request?
In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with:
```
java.io.IOException: No FileSystem for scheme: http
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354)
at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599)
at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599)
at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598)
at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848)
at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173)
```
This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`.
## How was this patch tested?
Unit test added, also verified in local cluster.
Author: jerryshao <sshao@hortonworks.com>
Closes#19130 from jerryshao/SPARK-21917.
## What changes were proposed in this pull request?
* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15754
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
* The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to
## How was this patch tested?
* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.
Author: Armin <me@obrown.io>
Closes#19254 from original-brownbear/cleanup-mem-consumer.
## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization.
## How was this patch tested?
Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
.persist(StorageLevel.OFF_HEAP)
.count()
println(System.currentTimeMillis() - start)
```
Test result:
before
| 27647 | 29108 | 28591 | 28264 | 27232 |
after
| 26868 | 26358 | 27767 | 26653 | 26693 |
Author: Xianyang Liu <xianyang.liu@intel.com>
Closes#19135 from ConeyLiu/memorystore.
## What changes were proposed in this pull request?
Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups. When Graphite is deployed behind an ELB, ELB may change IP addresses based on auto-scaling needs. Using current approach yields Graphite usage impossible, fixing for that use case
- Upgrade to codahale 3.1.5
- Use new Graphite(host, port) constructor instead of new Graphite(new InetSocketAddress(host, port)) constructor
## How was this patch tested?
The same logic is used for another project that is using the same configuration and code path, and graphite re-connect's behind ELB's are no longer an issue
This are proposed changes for codahale lib - https://github.com/dropwizard/metrics/compare/v3.1.2...v3.1.5#diff-6916c85d2dd08d89fe771c952e3b8512R120. Specifically, b4d246d34e/metrics-graphite/src/main/java/com/codahale/metrics/graphite/Graphite.java (L120)
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: alexmnyc <project@alexandermarkham.com>
Closes#19210 from alexmnyc/patch-1.
Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O.
## How was this patch tested?
Tested by running a job on the cluster and could see up to 8% CPU improvement.
Author: Sital Kedia <skedia@fb.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: Sital Kedia <sitalkedia@users.noreply.github.com>
Closes#18317 from sitalkedia/read_ahead_buffer.
As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden.
Author: Andrew Ash <andrew@andrewash.com>
Closes#19164 from ash211/patch-3.
## What changes were proposed in this pull request?
As logging below, actually exception will be hidden when removeBlockInternal throw an exception.
`2017-08-31,10:26:57,733 WARN org.apache.spark.storage.BlockManager: Putting block broadcast_110 failed due to an exception
2017-08-31,10:26:57,734 WARN org.apache.spark.broadcast.BroadcastManager: Failed to create a new broadcast in 1 attempts
java.io.IOException: Failed to create local dir in /tmp/blockmgr-5bb5ac1e-c494-434a-ab89-bd1808c6b9ed/2e.
at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:115)
at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726)
at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233)
at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122)
at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
at org.apache.spark.broadcast.BroadcastManager$$anonfun$newBroadcast$1.apply$mcVI$sp(BroadcastManager.scala:60)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:58)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1415)
at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1002)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:924)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:771)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:770)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:770)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1235)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1662)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)`
In this pr i will print exception first make troubleshooting more conveniently.
PS:
This one split from [PR-19133](https://github.com/apache/spark/pull/19133)
## How was this patch tested?
Exsist unit test
Author: zhoukang <zhoukang199191@gmail.com>
Closes#19171 from caneGuy/zhoukang/print-rootcause.
…NING
## What changes were proposed in this pull request?
When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely.
We can fix this time by modify time of event log since this time has gotten when `FSHistoryProvider` fetch event log from File System.
And the result picture is uploaded in [SPARK-21922](https://issues.apache.org/jira/browse/SPARK-21922).
How to reproduce?
(1) Submit a job to spark on yarn
(2) Mock an oom(or other case can make driver quit abnormally) senario for driver
(3) Make sure executor is running task when driver quitting
(4) Open the history server and checkout result
It is not a corner case since there are many such jobs in our current cluster.
## How was this patch tested?
Deploy historyserver and open a job has this problem.
Author: zhoukang <zhoukang199191@gmail.com>
Closes#19132 from caneGuy/zhoukang/fix-duration.
## What changes were proposed in this pull request?
forgot to call `update()` with `graph1` & `rdd1` in examples for `PeriodicGraphCheckpointer` & `PeriodicRDDCheckpoin`
## How was this patch tested?
existing tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes#19198 from zhengruifeng/fix_doc_checkpointer.
## What changes were proposed in this pull request?
1. Removing all redundant throws declarations from Java codebase.
2. Removing dead code made visible by this from `ShuffleExternalSorter#closeAndGetSpills`
## How was this patch tested?
Build still passes.
Author: Armin <me@obrown.io>
Closes#19182 from original-brownbear/SPARK-21970.
## What changes were proposed in this pull request?
After you create a temporary table, you need to delete it, otherwise it will leave a file similar to the file name ‘SPARK194465907929586320484966temp’.
## How was this patch tested?
N / A
Author: caoxuewen <cao.xuewen@zte.com.cn>
Closes#19174 from heary-cao/DeleteTempFile.
## What changes were proposed in this pull request?
In UtilsSuite Locale was set by default to US, but at the moment of using format function it wasn't, taking by default JVM locale which could be different than US making this test fail.
## How was this patch tested?
Unit test (UtilsSuite)
Author: German Schiavon <germanschiavon@gmail.com>
Closes#19205 from Gschiavon/fix/test-locale.
## What changes were proposed in this pull request?
this PR describe remove the import class that are unused.
## How was this patch tested?
N/A
Author: caoxuewen <cao.xuewen@zte.com.cn>
Closes#19131 from heary-cao/unuse_import.
Since ScalaTest 3.0.0, `org.scalatest.concurrent.Timeouts` is deprecated.
This PR replaces the deprecated one with `org.scalatest.concurrent.TimeLimits`.
```scala
-import org.scalatest.concurrent.Timeouts._
+import org.scalatest.concurrent.TimeLimits._
```
Pass the existing test suites.
Author: Dongjoon Hyun <dongjoon@apache.org>
Closes#19150 from dongjoon-hyun/SPARK-21939.
Change-Id: I1a1b07f1b97e51e2263dfb34b7eaaa099b2ded5e
I observed this while running a oozie job trying to connect to hbase via spark.
It look like the creds are not being passed in thehttps://github.com/apache/spark/blob/branch-2.2/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/security/HadoopFSCredentialProvider.scala#L53 for 2.2 release.
More Info as to why it fails on secure grid:
Oozie client gets the necessary tokens the application needs before launching. It passes those tokens along to the oozie launcher job (MR job) which will then actually call the Spark client to launch the spark app and pass the tokens along.
The oozie launcher job cannot get anymore tokens because all it has is tokens ( you can't get tokens with tokens, you need tgt or keytab).
The error here is because the launcher job runs the Spark Client to submit the spark job but the spark client doesn't see that it already has the hdfs tokens so it tries to get more, which ends with the exception.
There was a change with SPARK-19021 to generalize the hdfs credentials provider that changed it so we don't pass the existing credentials into the call to get tokens so it doesn't realize it already has the necessary tokens.
https://issues.apache.org/jira/browse/SPARK-21890
Modified to pass creds to get delegation tokens
Author: Sanket Chintapalli <schintap@yahoo-inc.com>
Closes#19140 from redsanket/SPARK-21890-master.
## What changes were proposed in this pull request?
If no SparkConf is available to Utils.redact, simply don't redact.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19123 from srowen/SPARK-21418.
…build; fix some things that will be warnings or errors in 2.12; restore Scala 2.12 profile infrastructure
## What changes were proposed in this pull request?
This change adds back the infrastructure for a Scala 2.12 build, but does not enable it in the release or Python test scripts.
In order to make that meaningful, it also resolves compile errors that the code hits in 2.12 only, in a way that still works with 2.11.
It also updates dependencies to the earliest minor release of dependencies whose current version does not yet support Scala 2.12. This is in a sense covered by other JIRAs under the main umbrella, but implemented here. The versions below still work with 2.11, and are the _latest_ maintenance release in the _earliest_ viable minor release.
- Scalatest 2.x -> 3.0.3
- Chill 0.8.0 -> 0.8.4
- Clapper 1.0.x -> 1.1.2
- json4s 3.2.x -> 3.4.2
- Jackson 2.6.x -> 2.7.9 (required by json4s)
This change does _not_ fully enable a Scala 2.12 build:
- It will also require dropping support for Kafka before 0.10. Easy enough, just didn't do it yet here
- It will require recreating `SparkILoop` and `Main` for REPL 2.12, which is SPARK-14650. Possible to do here too.
What it does do is make changes that resolve much of the remaining gap without affecting the current 2.11 build.
## How was this patch tested?
Existing tests and build. Manually tested with `./dev/change-scala-version.sh 2.12` to verify it compiles, modulo the exceptions above.
Author: Sean Owen <sowen@cloudera.com>
Closes#18645 from srowen/SPARK-14280.
- SecurityManager complains when auth is enabled but no secret is defined;
SparkSubmit doesn't use the auth functionality of the SecurityManager,
so use a dummy secret to work around the exception.
- Only reset the log4j configuration when Spark was the one initializing
it, otherwise user-defined log configuration may be lost.
Tested with the log config file posted to the bug, on a secured YARN cluster.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19089 from vanzin/SPARK-21728.
## What changes were proposed in this pull request?
`PickleException` is thrown when creating dataframe from python row with empty bytearray
spark.createDataFrame(spark.sql("select unhex('') as xx").rdd.map(lambda x: {"abc": x.xx})).show()
net.razorvine.pickle.PickleException: invalid pickle data for bytearray; expected 1 or 2 args, got 0
at net.razorvine.pickle.objects.ByteArrayConstructor.construct(ByteArrayConstructor.java
...
`ByteArrayConstructor` doesn't deal with empty byte array pickled by Python3.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <viirya@gmail.com>
Closes#19085 from viirya/SPARK-21534.
This patch adds statsd sink to the current metrics system in spark core.
Author: Xiaofeng Lin <xlin@twilio.com>
Closes#9518 from xflin/statsd.
Change-Id: Ib8720e86223d4a650df53f51ceb963cd95b49a44
## What changes were proposed in this pull request?
Fix Java code style so `./dev/lint-java` succeeds
## How was this patch tested?
Run `./dev/lint-java`
Author: Andrew Ash <andrew@andrewash.com>
Closes#19088 from ash211/spark-21875-lint-java.
## What changes were proposed in this pull request?
killExecutor api currently does not allow killing an executor without updating the total number of executors needed. In case of dynamic allocation is turned on and the allocator tries to kill an executor, the scheduler reduces the total number of executors needed ( see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala#L635) which is incorrect because the allocator already takes care of setting the required number of executors itself.
## How was this patch tested?
Ran a job on the cluster and made sure the executor request is correct
Author: Sital Kedia <skedia@fb.com>
Closes#19081 from sitalkedia/skedia/oss_fix_executor_allocation.
## What changes were proposed in this pull request?
`org.apache.spark.deploy.RPackageUtilsSuite`
```
- jars without manifest return false *** FAILED *** (109 milliseconds)
java.io.IOException: Unable to delete file: C:\projects\spark\target\tmp\1500266936418-0\dep1-c.jar
```
`org.apache.spark.deploy.SparkSubmitSuite`
```
- download one file to local *** FAILED *** (16 milliseconds)
java.net.URISyntaxException: Illegal character in authority at index 6: s3a://C:\projects\spark\target\tmp\test2630198944759847458.jar
- download list of files to local *** FAILED *** (0 milliseconds)
java.net.URISyntaxException: Illegal character in authority at index 6: s3a://C:\projects\spark\target\tmp\test2783551769392880031.jar
```
`org.apache.spark.scheduler.ReplayListenerSuite`
```
- Replay compressed inprogress log file succeeding on partial read (156 milliseconds)
Exception encountered when attempting to run a suite with class name:
org.apache.spark.scheduler.ReplayListenerSuite *** ABORTED *** (1 second, 391 milliseconds)
java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-8f3cacd6-faad-4121-b901-ba1bba8025a0
- End-to-end replay *** FAILED *** (62 milliseconds)
java.io.IOException: No FileSystem for scheme: C
- End-to-end replay with compression *** FAILED *** (110 milliseconds)
java.io.IOException: No FileSystem for scheme: C
```
`org.apache.spark.sql.hive.StatisticsSuite`
```
- SPARK-21079 - analyze table with location different than that of individual partitions *** FAILED *** (875 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
- SPARK-21079 - analyze partitioned table with only a subset of partitions visible *** FAILED *** (47 milliseconds)
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string);
```
**Note:** this PR does not fix:
`org.apache.spark.deploy.SparkSubmitSuite`
```
- launch simple application with spark-submit with redaction *** FAILED *** (172 milliseconds)
java.util.NoSuchElementException: next on empty iterator
```
I can't reproduce this on my Windows machine but looks appearntly consistently failed on AppVeyor. This one is unclear to me yet and hard to debug so I did not include this one for now.
**Note:** it looks there are more instances but it is hard to identify them partly due to flakiness and partly due to swarming logs and errors. Will probably go one more time if it is fine.
## How was this patch tested?
Manually via AppVeyor:
**Before**
- `org.apache.spark.deploy.RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/8t8ra3lrljuir7q4
- `org.apache.spark.deploy.SparkSubmitSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/taquy84yudjjen64
- `org.apache.spark.scheduler.ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/24omrfn2k0xfa9xq
- `org.apache.spark.sql.hive.StatisticsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/771-windows-fix/job/2079y1plgj76dc9l
**After**
- `org.apache.spark.deploy.RPackageUtilsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/3803dbfn89ne1164
- `org.apache.spark.deploy.SparkSubmitSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/m5l350dp7u9a4xjr
- `org.apache.spark.scheduler.ReplayListenerSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/565vf74pp6bfdk18
- `org.apache.spark.sql.hive.StatisticsSuite`: https://ci.appveyor.com/project/spark-test/spark/build/775-windows-fix/job/qm78tsk8c37jb6s4
Jenkins tests are required and AppVeyor tests will be triggered.
Author: hyukjinkwon <gurwls223@gmail.com>
Closes#18971 from HyukjinKwon/windows-fixes.
## What changes were proposed in this pull request?
Free off -heap memory .
I have checked all the unit tests.
## How was this patch tested?
N/A
Author: liuxian <liu.xian3@zte.com.cn>
Closes#19075 from 10110346/memleak.
## What changes were proposed in this pull request?
Handles the situation where a `FileOutputCommitter.getWorkPath()` returns `null` by downgrading to the supplied `path` argument.
The existing code does an `Option(workPath.toString).getOrElse(path)`, which triggers an NPE in the `toString()` operation if the workPath == null. The code apparently was meant to handle this (hence the getOrElse() clause, but as the NPE has already occurred at that point the else-clause never gets invoked.
## How was this patch tested?
Manually, with some later code review.
Author: Steve Loughran <stevel@hortonworks.com>
Closes#18111 from steveloughran/cloud/SPARK-20886-committer-NPE.
## What changes were proposed in this pull request?
The variable "TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES" comment error, It shouldn't be 2^32-1, should be 2^31-1, That means the maximum value of int.
## How was this patch tested?
Existing test cases
Author: he.qiao <he.qiao17@zte.com.cn>
Closes#19025 from Geek-He/08_23_comments.
This change initializes logging when SparkSubmit runs, using
a configuration that should avoid printing log messages as
much as possible with most configurations, and adds code to
restore the Spark logging system to as close as possible to
its initial state, so the Spark app being run can re-initialize
logging with its own configuration.
With that feature, some duplicate code in SparkSubmit can now
be replaced with the existing methods in the Utils class, which
could not be used before because they initialized logging. As part
of that I also did some minor refactoring, moving methods that
should really belong in DependencyUtils.
The change also shuffles some code in SparkHadoopUtil so that
SparkSubmit can create a Hadoop config like the rest of Spark
code, respecting the user's Spark configuration.
The behavior was verified running spark-shell, pyspark and
normal applications, then verifying the logging behavior,
with and without dependency downloads.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes#19013 from vanzin/SPARK-21728.
## What changes were proposed in this pull request?
Fair Scheduler can be built via one of the following options:
- By setting a `spark.scheduler.allocation.file` property,
- By setting `fairscheduler.xml` into classpath.
These options are checked **in order** and fair-scheduler is built via first found option. If invalid path is found, `FileNotFoundException` will be expected.
This PR aims unit test coverage of these use cases and a minor documentation change has been added for second option(`fairscheduler.xml` into classpath) to inform the users.
Also, this PR was related with #16813 and has been created separately to keep patch content as isolated and to help the reviewers.
## How was this patch tested?
Added new Unit Tests.
Author: erenavsarogullari <erenavsarogullari@gmail.com>
Closes#16992 from erenavsarogullari/SPARK-19662.
## What changes were proposed in this pull request?
With SPARK-10643, Spark supports download resources from remote in client deploy mode. But the implementation overrides variables which representing added resources (like `args.jars`, `args.pyFiles`) to local path, And yarn client leverage this local path to re-upload resources to distributed cache. This is unnecessary to break the semantics of putting resources in a shared FS. So here proposed to fix it.
## How was this patch tested?
This is manually verified with jars, pyFiles in local and remote storage, both in client and cluster mode.
Author: jerryshao <sshao@hortonworks.com>
Closes#18962 from jerryshao/SPARK-21714.
## What changes were proposed in this pull request?
Fix build warnings and Java lint errors. This just helps a bit in evaluating (new) warnings in another PR I have open.
## How was this patch tested?
Existing tests
Author: Sean Owen <sowen@cloudera.com>
Closes#19051 from srowen/JavaWarnings.
## What changes were proposed in this pull request?
Right now, ChunkedByteBuffer#writeFully do not slice bytes first.We observe code in java nio Util#getTemporaryDirectBuffer below:
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf;
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size);
}
If we slice first with a fixed size, we can use buffer cache and only need to allocate at the first write call.
Since we allocate new buffer, we can not control the free time of this buffer.This once cause memory issue in our production cluster.
In this patch, i supply a new api which will slice with fixed size for buffer writing.
## How was this patch tested?
Unit test and test in production.
Author: zhoukang <zhoukang199191@gmail.com>
Author: zhoukang <zhoukang@xiaomi.com>
Closes#18730 from caneGuy/zhoukang/improve-chunkwrite.
Right now the spark shuffle service has a cache for index files. It is based on a # of files cached (spark.shuffle.service.index.cache.entries). This can cause issues if people have a lot of reducers because the size of each entry can fluctuate based on the # of reducers.
We saw an issues with a job that had 170000 reducers and it caused NM with spark shuffle service to use 700-800MB or memory in NM by itself.
We should change this cache to be memory based and only allow a certain memory size used. When I say memory based I mean the cache should have a limit of say 100MB.
https://issues.apache.org/jira/browse/SPARK-21501
Manual Testing with 170000 reducers has been performed with cache loaded up to max 100MB default limit, with each shuffle index file of size 1.3MB. Eviction takes place as soon as the total cache size reaches the 100MB limit and the objects will be ready for garbage collection there by avoiding NM to crash. No notable difference in runtime has been observed.
Author: Sanket Chintapalli <schintap@yahoo-inc.com>
Closes#18940 from redsanket/SPARK-21501.
## What changes were proposed in this pull request?
Add a new listener event when a speculative task is created and notify it to ExecutorAllocationManager for requesting more executor.
## How was this patch tested?
- Added Unittests.
- For the test snippet in the jira:
val n = 100
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index == 1) {
Thread.sleep(Long.MaxValue) // fake long running task(s)
}
it.toList.map(x => index + ", " + x).iterator
}).collect
With this code change, spark indicates 101 jobs are running (99 succeeded, 2 running and 1 is speculative job)
Author: Jane Wang <janewang@fb.com>
Closes#18492 from janewangfb/speculated_task_not_launched.
There're two code in Launcher and SparkSubmit will will explicitly list all the Spark submodules, newly added kvstore module is missing in this two parts, so submitting a minor PR to fix this.
Author: jerryshao <sshao@hortonworks.com>
Closes#19014 from jerryshao/missing-kvstore.