Commit graph

19019 commits

Author SHA1 Message Date
Cheng Lian 7730426cb9 [SPARK-19409][SPARK-17213] Cleanup Parquet workarounds/hacks due to bugs of old Parquet versions
## What changes were proposed in this pull request?

We've already upgraded parquet-mr to 1.8.2. This PR does some further cleanup by removing a workaround of PARQUET-686 and a hack due to PARQUET-363 and PARQUET-278. All three Parquet issues are fixed in parquet-mr 1.8.2.

## How was this patch tested?

Existing unit tests.

Author: Cheng Lian <lian@databricks.com>

Closes #16791 from liancheng/parquet-1.8.2-cleanup.
2017-02-06 09:10:55 +01:00
gatorsmile 65b10ffb38 [SPARK-19279][SQL] Infer Schema for Hive Serde Tables and Block Creating a Hive Table With an Empty Schema
### What changes were proposed in this pull request?
So far, we allow users to create a table with an empty schema: `CREATE TABLE tab1`. This could break many code paths if we enable it. Thus, we should follow Hive to block it.

For Hive serde tables, some serde libraries require the specified schema and record it in the metastore. To get the list, we need to check `hive.serdes.using.metastore.for.schema,` which contains a list of serdes that require user-specified schema. The default values are

- org.apache.hadoop.hive.ql.io.orc.OrcSerde
- org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe
- org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe
- org.apache.hadoop.hive.serde2.dynamic_type.DynamicSerDe
- org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe
- org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe
- org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
- org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

### How was this patch tested?
Added test cases for both Hive and data source tables

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16636 from gatorsmile/fixEmptyTableSchema.
2017-02-06 13:30:07 +08:00
Zheng RuiFeng 317fa75081 [SPARK-19421][ML][PYSPARK] Remove numClasses and numFeatures methods in LinearSVC
## What changes were proposed in this pull request?
Methods `numClasses` and `numFeatures` in LinearSVCModel are already usable by inheriting `JavaClassificationModel`
we should not explicitly add them.

## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #16727 from zhengruifeng/nits_in_linearSVC.
2017-02-05 19:06:51 -08:00
Asher Krim b3e89802ae [SPARK-19247][ML] Save large word2vec models
## What changes were proposed in this pull request?

* save word2vec models as distributed files rather than as one large datum. Backwards compatibility with the previous save format is maintained by checking for the "wordIndex" column
* migrate the fix for loading large models (SPARK-11994) to ml word2vec

## How was this patch tested?

Tested loading the new and old formats locally

srowen yanboliang MLnick

Author: Asher Krim <akrim@hubspot.com>

Closes #16607 from Krimit/saveLargeModels.
2017-02-05 16:14:07 -08:00
actuaryzhang b94f4b6fa6 [SPARK-19452][SPARKR] Fix bug in the name assignment method
## What changes were proposed in this pull request?
The names method fails to check for validity of the assignment values. This can be fixed by calling colnames within names.

## How was this patch tested?
new tests.

Author: actuaryzhang <actuaryzhang10@gmail.com>

Closes #16794 from actuaryzhang/sparkRNames.
2017-02-05 11:37:45 -08:00
Liang-Chi Hsieh 0674e7eb85 [SPARK-19425][SQL] Make ExtractEquiJoinKeys support UDT columns
## What changes were proposed in this pull request?

DataFrame.except doesn't work for UDT columns. It is because `ExtractEquiJoinKeys` will run `Literal.default` against UDT. However, we don't handle UDT in `Literal.default` and an exception will throw like:

    java.lang.RuntimeException: no default for type
    org.apache.spark.ml.linalg.VectorUDT3bfc3ba7
      at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
      at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:117)
      at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:110)

More simple fix is just let `Literal.default` handle UDT by its sql type. So we can use more efficient join type on UDT.

Besides `except`, this also fixes other similar scenarios, so in summary this fixes:

* `except` on two Datasets with UDT
* `intersect` on two Datasets with UDT
* `Join` with the join conditions using `<=>` on UDT columns

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16765 from viirya/df-except-for-udt.
2017-02-04 15:57:56 -08:00
hyukjinkwon 2f3c20bbdd [SPARK-19446][SQL] Remove unused findTightestCommonType in TypeCoercion
## What changes were proposed in this pull request?

This PR proposes to

- remove unused `findTightestCommonType` in `TypeCoercion` as suggested in https://github.com/apache/spark/pull/16777#discussion_r99283834
- rename `findTightestCommonTypeOfTwo ` to `findTightestCommonType`.
- fix comments accordingly

The usage was removed while refactoring/fixing in several JIRAs such as SPARK-16714, SPARK-16735 and SPARK-16646

## How was this patch tested?

Existing tests.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16786 from HyukjinKwon/SPARK-19446.
2017-02-03 22:10:17 -08:00
Reynold Xin 22d4aae8be [SPARK-10063] Follow-up: remove dead code related to an old output committer.
## What changes were proposed in this pull request?
DirectParquetOutputCommitter was removed from Spark as it was deemed unsafe to use. We however still have some code to generate warning. This patch removes those code as well.

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #16796 from rxin/remove-direct.
2017-02-03 21:12:20 -08:00
actuaryzhang 050c20cc90 [SPARK-19386][SPARKR][FOLLOWUP] fix error in vignettes
## What changes were proposed in this pull request?

Current version has error in vignettes:
```
model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
summary(kmeansModel)
```

`kmeansModel` does not exist...

felixcheung wangmiao1981

Author: actuaryzhang <actuaryzhang10@gmail.com>

Closes #16799 from actuaryzhang/sparkRVignettes.
2017-02-03 18:02:10 -08:00
krishnakalyan3 48aafeda7d [SPARK-19386][SPARKR][DOC] Bisecting k-means in SparkR documentation
## What changes were proposed in this pull request?
Update programming guide, example and vignette with Bisecting k-means.

Author: krishnakalyan3 <krishnakalyan3@gmail.com>

Closes #16767 from krishnakalyan3/bisecting-kmeans.
2017-02-03 12:19:47 -08:00
Liang-Chi Hsieh 2f523fa0c9 [SPARK-19244][CORE] Sort MemoryConsumers according to their memory usage when spilling
## What changes were proposed in this pull request?

In `TaskMemoryManager `, when we acquire memory by calling `acquireExecutionMemory` and we can't acquire required memory, we will try to spill other memory consumers.

Currently, we simply iterates the memory consumers in a hash set. Normally each time the consumer will be iterated in the same order.

The first issue is that we might spill additional consumers. For example, if consumer 1 uses 10MB, consumer 2 uses 50MB, then consumer 3 acquires 100MB but we can only get 60MB and spilling is needed. We might spill both consumer 1 and consumer 2. But we actually just need to spill consumer 2 and get the required 100MB.

The second issue is that if we spill consumer 1 in first time spilling. After a while, consumer 1 now uses 5MB. Then consumer 4 may acquire some memory and spilling is needed again. Because we iterate the memory consumers in the same order, we will spill consumer 1 again. So for consumer 1, we will produce many small spilling files.

This patch modifies the way iterating the memory consumers. It sorts the memory consumers by their memory usage. So the consumer using more memory will spill first. Once it is spilled, even it acquires few memory again, in next time spilling happens it will not be the consumers to spill again if there are other consumers using more memory than it.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16603 from viirya/sort-memoryconsumer-when-spill.
2017-02-03 06:14:10 -08:00
Dongjoon Hyun 52d4f61941 [SPARK-18909][SQL] The error messages in ExpressionEncoder.toRow/fromRow are too verbose
## What changes were proposed in this pull request?

In `ExpressionEncoder.toRow` and `fromRow`, we catch the exception and output `treeString` of serializer/deserializer expressions in the error message. However, encoder can be very complex and the serializer/deserializer expressions can be very large trees and blow up the log files(e.g. generate over 500mb logs for this single error message.) As a first attempt, this PR try to use `simpleString` instead.

**BEFORE**

```scala
scala> :paste
// Entering paste mode (ctrl-D to finish)

case class TestCaseClass(value: Int)
import spark.implicits._
Seq(TestCaseClass(1)).toDS().collect()

// Exiting paste mode, now interpreting.

java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
newInstance(class TestCaseClass)
+- assertnotnull(input[0, int, false], - field (class: "scala.Int", name: "value"), - root class: "TestCaseClass")
   +- input[0, int, false]

  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:303)
...
```

**AFTER**

```scala
...
// Exiting paste mode, now interpreting.

java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
newInstance(class TestCaseClass)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:303)
...
```

## How was this patch tested?

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #16701 from dongjoon-hyun/SPARK-18909-EXPR-ERROR.
2017-02-03 20:26:53 +08:00
Sean Owen 20b4ca1402
[BUILD] Close stale PRs
Closes #15736
Closes #16309
Closes #16485
Closes #16502
Closes #16196
Closes #16498
Closes #12380
Closes #16764

Closes #14394
Closes #14204
Closes #14027
Closes #13690
Closes #16279

Author: Sean Owen <sowen@cloudera.com>

Closes #16778 from srowen/CloseStalePRs.
2017-02-03 11:23:44 +00:00
Liang-Chi Hsieh bf493686eb [SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged Parquet schema for filter predicate pushdown
## What changes were proposed in this pull request?

There is a metadata introduced before to mark the optional columns in merged Parquet schema for filter predicate pushdown. As we upgrade to Parquet 1.8.2 which includes the fix for the pushdown of optional columns, we don't need this metadata now.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #16756 from viirya/remove-optional-metadata.
2017-02-03 11:58:42 +01:00
jinxing c86a57f4d1 [SPARK-19437] Rectify spark executor id in HeartbeatReceiverSuite.
## What changes were proposed in this pull request?

The current code in `HeartbeatReceiverSuite`, executorId is set as below:
```
  private val executorId1 = "executor-1"
  private val executorId2 = "executor-2"
```

The executorId is sent to driver when register as below:

```
test("expire dead hosts should kill executors with replacement (SPARK-8119)")  {
  ...
  fakeSchedulerBackend.driverEndpoint.askSync[Boolean](
      RegisterExecutor(executorId1, dummyExecutorEndpointRef1, "1.2.3.4", 0, Map.empty))
  ...
}
```

Receiving `RegisterExecutor` in `CoarseGrainedSchedulerBackend`, the executorId will be compared with `currentExecutorIdCounter` as below:
```
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls)  =>
  if (executorDataMap.contains(executorId)) {
    executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
    context.reply(true)
  } else {
  ...
  executorDataMap.put(executorId, data)
  if (currentExecutorIdCounter < executorId.toInt) {
    currentExecutorIdCounter = executorId.toInt
  }
  ...
```

`executorId.toInt` will cause NumberformatException.

This unit test can pass currently because of `askWithRetry`, when catching exception, RPC will call again, thus it will go `if` branch and return true.

**To fix**
Rectify executorId and replace `askWithRetry` with `askSync`, refer to https://github.com/apache/spark/pull/16690
## How was this patch tested?
This fix is for unit test and no need to add another one.(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

Author: jinxing <jinxing@meituan.com>

Closes #16779 from jinxing64/SPARK-19437.
2017-02-02 23:18:16 -08:00
Joseph K. Bradley 1d5d2a9d09 [SPARK-19389][ML][PYTHON][DOC] Minor doc fixes for ML Python Params and LinearSVC
## What changes were proposed in this pull request?

* Removed Since tags in Python Params since they are inherited by other classes
* Fixed doc links for LinearSVC

## How was this patch tested?

* doc tests
* generating docs locally and checking manually

Author: Joseph K. Bradley <joseph@databricks.com>

Closes #16723 from jkbradley/pyparam-fix-doc.
2017-02-02 11:58:46 -08:00
Shixiong Zhu 8303e20c45 [SPARK-19432][CORE] Fix an unexpected failure when connecting timeout
## What changes were proposed in this pull request?

When connecting timeout, `ask` may fail with a confusing message:

```
17/02/01 23:15:19 INFO Worker: Connecting to master ...
java.lang.IllegalArgumentException: requirement failed: TransportClient has not yet been set.
        at scala.Predef$.require(Predef.scala:224)
        at org.apache.spark.rpc.netty.RpcOutboxMessage.onTimeout(Outbox.scala:70)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:232)
        at org.apache.spark.rpc.netty.NettyRpcEnv$$anonfun$ask$1.applyOrElse(NettyRpcEnv.scala:231)
        at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:138)
        at scala.concurrent.Future$$anonfun$onFailure$1.apply(Future.scala:136)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
```

It's better to provide a meaningful message.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16773 from zsxwing/connect-timeout.
2017-02-01 21:39:21 -08:00
Zheng RuiFeng b0985764f0 [SPARK-14352][SQL] approxQuantile should support multi columns
## What changes were proposed in this pull request?

1, add the multi-cols support based on current private api
2, add the multi-cols support to pyspark
## How was this patch tested?

unit tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>
Author: Ruifeng Zheng <ruifengz@foxmail.com>

Closes #12135 from zhengruifeng/quantile4multicols.
2017-02-01 14:11:28 -08:00
jinxing c5fcb7f68b [SPARK-19347] ReceiverSupervisorImpl can add block to ReceiverTracker multiple times because of askWithRetry.
## What changes were proposed in this pull request?

`ReceiverSupervisorImpl` on executor side reports block's meta back to `ReceiverTracker` on driver side. In current code, `askWithRetry` is used. However, for `AddBlock`, `ReceiverTracker` is not idempotent, which may result in messages are processed multiple times.

**To reproduce**:

1. Check if it is the first time receiving `AddBlock` in `ReceiverTracker`, if so sleep long enough(say 200 seconds), thus the first RPC call will be timeout in `askWithRetry`, then `AddBlock` will be resent.
2. Rebuild Spark and run following job:
```
  def streamProcessing(): Unit = {
    val conf = new SparkConf()
      .setAppName("StreamingTest")
      .setMaster(masterUrl)
    val ssc = new StreamingContext(conf, Seconds(200))
    val stream = ssc.socketTextStream("localhost", 1234)
    stream.print()
    ssc.start()
    ssc.awaitTermination()
  }
```
**To fix**:

It makes sense to provide a blocking version `ask` in RpcEndpointRef, as mentioned in SPARK-18113 (https://github.com/apache/spark/pull/16503#event-927953218). Because Netty RPC layer will not drop messages. `askWithRetry` is a leftover from akka days. It imposes restrictions on the caller(e.g. idempotency) and other things that people generally don't pay that much attention to when using it.

## How was this patch tested?
Test manually. The scenario described above doesn't happen with this patch.

Author: jinxing <jinxing@meituan.com>

Closes #16690 from jinxing64/SPARK-19347.
2017-02-01 13:54:37 -08:00
Devaraj K df4a27cc5c [SPARK-19377][WEBUI][CORE] Killed tasks should have the status as KILLED
## What changes were proposed in this pull request?

Copying of the killed status was missing while getting the newTaskInfo object by dropping the unnecessary details to reduce the memory usage. This patch adds the copying of the killed status to newTaskInfo object, this will correct the display of the status from wrong status to KILLED status in Web UI.

## How was this patch tested?

Current behaviour of displaying tasks in stage UI page,

| Index | ID | Attempt | Status | Locality Level | Executor ID / Host | Launch Time | Duration | GC Time | Input Size / Records | Write Time | Shuffle Write Size / Records | Errors |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
|143	|10	|0	|SUCCESS	|NODE_LOCAL	|6 / x.xx.x.x stdout stderr|2017/01/25 07:49:27	|0 ms |		|0.0 B / 0		| |0.0 B / 0	|TaskKilled (killed intentionally)|
|156	|11	|0	|SUCCESS	|NODE_LOCAL	|5 / x.xx.x.x stdout stderr|2017/01/25 07:49:27	|0 ms |		|0.0 B / 0		| |0.0 B / 0	|TaskKilled (killed intentionally)|

Web UI display after applying the patch,

| Index | ID | Attempt | Status | Locality Level | Executor ID / Host | Launch Time | Duration | GC Time | Input Size / Records | Write Time | Shuffle Write Size / Records | Errors |
| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |
|143	|10	|0	|KILLED	|NODE_LOCAL	|6 / x.xx.x.x stdout stderr|2017/01/25 07:49:27	|0 ms |		|0.0 B / 0		|  | 0.0 B / 0	| TaskKilled (killed intentionally)|
|156	|11	|0	|KILLED	|NODE_LOCAL	|5 / x.xx.x.x stdout stderr|2017/01/25 07:49:27	|0 ms |		|0.0 B / 0		|  |0.0 B / 0	| TaskKilled (killed intentionally)|

Author: Devaraj K <devaraj@apache.org>

Closes #16725 from devaraj-kavali/SPARK-19377.
2017-02-01 12:55:11 -08:00
hyukjinkwon 5ed397baa7 [SPARK-19296][SQL] Deduplicate url and table in JdbcUtils
## What changes were proposed in this pull request?

This PR deduplicates arguments, `url` and `table` in `JdbcUtils` with `JDBCOptions`.

It avoids to use duplicated arguments, for example, as below:

from

```scala
val jdbcOptions = new JDBCOptions(url, table, map)
JdbcUtils.saveTable(ds, url, table, jdbcOptions)
```

to

```scala
val jdbcOptions = new JDBCOptions(url, table, map)
JdbcUtils.saveTable(ds, jdbcOptions)
```

## How was this patch tested?

Running unit test in `JdbcSuite`/`JDBCWriteSuite`

Building with Scala 2.10 as below:

```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16753 from HyukjinKwon/SPARK-19296.
2017-02-01 09:43:35 -08:00
Zheng RuiFeng 04ee8cf633
[SPARK-19410][DOC] Fix brokens links in ml-pipeline and ml-tuning
## What changes were proposed in this pull request?
Fix brokens links in ml-pipeline and ml-tuning
`<div data-lang="scala">`  ->   `<div data-lang="scala" markdown="1">`

## How was this patch tested?
manual tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #16754 from zhengruifeng/doc_api_fix.
2017-02-01 13:27:20 +00:00
hyukjinkwon f1a1f2607d
[SPARK-19402][DOCS] Support LaTex inline formula correctly and fix warnings in Scala/Java APIs generation
## What changes were proposed in this pull request?

This PR proposes three things as below:

- Support LaTex inline-formula, `\( ... \)` in Scala API documentation
  It seems currently,

  ```
  \( ... \)
  ```

  are rendered as they are, for example,

  <img width="345" alt="2017-01-30 10 01 13" src="https://cloud.githubusercontent.com/assets/6477701/22423960/ab37d54a-e737-11e6-9196-4f6229c0189c.png">

  It seems mistakenly more backslashes were added.

- Fix warnings Scaladoc/Javadoc generation
  This PR fixes t two types of warnings as below:

  ```
  [warn] .../spark/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala:335: Could not find any member to link for "UnsupportedOperationException".
  [warn]   /**
  [warn]   ^
  ```

  ```
  [warn] .../spark/sql/core/src/main/scala/org/apache/spark/sql/internal/VariableSubstitution.scala:24: Variable var undefined in comment for class VariableSubstitution in class VariableSubstitution
  [warn]  * `${var}`, `${system:var}` and `${env:var}`.
  [warn]      ^
  ```

- Fix Javadoc8 break
  ```
  [error] .../spark/mllib/target/java/org/apache/spark/ml/PredictionModel.java:7: error: reference not found
  [error]  *                       E.g., {link VectorUDT} for vector features.
  [error]                                       ^
  [error] .../spark/mllib/target/java/org/apache/spark/ml/PredictorParams.java:12: error: reference not found
  [error]    *                          E.g., {link VectorUDT} for vector features.
  [error]                                            ^
  [error] .../spark/mllib/target/java/org/apache/spark/ml/Predictor.java:10: error: reference not found
  [error]  *                       E.g., {link VectorUDT} for vector features.
  [error]                                       ^
  [error] .../spark/sql/hive/target/java/org/apache/spark/sql/hive/HiveAnalysis.java:5: error: reference not found
  [error]  * Note that, this rule must be run after {link PreprocessTableInsertion}.
  [error]                                                  ^
  ```

## How was this patch tested?

Manually via `sbt unidoc` and `jeykil build`.

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16741 from HyukjinKwon/warn-and-break.
2017-02-01 13:26:16 +00:00
wm624@hotmail.com 9ac05225e8 [SPARK-19319][SPARKR] SparkR Kmeans summary returns error when the cluster size doesn't equal to k
## What changes were proposed in this pull request

When Kmeans using initMode = "random" and some random seed, it is possible the actual cluster size doesn't equal to the configured `k`.

In this case, summary(model) returns error due to the number of cols of coefficient matrix doesn't equal to k.

Example:
>  col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
>   col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
>   col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
>   cols <- as.data.frame(cbind(col1, col2, col3))
>   df <- createDataFrame(cols)
>
>   model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,  initMode = "random", seed = 22222, tol = 1E-5)
>
> summary(model2)
Error in `colnames<-`(`*tmp*`, value = c("col1", "col2", "col3")) :
  length of 'dimnames' [2] not equal to array extent
In addition: Warning message:
In matrix(coefficients, ncol = k) :
  data length [9] is not a sub-multiple or multiple of the number of rows [2]

Fix: Get the actual cluster size in the summary and use it to build the coefficient matrix.
## How was this patch tested?

Add unit tests.

Author: wm624@hotmail.com <wm624@hotmail.com>

Closes #16666 from wangmiao1981/kmeans.
2017-01-31 21:16:37 -08:00
zero323 9063835803 [SPARK-19163][PYTHON][SQL] Delay _judf initialization to the __call__
## What changes were proposed in this pull request?

Defer `UserDefinedFunction._judf` initialization to the first call. This prevents unintended `SparkSession` initialization.  This allows users to define and import UDF without creating a context / session as a side effect.

[SPARK-19163](https://issues.apache.org/jira/browse/SPARK-19163)

## How was this patch tested?

Unit tests.

Author: zero323 <zero323@users.noreply.github.com>

Closes #16536 from zero323/SPARK-19163.
2017-01-31 18:03:39 -08:00
Burak Yavuz 081b7addaf [SPARK-19378][SS] Ensure continuity of stateOperator and eventTime metrics even if there is no new data in trigger
## What changes were proposed in this pull request?

In StructuredStreaming, if a new trigger was skipped because no new data arrived, we suddenly report nothing for the metrics `stateOperator`. We could however easily report the metrics from `lastExecution` to ensure continuity of metrics.

## How was this patch tested?

Regression test in `StreamingQueryStatusAndProgressSuite`

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16716 from brkyvz/state-agg.
2017-01-31 16:52:53 -08:00
Bryan Cutler 57d70d26c8 [SPARK-17161][PYSPARK][ML] Add PySpark-ML JavaWrapper convenience function to create Py4J JavaArrays
## What changes were proposed in this pull request?

Adding convenience function to Python `JavaWrapper` so that it is easy to create a Py4J JavaArray that is compatible with current class constructors that have a Scala `Array` as input so that it is not necessary to have a Java/Python friendly constructor.  The function takes a Java class as input that is used by Py4J to create the Java array of the given class.  As an example, `OneVsRest` has been updated to use this and the alternate constructor is removed.

## How was this patch tested?

Added unit tests for the new convenience function and updated `OneVsRest` doctests which use this to persist the model.

Author: Bryan Cutler <cutlerb@gmail.com>

Closes #14725 from BryanCutler/pyspark-new_java_array-CountVectorizer-SPARK-17161.
2017-01-31 15:42:36 -08:00
actuaryzhang ce112cec4f [SPARK-19395][SPARKR] Convert coefficients in summary to matrix
## What changes were proposed in this pull request?
The `coefficients` component in model summary should be 'matrix' but the underlying structure is indeed list. This affects several models except for 'AFTSurvivalRegressionModel' which has the correct implementation. The fix is to first `unlist` the coefficients returned from the `callJMethod` before converting to matrix. An example illustrates the issues:

```
data(iris)
df <- createDataFrame(iris)
model <- spark.glm(df, Sepal_Length ~ Sepal_Width, family = "gaussian")
s <- summary(model)

> str(s$coefficients)
List of 8
 $ : num 6.53
 $ : num -0.223
 $ : num 0.479
 $ : num 0.155
 $ : num 13.6
 $ : num -1.44
 $ : num 0
 $ : num 0.152
 - attr(*, "dim")= int [1:2] 2 4
 - attr(*, "dimnames")=List of 2
  ..$ : chr [1:2] "(Intercept)" "Sepal_Width"
  ..$ : chr [1:4] "Estimate" "Std. Error" "t value" "Pr(>|t|)"
> s$coefficients[, 2]
$`(Intercept)`
[1] 0.4788963

$Sepal_Width
[1] 0.1550809
```

This  shows that the underlying structure of coefficients is still `list`.

felixcheung wangmiao1981

Author: actuaryzhang <actuaryzhang10@gmail.com>

Closes #16730 from actuaryzhang/sparkRCoef.
2017-01-31 12:20:43 -08:00
Dongjoon Hyun 26a4cba3ff [SPARK-19409][BUILD] Bump parquet version to 1.8.2
## What changes were proposed in this pull request?

According to the discussion on #16281 which tried to upgrade toward Apache Parquet 1.9.0, Apache Spark community prefer to upgrade to 1.8.2 instead of 1.9.0. Now, Apache Parquet 1.8.2 is released officially last week on 26 Jan. We can use 1.8.2 now.

https://lists.apache.org/thread.html/af0c813f1419899289a336d96ec02b3bbeecaea23aa6ef69f435c142%3Cdev.parquet.apache.org%3E

This PR only aims to bump Parquet version to 1.8.2. It didn't touch any other codes.

## How was this patch tested?

Pass the existing tests and also manually by doing `./dev/test-dependencies.sh`.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #16751 from dongjoon-hyun/SPARK-19409.
2017-01-31 11:43:52 +01:00
Felix Cheung be7425e26a [SPARKR][DOCS] update R API doc for subset/extract
## What changes were proposed in this pull request?

With extract `[[` or replace `[[<-`, the parameter `i` is a column index, that needs to be corrected in doc. Also a few minor updates: examples, links.

## How was this patch tested?

manual

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16721 from felixcheung/rsubsetdoc.
2017-01-30 18:47:14 -08:00
gatorsmile f9156d2956 [SPARK-19406][SQL] Fix function to_json to respect user-provided options
### What changes were proposed in this pull request?
Currently, the function `to_json` allows users to provide options for generating JSON. However, it does not pass it to `JacksonGenerator`. Thus, it ignores the user-provided options. This PR is to fix it. Below is an example.

```Scala
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
df.select(to_json($"a", options)).show(false)
```
The current output is like
```
+--------------------------------------+
|structtojson(a)                       |
+--------------------------------------+
|{"_1":"2015-08-26T18:00:00.000-07:00"}|
+--------------------------------------+
```

After the fix, the output is like
```
+-------------------------+
|structtojson(a)          |
+-------------------------+
|{"_1":"26/08/2015 18:00"}|
+-------------------------+
```
### How was this patch tested?
Added test cases for both `from_json` and `to_json`

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16745 from gatorsmile/toJson.
2017-01-30 18:38:14 -08:00
gatorsmile c0eda7e87f [SPARK-19396][DOC] JDBC Options are Case In-sensitive
### What changes were proposed in this pull request?
The case are not sensitive in JDBC options, after the PR https://github.com/apache/spark/pull/15884 is merged to Spark 2.1.

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16734 from gatorsmile/fixDocCaseInsensitive.
2017-01-30 14:05:53 -08:00
zero323 06fbc35549 [SPARK-19403][PYTHON][SQL] Correct pyspark.sql.column.__all__ list.
## What changes were proposed in this pull request?

This removes from the `__all__` list class names that are not defined (visible) in the `pyspark.sql.column`.

## How was this patch tested?

Existing unit tests.

Author: zero323 <zero323@users.noreply.github.com>

Closes #16742 from zero323/SPARK-19403.
2017-01-30 18:01:02 +01:00
Liwei Lin ade075aed4 [SPARK-19385][SQL] During canonicalization, NOT(...(l, r)) should not expect such cases that l.hashcode > r.hashcode
## What changes were proposed in this pull request?

During canonicalization, `NOT(...(l, r))` should not expect such cases that `l.hashcode > r.hashcode`.

Take the rule `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` for example, it should never be matched since `GreaterThan(l, r)` itself would be re-written as `GreaterThan(r, l)` given `l.hashcode > r.hashcode` after canonicalization.

This patch consolidates rules like `case NOT(GreaterThan(l, r)) if l.hashcode > r.hashcode` and `case NOT(GreaterThan(l, r))`.

## How was this patch tested?

This patch expanded the `NOT` test case to cover both cases where:
- `l.hashcode > r.hashcode`
- `l.hashcode < r.hashcode`

Author: Liwei Lin <lwlin7@gmail.com>

Closes #16719 from lw-lin/canonicalize.
2017-01-29 13:00:50 -08:00
Dilip Biswal e2e7b12ce8 [SPARK-18872][SQL][TESTS] New test cases for EXISTS subquery
## What changes were proposed in this pull request?
This PR adds the first set of tests for EXISTS subquery.

File name                        | Brief description
------------------------| -----------------
exists-basic.sql              |Tests EXISTS and NOT EXISTS subqueries with both correlated and local predicates.
exists-within-and-or.sql|Tests EXISTS and NOT EXISTS subqueries embedded in AND or OR expression.

DB2 results are attached here as reference :

[exists-basic-db2.txt](https://github.com/apache/spark/files/733031/exists-basic-db2.txt)
[exists-and-or-db2.txt](https://github.com/apache/spark/files/733030/exists-and-or-db2.txt)

## How was this patch tested?
This patch is adding tests.

Author: Dilip Biswal <dbiswal@us.ibm.com>

Closes #16710 from dilipbiswal/exist-basic.
2017-01-29 12:51:59 -08:00
Wenchen Fan f7c07db852 [SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand
## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch.

This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16693 from cloud-fan/minor.
2017-01-28 20:38:03 -08:00
gatorsmile cfcfc92f7b [SPARK-19359][SQL] Revert Clear useless path after rename a partition with upper-case by HiveExternalCatalog
### What changes were proposed in this pull request?

This PR is to revert the changes made in https://github.com/apache/spark/pull/16700. It could cause the data loss after partition rename, because we have a bug in the file renaming.

Not all the OSs have the same behaviors. For example, on mac OS, if we renaming a path from `.../tbl/a=5/b=6` to `.../tbl/A=5/B=6`. The result is `.../tbl/a=5/B=6`. The expected result is `.../tbl/A=5/B=6`. Thus, renaming on mac OS is not recursive. However, the systems used in Jenkin does not have such an issue. Although this PR is not the root cause, it exposes an existing issue on the code `tablePath.getFileSystem(hadoopConf).rename(wrongPath, rightPath)`

---

Hive metastore is not case preserving and keep partition columns with lower case names.

If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.

while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16728 from gatorsmile/revert-pr-16700.
2017-01-28 13:32:30 -08:00
Zheng RuiFeng 42ad93b2c9
[SPARK-19384][ML] forget unpersist input dataset in IsotonicRegression
## What changes were proposed in this pull request?
unpersist the input dataset if `handlePersistence` = true

## How was this patch tested?
existing tests

Author: Zheng RuiFeng <ruifengz@foxmail.com>

Closes #16718 from zhengruifeng/isoReg_unpersisit.
2017-01-28 10:18:47 +00:00
windpiger 1b5ee2003c [SPARK-19359][SQL] clear useless path after rename a partition with upper-case by HiveExternalCatalog
## What changes were proposed in this pull request?

Hive metastore is not case preserving and keep partition columns with lower case names.

If SparkSQL create a table with upper-case partion name use HiveExternalCatalog, when we rename partition, it first call the HiveClient to renamePartition, which will create a new lower case partition path, then SparkSql rename the lower case path to the upper-case.

while if the renamed partition contains more than one depth partition ,e.g. A=1/B=2, hive renamePartition change to a=1/b=2, then SparkSql rename it to A=1/B=2, but the a=1 still exists in the filesystem, we should also delete it.

## How was this patch tested?
unit test added

Author: windpiger <songjun@outlook.com>

Closes #16700 from windpiger/clearUselessPathAfterRenamPartition.
2017-01-27 17:17:17 -08:00
wm624@hotmail.com bb1a1fe05e [SPARK-19336][ML][PYSPARK] LinearSVC Python API
## What changes were proposed in this pull request?

Add Python API for the newly added LinearSVC algorithm.

## How was this patch tested?

Add new doc string test.

Author: wm624@hotmail.com <wm624@hotmail.com>

Closes #16694 from wangmiao1981/ser.
2017-01-27 16:03:53 -08:00
Shixiong Zhu 21aa8c32ba [SPARK-19365][CORE] Optimize RequestMessage serialization
## What changes were proposed in this pull request?

Right now Netty PRC serializes `RequestMessage` using Java serialization, and the size of a single message (e.g., RequestMessage(..., "hello")`) is almost 1KB.

This PR optimizes it by serializing `RequestMessage` manually (eliminate unnecessary information from most messages, e.g., class names of `RequestMessage`, `NettyRpcEndpointRef`, ...), and reduces the above message size to 100+ bytes.

## How was this patch tested?

Jenkins

I did a simple test to measure the improvement:

Before
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6830
2       4353
3       3322
4       3107
5       3235
6       3139
7       3156
8       3166
9       3091
10      3029
```
After:
```
$ bin/spark-shell --master local-cluster[1,4,1024]
...
scala> for (i <- 1 to 10) {
     |   val start = System.nanoTime
     |   val s = sc.parallelize(1 to 1000000, 10 * 1000).count()
     |   val end = System.nanoTime
     |   println(s"$i\t" + ((end - start)/1000/1000))
     | }
1       6431
2       3643
3       2913
4       2679
5       2760
6       2710
7       2747
8       2793
9       2679
10      2651
```

I also captured the TCP packets for this test. Before this patch, the total size of TCP packets is ~1.5GB. After it, it reduces to ~1.2GB.

Author: Shixiong Zhu <shixiong@databricks.com>

Closes #16706 from zsxwing/rpc-opt.
2017-01-27 15:07:57 -08:00
Felix Cheung a7ab6f9a8f [SPARK-19324][SPARKR] Spark VJM stdout output is getting dropped in SparkR
## What changes were proposed in this pull request?

This affects mostly running job from the driver in client mode when results are expected to be through stdout (which should be somewhat rare, but possible)

Before:
```
> a <- as.DataFrame(cars)
> b <- group_by(a, "dist")
> c <- count(b)
> sparkR.callJMethod(c$countjc, "explain", TRUE)
NULL
```

After:
```
> a <- as.DataFrame(cars)
> b <- group_by(a, "dist")
> c <- count(b)
> sparkR.callJMethod(c$countjc, "explain", TRUE)
count#11L
NULL
```

Now, `column.explain()` doesn't seem very useful (we can get more extensive output with `DataFrame.explain()`) but there are other more complex examples with calls of `println` in Scala/JVM side, that are getting dropped.

## How was this patch tested?

manual

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16670 from felixcheung/rjvmstdout.
2017-01-27 12:41:35 -08:00
Felix Cheung 385d73848b [SPARK-19333][SPARKR] Add Apache License headers to R files
## What changes were proposed in this pull request?

add header

## How was this patch tested?

Manual run to check vignettes html is created properly

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16709 from felixcheung/rfilelicense.
2017-01-27 10:31:28 -08:00
hyukjinkwon 4e35c5a3d3
[SPARK-12970][DOCS] Fix the example in SturctType APIs for Scala and Java
## What changes were proposed in this pull request?

This PR fixes both,

javadoc8 break

```
[error] .../spark/sql/hive/target/java/org/apache/spark/sql/hive/FindHiveSerdeTable.java:3: error: reference not found
[error]  * Replaces {link SimpleCatalogRelation} with {link MetastoreRelation} if its table provider is hive.
```

and the example in `StructType` as a self-contained example as below:

```scala
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val struct =
  StructType(
    StructField("a", IntegerType, true) ::
    StructField("b", LongType, false) ::
    StructField("c", BooleanType, false) :: Nil)

// Extract a single StructField.
val singleField = struct("b")
// singleField: StructField = StructField(b,LongType,false)

// If this struct does not have a field called "d", it throws an exception.
struct("d")
// java.lang.IllegalArgumentException: Field "d" does not exist.
//   ...

// Extract multiple StructFields. Field names are provided in a set.
// A StructType object will be returned.
val twoFields = struct(Set("b", "c"))
// twoFields: StructType =
//   StructType(StructField(b,LongType,false), StructField(c,BooleanType,false))

// Any names without matching fields will throw an exception.
// For the case shown below, an exception is thrown due to "d".
struct(Set("b", "c", "d"))
// java.lang.IllegalArgumentException: Field "d" does not exist.
//    ...
```

```scala
import org.apache.spark.sql._
import org.apache.spark.sql.types._

val innerStruct =
  StructType(
    StructField("f1", IntegerType, true) ::
    StructField("f2", LongType, false) ::
    StructField("f3", BooleanType, false) :: Nil)

val struct = StructType(
  StructField("a", innerStruct, true) :: Nil)

// Create a Row with the schema defined by struct
val row = Row(Row(1, 2, true))
```

Also, now when the column is missing, it throws an exception rather than ignoring.

## How was this patch tested?

Manually via `sbt unidoc`.

- Scaladoc

  <img width="665" alt="2017-01-26 12 54 13" src="https://cloud.githubusercontent.com/assets/6477701/22297905/1245620e-e362-11e6-9e22-43bb8d9871af.png">

- Javadoc

  <img width="722" alt="2017-01-26 12 54 27" src="https://cloud.githubusercontent.com/assets/6477701/22297899/0fd87e0c-e362-11e6-9033-7590bda1aea6.png">

  <img width="702" alt="2017-01-26 12 54 32" src="https://cloud.githubusercontent.com/assets/6477701/22297900/0fe14154-e362-11e6-9882-768381c53163.png">

Author: hyukjinkwon <gurwls223@gmail.com>

Closes #16703 from HyukjinKwon/SPARK-12970.
2017-01-27 10:06:54 +00:00
actuaryzhang 4172ff80dd [SPARK-18929][ML] Add Tweedie distribution in GLM
## What changes were proposed in this pull request?
I propose to add the full Tweedie family into the GeneralizedLinearRegression model. The Tweedie family is characterized by a power variance function. Currently supported distributions such as Gaussian, Poisson and Gamma families are a special case of the Tweedie https://en.wikipedia.org/wiki/Tweedie_distribution.

yanboliang srowen sethah

Author: actuaryzhang <actuaryzhang10@gmail.com>
Author: Wayne Zhang <actuaryzhang10@gmail.com>

Closes #16344 from actuaryzhang/tweedie.
2017-01-26 23:01:13 -08:00
Felix Cheung 90817a6cd0 [SPARK-18788][SPARKR] Add API for getNumPartitions
## What changes were proposed in this pull request?

With doc to say this would convert DF into RDD

## How was this patch tested?

unit tests, manual tests

Author: Felix Cheung <felixcheung_m@hotmail.com>

Closes #16668 from felixcheung/rgetnumpartitions.
2017-01-26 21:06:39 -08:00
wm624@hotmail.com c0ba284300 [SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR
## What changes were proposed in this pull request?

Add R wrapper for bisecting Kmeans.

As JIRA is down, I will update title to link with corresponding JIRA later.

## How was this patch tested?

Add new unit tests.

Author: wm624@hotmail.com <wm624@hotmail.com>

Closes #16566 from wangmiao1981/bk.
2017-01-26 21:01:59 -08:00
WeichenXu 1191fe267d [SPARK-18218][ML][MLLIB] Reduce shuffled data size of BlockMatrix multiplication and solve potential OOM and low parallelism usage problem By split middle dimension in matrix multiplication
## What changes were proposed in this pull request?

### The problem in current block matrix mulitiplication

As in JIRA https://issues.apache.org/jira/browse/SPARK-18218 described, block matrix multiplication in spark may cause some problem, suppose we have `M*N` dimensions matrix A multiply `N*P` dimensions matrix B, when N is much larger than M and P, then the following problem may occur:
- when the middle dimension N is too large, it will cause reducer OOM.
- even if OOM do not occur, it will still cause parallism too low.
- when N is much large than M and P, and matrix A and B have many partitions, it may cause too many partition on M and P dimension, it will cause much larger shuffled data size. (I will expain this in detail in the following.)

### Key point of my improvement

In this PR, I introduce `midDimSplitNum` parameter, and improve the algorithm, to resolve this problem.

In order to understand the improvement in this PR, first let me give a simple case to explain how the current mulitiplication works and what cause the problems above:

suppose we have block matrix A, contains 200 blocks (`2 numRowBlocks * 100 numColBlocks`), blocks arranged in 2 rows, 100 cols:
```
A00 A01 A02 ... A0,99
A10 A11 A12 ... A1,99
```
and we have block matrix B, also contains 200 blocks (`100 numRowBlocks * 2 numColBlocks`), blocks arranged in 100 rows, 2 cols:
```
B00    B01
B10    B11
B20    B21
...
B99,0  B99,1
```
Suppose all blocks in the two matrices are dense for now.
Now we call A.multiply(B), suppose the generated `resultPartitioner` contains 2 rowPartitions and 2 colPartitions (can't be more partitions because the result matrix only contains `2 * 2` blocks), the current algorithm will contains two shuffle steps:

**step-1**
Step-1 will generate 4 reducer, I tag them as reducer-00, reducer-01, reducer-10, reducer-11, and shuffle data as following:
```
A00 A01 A02 ... A0,99
B00 B10 B20 ... B99,0    shuffled into reducer-00

A00 A01 A02 ... A0,99
B01 B11 B21 ... B99,1    shuffled into reducer-01

A10 A11 A12 ... A1,99
B00 B10 B20 ... B99,0    shuffled into reducer-10

A10 A11 A12 ... A1,99
B01 B11 B21 ... B99,1    shuffled into reducer-11
```

and the shuffling above is a `cogroup` transform, note that each reducer contains **only one group**.

**step-2**
Step-2 will do an `aggregateByKey` transform on the result of step-1, will also generate 4 reducers, and generate the final result RDD, contains 4 partitions, each partition contains one block.

The main problems are in step-1. Now we have only 4 reducers, but matrix A and B have 400 blocks in total, obviously the reducer number is too small.
and, we can see that, each reducer contains only one group(the group concept in `coGroup` transform), each group contains 200 blocks. This is terrible because we know that `coGroup` transformer will load each group into memory when computing. It is un-extensable in the algorithm level. Suppose matrix A has 10000 cols blocks or more instead of 100? Than each reducer will load 20000 blocks into memory. It will easily cause reducer OOM.

This PR try to resolve the problem described above.
When matrix A with dimension M * N multiply matrix B with dimension N * P, the middle dimension N is the keypoint. If N is large, the current mulitiplication implementation works badly.
In this PR, I introduce a `numMidDimSplits` parameter, represent how many splits it will cut on the middle dimension N.
Still using the example described above, now we set `numMidDimSplits = 10`, now we can generate 40 reducers in **step-1**:

the reducer-ij above now will be splited into 10 reducers: reducer-ij0, reducer-ij1, ... reducer-ij9, each reducer will receive 20 blocks.
now the shuffle works as following:

**reducer-000 to reducer-009**
```
A0,0 A0,10 A0,20 ... A0,90
B0,0 B10,0 B20,0 ... B90,0    shuffled into reducer-000

A0,1 A0,11 A0,21 ... A0,91
B1,0 B11,0 B21,0 ... B91,0    shuffled into reducer-001

A0,2 A0,12 A0,22 ... A0,92
B2,0 B12,0 B22,0 ... B92,0    shuffled into reducer-002

...

A0,9 A0,19 A0,29 ... A0,99
B9,0 B19,0 B29,0 ... B99,0    shuffled into reducer-009
```

**reducer-010 to reducer-019**
```
A0,0 A0,10 A0,20 ... A0,90
B0,1 B10,1 B20,1 ... B90,1    shuffled into reducer-010

A0,1 A0,11 A0,21 ... A0,91
B1,1 B11,1 B21,1 ... B91,1    shuffled into reducer-011

A0,2 A0,12 A0,22 ... A0,92
B2,1 B12,1 B22,1 ... B92,1    shuffled into reducer-012

...

A0,9 A0,19 A0,29 ... A0,99
B9,1 B19,1 B29,1 ... B99,1    shuffled into reducer-019
```

**reducer-100 to reducer-109** and **reducer-110 to reducer-119** is similar to the above, I omit to write them out.

### API for this optimized algorithm

I add a new API as following:
```
  def multiply(
      other: BlockMatrix,
      numMidDimSplits: Int // middle dimension split number, expained above
): BlockMatrix
```

### Shuffled data size analysis (compared under the same parallelism)

The optimization has some subtle influence on the total shuffled data size. Appropriate `numMidDimSplits` will significantly reduce the shuffled data size,
but too large `numMidDimSplits` may increase the shuffled data in reverse. For now I don't want to introduce formula to make thing too complex, I only use a simple case to represent it here:

Suppose we have two same size square matrices X and Y, both have `16 numRowBlocks * 16 numColBlocks`. X and Y are both dense matrix. Now let me analysis the shuffling data size in the following case:

**case 1: X and Y both partitioned in 16 rowPartitions and 16 colPartitions, numMidDimSplits = 1**
ShufflingDataSize = (16 * 16 * (16 + 16) + 16 * 16) blocks = 8448 blocks
parallelism = 16 * 16 * 1 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**case 2: X and Y both partitioned in 8 rowPartitions and 8 colPartitions, numMidDimSplits = 4**
ShufflingDataSize = (8 * 8 * (32 + 32) + 16 * 16 * 4) blocks = 5120 blocks
parallelism = 8 * 8 * 4 = 256 //use step-1 reducers number as the parallism because it cost most of the computation time in this algorithm.

**The two cases above all have parallism = 256**, case 1 `numMidDimSplits = 1` is equivalent with current implementation in mllib, but case 2 shuffling data is 60.6% of case 1, **it shows that under the same parallelism, proper `numMidDimSplits` will significantly reduce the shuffling data size**.

## How was this patch tested?

Test suites added.
Running result:
![blockmatrix](https://cloud.githubusercontent.com/assets/19235986/21600989/5e162cc2-d1bf-11e6-868c-0ec29190b605.png)

Author: WeichenXu <WeichenXu123@outlook.com>

Closes #15730 from WeichenXu123/optim_block_matrix.
2017-01-26 20:10:17 -08:00
Takeshi YAMAMURO 9f523d3192 [SPARK-19338][SQL] Add UDF names in explain
## What changes were proposed in this pull request?
This pr added a variable for a UDF name in `ScalaUDF`.
Then, if the variable filled, `DataFrame#explain` prints the name.

## How was this patch tested?
Added a test in `UDFSuite`.

Author: Takeshi YAMAMURO <linguin.m.s@gmail.com>

Closes #16707 from maropu/SPARK-19338.
2017-01-26 09:50:42 -08:00
Takuya UESHIN 2969fb4370 [SPARK-18936][SQL] Infrastructure for session local timezone support.
## What changes were proposed in this pull request?

As of Spark 2.1, Spark SQL assumes the machine timezone for datetime manipulation, which is bad if users are not in the same timezones as the machines, or if different users have different timezones.

We should introduce a session local timezone setting that is used for execution.

An explicit non-goal is locale handling.

### Semantics

Setting the session local timezone means that the timezone-aware expressions listed below should use the timezone to evaluate values, and also it should be used to convert (cast) between string and timestamp or between timestamp and date.

- `CurrentDate`
- `CurrentBatchTimestamp`
- `Hour`
- `Minute`
- `Second`
- `DateFormatClass`
- `ToUnixTimestamp`
- `UnixTimestamp`
- `FromUnixTime`

and below are implicitly timezone-aware through cast from timestamp to date:

- `DayOfYear`
- `Year`
- `Quarter`
- `Month`
- `DayOfMonth`
- `WeekOfYear`
- `LastDay`
- `NextDay`
- `TruncDate`

For example, if you have timestamp `"2016-01-01 00:00:00"` in `GMT`, the values evaluated by some of timezone-aware expressions are:

```scala
scala> val df = Seq(new java.sql.Timestamp(1451606400000L)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2016-01-01 00:00:00|2016                  |1                      |1                           |0       |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

whereas setting the session local timezone to `"PST"`, they are:

```scala
scala> spark.conf.set("spark.sql.session.timeZone", "PST")

scala> df.selectExpr("cast(ts as string)", "year(ts)", "month(ts)", "dayofmonth(ts)", "hour(ts)", "minute(ts)", "second(ts)").show(truncate = false)
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|ts                 |year(CAST(ts AS DATE))|month(CAST(ts AS DATE))|dayofmonth(CAST(ts AS DATE))|hour(ts)|minute(ts)|second(ts)|
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
|2015-12-31 16:00:00|2015                  |12                     |31                          |16      |0         |0         |
+-------------------+----------------------+-----------------------+----------------------------+--------+----------+----------+
```

Notice that even if you set the session local timezone, it affects only in `DataFrame` operations, neither in `Dataset` operations, `RDD` operations nor in `ScalaUDF`s. You need to properly handle timezone by yourself.

### Design of the fix

I introduced an analyzer to pass session local timezone to timezone-aware expressions and modified DateTimeUtils to take the timezone argument.

## How was this patch tested?

Existing tests and added tests for timezone aware expressions.

Author: Takuya UESHIN <ueshin@happy-camper.st>

Closes #16308 from ueshin/issues/SPARK-18350.
2017-01-26 11:51:05 +01:00