Commit graph

19944 commits

Author SHA1 Message Date
lianhuiwang 8b5b2e272f [SPARK-20986][SQL] Reset table's statistics after PruneFileSourcePartitions rule.
## What changes were proposed in this pull request?
After PruneFileSourcePartitions rule, It needs reset table's statistics because PruneFileSourcePartitions can filter some unnecessary partitions. So the statistics need to be changed.

## How was this patch tested?
add unit test.

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #18205 from lianhuiwang/SPARK-20986.
2017-06-14 09:57:56 +08:00
jerryshao 9eb095243b [SPARK-12552][CORE] Correctly count the driver resource when recovering from failure for Master
Currently in Standalone HA mode, the resource usage of driver is not correctly counted in Master when recovering from failure, this will lead to some unexpected behaviors like negative value in UI.

So here fix this to also count the driver's resource usage.

Also changing the recovered app's state to `RUNNING` when fully recovered. Previously it will always be WAITING even fully recovered.

andrewor14 please help to review, thanks a lot.

Author: jerryshao <sshao@hortonworks.com>

Closes #10506 from jerryshao/SPARK-12552.
2017-06-14 08:12:15 +08:00
liuxian 7ba8bf288d [SPARK-21016][CORE] Improve code fault tolerance for converting string to number
## What changes were proposed in this pull request?
When converting `string` to `number`(int, long or double),  if the string has a space before or after,will lead to unnecessary mistakes.

## How was this patch tested?
unit test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18238 from 10110346/lx-wip-0608.
2017-06-13 10:12:28 -07:00
Liang-Chi Hsieh bcf3643f94 [SPARK-21051][SQL] Add hash map metrics to aggregate
## What changes were proposed in this pull request?

This adds the average hash map probe metrics to hash aggregate.

`BytesToBytesMap` already has API to get the metrics, this PR adds an API to `UnsafeFixedWidthAggregationMap` to access it.

Preparing a test for this metrics seems tricky, because we don't know what collision keys are. For now, the test case generates random data large enough to have desired probe.

TODO in later PR: add hash map metrics to join.

## How was this patch tested?

Added test to SQLMetricsSuite.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #18258 from viirya/SPARK-20953.
2017-06-13 10:10:35 -07:00
DjvuLee b36ce2a246 [SPARK-21064][CORE][TEST] Fix the default value bug in NettyBlockTransferServiceSuite
## What changes were proposed in this pull request?

The default value for `spark.port.maxRetries` is 100,
but we use 10 in the suite file.
So we change it to 100 to avoid test failure.

## How was this patch tested?
No test

Author: DjvuLee <lihu@bytedance.com>

Closes #18280 from djvulee/NettyTestBug.
2017-06-13 15:56:03 +01:00
guoxiaolong b7304f2559 [SPARK-21060][WEB-UI] Css style about paging function is error in the executor page. Css style about paging function is error in the executor page. It is different of history server ui paging function css style.
## What changes were proposed in this pull request?

Css style about paging function is error in the executor page. It is different of history server ui paging function css style.

**But their style should be consistent**. There are three reasons.

1. The first reason: 'Previous', 'Next' and number should be the button format.

2. The second reason: when you are on the first page, 'Previous' and '1' should be gray and can not be clicked.
![1](https://user-images.githubusercontent.com/26266482/27026667-1fe745ee-4f91-11e7-8b34-150819d22bd3.png)

3. The third reason: when you are on the last page, 'Previous' and 'Max number' should be gray and can not be clicked.
![2](https://user-images.githubusercontent.com/26266482/27026811-9d8d6fa0-4f91-11e7-8b51-7816c3feb381.png)

before fix:
![fix_before](https://user-images.githubusercontent.com/26266482/27026428-47ec5c56-4f90-11e7-9dd5-d52c22d7bd36.png)

after fix:
![fix_after](https://user-images.githubusercontent.com/26266482/27026439-50d17072-4f90-11e7-8405-6f81da5ab32c.png)

The style of history server ui:
![history](https://user-images.githubusercontent.com/26266482/27026528-9c90f780-4f90-11e7-91e6-90d32651fe03.png)

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>

Closes #18275 from guoxiaolongzte/SPARK-21060.
2017-06-13 15:38:11 +01:00
Rishabh Bhardwaj 9b2c877bec [SPARK-21039][SPARK CORE] Use treeAggregate instead of aggregate in DataFrame.stat.bloomFilter
## What changes were proposed in this pull request?
To use treeAggregate instead of aggregate in DataFrame.stat.bloomFilter to parallelize the operation of merging the bloom filters
(Please fill in changes proposed in this fix)

## How was this patch tested?
unit tests passed
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Rishabh Bhardwaj <rbnext29@gmail.com>
Author: Rishabh Bhardwaj <admin@rishabh.local>
Author: Rishabh Bhardwaj <r0b00ko@rishabh.Dlink>
Author: Rishabh Bhardwaj <admin@Admins-MacBook-Pro.local>
Author: Rishabh Bhardwaj <r0b00ko@rishabh.local>

Closes #18263 from rishabhbhardwaj/SPARK-21039.
2017-06-13 15:09:12 +01:00
liuxian 2aaed0a4db [SPARK-21006][TESTS][FOLLOW-UP] Some Worker's RpcEnv is leaked in WorkerSuite
## What changes were proposed in this pull request?

Create rpcEnv and run later needs shutdown. as #18226

## How was this patch tested?
unit test

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18259 from 10110346/wip-lx-0610.
2017-06-13 12:29:50 +01:00
Sean Owen 7b7c85ede3 [SPARK-20920][SQL] ForkJoinPool pools are leaked when writing hive tables with many partitions
## What changes were proposed in this pull request?

Don't leave thread pool running from AlterTableRecoverPartitionsCommand DDL command

## How was this patch tested?

Existing tests.

Author: Sean Owen <sowen@cloudera.com>

Closes #18216 from srowen/SPARK-20920.
2017-06-13 10:48:07 +01:00
Felix Cheung 278ba7a2c6 [TEST][SPARKR][CORE] Fix broken SparkSubmitSuite
## What changes were proposed in this pull request?

Fix test file path. This is broken in #18264 and undetected since R-only changes don't build core and subsequent post-commit with the change built fine (again because it wasn't building core)

actually appveyor builds everything but it's not running scala suites ...

## How was this patch tested?

jenkins
srowen gatorsmile

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #18283 from felixcheung/rsubmitsuite.
2017-06-12 22:08:49 -07:00
Dongjoon Hyun 2639c3ed03 [SPARK-19910][SQL] stack should not reject NULL values due to type mismatch
## What changes were proposed in this pull request?

Since `stack` function generates a table with nullable columns, it should allow mixed null values.

```scala
scala> sql("select stack(3, 1, 2, 3)").printSchema
root
 |-- col0: integer (nullable = true)

scala> sql("select stack(3, 1, 2, null)").printSchema
org.apache.spark.sql.AnalysisException: cannot resolve 'stack(3, 1, 2, NULL)' due to data type mismatch: Argument 1 (IntegerType) != Argument 3 (NullType); line 1 pos 7;
```

## How was this patch tested?

Pass the Jenkins with a new test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #17251 from dongjoon-hyun/SPARK-19910.
2017-06-12 21:18:43 -07:00
Wenchen Fan fc0e6944a5 Revert "[SPARK-21046][SQL] simplify the array offset and length in ColumnVector"
This reverts commit 22dd65f58e.
2017-06-13 09:15:14 +08:00
Shixiong Zhu 74a432d3a3 [SPARK-20979][SS] Add RateSource to generate values for tests and benchmark
## What changes were proposed in this pull request?

This PR adds RateSource for Structured Streaming so that the user can use it to generate data for tests and benchmark easily.

This source generates increment long values with timestamps. Each generated row has two columns: a timestamp column for the generated time and an auto increment long column starting with 0L.

It supports the following options:
- `rowsPerSecond` (e.g. 100, default: 1): How many rows should be generated per second.
- `rampUpTime` (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes `rowsPerSecond`. Using finer granularities than seconds will be truncated to integer seconds.
- `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach `rowsPerSecond`, but the query may be resource constrained, and `numPartitions` can be tweaked to help reach the desired speed.

Here is a simple example that prints 10 rows per seconds:
```
    spark.readStream
      .format("rate")
      .option("rowsPerSecond", "10")
      .load()
      .writeStream
      .format("console")
      .start()
```

The idea came from marmbrus and he did the initial work.

## How was this patch tested?

The added tests.

Author: Shixiong Zhu <shixiong@databricks.com>
Author: Michael Armbrust <michael@databricks.com>

Closes #18199 from zsxwing/rate.
2017-06-12 14:58:08 -07:00
Joseph K. Bradley ff318c0d2f [SPARK-21050][ML] Word2vec persistence overflow bug fix
## What changes were proposed in this pull request?

The method calculateNumberOfPartitions() uses Int, not Long (unlike the MLlib version), so it is very easily to have an overflow in calculating the number of partitions for ML persistence.

This modifies the calculations to use Long.

## How was this patch tested?

New unit test.  I verified that the test fails before this patch.

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #18265 from jkbradley/word2vec-save-fix.
2017-06-12 14:27:57 -07:00
Reynold Xin b1436c7496 [SPARK-21059][SQL] LikeSimplification can NPE on null pattern
## What changes were proposed in this pull request?
This patch fixes a bug that can cause NullPointerException in LikeSimplification, when the pattern for like is null.

## How was this patch tested?
Added a new unit test case in LikeSimplificationSuite.

Author: Reynold Xin <rxin@databricks.com>

Closes #18273 from rxin/SPARK-21059.
2017-06-12 14:07:51 -07:00
Dongjoon Hyun 32818d9b37 [SPARK-20345][SQL] Fix STS error handling logic on HiveSQLException
## What changes were proposed in this pull request?

[SPARK-5100](343d3bfafd) added Spark Thrift Server(STS) UI and the following logic to handle exceptions on case `Throwable`.

```scala
HiveThriftServer2.listener.onStatementError(
  statementId, e.getMessage, SparkUtils.exceptionString(e))
```

However, there occurred a missed case after implementing [SPARK-6964](eb19d3f75c)'s `Support Cancellation in the Thrift Server` by adding case `HiveSQLException` before case `Throwable`.

```scala
case e: HiveSQLException =>
  if (getStatus().getState() == OperationState.CANCELED) {
    return
  } else {
    setState(OperationState.ERROR)
    throw e
  }
  // Actually do need to catch Throwable as some failures don't inherit from Exception and
  // HiveServer will silently swallow them.
case e: Throwable =>
  val currentState = getStatus().getState()
  logError(s"Error executing query, currentState $currentState, ", e)
  setState(OperationState.ERROR)
  HiveThriftServer2.listener.onStatementError(
    statementId, e.getMessage, SparkUtils.exceptionString(e))
  throw new HiveSQLException(e.toString)
```

Logically, we had better add `HiveThriftServer2.listener.onStatementError` on case `HiveSQLException`, too.

## How was this patch tested?

N/A

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #17643 from dongjoon-hyun/SPARK-20345.
2017-06-12 14:05:03 -07:00
aokolnychyi ca4e960aec [SPARK-17914][SQL] Fix parsing of timestamp strings with nanoseconds
The PR contains a tiny change to fix the way Spark parses string literals into timestamps. Currently, some timestamps that contain nanoseconds are corrupted during the conversion from internal UTF8Strings into the internal representation of timestamps.

Consider the following example:
```
spark.sql("SELECT cast('2015-01-02 00:00:00.000000001' as TIMESTAMP)").show(false)
+------------------------------------------------+
|CAST(2015-01-02 00:00:00.000000001 AS TIMESTAMP)|
+------------------------------------------------+
|2015-01-02 00:00:00.000001                      |
+------------------------------------------------+
```

The fix was tested with existing tests. Also, there is a new test to cover cases that did not work previously.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #18252 from aokolnychyi/spark-17914.
2017-06-12 13:06:14 -07:00
Wenchen Fan 22dd65f58e [SPARK-21046][SQL] simplify the array offset and length in ColumnVector
## What changes were proposed in this pull request?

Currently when a `ColumnVector` stores array type elements, we will use 2 arrays for lengths and offsets and implement them individually in on-heap and off-heap column vector.

In this PR, we use one array to represent both offsets and lengths, so that we can treat it as `ColumnVector` and all the logic can go to the base class `ColumnVector`

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #18260 from cloud-fan/put.
2017-06-13 00:12:34 +08:00
Dongjoon Hyun a92e095e70 [SPARK-21041][SQL] SparkSession.range should be consistent with SparkContext.range
## What changes were proposed in this pull request?

This PR fixes the inconsistency in `SparkSession.range`.

**BEFORE**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array(9223372036854775804, 9223372036854775805, 9223372036854775806)
```

**AFTER**
```scala
scala> spark.range(java.lang.Long.MAX_VALUE - 3, java.lang.Long.MIN_VALUE + 2, 1).collect
res2: Array[Long] = Array()
```

## How was this patch tested?

Pass the Jenkins with newly added test cases.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18257 from dongjoon-hyun/SPARK-21041.
2017-06-12 20:58:27 +08:00
Ziyue Huang e6eb02df15 [DOCS] Fix error: ambiguous reference to overloaded definition
## What changes were proposed in this pull request?

`df.groupBy.count()` should be `df.groupBy().count()` , otherwise there is an error :

ambiguous reference to overloaded definition, both method groupBy in class Dataset of type (col1: String, cols: String*) and method groupBy in class Dataset of type (cols: org.apache.spark.sql.Column*)

## How was this patch tested?

```scala
val df = spark.readStream.schema(...).json(...)
val dfCounts = df.groupBy().count()
```

Author: Ziyue Huang <zyhuang94@gmail.com>

Closes #18272 from ZiyueHuang/master.
2017-06-12 10:59:33 +01:00
liuxian d140918093 [SPARK-20665][SQL][FOLLOW-UP] Move test case to MathExpressionsSuite
## What changes were proposed in this pull request?

 add test case to MathExpressionsSuite as #17906

## How was this patch tested?

unit test cases

Author: liuxian <liu.xian3@zte.com.cn>

Closes #18082 from 10110346/wip-lx-0524.
2017-06-11 22:29:09 -07:00
Josh Rosen 3476390c6e [SPARK-20715] Store MapStatuses only in MapOutputTracker, not ShuffleMapStage
## What changes were proposed in this pull request?

This PR refactors `ShuffleMapStage` and `MapOutputTracker` in order to simplify the management of `MapStatuses`, reduce driver memory consumption, and remove a potential source of scheduler correctness bugs.

### Background

In Spark there are currently two places where MapStatuses are tracked:

- The `MapOutputTracker` maintains an `Array[MapStatus]` storing a single location for each map output. This mapping is used by the `DAGScheduler` for determining reduce-task locality preferences (when locality-aware reduce task scheduling is enabled) and is also used to serve map output locations to executors / tasks.
- Each `ShuffleMapStage` also contains a mapping of `Array[List[MapStatus]]` which holds the complete set of locations where each map output could be available. This mapping is used to determine which map tasks need to be run when constructing `TaskSets` for the stage.

This duplication adds complexity and creates the potential for certain types of correctness bugs.  Bad things can happen if these two copies of the map output locations get out of sync. For instance, if the `MapOutputTracker` is missing locations for a map output but `ShuffleMapStage` believes that locations are available then tasks will fail with `MetadataFetchFailedException` but `ShuffleMapStage` will not be updated to reflect the missing map outputs, leading to situations where the stage will be reattempted (because downstream stages experienced fetch failures) but no task sets will be launched (because `ShuffleMapStage` thinks all maps are available).

I observed this behavior in a real-world deployment. I'm still not quite sure how the state got out of sync in the first place, but we can completely avoid this class of bug if we eliminate the duplicate state.

### Why we only need to track a single location for each map output

I think that storing an `Array[List[MapStatus]]` in `ShuffleMapStage` is unnecessary.

First, note that this adds memory/object bloat to the driver we need one extra `List` per task. If you have millions of tasks across all stages then this can add up to be a significant amount of resources.

Secondly, I believe that it's extremely uncommon that these lists will ever contain more than one entry. It's not impossible, but is very unlikely given the conditions which must occur for that to happen:

- In normal operation (no task failures) we'll only run each task once and thus will have at most one output.
- If speculation is enabled then it's possible that we'll have multiple attempts of a task. The TaskSetManager will [kill duplicate attempts of a task](04901dd03a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (L717)) after a task finishes successfully, reducing the likelihood that both the original and speculated task will successfully register map outputs.
- There is a [comment in `TaskSetManager`](04901dd03a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala (L113)) which suggests that running tasks are not killed if a task set becomes a zombie. However:
  - If the task set becomes a zombie due to the job being cancelled then it doesn't matter whether we record map outputs.
  - If the task set became a zombie because of a stage failure (e.g. the map stage itself had a fetch failure from an upstream match stage) then I believe that the "failedEpoch" will be updated which may cause map outputs from still-running tasks to [be ignored](04901dd03a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala (L1213)). (I'm not 100% sure on this point, though).
- Even if you _do_ manage to record multiple map outputs for a stage, only a single map output is reported to / tracked by the MapOutputTracker. The only situation where the additional output locations could actually be read or used would be if a task experienced a `FetchFailure` exception. The most likely cause of a `FetchFailure` exception is an executor lost, which will have most likely caused the loss of several map tasks' output, so saving on potential re-execution of a single map task isn't a huge win if we're going to have to recompute several other lost map outputs from other tasks which ran on that lost executor. Also note that the re-population of MapOutputTracker state from state in the ShuffleMapTask only happens after the reduce stage has failed; the additional location doesn't help to prevent FetchFailures but, instead, can only reduce the amount of work when recomputing missing parent stages.

Given this, this patch chooses to do away with tracking multiple locations for map outputs and instead stores only a single location. This change removes the main distinction between the `ShuffleMapTask` and `MapOutputTracker`'s copies of this state, paving the way for storing it only in the `MapOutputTracker`.

### Overview of other changes

- Significantly simplified the cache / lock management inside of the `MapOutputTrackerMaster`:
  - The old code had several parallel `HashMap`s which had to be guarded by maps of `Object`s which were used as locks. This code was somewhat complicated to follow.
  - The new code uses a new `ShuffleStatus` class to group together all of the state associated with a particular shuffle, including cached serialized map statuses, significantly simplifying the logic.
- Moved more code out of the shared `MapOutputTracker` abstract base class and into the `MapOutputTrackerMaster` and `MapOutputTrackerWorker` subclasses. This makes it easier to reason about which functionality needs to be supported only on the driver or executor.
- Removed a bunch of code from the `DAGScheduler` which was used to synchronize information from the `MapOutputTracker` to `ShuffleMapStage`.
- Added comments to clarify the role of `MapOutputTrackerMaster`'s `epoch` in invalidating executor-side shuffle map output caches.

I will comment on these changes via inline GitHub review comments.

/cc hvanhovell and rxin (whom I discussed this with offline), tgravescs (who recently worked on caching of serialized MapOutputStatuses), and kayousterhout and markhamstra (for scheduler changes).

## How was this patch tested?

Existing tests. I purposely avoided making interface / API which would require significant updates or modifications to test code.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #17955 from JoshRosen/map-output-tracker-rewrite.
2017-06-11 18:34:12 -07:00
Michal Senkyr f48273c13c [SPARK-18891][SQL] Support for specific Java List subtypes
## What changes were proposed in this pull request?

Add support for specific Java `List` subtypes in deserialization as well as a generic implicit encoder.

All `List` subtypes are supported by using either the size-specifying constructor (one `int` parameter) or the default constructor.

Interfaces/abstract classes use the following implementations:

* `java.util.List`, `java.util.AbstractList` or `java.util.AbstractSequentialList` => `java.util.ArrayList`

## How was this patch tested?

```bash
build/mvn -DskipTests clean package && dev/run-tests
```

Additionally in Spark shell:

```
scala> val jlist = new java.util.LinkedList[Int]; jlist.add(1)
jlist: java.util.LinkedList[Int] = [1]
res0: Boolean = true

scala> Seq(jlist).toDS().map(_.element()).collect()
res1: Array[Int] = Array(1)
```

Author: Michal Senkyr <mike.senkyr@gmail.com>

Closes #18009 from michalsenkyr/dataset-java-lists.
2017-06-12 08:53:23 +08:00
Michal Senkyr 0538f3b0ae [SPARK-18891][SQL] Support for Scala Map collection types
## What changes were proposed in this pull request?

Add support for arbitrary Scala `Map` types in deserialization as well as a generic implicit encoder.

Used the builder approach as in #16541 to construct any provided `Map` type upon deserialization.

Please note that this PR also adds (ignored) tests for issue [SPARK-19104 CompileException with Map and Case Class in Spark 2.1.0](https://issues.apache.org/jira/browse/SPARK-19104) but doesn't solve it.

Added support for Java Maps in codegen code (encoders will be added in a different PR) with the following default implementations for interfaces/abstract classes:

* `java.util.Map`, `java.util.AbstractMap` => `java.util.HashMap`
* `java.util.SortedMap`, `java.util.NavigableMap` => `java.util.TreeMap`
* `java.util.concurrent.ConcurrentMap` => `java.util.concurrent.ConcurrentHashMap`
* `java.util.concurrent.ConcurrentNavigableMap` => `java.util.concurrent.ConcurrentSkipListMap`

Resulting codegen for `Seq(Map(1 -> 2)).toDS().map(identity).queryExecution.debug.codegen`:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjectsToMap_loopIsNull1;
/* 010 */   private int CollectObjectsToMap_loopValue0;
/* 011 */   private boolean CollectObjectsToMap_loopIsNull3;
/* 012 */   private int CollectObjectsToMap_loopValue2;
/* 013 */   private UnsafeRow deserializetoobject_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 016 */   private scala.collection.immutable.Map mapelements_argValue;
/* 017 */   private UnsafeRow mapelements_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 020 */   private UnsafeRow serializefromobject_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1;
/* 025 */
/* 026 */   public GeneratedIterator(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */     wholestagecodegen_init_0();
/* 034 */     wholestagecodegen_init_1();
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   private void wholestagecodegen_init_0() {
/* 039 */     inputadapter_input = inputs[0];
/* 040 */
/* 041 */     deserializetoobject_result = new UnsafeRow(1);
/* 042 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
/* 043 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 044 */
/* 045 */     mapelements_result = new UnsafeRow(1);
/* 046 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
/* 047 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 048 */     serializefromobject_result = new UnsafeRow(1);
/* 049 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
/* 050 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 051 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 052 */
/* 053 */   }
/* 054 */
/* 055 */   private void wholestagecodegen_init_1() {
/* 056 */     this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 057 */
/* 058 */   }
/* 059 */
/* 060 */   protected void processNext() throws java.io.IOException {
/* 061 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 062 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 063 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 064 */       MapData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getMap(0));
/* 065 */
/* 066 */       boolean deserializetoobject_isNull1 = true;
/* 067 */       ArrayData deserializetoobject_value1 = null;
/* 068 */       if (!inputadapter_isNull) {
/* 069 */         deserializetoobject_isNull1 = false;
/* 070 */         if (!deserializetoobject_isNull1) {
/* 071 */           Object deserializetoobject_funcResult = null;
/* 072 */           deserializetoobject_funcResult = inputadapter_value.keyArray();
/* 073 */           if (deserializetoobject_funcResult == null) {
/* 074 */             deserializetoobject_isNull1 = true;
/* 075 */           } else {
/* 076 */             deserializetoobject_value1 = (ArrayData) deserializetoobject_funcResult;
/* 077 */           }
/* 078 */
/* 079 */         }
/* 080 */         deserializetoobject_isNull1 = deserializetoobject_value1 == null;
/* 081 */       }
/* 082 */
/* 083 */       boolean deserializetoobject_isNull3 = true;
/* 084 */       ArrayData deserializetoobject_value3 = null;
/* 085 */       if (!inputadapter_isNull) {
/* 086 */         deserializetoobject_isNull3 = false;
/* 087 */         if (!deserializetoobject_isNull3) {
/* 088 */           Object deserializetoobject_funcResult1 = null;
/* 089 */           deserializetoobject_funcResult1 = inputadapter_value.valueArray();
/* 090 */           if (deserializetoobject_funcResult1 == null) {
/* 091 */             deserializetoobject_isNull3 = true;
/* 092 */           } else {
/* 093 */             deserializetoobject_value3 = (ArrayData) deserializetoobject_funcResult1;
/* 094 */           }
/* 095 */
/* 096 */         }
/* 097 */         deserializetoobject_isNull3 = deserializetoobject_value3 == null;
/* 098 */       }
/* 099 */       scala.collection.immutable.Map deserializetoobject_value = null;
/* 100 */
/* 101 */       if ((deserializetoobject_isNull1 && !deserializetoobject_isNull3) ||
/* 102 */         (!deserializetoobject_isNull1 && deserializetoobject_isNull3)) {
/* 103 */         throw new RuntimeException("Invalid state: Inconsistent nullability of key-value");
/* 104 */       }
/* 105 */
/* 106 */       if (!deserializetoobject_isNull1) {
/* 107 */         if (deserializetoobject_value1.numElements() != deserializetoobject_value3.numElements()) {
/* 108 */           throw new RuntimeException("Invalid state: Inconsistent lengths of key-value arrays");
/* 109 */         }
/* 110 */         int deserializetoobject_dataLength = deserializetoobject_value1.numElements();
/* 111 */
/* 112 */         scala.collection.mutable.Builder CollectObjectsToMap_builderValue5 = scala.collection.immutable.Map$.MODULE$.newBuilder();
/* 113 */         CollectObjectsToMap_builderValue5.sizeHint(deserializetoobject_dataLength);
/* 114 */
/* 115 */         int deserializetoobject_loopIndex = 0;
/* 116 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 117 */           CollectObjectsToMap_loopValue0 = (int) (deserializetoobject_value1.getInt(deserializetoobject_loopIndex));
/* 118 */           CollectObjectsToMap_loopValue2 = (int) (deserializetoobject_value3.getInt(deserializetoobject_loopIndex));
/* 119 */           CollectObjectsToMap_loopIsNull1 = deserializetoobject_value1.isNullAt(deserializetoobject_loopIndex);
/* 120 */           CollectObjectsToMap_loopIsNull3 = deserializetoobject_value3.isNullAt(deserializetoobject_loopIndex);
/* 121 */
/* 122 */           if (CollectObjectsToMap_loopIsNull1) {
/* 123 */             throw new RuntimeException("Found null in map key!");
/* 124 */           }
/* 125 */
/* 126 */           scala.Tuple2 CollectObjectsToMap_loopValue4;
/* 127 */
/* 128 */           if (CollectObjectsToMap_loopIsNull3) {
/* 129 */             CollectObjectsToMap_loopValue4 = new scala.Tuple2(CollectObjectsToMap_loopValue0, null);
/* 130 */           } else {
/* 131 */             CollectObjectsToMap_loopValue4 = new scala.Tuple2(CollectObjectsToMap_loopValue0, CollectObjectsToMap_loopValue2);
/* 132 */           }
/* 133 */
/* 134 */           CollectObjectsToMap_builderValue5.$plus$eq(CollectObjectsToMap_loopValue4);
/* 135 */
/* 136 */           deserializetoobject_loopIndex += 1;
/* 137 */         }
/* 138 */
/* 139 */         deserializetoobject_value = (scala.collection.immutable.Map) CollectObjectsToMap_builderValue5.result();
/* 140 */       }
/* 141 */
/* 142 */       boolean mapelements_isNull = true;
/* 143 */       scala.collection.immutable.Map mapelements_value = null;
/* 144 */       if (!false) {
/* 145 */         mapelements_argValue = deserializetoobject_value;
/* 146 */
/* 147 */         mapelements_isNull = false;
/* 148 */         if (!mapelements_isNull) {
/* 149 */           Object mapelements_funcResult = null;
/* 150 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 151 */           if (mapelements_funcResult == null) {
/* 152 */             mapelements_isNull = true;
/* 153 */           } else {
/* 154 */             mapelements_value = (scala.collection.immutable.Map) mapelements_funcResult;
/* 155 */           }
/* 156 */
/* 157 */         }
/* 158 */         mapelements_isNull = mapelements_value == null;
/* 159 */       }
/* 160 */
/* 161 */       MapData serializefromobject_value = null;
/* 162 */       if (!mapelements_isNull) {
/* 163 */         final int serializefromobject_length = mapelements_value.size();
/* 164 */         final Object[] serializefromobject_convertedKeys = new Object[serializefromobject_length];
/* 165 */         final Object[] serializefromobject_convertedValues = new Object[serializefromobject_length];
/* 166 */         int serializefromobject_index = 0;
/* 167 */         final scala.collection.Iterator serializefromobject_entries = mapelements_value.iterator();
/* 168 */         while(serializefromobject_entries.hasNext()) {
/* 169 */           final scala.Tuple2 serializefromobject_entry = (scala.Tuple2) serializefromobject_entries.next();
/* 170 */           int ExternalMapToCatalyst_key1 = (Integer) serializefromobject_entry._1();
/* 171 */           int ExternalMapToCatalyst_value1 = (Integer) serializefromobject_entry._2();
/* 172 */
/* 173 */           boolean ExternalMapToCatalyst_value_isNull1 = false;
/* 174 */
/* 175 */           if (false) {
/* 176 */             throw new RuntimeException("Cannot use null as map key!");
/* 177 */           } else {
/* 178 */             serializefromobject_convertedKeys[serializefromobject_index] = (Integer) ExternalMapToCatalyst_key1;
/* 179 */           }
/* 180 */
/* 181 */           if (false) {
/* 182 */             serializefromobject_convertedValues[serializefromobject_index] = null;
/* 183 */           } else {
/* 184 */             serializefromobject_convertedValues[serializefromobject_index] = (Integer) ExternalMapToCatalyst_value1;
/* 185 */           }
/* 186 */
/* 187 */           serializefromobject_index++;
/* 188 */         }
/* 189 */
/* 190 */         serializefromobject_value = new org.apache.spark.sql.catalyst.util.ArrayBasedMapData(new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedKeys), new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedValues));
/* 191 */       }
/* 192 */       serializefromobject_holder.reset();
/* 193 */
/* 194 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 195 */
/* 196 */       if (mapelements_isNull) {
/* 197 */         serializefromobject_rowWriter.setNullAt(0);
/* 198 */       } else {
/* 199 */         // Remember the current cursor so that we can calculate how many bytes are
/* 200 */         // written later.
/* 201 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 202 */
/* 203 */         if (serializefromobject_value instanceof UnsafeMapData) {
/* 204 */           final int serializefromobject_sizeInBytes = ((UnsafeMapData) serializefromobject_value).getSizeInBytes();
/* 205 */           // grow the global buffer before writing data.
/* 206 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 207 */           ((UnsafeMapData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 208 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 209 */
/* 210 */         } else {
/* 211 */           final ArrayData serializefromobject_keys = serializefromobject_value.keyArray();
/* 212 */           final ArrayData serializefromobject_values = serializefromobject_value.valueArray();
/* 213 */
/* 214 */           // preserve 8 bytes to write the key array numBytes later.
/* 215 */           serializefromobject_holder.grow(8);
/* 216 */           serializefromobject_holder.cursor += 8;
/* 217 */
/* 218 */           // Remember the current cursor so that we can write numBytes of key array later.
/* 219 */           final int serializefromobject_tmpCursor1 = serializefromobject_holder.cursor;
/* 220 */
/* 221 */           if (serializefromobject_keys instanceof UnsafeArrayData) {
/* 222 */             final int serializefromobject_sizeInBytes1 = ((UnsafeArrayData) serializefromobject_keys).getSizeInBytes();
/* 223 */             // grow the global buffer before writing data.
/* 224 */             serializefromobject_holder.grow(serializefromobject_sizeInBytes1);
/* 225 */             ((UnsafeArrayData) serializefromobject_keys).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 226 */             serializefromobject_holder.cursor += serializefromobject_sizeInBytes1;
/* 227 */
/* 228 */           } else {
/* 229 */             final int serializefromobject_numElements = serializefromobject_keys.numElements();
/* 230 */             serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 231 */
/* 232 */             for (int serializefromobject_index1 = 0; serializefromobject_index1 < serializefromobject_numElements; serializefromobject_index1++) {
/* 233 */               if (serializefromobject_keys.isNullAt(serializefromobject_index1)) {
/* 234 */                 serializefromobject_arrayWriter.setNullInt(serializefromobject_index1);
/* 235 */               } else {
/* 236 */                 final int serializefromobject_element = serializefromobject_keys.getInt(serializefromobject_index1);
/* 237 */                 serializefromobject_arrayWriter.write(serializefromobject_index1, serializefromobject_element);
/* 238 */               }
/* 239 */             }
/* 240 */           }
/* 241 */
/* 242 */           // Write the numBytes of key array into the first 8 bytes.
/* 243 */           Platform.putLong(serializefromobject_holder.buffer, serializefromobject_tmpCursor1 - 8, serializefromobject_holder.cursor - serializefromobject_tmpCursor1);
/* 244 */
/* 245 */           if (serializefromobject_values instanceof UnsafeArrayData) {
/* 246 */             final int serializefromobject_sizeInBytes2 = ((UnsafeArrayData) serializefromobject_values).getSizeInBytes();
/* 247 */             // grow the global buffer before writing data.
/* 248 */             serializefromobject_holder.grow(serializefromobject_sizeInBytes2);
/* 249 */             ((UnsafeArrayData) serializefromobject_values).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 250 */             serializefromobject_holder.cursor += serializefromobject_sizeInBytes2;
/* 251 */
/* 252 */           } else {
/* 253 */             final int serializefromobject_numElements1 = serializefromobject_values.numElements();
/* 254 */             serializefromobject_arrayWriter1.initialize(serializefromobject_holder, serializefromobject_numElements1, 4);
/* 255 */
/* 256 */             for (int serializefromobject_index2 = 0; serializefromobject_index2 < serializefromobject_numElements1; serializefromobject_index2++) {
/* 257 */               if (serializefromobject_values.isNullAt(serializefromobject_index2)) {
/* 258 */                 serializefromobject_arrayWriter1.setNullInt(serializefromobject_index2);
/* 259 */               } else {
/* 260 */                 final int serializefromobject_element1 = serializefromobject_values.getInt(serializefromobject_index2);
/* 261 */                 serializefromobject_arrayWriter1.write(serializefromobject_index2, serializefromobject_element1);
/* 262 */               }
/* 263 */             }
/* 264 */           }
/* 265 */
/* 266 */         }
/* 267 */
/* 268 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 269 */       }
/* 270 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 271 */       append(serializefromobject_result);
/* 272 */       if (shouldStop()) return;
/* 273 */     }
/* 274 */   }
/* 275 */ }
```

Codegen for `java.util.Map`:

```
/* 001 */ public Object generate(Object[] references) {
/* 002 */   return new GeneratedIterator(references);
/* 003 */ }
/* 004 */
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private boolean CollectObjectsToMap_loopIsNull1;
/* 010 */   private int CollectObjectsToMap_loopValue0;
/* 011 */   private boolean CollectObjectsToMap_loopIsNull3;
/* 012 */   private int CollectObjectsToMap_loopValue2;
/* 013 */   private UnsafeRow deserializetoobject_result;
/* 014 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder deserializetoobject_holder;
/* 015 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter deserializetoobject_rowWriter;
/* 016 */   private java.util.HashMap mapelements_argValue;
/* 017 */   private UnsafeRow mapelements_result;
/* 018 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder mapelements_holder;
/* 019 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter mapelements_rowWriter;
/* 020 */   private UnsafeRow serializefromobject_result;
/* 021 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 022 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 023 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter;
/* 024 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter serializefromobject_arrayWriter1;
/* 025 */
/* 026 */   public GeneratedIterator(Object[] references) {
/* 027 */     this.references = references;
/* 028 */   }
/* 029 */
/* 030 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 031 */     partitionIndex = index;
/* 032 */     this.inputs = inputs;
/* 033 */     wholestagecodegen_init_0();
/* 034 */     wholestagecodegen_init_1();
/* 035 */
/* 036 */   }
/* 037 */
/* 038 */   private void wholestagecodegen_init_0() {
/* 039 */     inputadapter_input = inputs[0];
/* 040 */
/* 041 */     deserializetoobject_result = new UnsafeRow(1);
/* 042 */     this.deserializetoobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result, 32);
/* 043 */     this.deserializetoobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder, 1);
/* 044 */
/* 045 */     mapelements_result = new UnsafeRow(1);
/* 046 */     this.mapelements_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result, 32);
/* 047 */     this.mapelements_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder, 1);
/* 048 */     serializefromobject_result = new UnsafeRow(1);
/* 049 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 32);
/* 050 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 051 */     this.serializefromobject_arrayWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 052 */
/* 053 */   }
/* 054 */
/* 055 */   private void wholestagecodegen_init_1() {
/* 056 */     this.serializefromobject_arrayWriter1 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeArrayWriter();
/* 057 */
/* 058 */   }
/* 059 */
/* 060 */   protected void processNext() throws java.io.IOException {
/* 061 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 062 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 063 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 064 */       MapData inputadapter_value = inputadapter_isNull ? null : (inputadapter_row.getMap(0));
/* 065 */
/* 066 */       boolean deserializetoobject_isNull1 = true;
/* 067 */       ArrayData deserializetoobject_value1 = null;
/* 068 */       if (!inputadapter_isNull) {
/* 069 */         deserializetoobject_isNull1 = false;
/* 070 */         if (!deserializetoobject_isNull1) {
/* 071 */           Object deserializetoobject_funcResult = null;
/* 072 */           deserializetoobject_funcResult = inputadapter_value.keyArray();
/* 073 */           if (deserializetoobject_funcResult == null) {
/* 074 */             deserializetoobject_isNull1 = true;
/* 075 */           } else {
/* 076 */             deserializetoobject_value1 = (ArrayData) deserializetoobject_funcResult;
/* 077 */           }
/* 078 */
/* 079 */         }
/* 080 */         deserializetoobject_isNull1 = deserializetoobject_value1 == null;
/* 081 */       }
/* 082 */
/* 083 */       boolean deserializetoobject_isNull3 = true;
/* 084 */       ArrayData deserializetoobject_value3 = null;
/* 085 */       if (!inputadapter_isNull) {
/* 086 */         deserializetoobject_isNull3 = false;
/* 087 */         if (!deserializetoobject_isNull3) {
/* 088 */           Object deserializetoobject_funcResult1 = null;
/* 089 */           deserializetoobject_funcResult1 = inputadapter_value.valueArray();
/* 090 */           if (deserializetoobject_funcResult1 == null) {
/* 091 */             deserializetoobject_isNull3 = true;
/* 092 */           } else {
/* 093 */             deserializetoobject_value3 = (ArrayData) deserializetoobject_funcResult1;
/* 094 */           }
/* 095 */
/* 096 */         }
/* 097 */         deserializetoobject_isNull3 = deserializetoobject_value3 == null;
/* 098 */       }
/* 099 */       java.util.HashMap deserializetoobject_value = null;
/* 100 */
/* 101 */       if ((deserializetoobject_isNull1 && !deserializetoobject_isNull3) ||
/* 102 */         (!deserializetoobject_isNull1 && deserializetoobject_isNull3)) {
/* 103 */         throw new RuntimeException("Invalid state: Inconsistent nullability of key-value");
/* 104 */       }
/* 105 */
/* 106 */       if (!deserializetoobject_isNull1) {
/* 107 */         if (deserializetoobject_value1.numElements() != deserializetoobject_value3.numElements()) {
/* 108 */           throw new RuntimeException("Invalid state: Inconsistent lengths of key-value arrays");
/* 109 */         }
/* 110 */         int deserializetoobject_dataLength = deserializetoobject_value1.numElements();
/* 111 */         java.util.Map CollectObjectsToMap_builderValue5 = new java.util.HashMap(deserializetoobject_dataLength);
/* 112 */
/* 113 */         int deserializetoobject_loopIndex = 0;
/* 114 */         while (deserializetoobject_loopIndex < deserializetoobject_dataLength) {
/* 115 */           CollectObjectsToMap_loopValue0 = (int) (deserializetoobject_value1.getInt(deserializetoobject_loopIndex));
/* 116 */           CollectObjectsToMap_loopValue2 = (int) (deserializetoobject_value3.getInt(deserializetoobject_loopIndex));
/* 117 */           CollectObjectsToMap_loopIsNull1 = deserializetoobject_value1.isNullAt(deserializetoobject_loopIndex);
/* 118 */           CollectObjectsToMap_loopIsNull3 = deserializetoobject_value3.isNullAt(deserializetoobject_loopIndex);
/* 119 */
/* 120 */           if (CollectObjectsToMap_loopIsNull1) {
/* 121 */             throw new RuntimeException("Found null in map key!");
/* 122 */           }
/* 123 */
/* 124 */           CollectObjectsToMap_builderValue5.put(CollectObjectsToMap_loopValue0, CollectObjectsToMap_loopValue2);
/* 125 */
/* 126 */           deserializetoobject_loopIndex += 1;
/* 127 */         }
/* 128 */
/* 129 */         deserializetoobject_value = (java.util.HashMap) CollectObjectsToMap_builderValue5;
/* 130 */       }
/* 131 */
/* 132 */       boolean mapelements_isNull = true;
/* 133 */       java.util.HashMap mapelements_value = null;
/* 134 */       if (!false) {
/* 135 */         mapelements_argValue = deserializetoobject_value;
/* 136 */
/* 137 */         mapelements_isNull = false;
/* 138 */         if (!mapelements_isNull) {
/* 139 */           Object mapelements_funcResult = null;
/* 140 */           mapelements_funcResult = ((scala.Function1) references[0]).apply(mapelements_argValue);
/* 141 */           if (mapelements_funcResult == null) {
/* 142 */             mapelements_isNull = true;
/* 143 */           } else {
/* 144 */             mapelements_value = (java.util.HashMap) mapelements_funcResult;
/* 145 */           }
/* 146 */
/* 147 */         }
/* 148 */         mapelements_isNull = mapelements_value == null;
/* 149 */       }
/* 150 */
/* 151 */       MapData serializefromobject_value = null;
/* 152 */       if (!mapelements_isNull) {
/* 153 */         final int serializefromobject_length = mapelements_value.size();
/* 154 */         final Object[] serializefromobject_convertedKeys = new Object[serializefromobject_length];
/* 155 */         final Object[] serializefromobject_convertedValues = new Object[serializefromobject_length];
/* 156 */         int serializefromobject_index = 0;
/* 157 */         final java.util.Iterator serializefromobject_entries = mapelements_value.entrySet().iterator();
/* 158 */         while(serializefromobject_entries.hasNext()) {
/* 159 */           final java.util.Map$Entry serializefromobject_entry = (java.util.Map$Entry) serializefromobject_entries.next();
/* 160 */           int ExternalMapToCatalyst_key1 = (Integer) serializefromobject_entry.getKey();
/* 161 */           int ExternalMapToCatalyst_value1 = (Integer) serializefromobject_entry.getValue();
/* 162 */
/* 163 */           boolean ExternalMapToCatalyst_value_isNull1 = false;
/* 164 */
/* 165 */           if (false) {
/* 166 */             throw new RuntimeException("Cannot use null as map key!");
/* 167 */           } else {
/* 168 */             serializefromobject_convertedKeys[serializefromobject_index] = (Integer) ExternalMapToCatalyst_key1;
/* 169 */           }
/* 170 */
/* 171 */           if (false) {
/* 172 */             serializefromobject_convertedValues[serializefromobject_index] = null;
/* 173 */           } else {
/* 174 */             serializefromobject_convertedValues[serializefromobject_index] = (Integer) ExternalMapToCatalyst_value1;
/* 175 */           }
/* 176 */
/* 177 */           serializefromobject_index++;
/* 178 */         }
/* 179 */
/* 180 */         serializefromobject_value = new org.apache.spark.sql.catalyst.util.ArrayBasedMapData(new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedKeys), new org.apache.spark.sql.catalyst.util.GenericArrayData(serializefromobject_convertedValues));
/* 181 */       }
/* 182 */       serializefromobject_holder.reset();
/* 183 */
/* 184 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 185 */
/* 186 */       if (mapelements_isNull) {
/* 187 */         serializefromobject_rowWriter.setNullAt(0);
/* 188 */       } else {
/* 189 */         // Remember the current cursor so that we can calculate how many bytes are
/* 190 */         // written later.
/* 191 */         final int serializefromobject_tmpCursor = serializefromobject_holder.cursor;
/* 192 */
/* 193 */         if (serializefromobject_value instanceof UnsafeMapData) {
/* 194 */           final int serializefromobject_sizeInBytes = ((UnsafeMapData) serializefromobject_value).getSizeInBytes();
/* 195 */           // grow the global buffer before writing data.
/* 196 */           serializefromobject_holder.grow(serializefromobject_sizeInBytes);
/* 197 */           ((UnsafeMapData) serializefromobject_value).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 198 */           serializefromobject_holder.cursor += serializefromobject_sizeInBytes;
/* 199 */
/* 200 */         } else {
/* 201 */           final ArrayData serializefromobject_keys = serializefromobject_value.keyArray();
/* 202 */           final ArrayData serializefromobject_values = serializefromobject_value.valueArray();
/* 203 */
/* 204 */           // preserve 8 bytes to write the key array numBytes later.
/* 205 */           serializefromobject_holder.grow(8);
/* 206 */           serializefromobject_holder.cursor += 8;
/* 207 */
/* 208 */           // Remember the current cursor so that we can write numBytes of key array later.
/* 209 */           final int serializefromobject_tmpCursor1 = serializefromobject_holder.cursor;
/* 210 */
/* 211 */           if (serializefromobject_keys instanceof UnsafeArrayData) {
/* 212 */             final int serializefromobject_sizeInBytes1 = ((UnsafeArrayData) serializefromobject_keys).getSizeInBytes();
/* 213 */             // grow the global buffer before writing data.
/* 214 */             serializefromobject_holder.grow(serializefromobject_sizeInBytes1);
/* 215 */             ((UnsafeArrayData) serializefromobject_keys).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 216 */             serializefromobject_holder.cursor += serializefromobject_sizeInBytes1;
/* 217 */
/* 218 */           } else {
/* 219 */             final int serializefromobject_numElements = serializefromobject_keys.numElements();
/* 220 */             serializefromobject_arrayWriter.initialize(serializefromobject_holder, serializefromobject_numElements, 4);
/* 221 */
/* 222 */             for (int serializefromobject_index1 = 0; serializefromobject_index1 < serializefromobject_numElements; serializefromobject_index1++) {
/* 223 */               if (serializefromobject_keys.isNullAt(serializefromobject_index1)) {
/* 224 */                 serializefromobject_arrayWriter.setNullInt(serializefromobject_index1);
/* 225 */               } else {
/* 226 */                 final int serializefromobject_element = serializefromobject_keys.getInt(serializefromobject_index1);
/* 227 */                 serializefromobject_arrayWriter.write(serializefromobject_index1, serializefromobject_element);
/* 228 */               }
/* 229 */             }
/* 230 */           }
/* 231 */
/* 232 */           // Write the numBytes of key array into the first 8 bytes.
/* 233 */           Platform.putLong(serializefromobject_holder.buffer, serializefromobject_tmpCursor1 - 8, serializefromobject_holder.cursor - serializefromobject_tmpCursor1);
/* 234 */
/* 235 */           if (serializefromobject_values instanceof UnsafeArrayData) {
/* 236 */             final int serializefromobject_sizeInBytes2 = ((UnsafeArrayData) serializefromobject_values).getSizeInBytes();
/* 237 */             // grow the global buffer before writing data.
/* 238 */             serializefromobject_holder.grow(serializefromobject_sizeInBytes2);
/* 239 */             ((UnsafeArrayData) serializefromobject_values).writeToMemory(serializefromobject_holder.buffer, serializefromobject_holder.cursor);
/* 240 */             serializefromobject_holder.cursor += serializefromobject_sizeInBytes2;
/* 241 */
/* 242 */           } else {
/* 243 */             final int serializefromobject_numElements1 = serializefromobject_values.numElements();
/* 244 */             serializefromobject_arrayWriter1.initialize(serializefromobject_holder, serializefromobject_numElements1, 4);
/* 245 */
/* 246 */             for (int serializefromobject_index2 = 0; serializefromobject_index2 < serializefromobject_numElements1; serializefromobject_index2++) {
/* 247 */               if (serializefromobject_values.isNullAt(serializefromobject_index2)) {
/* 248 */                 serializefromobject_arrayWriter1.setNullInt(serializefromobject_index2);
/* 249 */               } else {
/* 250 */                 final int serializefromobject_element1 = serializefromobject_values.getInt(serializefromobject_index2);
/* 251 */                 serializefromobject_arrayWriter1.write(serializefromobject_index2, serializefromobject_element1);
/* 252 */               }
/* 253 */             }
/* 254 */           }
/* 255 */
/* 256 */         }
/* 257 */
/* 258 */         serializefromobject_rowWriter.setOffsetAndSize(0, serializefromobject_tmpCursor, serializefromobject_holder.cursor - serializefromobject_tmpCursor);
/* 259 */       }
/* 260 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 261 */       append(serializefromobject_result);
/* 262 */       if (shouldStop()) return;
/* 263 */     }
/* 264 */   }
/* 265 */ }
```

## How was this patch tested?

```
build/mvn -DskipTests clean package && dev/run-tests
```

Additionally in Spark shell:

```
scala> Seq(collection.mutable.HashMap(1 -> 2, 2 -> 3)).toDS().map(_ += (3 -> 4)).collect()
res0: Array[scala.collection.mutable.HashMap[Int,Int]] = Array(Map(2 -> 3, 1 -> 2, 3 -> 4))
```

Author: Michal Senkyr <mike.senkyr@gmail.com>
Author: Michal Šenkýř <mike.senkyr@gmail.com>

Closes #16986 from michalsenkyr/dataset-map-builder.
2017-06-12 08:47:01 +08:00
Zhenhua Wang a7c61c100b [SPARK-21031][SQL] Add alterTableStats to store spark's stats and let alterTable keep existing stats
## What changes were proposed in this pull request?

Currently, hive's stats are read into `CatalogStatistics`, while spark's stats are also persisted through `CatalogStatistics`. As a result, hive's stats can be unexpectedly propagated into spark' stats.

For example, for a catalog table, we read stats from hive, e.g. "totalSize" and put it into `CatalogStatistics`. Then, by using "ALTER TABLE" command, we will store the stats in `CatalogStatistics` into metastore as spark's stats (because we don't know whether it's from spark or not). But spark's stats should be only generated by "ANALYZE" command. This is unexpected from this command.

Secondly, now that we have spark's stats in metastore, after inserting new data, although hive updated "totalSize" in metastore, we still cannot get the right `sizeInBytes` in `CatalogStatistics`, because we respect spark's stats (should not exist) over hive's stats.

A running example is shown in [JIRA](https://issues.apache.org/jira/browse/SPARK-21031).

To fix this, we add a new method `alterTableStats` to store spark's stats, and let `alterTable` keep existing stats.

## How was this patch tested?

Added new tests.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #18248 from wzhfy/separateHiveStats.
2017-06-12 08:23:04 +08:00
sujithjay 3a840048ed Fixed typo in sql.functions
## What changes were proposed in this pull request?

I fixed a typo in the Scaladoc for the method `def struct(cols: Column*): Column`. 'retained' was misspelt as 'remained'.

## How was this patch tested?
Before:

Creates a new struct column.
   If the input column is a column in a `DataFrame`, or a derived column expression
   that is named (i.e. aliased), its name would be **remained** as the StructField's name,
   otherwise, the newly generated StructField's name would be auto generated as
   `col` with a suffix `index + 1`, i.e. col1, col2, col3, ...

After:

   Creates a new struct column.
   If the input column is a column in a `DataFrame`, or a derived column expression
   that is named (i.e. aliased), its name would be **retained** as the StructField's name,
   otherwise, the newly generated StructField's name would be auto generated as
   `col` with a suffix `index + 1`, i.e. col1, col2, col3, ...

Author: sujithjay <sujith@logistimo.com>

Closes #18254 from sujithjay/fix-typo.
2017-06-11 18:23:57 +01:00
Felix Cheung 9f4ff95524 [SPARK-20877][SPARKR][FOLLOWUP] clean up after test move
## What changes were proposed in this pull request?

clean up after big test move

## How was this patch tested?

unit tests, jenkins

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #18267 from felixcheung/rtestset2.
2017-06-11 03:00:44 -07:00
Yuming Wang 823f1eef58 [SPARK-13933][BUILD] Update hadoop-2.7 profile's curator version to 2.7.1
## What changes were proposed in this pull request?

Update hadoop-2.7 profile's curator version to 2.7.1, more see [SPARK-13933](https://issues.apache.org/jira/browse/SPARK-13933).

## How was this patch tested?

manual tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #18247 from wangyum/SPARK-13933.
2017-06-11 10:05:47 +01:00
hyukjinkwon eb3ea3a083 [SPARK-20935][STREAMING] Always close WriteAheadLog and make it idempotent
## What changes were proposed in this pull request?

This PR proposes to stop `ReceiverTracker` to close `WriteAheadLog` whenever it is and make `WriteAheadLog` and its implementations idempotent.

## How was this patch tested?

Added a test in `WriteAheadLogSuite`. Note that  the added test looks passing even if it closes twice (namely even without the changes in `FileBasedWriteAheadLog` and `BatchedWriteAheadLog`. It looks both are already idempotent but this is a rather sanity check.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18224 from HyukjinKwon/streaming-closing.
2017-06-11 09:54:57 +01:00
Michael Gummelt 8da3f7041a [SPARK-21000][MESOS] Add Mesos labels support to the Spark Dispatcher
## What changes were proposed in this pull request?

Add Mesos labels support to the Spark Dispatcher

## How was this patch tested?

unit tests

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #18220 from mgummelt/SPARK-21000-dispatcher-labels.
2017-06-11 09:49:39 +01:00
Felix Cheung dc4c351837 [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN
## What changes were proposed in this pull request?

Move all existing tests to non-installed directory so that it will never run by installing SparkR package

For a follow-up PR:
- remove all skip_on_cran() calls in tests
- clean up test timer
- improve or change basic tests that do run on CRAN (if anyone has suggestion)

It looks like `R CMD build pkg` will still put pkg\tests (ie. the full tests) into the source package but `R CMD INSTALL` on such source package does not install these tests (and so `R CMD check` does not run them)

## How was this patch tested?

- [x] unit tests, Jenkins
- [x] AppVeyor
- [x] make a source package, install it, `R CMD check` it - verify the full tests are not installed or run

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #18264 from felixcheung/rtestset.
2017-06-11 00:00:33 -07:00
liuxian 5301a19a0e [SPARK-20620][TEST] Improve some unit tests for NullExpressionsSuite and TypeCoercionSuite
## What changes were proposed in this pull request?
add more  datatype for some unit tests

## How was this patch tested?
unit tests

Author: liuxian <liu.xian3@zte.com.cn>

Closes #17880 from 10110346/wip_lx_0506.
2017-06-10 10:42:23 -07:00
Xiao Li 8e96acf71c [SPARK-20211][SQL] Fix the Precision and Scale of Decimal Values when the Input is BigDecimal between -1.0 and 1.0
### What changes were proposed in this pull request?
The precision and scale of decimal values are wrong when the input is BigDecimal between -1.0 and 1.0.

The BigDecimal's precision is the digit count starts from the leftmost nonzero digit based on the [JAVA's BigDecimal definition](https://docs.oracle.com/javase/7/docs/api/java/math/BigDecimal.html). However, our Decimal decision follows the database decimal standard, which is the total number of digits, including both to the left and the right of the decimal point. Thus, this PR is to fix the issue by doing the conversion.

Before this PR, the following queries failed:
```SQL
select 1 > 0.0001
select floor(0.0001)
select ceil(0.0001)
```

### How was this patch tested?
Added test cases.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18244 from gatorsmile/bigdecimal.
2017-06-10 10:28:14 -07:00
Reynold Xin b78e3849b2 [SPARK-21042][SQL] Document Dataset.union is resolution by position
## What changes were proposed in this pull request?
Document Dataset.union is resolution by position, not by name, since this has been a confusing point for a lot of users.

## How was this patch tested?
N/A - doc only change.

Author: Reynold Xin <rxin@databricks.com>

Closes #18256 from rxin/SPARK-21042.
2017-06-09 18:29:33 -07:00
Xiao Li 571635488d [SPARK-20918][SQL] Use FunctionIdentifier as function identifiers in FunctionRegistry
### What changes were proposed in this pull request?
Currently, the unquoted string of a function identifier is being used as the function identifier in the function registry. This could cause the incorrect the behavior when users use `.` in the function names. This PR is to take the `FunctionIdentifier` as the identifier in the function registry.

- Add one new function `createOrReplaceTempFunction` to `FunctionRegistry`
```Scala
final def createOrReplaceTempFunction(name: String, builder: FunctionBuilder): Unit
```

### How was this patch tested?
Add extra test cases to verify the inclusive bug fixes.

Author: Xiao Li <gatorsmile@gmail.com>
Author: gatorsmile <gatorsmile@gmail.com>

Closes #18142 from gatorsmile/fuctionRegistry.
2017-06-09 10:16:30 -07:00
guoxiaolong 82faacd791 [SPARK-20997][CORE] driver-cores' standalone or Mesos or YARN in Cluster deploy mode only.
## What changes were proposed in this pull request?

'--driver-cores'  standalone or Mesos or YARN in Cluster deploy mode only.So  The description of spark-submit about it is not very accurate.

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>

Closes #18241 from guoxiaolongzte/SPARK-20997.
2017-06-09 14:26:54 +01:00
junzhi lu 6491cbf065 Fix bug in JavaRegressionMetricsExample.
the original code cant visit the last element of the"parts" array.
so the v[v.length–1] always equals 0

## What changes were proposed in this pull request?
change the recycle range from (1 to parts.length-1) to (1 to parts.length)

## How was this patch tested?

debug it in eclipse (´〜`*) zzz.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: junzhi lu <452756565@qq.com>

Closes #18237 from masterwugui/patch-1.
2017-06-09 10:49:04 +01:00
Corey Woodfield 033839559e Fixed broken link
## What changes were proposed in this pull request?

I fixed some incorrect formatting on a link in the docs

## How was this patch tested?

I looked at the markdown preview before and after, and the link was fixed

Before:
<img width="593" alt="screen shot 2017-06-08 at 6 37 32 pm" src="https://user-images.githubusercontent.com/17733030/26956272-a62cd558-4c79-11e7-862f-9d0e0184b18a.png">
After:
<img width="587" alt="screen shot 2017-06-08 at 6 37 44 pm" src="https://user-images.githubusercontent.com/17733030/26956276-b1135ef6-4c79-11e7-8028-84d19c392fda.png">

Author: Corey Woodfield <coreywoodfield@gmail.com>

Closes #18246 from coreywoodfield/master.
2017-06-09 10:24:49 +01:00
guoxiaolong bdcd6e4c68 [SPARK-20995][CORE] Spark-env.sh.template' should add 'YARN_CONF_DIR' configuration instructions.
## What changes were proposed in this pull request?

Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster.
These configs are used to write to HDFS and connect to the YARN ResourceManager. The
configuration contained in this directory will be distributed to the YARN cluster so that all
containers used by the application use the same configuration.

Sometimes, `HADOOP_CONF_DIR` is set to the hdfs configuration file path. So, YARN_CONF_DIR should be set to the yarn configuration file path.

My project configuration item of 'spark-env.sh ' is as follows:
![1](https://cloud.githubusercontent.com/assets/26266482/26819987/d4acb814-4ad3-11e7-8458-a21aea57a53d.png)

'HADOOP_CONF_DIR' configuration file path. List the relevant documents below:
![3](https://cloud.githubusercontent.com/assets/26266482/26820116/47b6b9fe-4ad4-11e7-8131-fe07c8d8bc21.png)

'YARN_CONF_DIR' configuration file path. List the relevant documents below:
![2](https://cloud.githubusercontent.com/assets/26266482/26820078/274ad79a-4ad4-11e7-83d4-ff359dbb397c.png)

So, 'Spark-env.sh.template' should add 'YARN_CONF_DIR' configuration instructions.

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>

Closes #18212 from guoxiaolongzte/SPARK-20995.
2017-06-09 09:26:30 +01:00
Joseph K. Bradley 5a3371883a [SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of reduce
## What changes were proposed in this pull request?

Previously, `RDD.treeAggregate` used `reduceByKey` and `reduce` in its implementation, neither of which technically allows the `seq`/`combOps` to modify and return their first arguments.

This PR uses `foldByKey` and `fold` instead and notes that `aggregate` and `treeAggregate` are semantically identical in the Scala doc.

Note that this had some test failures by unknown reasons. This was actually fixed in e3554605b3.

The root cause was, the `zeroValue` now becomes `AFTAggregator` and it compares `totalCnt` (where the value is actually 0). It starts merging one by one and it keeps returning `this` where `totalCnt` is 0. So, this looks not the bug in the current change.

This is now fixed in the commit. So, this should pass the tests.

## How was this patch tested?

Test case added in `RDDSuite`.

Closes #12217

Author: Joseph K. Bradley <joseph@databricks.com>
Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18198 from HyukjinKwon/SPARK-14408.
2017-06-09 08:53:18 +01:00
Josh Rosen 2a23cdd078 [SPARK-20863] Add metrics/instrumentation to LiveListenerBus
## What changes were proposed in this pull request?

This patch adds Coda Hale metrics for instrumenting the `LiveListenerBus` in order to track the number of events received, dropped, and processed. In addition, it adds per-SparkListener-subclass timers to track message processing time. This is useful for identifying when slow third-party SparkListeners cause performance bottlenecks.

See the new `LiveListenerBusMetrics` for a complete description of the new metrics.

## How was this patch tested?

New tests in SparkListenerSuite, including a test to ensure proper counting of dropped listener events.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #18083 from JoshRosen/listener-bus-metrics.
2017-06-08 18:08:25 -07:00
Dongjoon Hyun 6e95897e88 [SPARK-20954][SQL] DESCRIBE [EXTENDED] result should be compatible with previous Spark
## What changes were proposed in this pull request?

After [SPARK-20067](https://issues.apache.org/jira/browse/SPARK-20067), `DESCRIBE` and `DESCRIBE EXTENDED` shows the following result. This is incompatible with Spark 2.1.1. This PR removes the column header line in case of those command.

**MASTER** and **BRANCH-2.2**
```scala
scala> sql("desc t").show(false)
+----------+---------+-------+
|col_name  |data_type|comment|
+----------+---------+-------+
|# col_name|data_type|comment|
|a         |int      |null   |
+----------+---------+-------+
```

**SPARK 2.1.1** and **this PR**
```scala
scala> sql("desc t").show(false)
+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|a       |int      |null   |
+--------+---------+-------+
```

## How was this patch tested?

Pass the Jenkins with the updated test suites.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18203 from dongjoon-hyun/SPARK-20954.
2017-06-08 16:46:56 -07:00
Xiao Li 1a527bde49 [SPARK-20976][SQL] Unify Error Messages for FAILFAST mode
### What changes were proposed in this pull request?
Before 2.2, we indicate the job was terminated because of `FAILFAST` mode.
```
Malformed line in FAILFAST mode: {"a":{, b:3}
```
If possible, we should keep it. This PR is to unify the error messages.

### How was this patch tested?
Modified the existing messages.

Author: Xiao Li <gatorsmile@gmail.com>

Closes #18196 from gatorsmile/messFailFast.
2017-06-08 12:10:31 -07:00
Mark Grover 55b8cfe6e6 [SPARK-19185][DSTREAM] Make Kafka consumer cache configurable
## What changes were proposed in this pull request?

Add a new property `spark.streaming.kafka.consumer.cache.enabled` that allows users to enable or disable the cache for Kafka consumers. This property can be especially handy in cases where issues like SPARK-19185 get hit, for which there isn't a solution committed yet. By default, the cache is still on, so this change doesn't change any out-of-box behavior.

## How was this patch tested?
Running unit tests

Author: Mark Grover <mark@apache.org>
Author: Mark Grover <grover.markgrover@gmail.com>

Closes #18234 from markgrover/spark-19185.
2017-06-08 09:55:43 -07:00
hyukjinkwon b771fed73f [INFRA] Close stale PRs
# What changes were proposed in this pull request?

This PR proposes to close stale PRs, mostly the same instances with https://github.com/apache/spark/pull/18017

Closes #11459
Closes #13833
Closes #13720
Closes #12506
Closes #12456
Closes #12252
Closes #17689
Closes #17791
Closes #18163
Closes #17640
Closes #17926
Closes #18163
Closes #12506
Closes #18044
Closes #14036
Closes #15831
Closes #14461
Closes #17638
Closes #18222

Added:
Closes #18045
Closes #18061
Closes #18010
Closes #18041
Closes #18124
Closes #18130
Closes #12217

Added:
Closes #16291
Closes #17480
Closes #14995

Added:
Closes #12835
Closes #17141

## How was this patch tested?

N/A

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #18223 from HyukjinKwon/close-stale-prs.
2017-06-08 11:14:42 +01:00
10087686 9be7945861 [SPARK-21006][TESTS] Create rpcEnv and run later needs shutdown and awaitTermination
Signed-off-by: 10087686 <wang.jiaochunzte.com.cn>

## What changes were proposed in this pull request?
When  run test("port conflict") case, we need run anotherEnv.shutdown() and anotherEnv.awaitTermination() for free resource.
(Please fill in changes proposed in this fix)

## How was this patch tested?
run RpcEnvSuit.scala Utest
(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: 10087686 <wang.jiaochun@zte.com.cn>

Closes #18226 from wangjiaochun/master.
2017-06-08 10:58:09 +01:00
Sean Owen 847efe1265 [SPARK-20914][DOCS] Javadoc contains code that is invalid
## What changes were proposed in this pull request?

Fix Java, Scala Dataset examples in scaladoc, which didn't compile.

## How was this patch tested?

Existing compilation/test

Author: Sean Owen <sowen@cloudera.com>

Closes #18215 from srowen/SPARK-20914.
2017-06-08 10:56:23 +01:00
guoxiaolong 0ca69c4ccf [SPARK-20966][WEB-UI][SQL] Table data is not sorted by startTime time desc, time is not formatted and redundant code in JDBC/ODBC Server page.
## What changes were proposed in this pull request?

1. Question 1 : Table data is not sorted by startTime time desc in JDBC/ODBC Server page.

fix before :
![2](https://cloud.githubusercontent.com/assets/26266482/26718483/bf4a0fa8-47b3-11e7-9a27-dc6a67165b16.png)

fix after :
![21](https://cloud.githubusercontent.com/assets/26266482/26718544/eb7376c8-47b3-11e7-9117-1bc68dfec92c.png)

2. Question 2 :  time is not formatted in JDBC/ODBC Server page.

fix before :
![1](https://cloud.githubusercontent.com/assets/26266482/26718573/0497d86a-47b4-11e7-945b-582aaa103949.png)

fix after :
![11](https://cloud.githubusercontent.com/assets/26266482/26718602/21371ad0-47b4-11e7-9587-c5114d10ab2c.png)

3. Question 3 :  Redundant code in the ThriftServerSessionPage.scala.
The function of 'generateSessionStatsTable'  has not been used

## How was this patch tested?

manual tests

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: guoxiaolong <guo.xiaolong1@zte.com.cn>
Author: 郭小龙 10207633 <guo.xiaolong1@zte.com.cn>
Author: guoxiaolongzte <guo.xiaolong1@zte.com.cn>

Closes #18186 from guoxiaolongzte/SPARK-20966.
2017-06-07 10:18:40 +01:00
Dongjoon Hyun 3218505a0b [MINOR][DOC] Update deprecation notes on Python/Hadoop/Scala.
## What changes were proposed in this pull request?

We had better update the deprecation notes about Python 2.6, Hadoop (before 2.6.5) and Scala 2.10 in [2.2.0-RC4](http://people.apache.org/~pwendell/spark-releases/spark-2.2.0-rc4-docs/) documentation. Since this is a doc only update, I think we can update the doc during publishing.

**BEFORE (2.2.0-RC4)**
    ![before](https://cloud.githubusercontent.com/assets/9700541/26799758/aea0dc06-49eb-11e7-8ca3-ed8ce1cc6147.png)

**AFTER**
    ![after](https://cloud.githubusercontent.com/assets/9700541/26799761/b3fef818-49eb-11e7-83c5-334f0e4768ed.png)

## How was this patch tested?

Manual.
```
SKIP_API=1 jekyll build
```

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18207 from dongjoon-hyun/minor_doc_deprecation.
2017-06-07 08:50:36 +01:00
Bogdan Raducanu cb83ca1433 [SPARK-20854][TESTS] Removing duplicate test case
## What changes were proposed in this pull request?

Removed a duplicate case in "SPARK-20854: select hint syntax with expressions"

## How was this patch tested?
Existing tests.

Author: Bogdan Raducanu <bogdan@databricks.com>

Closes #18217 from bogdanrdc/SPARK-20854-2.
2017-06-06 22:51:10 -07:00