Commit graph

20493 commits

Author SHA1 Message Date
Sean Owen e17901d6df [SPARK-22049][DOCS] Confusing behavior of from_utc_timestamp and to_utc_timestamp
## What changes were proposed in this pull request?

Clarify behavior of to_utc_timestamp/from_utc_timestamp with an example

## How was this patch tested?

Doc only change / existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19276 from srowen/SPARK-22049.
2017-09-20 20:47:17 +09:00
Sean Owen 2b6ff0cefd [SPARK-22066][BUILD][HOTFIX] Revert scala-maven-plugin to 3.2.2 to work with Maven+zinc again
## What changes were proposed in this pull request?

See https://github.com/apache/spark/pull/19282
Revert scala-maven-plugin to 3.2.2 to work with Maven+zinc again

## How was this patch tested?

Reproduced locally with zinc, and confirmed this removes the problem.

Author: Sean Owen <sowen@cloudera.com>

Closes #19292 from srowen/SPARK-22066.2.
2017-09-20 10:49:06 +01:00
Sean Owen 3d4dd14cd5 [SPARK-22066][BUILD] Update checkstyle to 8.2, enable it, fix violations
## What changes were proposed in this pull request?

Update plugins, including scala-maven-plugin, to latest versions. Update checkstyle to 8.2. Remove bogus checkstyle config and enable it. Fix existing and new Java checkstyle errors.

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19282 from srowen/SPARK-22066.
2017-09-20 10:01:46 +01:00
Burak Yavuz 280ff523f4 [SPARK-21977] SinglePartition optimizations break certain Streaming Stateful Aggregation requirements
## What changes were proposed in this pull request?

This is a bit hard to explain as there are several issues here, I'll try my best. Here are the requirements:
  1. A StructuredStreaming Source that can generate empty RDDs with 0 partitions
  2. A StructuredStreaming query that uses the above source, performs a stateful aggregation
     (mapGroupsWithState, groupBy.count, ...), and coalesce's by 1

The crux of the problem is that when a dataset has a `coalesce(1)` call, it receives a `SinglePartition` partitioning scheme. This scheme satisfies most required distributions used for aggregations such as HashAggregateExec. This causes a world of problems:
  Symptom 1. If the input RDD has 0 partitions, the whole lineage will receive 0 partitions, nothing will be executed, the state store will not create any delta files. When this happens, the next trigger fails, because the StateStore fails to load the delta file for the previous trigger
  Symptom 2. Let's say that there was data. Then in this case, if you stop your stream, and change `coalesce(1)` with `coalesce(2)`, then restart your stream, your stream will fail, because `spark.sql.shuffle.partitions - 1` number of StateStores will fail to find its delta files.

To fix the issues above, we must check that the partitioning of the child of a `StatefulOperator` satisfies:
If the grouping expressions are empty:
  a) AllTuple distribution
  b) Single physical partition
If the grouping expressions are non empty:
  a) Clustered distribution
  b) spark.sql.shuffle.partition # of partitions
whether or not `coalesce(1)` exists in the plan, and whether or not the input RDD for the trigger has any data.

Once you fix the above problem by adding an Exchange to the plan, you come across the following bug:
If you call `coalesce(1).groupBy().count()` on a Streaming DataFrame, and if you have a trigger with no data, `StateStoreRestoreExec` doesn't return the prior state. However, for this specific aggregation, `HashAggregateExec` after the restore returns a (0, 0) row, since we're performing a count, and there is no data. Then this data gets stored in `StateStoreSaveExec` causing the previous counts to be overwritten and lost.

## How was this patch tested?

Regression tests

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #19196 from brkyvz/sa-0.
2017-09-20 00:01:21 -07:00
Marcelo Vanzin c6ff59a230 [SPARK-18838][CORE] Add separate listener queues to LiveListenerBus.
This change modifies the live listener bus so that all listeners are
added to queues; each queue has its own thread to dispatch events,
making it possible to separate slow listeners from other more
performance-sensitive ones.

The public API has not changed - all listeners added with the existing
"addListener" method, which after this change mostly means all
user-defined listeners, end up in a default queue. Internally, there's
an API allowing listeners to be added to specific queues, and that API
is used to separate the internal Spark listeners into 3 categories:
application status listeners (e.g. UI), executor management (e.g. dynamic
allocation), and the event log.

The queueing logic, while abstracted away in a separate class, is kept
as much as possible hidden away from consumers. Aside from choosing their
queue, there's no code change needed to take advantage of queues.

Test coverage relies on existing tests; a few tests had to be tweaked
because they relied on `LiveListenerBus.postToAll` being synchronous,
and the change makes that method asynchronous. Other tests were simplified
not to use the asynchronous LiveListenerBus.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19211 from vanzin/SPARK-18838.
2017-09-20 13:41:29 +08:00
Bryan Cutler 718bbc9390 [SPARK-22067][SQL] ArrowWriter should use position when setting UTF8String ByteBuffer
## What changes were proposed in this pull request?

The ArrowWriter StringWriter was setting Arrow data using a position of 0 instead of the actual position in the ByteBuffer.  This was currently working because of a bug ARROW-1443, and has been fixed as of
Arrow 0.7.0.  Testing with this version revealed the error in ArrowConvertersSuite test string conversion.

## How was this patch tested?

Existing tests, manually verified working with Arrow 0.7.0

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #19284 from BryanCutler/arrow-ArrowWriter-StringWriter-position-SPARK-22067.
2017-09-20 10:51:00 +09:00
aokolnychyi ee13f3e3dc [SPARK-21969][SQL] CommandUtils.updateTableStats should call refreshTable
## What changes were proposed in this pull request?

Tables in the catalog cache are not invalidated once their statistics are updated. As a consequence, existing sessions will use the cached information even though it is not valid anymore. Consider and an example below.

```
// step 1
spark.range(100).write.saveAsTable("tab1")
// step 2
spark.sql("analyze table tab1 compute statistics")
// step 3
spark.sql("explain cost select distinct * from tab1").show(false)
// step 4
spark.range(100).write.mode("append").saveAsTable("tab1")
// step 5
spark.sql("explain cost select distinct * from tab1").show(false)
```

After step 3, the table will be present in the catalog relation cache. Step 4 will correctly update the metadata inside the catalog but will NOT invalidate the cache.

By the way, ``spark.sql("analyze table tab1 compute statistics")`` between step 3 and step 4 would also solve the problem.

## How was this patch tested?

Current and additional unit tests.

Author: aokolnychyi <anton.okolnychyi@sap.com>

Closes #19252 from aokolnychyi/spark-21969.
2017-09-19 14:19:13 -07:00
Huaxin Gao d5aefa83ad [SPARK-21338][SQL] implement isCascadingTruncateTable() method in AggregatedDialect
## What changes were proposed in this pull request?

org.apache.spark.sql.jdbc.JdbcDialect's method:
def isCascadingTruncateTable(): Option[Boolean] = None
is not overriden in org.apache.spark.sql.jdbc.AggregatedDialect class.
Because of this issue, when you add more than one dialect Spark doesn't truncate table because isCascadingTruncateTable always returns default None for Aggregated Dialect.
Will implement isCascadingTruncateTable in AggregatedDialect class in this PR.

## How was this patch tested?

In JDBCSuite, inside test("Aggregated dialects"), will add one line to test AggregatedDialect.isCascadingTruncateTable

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #19256 from huaxingao/spark-21338.
2017-09-19 09:27:05 -07:00
Yanbo Liang 2f962422a2 [MINOR][ML] Remove unnecessary default value setting for evaluators.
## What changes were proposed in this pull request?
Remove unnecessary default value setting for all evaluators, as we have set them in corresponding _HasXXX_ base classes.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #19262 from yanboliang/evaluation.
2017-09-19 22:22:35 +08:00
jerryshao 8319432af6 [SPARK-21917][CORE][YARN] Supporting adding http(s) resources in yarn mode
## What changes were proposed in this pull request?
In the current Spark, when submitting application on YARN with remote resources `./bin/spark-shell --jars http://central.maven.org/maven2/com/github/swagger-akka-http/swagger-akka-http_2.11/0.10.1/swagger-akka-http_2.11-0.10.1.jar --master yarn-client -v`, Spark will be failed with:

```
java.io.IOException: No FileSystem for scheme: http
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2586)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2593)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2632)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2614)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
	at org.apache.spark.deploy.yarn.Client.copyFileToRemote(Client.scala:354)
	at org.apache.spark.deploy.yarn.Client.org$apache$spark$deploy$yarn$Client$$distribute$1(Client.scala:478)
	at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:600)
	at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11$$anonfun$apply$6.apply(Client.scala:599)
	at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:74)
	at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:599)
	at org.apache.spark.deploy.yarn.Client$$anonfun$prepareLocalResources$11.apply(Client.scala:598)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at org.apache.spark.deploy.yarn.Client.prepareLocalResources(Client.scala:598)
	at org.apache.spark.deploy.yarn.Client.createContainerLaunchContext(Client.scala:848)
	at org.apache.spark.deploy.yarn.Client.submitApplication(Client.scala:173)
```

This is because `YARN#client` assumes resources are on the Hadoop compatible FS. To fix this problem, here propose to download remote http(s) resources to local and add this local downloaded resources to dist cache. This solution has one downside: remote resources are downloaded and uploaded again, but it only restricted to only remote http(s) resources, also the overhead is not so big. The advantages of this solution is that it is simple and the code changes restricts to only `SparkSubmit`.

## How was this patch tested?

Unit test added, also verified in local cluster.

Author: jerryshao <sshao@hortonworks.com>

Closes #19130 from jerryshao/SPARK-21917.
2017-09-19 22:20:05 +08:00
Kent Yao 581200af71 [SPARK-21428][SQL][FOLLOWUP] CliSessionState should point to the actual metastore not a dummy one
## What changes were proposed in this pull request?

While running bin/spark-sql, we will reuse cliSessionState, but the Hive configurations generated here just points to a dummy meta store which actually should be the real one. And the warehouse is determined later in SharedState, HiveClient should respect this config changing in this case too.

## How was this patch tested?
existing ut

cc cloud-fan jiangxb1987

Author: Kent Yao <yaooqinn@hotmail.com>

Closes #19068 from yaooqinn/SPARK-21428-FOLLOWUP.
2017-09-19 19:35:36 +08:00
Taaffy 1bc17a6b8a [SPARK-22052] Incorrect Metric assigned in MetricsReporter.scala
Current implementation for processingRate-total uses wrong metric:
mistakenly uses inputRowsPerSecond instead of processedRowsPerSecond

## What changes were proposed in this pull request?
Adjust processingRate-total from using inputRowsPerSecond to processedRowsPerSecond

## How was this patch tested?

Built spark from source with proposed change and tested output with correct parameter. Before change the csv metrics file for inputRate-total and processingRate-total displayed the same values due to the error. After changing MetricsReporter.scala the processingRate-total csv file displayed the correct metric.
<img width="963" alt="processed rows per second" src="https://user-images.githubusercontent.com/32072374/30554340-82eea12c-9ca4-11e7-8370-8168526ff9a2.png">

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Taaffy <32072374+Taaffy@users.noreply.github.com>

Closes #19268 from Taaffy/patch-1.
2017-09-19 10:20:04 +01:00
Armin 7c92351f43 [MINOR][CORE] Cleanup dead code and duplication in Mem. Management
## What changes were proposed in this pull request?

* Removed the method `org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter#alignToWords`.
It became unused as a result of 85b0a15754
(SPARK-15962) introducing word alignment for unsafe arrays.
* Cleaned up duplicate code in memory management and unsafe sorters
  * The change extracting the exception paths is more than just cosmetics since it def. reduces the size the affected methods compile to

## How was this patch tested?

* Build still passes after removing the method, grepping the codebase for `alignToWords` shows no reference to it anywhere either.
* Dried up code is covered by existing tests.

Author: Armin <me@obrown.io>

Closes #19254 from original-brownbear/cleanup-mem-consumer.
2017-09-19 10:06:32 +01:00
Xianyang Liu a11db942aa [SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record
## What changes were proposed in this pull request?
When Spark persist data to Unsafe memory, we call  the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization.

## How was this patch tested?

Test case (with 1 executor 20 core):
```scala
val start = System.currentTimeMillis()
val data = sc.parallelize(0 until Integer.MAX_VALUE, 100)
      .persist(StorageLevel.OFF_HEAP)
      .count()

println(System.currentTimeMillis() - start)

```

Test result:

before

|  27647  |  29108  |  28591  |  28264  |  27232  |

after

|  26868  |  26358  |  27767  |  26653  |  26693  |

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #19135 from ConeyLiu/memorystore.
2017-09-19 14:51:27 +08:00
Wenchen Fan 10f45b3c84 [SPARK-22047][FLAKY TEST] HiveExternalCatalogVersionsSuite
## What changes were proposed in this pull request?

This PR tries to download Spark for each test run, to make sure each test run is absolutely isolated.

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19265 from cloud-fan/test.
2017-09-19 11:53:50 +08:00
alexmnyc 94f7e046a2 [SPARK-22030][CORE] GraphiteSink fails to re-connect to Graphite instances behind an ELB or any other auto-scaled LB
## What changes were proposed in this pull request?

Upgrade codahale metrics library so that Graphite constructor can re-resolve hosts behind a CNAME with re-tried DNS lookups. When Graphite is deployed behind an ELB, ELB may change IP addresses based on auto-scaling needs. Using current approach yields Graphite usage impossible, fixing for that use case

- Upgrade to codahale 3.1.5
- Use new Graphite(host, port) constructor instead of new Graphite(new InetSocketAddress(host, port)) constructor

## How was this patch tested?

The same logic is used for another project that is using the same configuration and code path, and graphite re-connect's behind ELB's are no longer an issue

This are proposed changes for codahale lib - https://github.com/dropwizard/metrics/compare/v3.1.2...v3.1.5#diff-6916c85d2dd08d89fe771c952e3b8512R120. Specifically, b4d246d34e/metrics-graphite/src/main/java/com/codahale/metrics/graphite/Graphite.java (L120)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: alexmnyc <project@alexandermarkham.com>

Closes #19210 from alexmnyc/patch-1.
2017-09-19 10:05:59 +08:00
Kevin Yu c66d64b3df [SPARK-14878][SQL] Trim characters string function support
#### What changes were proposed in this pull request?

This PR enhances the TRIM function support in Spark SQL by allowing the specification
of trim characters set. Below is the SQL syntax :

``` SQL
<trim function> ::= TRIM <left paren> <trim operands> <right paren>
<trim operands> ::= [ [ <trim specification> ] [ <trim character set> ] FROM ] <trim source>
<trim source> ::= <character value expression>
<trim specification> ::=
  LEADING
| TRAILING
| BOTH
<trim character set> ::= <characters value expression>
```
or
``` SQL
LTRIM (source-exp [, trim-exp])
RTRIM (source-exp [, trim-exp])
```

Here are the documentation link of support of this feature by other mainstream databases.
- **Oracle:** [TRIM function](http://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2126.htm#OLADM704)
- **DB2:** [TRIM scalar function](https://www.ibm.com/support/knowledgecenter/en/SSMKHH_10.0.0/com.ibm.etools.mft.doc/ak05270_.htm)
- **MySQL:** [Trim function](http://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_trim)
- **Oracle:** [ltrim](https://docs.oracle.com/cd/B28359_01/olap.111/b28126/dml_functions_2018.htm#OLADM594)
- **DB2:** [ltrim](https://www.ibm.com/support/knowledgecenter/en/SSEPEK_11.0.0/sqlref/src/tpc/db2z_bif_ltrim.html)

This PR is to implement the above enhancement. In the implementation, the design principle is to keep the changes to the minimum. Also, the exiting trim functions (which handles a special case, i.e., trimming space characters) are kept unchanged for performane reasons.
#### How was this patch tested?

The unit test cases are added in the following files:
- UTF8StringSuite.java
- StringExpressionsSuite.scala
- sql/SQLQuerySuite.scala
- StringFunctionsSuite.scala

Author: Kevin Yu <qyu@us.ibm.com>

Closes #12646 from kevinyu98/spark-14878.
2017-09-18 12:12:35 -07:00
Feng Liu 3b049abf10 [SPARK-22003][SQL] support array column in vectorized reader with UDF
## What changes were proposed in this pull request?

The UDF needs to deserialize the `UnsafeRow`. When the column type is Array, the `get` method from the `ColumnVector`, which is used by the vectorized reader, is called, but this method is not implemented.

## How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #19230 from liufengdb/fix_array_open.
2017-09-18 08:49:32 -07:00
Wenchen Fan 894a7561de [SPARK-22047][TEST] ignore HiveExternalCatalogVersionsSuite
## What changes were proposed in this pull request?

As reported in https://issues.apache.org/jira/browse/SPARK-22047 , HiveExternalCatalogVersionsSuite is failing frequently, let's disable this test suite to unblock other PRs, I'm looking into the root cause.

## How was this patch tested?
N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19264 from cloud-fan/test.
2017-09-18 16:42:08 +08:00
Sital Kedia 1e978b17d6 [SPARK-21113][CORE] Read ahead input stream to amortize disk IO cost …
Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O.

## How was this patch tested?

Tested by running a job on the cluster and could see up to 8% CPU improvement.

Author: Sital Kedia <skedia@fb.com>
Author: Shixiong Zhu <zsxwing@gmail.com>
Author: Sital Kedia <sitalkedia@users.noreply.github.com>

Closes #18317 from sitalkedia/read_ahead_buffer.
2017-09-17 23:15:08 -07:00
hyukjinkwon 7c7266208a [SPARK-22043][PYTHON] Improves error message for show_profiles and dump_profiles
## What changes were proposed in this pull request?

This PR proposes to improve error message from:

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/context.py", line 1000, in show_profiles
    self.profiler_collector.show_profiles()
AttributeError: 'NoneType' object has no attribute 'show_profiles'
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/context.py", line 1005, in dump_profiles
    self.profiler_collector.dump_profiles(path)
AttributeError: 'NoneType' object has no attribute 'dump_profiles'
```

to

```
>>> sc.show_profiles()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/context.py", line 1003, in show_profiles
    raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile.
>>> sc.dump_profiles("/tmp/abc")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File ".../spark/python/pyspark/context.py", line 1012, in dump_profiles
    raise RuntimeError("'spark.python.profile' configuration must be set "
RuntimeError: 'spark.python.profile' configuration must be set to 'true' to enable Python profile.
```

## How was this patch tested?

Unit tests added in `python/pyspark/tests.py` and manual tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #19260 from HyukjinKwon/profile-errors.
2017-09-18 13:20:11 +09:00
Andrew Ash 6308c65f08 [SPARK-21953] Show both memory and disk bytes spilled if either is present
As written now, there must be both memory and disk bytes spilled to show either of them. If there is only one of those types of spill recorded, it will be hidden.

Author: Andrew Ash <andrew@andrewash.com>

Closes #19164 from ash211/patch-3.
2017-09-18 10:42:24 +08:00
Andrew Ray 6adf67dd14 [SPARK-21985][PYSPARK] PairDeserializer is broken for double-zipped RDDs
## What changes were proposed in this pull request?
(edited)
Fixes a bug introduced in #16121

In PairDeserializer convert each batch of keys and values to lists (if they do not have `__len__` already) so that we can check that they are the same size. Normally they already are lists so this should not have a performance impact, but this is needed when repeated `zip`'s are done.

## How was this patch tested?

Additional unit test

Author: Andrew Ray <ray.andrew@gmail.com>

Closes #19226 from aray/SPARK-21985.
2017-09-18 02:46:27 +09:00
Maciej Bryński f4073020ad [SPARK-22032][PYSPARK] Speed up StructType conversion
## What changes were proposed in this pull request?

StructType.fromInternal is calling f.fromInternal(v) for every field.
We can use precalculated information about type to limit the number of function calls. (its calculated once per StructType and used in per record calculations)

Benchmarks (Python profiler)
```
df = spark.range(10000000).selectExpr("id as id0", "id as id1", "id as id2", "id as id3", "id as id4", "id as id5", "id as id6", "id as id7", "id as id8", "id as id9", "struct(id) as s").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
310274584 function calls (300272456 primitive calls) in 1320.684 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 10000000  253.417    0.000  486.991    0.000 types.py:619(<listcomp>)
 30000000  192.272    0.000 1009.986    0.000 types.py:612(fromInternal)
100000000  176.140    0.000  176.140    0.000 types.py:88(fromInternal)
 20000000  156.832    0.000  328.093    0.000 types.py:1471(_create_row)
    14000  107.206    0.008 1237.917    0.088 {built-in method loads}
 20000000   80.176    0.000 1090.162    0.000 types.py:1468(<lambda>)
```

After
```
210274584 function calls (200272456 primitive calls) in 1035.974 seconds

Ordered by: internal time, cumulative time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
 30000000  215.845    0.000  698.748    0.000 types.py:612(fromInternal)
 20000000  165.042    0.000  351.572    0.000 types.py:1471(_create_row)
    14000  116.834    0.008  946.791    0.068 {built-in method loads}
 20000000   87.326    0.000  786.073    0.000 types.py:1468(<lambda>)
 20000000   85.477    0.000  134.607    0.000 types.py:1519(__new__)
 10000000   65.777    0.000  126.712    0.000 types.py:619(<listcomp>)
```

Main difference is types.py:619(<listcomp>) and types.py:88(fromInternal) (which is removed in After)
The number of function calls is 100 million less. And performance is 20% better.

Benchmark (worst case scenario.)

Test
```
df = spark.range(1000000).selectExpr("current_timestamp as id0", "current_timestamp as id1", "current_timestamp as id2", "current_timestamp as id3", "current_timestamp as id4", "current_timestamp as id5", "current_timestamp as id6", "current_timestamp as id7", "current_timestamp as id8", "current_timestamp as id9").cache()
df.count()
df.rdd.map(lambda x: x).count()
```

Before
```
31166064 function calls (31163984 primitive calls) in 150.882 seconds
```

After
```
31166064 function calls (31163984 primitive calls) in 153.220 seconds
```

IMPORTANT:
The benchmark was done on top of https://github.com/apache/spark/pull/19246.
Without https://github.com/apache/spark/pull/19246 the performance improvement will be even greater.

## How was this patch tested?

Existing tests.
Performance benchmark.

Author: Maciej Bryński <maciek-github@brynski.pl>

Closes #19249 from maver1ck/spark_22032.
2017-09-18 02:34:44 +09:00
Armin 73d9067226 [SPARK-21967][CORE] org.apache.spark.unsafe.types.UTF8String#compareTo Should Compare 8 Bytes at a Time for Better Performance
## What changes were proposed in this pull request?

* Using 64 bit unsigned long comparison instead of unsigned int comparison in `org.apache.spark.unsafe.types.UTF8String#compareTo` for better performance.
* Making `IS_LITTLE_ENDIAN` a constant for correctness reasons (shouldn't use a non-constant in `compareTo` implementations and it def. is a constant per JVM)

## How was this patch tested?

Build passes and the functionality is widely covered by existing tests as far as I can see.

Author: Armin <me@obrown.io>

Closes #19180 from original-brownbear/SPARK-21967.
2017-09-16 09:18:13 +01:00
Jose Torres 0bad10d3e3 [SPARK-22017] Take minimum of all watermark execs in StreamExecution.
## What changes were proposed in this pull request?

Take the minimum of all watermark exec nodes as the "real" watermark in StreamExecution, rather than picking one arbitrarily.

## How was this patch tested?

new unit test

Author: Jose Torres <jose@databricks.com>

Closes #19239 from joseph-torres/SPARK-22017.
2017-09-15 21:10:07 -07:00
Wenchen Fan c7307acdad [SPARK-15689][SQL] data source v2 read path
## What changes were proposed in this pull request?

This PR adds the infrastructure for data source v2, and implement features which Spark already have in data source v1, i.e. column pruning, filter push down, catalyst expression filter push down, InternalRow scan, schema inference, data size report. The write path is excluded to avoid making this PR growing too big, and will be added in follow-up PR.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19136 from cloud-fan/data-source-v2.
2017-09-15 22:18:36 +08:00
Travis Hegner 79a4dab629 [SPARK-21958][ML] Word2VecModel save: transform data in the cluster
## What changes were proposed in this pull request?

Change a data transformation while saving a Word2VecModel to happen with distributed data instead of local driver data.

## How was this patch tested?

Unit tests for the ML sub-component still pass.
Running this patch against v2.2.0 in a fully distributed production cluster allows a 4.0G model to save and load correctly, where it would not do so without the patch.

Author: Travis Hegner <thegner@trilliumit.com>

Closes #19191 from travishegner/master.
2017-09-15 15:17:16 +02:00
Wenchen Fan 3c6198c86e [SPARK-21987][SQL] fix a compatibility issue of sql event logs
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/18600 we removed the `metadata` field from `SparkPlanInfo`. This causes a problem when we replay event logs that are generated by older Spark versions.

## How was this patch tested?

a regression test.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19237 from cloud-fan/event.
2017-09-15 00:47:44 -07:00
Yuming Wang 4decedfdbd [SPARK-22002][SQL] Read JDBC table use custom schema support specify partial fields.
## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18266 add a new feature to support read JDBC table use custom schema, but we must specify all the fields. For simplicity, this PR support  specify partial fields.

## How was this patch tested?
unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #19231 from wangyum/SPARK-22002.
2017-09-14 23:35:55 -07:00
zhoukang 22b111ef9d [SPARK-21902][CORE] Print root cause for BlockManager#doPut
## What changes were proposed in this pull request?

As logging below, actually exception will be hidden when removeBlockInternal throw an exception.
`2017-08-31,10:26:57,733 WARN org.apache.spark.storage.BlockManager: Putting block broadcast_110 failed due to an exception
2017-08-31,10:26:57,734 WARN org.apache.spark.broadcast.BroadcastManager: Failed to create a new broadcast in 1 attempts
java.io.IOException: Failed to create local dir in /tmp/blockmgr-5bb5ac1e-c494-434a-ab89-bd1808c6b9ed/2e.
        at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:70)
        at org.apache.spark.storage.DiskStore.remove(DiskStore.scala:115)
        at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1339)
        at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:910)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
        at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:726)
        at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1233)
        at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:122)
        at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:88)
        at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
        at org.apache.spark.broadcast.BroadcastManager$$anonfun$newBroadcast$1.apply$mcVI$sp(BroadcastManager.scala:60)
        at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160)
        at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:58)
        at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1415)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1002)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:924)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:771)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$submitWaitingChildStages$6.apply(DAGScheduler.scala:770)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.scheduler.DAGScheduler.submitWaitingChildStages(DAGScheduler.scala:770)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1235)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1662)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1620)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1609)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)`

In this pr i will print exception first make troubleshooting more conveniently.
PS:
This one split from [PR-19133](https://github.com/apache/spark/pull/19133)

## How was this patch tested?
Exsist unit test

Author: zhoukang <zhoukang199191@gmail.com>

Closes #19171 from caneGuy/zhoukang/print-rootcause.
2017-09-15 14:03:26 +08:00
Tathagata Das 88661747f5 [SPARK-22018][SQL] Preserve top-level alias metadata when collapsing projects
## What changes were proposed in this pull request?
If there are two projects like as follows.
```
Project [a_with_metadata#27 AS b#26]
+- Project [a#0 AS a_with_metadata#27]
   +- LocalRelation <empty>, [a#0, b#1]
```
Child Project has an output column with a metadata in it, and the parent Project has an alias that implicitly forwards the metadata. So this metadata is visible for higher operators. Upon applying CollapseProject optimizer rule, the metadata is not preserved.
```
Project [a#0 AS b#26]
+- LocalRelation <empty>, [a#0, b#1]
```
This is incorrect, as downstream operators that expect certain metadata (e.g. watermark in structured streaming) to identify certain fields will fail to do so. This PR fixes it by preserving the metadata of top-level aliases.

## How was this patch tested?
New unit test

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19240 from tdas/SPARK-22018.
2017-09-14 22:32:16 -07:00
goldmedal a28728a9af [SPARK-21513][SQL][FOLLOWUP] Allow UDF to_json support converting MapType to json for PySpark and SparkR
## What changes were proposed in this pull request?
In previous work SPARK-21513, we has allowed `MapType` and `ArrayType` of `MapType`s convert to a json string but only for Scala API. In this follow-up PR, we will make SparkSQL support it for PySpark and SparkR, too. We also fix some little bugs and comments of the previous work in this follow-up PR.

### For PySpark
```
>>> data = [(1, {"name": "Alice"})]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'{"name":"Alice")']
>>> data = [(1, [{"name": "Alice"}, {"name": "Bob"}])]
>>> df = spark.createDataFrame(data, ("key", "value"))
>>> df.select(to_json(df.value).alias("json")).collect()
[Row(json=u'[{"name":"Alice"},{"name":"Bob"}]')]
```
### For SparkR
```
# Converts a map into a JSON object
df2 <- sql("SELECT map('name', 'Bob')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
# Converts an array of maps into a JSON array
df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
df2 <- mutate(df2, people_json = to_json(df2$people))
```
## How was this patch tested?
Add unit test cases.

cc viirya HyukjinKwon

Author: goldmedal <liugs963@gmail.com>

Closes #19223 from goldmedal/SPARK-21513-fp-PySaprkAndSparkR.
2017-09-15 11:53:10 +09:00
Jose Torres 054ddb2f54 [SPARK-21988] Add default stats to StreamingExecutionRelation.
## What changes were proposed in this pull request?

Add default stats to StreamingExecutionRelation.

## How was this patch tested?

existing unit tests and an explain() test to be sure

Author: Jose Torres <jose@databricks.com>

Closes #19212 from joseph-torres/SPARK-21988.
2017-09-14 11:06:25 -07:00
Zhenhua Wang ddd7f5e11d [SPARK-17642][SQL][FOLLOWUP] drop test tables and improve comments
## What changes were proposed in this pull request?

Drop test tables and improve comments.

## How was this patch tested?

Modified existing test.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19213 from wzhfy/useless_comment.
2017-09-14 23:14:21 +08:00
zhoukang 4b88393cb9 [SPARK-21922] Fix duration always updating when task failed but status is still RUN…
…NING

## What changes were proposed in this pull request?
When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely.
We can fix this time by modify time of event log since this time has gotten when `FSHistoryProvider` fetch event log from File System.
And the result picture is uploaded in [SPARK-21922](https://issues.apache.org/jira/browse/SPARK-21922).
How to reproduce?
(1) Submit a job to spark on yarn
(2) Mock an oom(or other case can make driver quit abnormally)  senario for driver
(3) Make sure executor is running task when driver quitting
(4) Open the history server and checkout result
It is not a corner case since there are many such jobs in our current cluster.

## How was this patch tested?
Deploy historyserver and open a job has this problem.

Author: zhoukang <zhoukang199191@gmail.com>

Closes #19132 from caneGuy/zhoukang/fix-duration.
2017-09-14 20:40:33 +08:00
gatorsmile 4e6fc69014 [SPARK-4131][FOLLOW-UP] Support "Writing data into the filesystem from queries"
## What changes were proposed in this pull request?
This PR is clean the codes in https://github.com/apache/spark/pull/18975

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19225 from gatorsmile/refactorSPARK-4131.
2017-09-14 14:48:04 +08:00
Yanbo Liang c76153cc7d [SPARK-18608][ML][FOLLOWUP] Fix double caching for PySpark OneVsRest.
## What changes were proposed in this pull request?
#19197 fixed double caching for MLlib algorithms, but missed PySpark ```OneVsRest```, this PR fixed it.

## How was this patch tested?
Existing tests.

Author: Yanbo Liang <ybliang8@gmail.com>

Closes #19220 from yanboliang/SPARK-18608.
2017-09-14 14:09:44 +08:00
Zheng RuiFeng 66cb72d7b9 [MINOR][DOC] Add missing call of update() in examples of PeriodicGraphCheckpointer & PeriodicRDDCheckpointer
## What changes were proposed in this pull request?
forgot to call `update()` with `graph1` & `rdd1` in examples for `PeriodicGraphCheckpointer` & `PeriodicRDDCheckpoin`
## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #19198 from zhengruifeng/fix_doc_checkpointer.
2017-09-14 14:04:43 +08:00
Ming Jiang 8d8641f122 [SPARK-21854] Added LogisticRegressionTrainingSummary for MultinomialLogisticRegression in Python API
## What changes were proposed in this pull request?

Added LogisticRegressionTrainingSummary for MultinomialLogisticRegression in Python API

## How was this patch tested?

Added unit test

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Ming Jiang <mjiang@fanatics.com>
Author: Ming Jiang <jmwdpk@gmail.com>
Author: jmwdpk <jmwdpk@gmail.com>

Closes #19185 from jmwdpk/SPARK-21854.
2017-09-14 13:53:28 +08:00
Dilip Biswal dcbb229433 [MINOR][SQL] Only populate type metadata for required types such as CHAR/VARCHAR.
## What changes were proposed in this pull request?
When reading column descriptions from hive catalog, we currently populate the metadata for all types to record the raw hive type string. In terms of processing , we need this additional metadata information for CHAR/VARCHAR types or complex type containing the CHAR/VARCHAR types.

Its a minor cleanup. I haven't created a JIRA for it.

## How was this patch tested?
Test added in HiveMetastoreCatalogSuite

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #19215 from dilipbiswal/column_metadata.
2017-09-13 22:45:44 -07:00
Takeshi Yamamuro 8be7e6bb3c [SPARK-21973][SQL] Add an new option to filter queries in TPC-DS
## What changes were proposed in this pull request?
This pr added a new option to filter TPC-DS queries to run in `TPCDSQueryBenchmark`.
By default, `TPCDSQueryBenchmark` runs all the TPC-DS queries.
This change could enable developers to run some of the TPC-DS queries by this option,
e.g., to run q2, q4, and q6 only:
```
spark-submit --class <this class> --conf spark.sql.tpcds.queryFilter="q2,q4,q6" --jars <spark sql test jar>
```

## How was this patch tested?
Manually checked.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19188 from maropu/RunPartialQueriesInTPCDS.
2017-09-13 21:54:10 -07:00
Yuming Wang 17edfec59d [SPARK-20427][SQL] Read JDBC table use custom schema
## What changes were proposed in this pull request?

Auto generated Oracle schema some times not we expect:

- `number(1)` auto mapped to BooleanType, some times it's not we expect, per [SPARK-20921](https://issues.apache.org/jira/browse/SPARK-20921).
-  `number` auto mapped to Decimal(38,10), It can't read big data, per [SPARK-20427](https://issues.apache.org/jira/browse/SPARK-20427).

This PR fix this issue by custom schema as follows:
```scala
val props = new Properties()
props.put("customSchema", "ID decimal(38, 0), N1 int, N2 boolean")
val dfRead = spark.read.schema(schema).jdbc(jdbcUrl, "tableWithCustomSchema", props)
dfRead.show()
```
or
```sql
CREATE TEMPORARY VIEW tableWithCustomSchema
USING org.apache.spark.sql.jdbc
OPTIONS (url '$jdbcUrl', dbTable 'tableWithCustomSchema', customSchema'ID decimal(38, 0), N1 int, N2 boolean')
```

## How was this patch tested?

unit tests

Author: Yuming Wang <wgyumg@gmail.com>

Closes #18266 from wangyum/SPARK-20427.
2017-09-13 16:34:17 -07:00
Jane Wang 8c7e19a37d [SPARK-4131] Merge HiveTmpFile.scala to SaveAsHiveFile.scala
## What changes were proposed in this pull request?

The code is already merged to master:
https://github.com/apache/spark/pull/18975

This is a following up PR to merge HiveTmpFile.scala to SaveAsHiveFile.

## How was this patch tested?

Build successfully

Author: Jane Wang <janewang@fb.com>

Closes #19221 from janewangfb/merge_savehivefile_hivetmpfile.
2017-09-13 15:12:36 -07:00
donnyzone 21c4450fb2 [SPARK-21980][SQL] References in grouping functions should be indexed with semanticEquals
## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-21980

This PR fixes the issue in ResolveGroupingAnalytics rule, which indexes the column references in grouping functions without considering case sensitive configurations.

The problem can be reproduced by:

`val df = spark.createDataFrame(Seq((1, 1), (2, 1), (2, 2))).toDF("a", "b")
 df.cube("a").agg(grouping("A")).show()`

## How was this patch tested?
unit tests

Author: donnyzone <wellfengzhu@gmail.com>

Closes #19202 from DonnyZone/ResolveGroupingAnalytics.
2017-09-13 10:06:53 -07:00
Armin b6ef1f57bc [SPARK-21970][CORE] Fix Redundant Throws Declarations in Java Codebase
## What changes were proposed in this pull request?

1. Removing all redundant throws declarations from Java codebase.
2. Removing dead code made visible by this from `ShuffleExternalSorter#closeAndGetSpills`

## How was this patch tested?

Build still passes.

Author: Armin <me@obrown.io>

Closes #19182 from original-brownbear/SPARK-21970.
2017-09-13 14:04:26 +01:00
Zheng RuiFeng 0fa5b7cacc [SPARK-21690][ML] one-pass imputer
## What changes were proposed in this pull request?
parallelize the computation of all columns

performance tests:

|numColums| Mean(Old) | Median(Old) | Mean(RDD) | Median(RDD) | Mean(DF) | Median(DF) |
|------|----------|------------|----------|------------|----------|------------|
|1|0.0771394713|0.0658712813|0.080779802|0.048165981499999996|0.10525509870000001|0.0499620203|
|10|0.7234340630999999|0.5954440414|0.0867935197|0.13263428659999998|0.09255724889999999|0.1573943635|
|100|7.3756451568|6.2196631259|0.1911931552|0.8625376817000001|0.5557462431|1.7216837982000002|

## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #18902 from zhengruifeng/parallelize_imputer.
2017-09-13 20:12:21 +08:00
caoxuewen ca00cc70d6 [SPARK-21963][CORE][TEST] Create temp file should be delete after use
## What changes were proposed in this pull request?

After you create a temporary table, you need to delete it, otherwise it will leave a file similar to the file name ‘SPARK194465907929586320484966temp’.

## How was this patch tested?

N / A

Author: caoxuewen <cao.xuewen@zte.com.cn>

Closes #19174 from heary-cao/DeleteTempFile.
2017-09-13 13:01:30 +01:00
Sean Owen 4fbf748bf8 [SPARK-21893][BUILD][STREAMING][WIP] Put Kafka 0.8 behind a profile
## What changes were proposed in this pull request?

Put Kafka 0.8 support behind a kafka-0-8 profile.

## How was this patch tested?

Existing tests, but, until PR builder and Jenkins configs are updated the effect here is to not build or test Kafka 0.8 support at all.

Author: Sean Owen <sowen@cloudera.com>

Closes #19134 from srowen/SPARK-21893.
2017-09-13 10:10:40 +01:00
German Schiavon a1d98c6dcd [SPARK-21982] Set locale to US
## What changes were proposed in this pull request?

In UtilsSuite Locale was set by default to US, but at the moment of using format function it wasn't, taking by default JVM locale which could be different than US making this test fail.

## How was this patch tested?
Unit test (UtilsSuite)

Author: German Schiavon <germanschiavon@gmail.com>

Closes #19205 from Gschiavon/fix/test-locale.
2017-09-13 09:52:45 +01:00