Commit graph

2601 commits

Author SHA1 Message Date
Marco Gaido b3d8fc3dc4 [SPARK-22226][SQL] splitExpression can create too many method calls in the outer class
## What changes were proposed in this pull request?

SPARK-18016 introduced `NestedClass` to avoid that the many methods generated by `splitExpressions` contribute to the outer class' constant pool, making it growing too much. Unfortunately, despite their definition is stored in the `NestedClass`, they all are invoked in the outer class and for each method invocation, there are two entries added to the constant pool: a `Methodref` and a `Utf8` entry (you can easily check this compiling a simple sample class with `janinoc` and looking at its Constant Pool). This limits the scalability of the solution with very large methods which are split in a lot of small ones. This means that currently we are generating classes like this one:

```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
  public UnsafeRow apply(InternalRow i) {
     rowWriter.zeroOutNullBytes();
     apply_0(i);
     apply_1(i);
...
    nestedClassInstance.apply_862(i);
    nestedClassInstance.apply_863(i);
...
    nestedClassInstance1.apply_1612(i);
    nestedClassInstance1.apply_1613(i);
...
  }
...
  private class NestedClass {
    private void apply_862(InternalRow i) { ... }
    private void apply_863(InternalRow i) { ... }
...
  }
  private class NestedClass1 {
    private void apply_1612(InternalRow i) { ... }
    private void apply_1613(InternalRow i) { ... }
...
  }
}
```

This PR reduce the Constant Pool size of the outer class by adding a new method to each nested class: in this method we invoke all the small methods generated by `splitExpression` in that nested class. In this way, in the outer class there is only one method invocation per nested class, reducing by orders of magnitude the entries in its constant pool because of method invocations. This means that after the patch the generated code becomes:

```
class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection {
...
  public UnsafeRow apply(InternalRow i) {
     rowWriter.zeroOutNullBytes();
     apply_0(i);
     apply_1(i);
     ...
     nestedClassInstance.apply(i);
     nestedClassInstance1.apply(i);
     ...
  }
...
  private class NestedClass {
    private void apply_862(InternalRow i) { ... }
    private void apply_863(InternalRow i) { ... }
...
    private void apply(InternalRow i) {
      apply_862(i);
      apply_863(i);
      ...
    }
  }
  private class NestedClass1 {
    private void apply_1612(InternalRow i) { ... }
    private void apply_1613(InternalRow i) { ... }
...
    private void apply(InternalRow i) {
      apply_1612(i);
      apply_1613(i);
      ...
    }
  }
}
```

## How was this patch tested?

Added UT and existing UTs

Author: Marco Gaido <mgaido@hortonworks.com>
Author: Marco Gaido <marcogaido91@gmail.com>

Closes #19480 from mgaido91/SPARK-22226.
2017-10-27 13:43:09 -07:00
gatorsmile 36b826f5d1 [TRIVIAL][SQL] Code cleaning in ResolveReferences
## What changes were proposed in this pull request?
This PR is to clean the related codes majorly based on the today's code review on  https://github.com/apache/spark/pull/19559

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19585 from gatorsmile/trivialFixes.
2017-10-27 07:52:10 -07:00
Jose Torres 8e9863531b [SPARK-22366] Support ignoring missing files
## What changes were proposed in this pull request?

Add a flag "spark.sql.files.ignoreMissingFiles" to parallel the existing flag "spark.sql.files.ignoreCorruptFiles".

## How was this patch tested?

new unit test

Author: Jose Torres <jose@databricks.com>

Closes #19581 from joseph-torres/SPARK-22366.
2017-10-26 16:55:30 -07:00
Nathan Kronenfeld 592cfeab9c [SPARK-22308] Support alternative unit testing styles in external applications
## What changes were proposed in this pull request?
Support unit tests of external code (i.e., applications that use spark) using scalatest that don't want to use FunSuite.  SharedSparkContext already supports this, but SharedSQLContext does not.

I've introduced SharedSparkSession as a parent to SharedSQLContext, written in a way that it does support all scalatest styles.

## How was this patch tested?
There are three new unit test suites added that just test using FunSpec, FlatSpec, and WordSpec.

Author: Nathan Kronenfeld <nicole.oresme@gmail.com>

Closes #19529 from nkronenfeld/alternative-style-tests-2.
2017-10-26 00:29:49 -07:00
Ruben Berenguel Montoro 427359f077 [SPARK-13947][SQL] The error message from using an invalid column reference is not clear
## What changes were proposed in this pull request?

 Rewritten error message for clarity. Added extra information in case of attribute name collision, hinting the user to double-check referencing two different tables

## How was this patch tested?

No functional changes, only final message has changed. It has been tested manually against the situation proposed in the JIRA ticket. Automated tests in repository pass.

This PR is original work from me and I license this work to the Spark project

Author: Ruben Berenguel Montoro <ruben@mostlymaths.net>
Author: Ruben Berenguel Montoro <ruben@dreamattic.com>
Author: Ruben Berenguel <ruben@mostlymaths.net>

Closes #17100 from rberenguel/SPARK-13947-error-message.
2017-10-24 23:02:11 -07:00
Marco Gaido 3f5ba968c5 [SPARK-22301][SQL] Add rule to Optimizer for In with not-nullable value and empty list
## What changes were proposed in this pull request?

For performance reason, we should resolve in operation on an empty list as false in the optimizations phase, ad discussed in #19522.

## How was this patch tested?
Added UT

cc gatorsmile

Author: Marco Gaido <marcogaido91@gmail.com>
Author: Marco Gaido <mgaido@hortonworks.com>

Closes #19523 from mgaido91/SPARK-22301.
2017-10-24 09:11:52 -07:00
Zhenhua Wang f6290aea24 [SPARK-22285][SQL] Change implementation of ApproxCountDistinctForIntervals to TypedImperativeAggregate
## What changes were proposed in this pull request?

The current implementation of `ApproxCountDistinctForIntervals` is `ImperativeAggregate`. The number of `aggBufferAttributes` is the number of total words in the hllppHelper array. Each hllppHelper has 52 words by default relativeSD.

Since this aggregate function is used in equi-height histogram generation, and the number of buckets in histogram is usually hundreds, the number of `aggBufferAttributes` can easily reach tens of thousands or even more.

This leads to a huge method in codegen and causes error:
```
org.codehaus.janino.JaninoRuntimeException: Code of method "apply(Lorg/apache/spark/sql/catalyst/InternalRow;)Lorg/apache/spark/sql/catalyst/expressions/UnsafeRow;" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB.
```
Besides, huge generated methods also result in performance regression.

In this PR, we change its implementation to `TypedImperativeAggregate`. After the fix, `ApproxCountDistinctForIntervals` can deal with more than thousands endpoints without throwing codegen error, and improve performance from `20 sec` to `2 sec` in a test case of 500 endpoints.

## How was this patch tested?

Test by an added test case and existing tests.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19506 from wzhfy/change_forIntervals_typedAgg.
2017-10-23 23:02:36 +01:00
gatorsmile d8cada8d1d [SPARK-20331][SQL][FOLLOW-UP] Add a SQLConf for enhanced Hive partition pruning predicate pushdown
## What changes were proposed in this pull request?
This is a follow-up PR of https://github.com/apache/spark/pull/17633.

This PR is to add a conf `spark.sql.hive.advancedPartitionPredicatePushdown.enabled`, which can be used to turn the enhancement off.

## How was this patch tested?
Add a test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19547 from gatorsmile/Spark20331FollowUp.
2017-10-21 10:05:45 -07:00
Zhenhua Wang d9f286d261 [SPARK-22326][SQL] Remove unnecessary hashCode and equals methods
## What changes were proposed in this pull request?

Plan equality should be computed by `canonicalized`, so we can remove unnecessary `hashCode` and `equals` methods.

## How was this patch tested?

Existing tests.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19539 from wzhfy/remove_equals.
2017-10-20 20:58:55 -07:00
Takuya UESHIN b8624b06e5 [SPARK-20396][SQL][PYSPARK][FOLLOW-UP] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.

## How was this patch tested?

Exisiting tests.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #19517 from ueshin/issues/SPARK-20396/fup2.
2017-10-20 12:44:30 -07:00
CenYuhai 16c9cc68c5 [SPARK-21055][SQL] replace grouping__id with grouping_id()
## What changes were proposed in this pull request?
spark does not support grouping__id, it has grouping_id() instead.
But it is not convenient for hive user to change to spark-sql
so this pr is to replace grouping__id with grouping_id()
hive user need not to alter their scripts

## How was this patch tested?

test with SQLQuerySuite.scala

Author: CenYuhai <yuhai.cen@ele.me>

Closes #18270 from cenyuhai/SPARK-21055.
2017-10-20 09:27:39 -07:00
maryannxue 72561ecf4b [SPARK-22266][SQL] The same aggregate function was evaluated multiple times
## What changes were proposed in this pull request?

To let the same aggregate function that appear multiple times in an Aggregate be evaluated only once, we need to deduplicate the aggregate expressions. The original code was trying to use a "distinct" call to get a set of aggregate expressions, but did not work, since the "distinct" did not compare semantic equality. And even if it did, further work should be done in result expression rewriting.
In this PR, I changed the "set" to a map mapping the semantic identity of a aggregate expression to itself. Thus, later on, when rewriting result expressions (i.e., output expressions), the aggregate expression reference can be fixed.

## How was this patch tested?

Added a new test in SQLQuerySuite

Author: maryannxue <maryann.xue@gmail.com>

Closes #19488 from maryannxue/spark-22266.
2017-10-18 20:59:40 +08:00
Huaxin Gao 28f9f3f225 [SPARK-22271][SQL] mean overflows and returns null for some decimal variables
## What changes were proposed in this pull request?

In Average.scala, it has
```
  override lazy val evaluateExpression = child.dataType match {
    case DecimalType.Fixed(p, s) =>
      // increase the precision and scale to prevent precision loss
      val dt = DecimalType.bounded(p + 14, s + 4)
      Cast(Cast(sum, dt) / Cast(count, dt), resultType)
    case _ =>
      Cast(sum, resultType) / Cast(count, resultType)
  }

  def setChild (newchild: Expression) = {
    child = newchild
  }

```
It is possible that  Cast(count, dt), resultType) will make the precision of the decimal number bigger than 38, and this causes over flow.  Since count is an integer and doesn't need a scale, I will cast it using DecimalType.bounded(38,0)
## How was this patch tested?
In DataFrameSuite, I will add a test case.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Huaxin Gao <huaxing@us.ibm.com>

Closes #19496 from huaxingao/spark-22271.
2017-10-17 12:50:41 -07:00
Burak Yavuz e8547ffb49 [SPARK-22238] Fix plan resolution bug caused by EnsureStatefulOpPartitioning
## What changes were proposed in this pull request?

In EnsureStatefulOpPartitioning, we check that the inputRDD to a SparkPlan has the expected partitioning for Streaming Stateful Operators. The problem is that we are not allowed to access this information during planning.
The reason we added that check was because CoalesceExec could actually create RDDs with 0 partitions. We should fix it such that when CoalesceExec says that there is a SinglePartition, there is in fact an inputRDD of 1 partition instead of 0 partitions.

## How was this patch tested?

Regression test in StreamingQuerySuite

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #19467 from brkyvz/stateful-op.
2017-10-14 17:39:15 -07:00
Wenchen Fan 3823dc88d3 [SPARK-22252][SQL][FOLLOWUP] Command should not be a LeafNode
## What changes were proposed in this pull request?

This is a minor folllowup of #19474 .

#19474 partially reverted #18064 but accidentally introduced a behavior change. `Command` extended `LogicalPlan` before #18064 , but #19474 made it extend `LeafNode`. This is an internal behavior change as now all `Command` subclasses can't define children, and they have to implement `computeStatistic` method.

This PR fixes this by making `Command` extend `LogicalPlan`

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19493 from cloud-fan/minor.
2017-10-13 10:49:48 -07:00
Dongjoon Hyun 6412ea1759 [SPARK-21247][SQL] Type comparison should respect case-sensitive SQL conf
## What changes were proposed in this pull request?

This is an effort to reduce the difference between Hive and Spark. Spark supports case-sensitivity in columns. Especially, for Struct types, with `spark.sql.caseSensitive=true`, the following is supported.

```scala
scala> sql("select named_struct('a', 1, 'A', 2).a").show
+--------------------------+
|named_struct(a, 1, A, 2).a|
+--------------------------+
|                         1|
+--------------------------+

scala> sql("select named_struct('a', 1, 'A', 2).A").show
+--------------------------+
|named_struct(a, 1, A, 2).A|
+--------------------------+
|                         2|
+--------------------------+
```

And vice versa, with `spark.sql.caseSensitive=false`, the following is supported.
```scala
scala> sql("select named_struct('a', 1).A, named_struct('A', 1).a").show
+--------------------+--------------------+
|named_struct(a, 1).A|named_struct(A, 1).a|
+--------------------+--------------------+
|                   1|                   1|
+--------------------+--------------------+
```

However, types are considered different. For example, SET operations fail.
```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. struct<A:int> <> struct<a:int> at the first column of the second table;;
'Union
:- Project [named_struct(a, 1) AS named_struct(a, 1)#57]
:  +- OneRowRelation$
+- Project [named_struct(A, 2) AS named_struct(A, 2)#58]
   +- OneRowRelation$
```

This PR aims to support case-insensitive type equality. For example, in Set operation, the above operation succeed when `spark.sql.caseSensitive=false`.

```scala
scala> sql("SELECT named_struct('a',1) union all (select named_struct('A',2))").show
+------------------+
|named_struct(a, 1)|
+------------------+
|               [1]|
|               [2]|
+------------------+
```

## How was this patch tested?

Pass the Jenkins with a newly add test case.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #18460 from dongjoon-hyun/SPARK-21247.
2017-10-14 00:35:12 +08:00
Wang Gengliang 2f00a71a87 [SPARK-22257][SQL] Reserve all non-deterministic expressions in ExpressionSet
## What changes were proposed in this pull request?

For non-deterministic expressions, they should be considered as not contained in the [[ExpressionSet]].
This is consistent with how we define `semanticEquals` between two expressions.
Otherwise, combining expressions will remove non-deterministic expressions which should be reserved.
E.g.
Combine filters of
```scala
testRelation.where(Rand(0) > 0.1).where(Rand(0) > 0.1)
```
should result in
```scala
testRelation.where(Rand(0) > 0.1 && Rand(0) > 0.1)
```

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19475 from gengliangwang/non-deterministic-expressionSet.
2017-10-12 22:45:19 -07:00
Wang Gengliang 3ff766f61a [SPARK-22263][SQL] Refactor deterministic as lazy value
## What changes were proposed in this pull request?

The method `deterministic` is frequently called in optimizer.
Refactor `deterministic` as lazy value, in order to avoid redundant computations.

## How was this patch tested?
Simple benchmark test over TPC-DS queries, run time from query string to optimized plan(continuous  20 runs, and get the average of last 5 results):
Before changes: 12601 ms
After changes: 11993ms
This is 4.8% performance improvement.

Also run test with Unit test.

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19478 from gengliangwang/deterministicAsLazyVal.
2017-10-12 18:47:16 -07:00
Steve Loughran 9104add4c7 [SPARK-22217][SQL] ParquetFileFormat to support arbitrary OutputCommitters
## What changes were proposed in this pull request?

`ParquetFileFormat` to relax its requirement of output committer class from `org.apache.parquet.hadoop.ParquetOutputCommitter` or subclass thereof (and so implicitly Hadoop `FileOutputCommitter`) to any committer implementing `org.apache.hadoop.mapreduce.OutputCommitter`

This enables output committers which don't write to the filesystem the way `FileOutputCommitter` does to save parquet data from a dataframe: at present you cannot do this.

Before a committer which isn't a subclass of `ParquetOutputCommitter`, it checks to see if the context has requested summary metadata by setting `parquet.enable.summary-metadata`. If true, and the committer class isn't a parquet committer, it raises a RuntimeException with an error message.

(It could downgrade, of course, but raising an exception makes it clear there won't be an summary. It also makes the behaviour testable.)

Note that `SQLConf` already states that any `OutputCommitter` can be used, but that typically it's a subclass of ParquetOutputCommitter. That's not currently true. This patch will make the code consistent with the docs, adding tests to verify,

## How was this patch tested?

The patch includes a test suite, `ParquetCommitterSuite`, with a new committer, `MarkingFileOutputCommitter` which extends `FileOutputCommitter` and writes a marker file in the destination directory. The presence of the marker file can be used to verify the new committer was used. The tests then try the combinations of Parquet committer summary/no-summary and marking committer summary/no-summary.

| committer | summary | outcome |
|-----------|---------|---------|
| parquet   | true    | success |
| parquet   | false   | success |
| marking   | false   | success with marker |
| marking   | true    | exception |

All tests are happy.

Author: Steve Loughran <stevel@hortonworks.com>

Closes #19448 from steveloughran/cloud/SPARK-22217-committer.
2017-10-13 08:40:26 +09:00
Wenchen Fan 274f0efefa [SPARK-22252][SQL] FileFormatWriter should respect the input query schema
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/18064, we allowed `RunnableCommand` to have children in order to fix some UI issues. Then we made `InsertIntoXXX` commands take the input `query` as a child, when we do the actual writing, we just pass the physical plan to the writer(`FileFormatWriter.write`).

However this is problematic. In Spark SQL, optimizer and planner are allowed to change the schema names a little bit. e.g. `ColumnPruning` rule will remove no-op `Project`s, like `Project("A", Scan("a"))`, and thus change the output schema from "<A: int>" to `<a: int>`. When it comes to writing, especially for self-description data format like parquet, we may write the wrong schema to the file and cause null values at the read path.

Fortunately, in https://github.com/apache/spark/pull/18450 , we decided to allow nested execution and one query can map to multiple executions in the UI. This releases the major restriction in #18604 , and now we don't have to take the input `query` as child of `InsertIntoXXX` commands.

So the fix is simple, this PR partially revert #18064 and make `InsertIntoXXX` commands leaf nodes again.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19474 from cloud-fan/bug.
2017-10-12 20:20:44 +08:00
Zhenhua Wang 655f6f86f8 [SPARK-22208][SQL] Improve percentile_approx by not rounding up targetError and starting from index 0
## What changes were proposed in this pull request?

Currently percentile_approx never returns the first element when percentile is in (relativeError, 1/N], where relativeError default 1/10000, and N is the total number of elements. But ideally, percentiles in [0, 1/N] should all return the first element as the answer.

For example, given input data 1 to 10, if a user queries 10% (or even less) percentile, it should return 1, because the first value 1 already reaches 10%. Currently it returns 2.

Based on the paper, targetError is not rounded up, and searching index should start from 0 instead of 1. By following the paper, we should be able to fix the cases mentioned above.

## How was this patch tested?

Added a new test case and fix existing test cases.

Author: Zhenhua Wang <wzh_zju@163.com>

Closes #19438 from wzhfy/improve_percentile_approx.
2017-10-11 00:16:12 -07:00
Kazuaki Ishizaki 76fb173dd6 [SPARK-21751][SQL] CodeGeneraor.splitExpressions counts code size more precisely
## What changes were proposed in this pull request?

Current `CodeGeneraor.splitExpressions` splits statements into methods if the total length of statements is more than 1024 characters. The length may include comments or empty line.

This PR excludes comment or empty line from the length to reduce the number of generated methods in a class, by using `CodeFormatter.stripExtraNewLinesAndComments()` method.

## How was this patch tested?

Existing tests

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #18966 from kiszk/SPARK-21751.
2017-10-10 20:29:02 -07:00
Marcelo Vanzin bd4eb9ce57 [SPARK-19558][SQL] Add config key to register QueryExecutionListeners automatically.
This change adds a new SQL config key that is equivalent to SparkContext's
"spark.extraListeners", allowing users to register QueryExecutionListener
instances through the Spark configuration system instead of having to
explicitly do it in code.

The code used by SparkContext to implement the feature was refactored into
a helper method in the Utils class, and SQL's ExecutionListenerManager was
modified to use it to initialize listener declared in the configuration.

Unit tests were added to verify all the new functionality.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19309 from vanzin/SPARK-19558.
2017-10-10 15:50:37 -07:00
Li Jin bfc7e1fe1a [SPARK-20396][SQL][PYSPARK] groupby().apply() with pandas udf
## What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on `pandas.DataFrame` -> `pandas.DataFrame`.

Static schema
-------------------
```
schema = df.schema

pandas_udf(schema)
def normalize(df):
    df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
    return df

df.groupBy('id').apply(normalize)
```
Dynamic schema
-----------------------
**This use case is removed from the PR and we will discuss this as a follow up. See discussion https://github.com/apache/spark/pull/18732#pullrequestreview-66583248**

Another example to use pd.DataFrame dtypes as output schema of the udf:

```
sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)
```
In interactive use case, user usually have a sample pd.DataFrame to test function `foo` in their notebook. Having been able to use `foo(sample_df).dtypes` frees user from specifying the output schema of `foo`.

Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

## How was this patch tested?
* Added GroupbyApplyTest

Author: Li Jin <ice.xelloss@gmail.com>
Author: Takuya UESHIN <ueshin@databricks.com>
Author: Bryan Cutler <cutlerb@gmail.com>

Closes #18732 from icexelloss/groupby-apply-SPARK-20396.
2017-10-11 07:32:01 +09:00
gatorsmile 633ffd816d rename the file. 2017-10-10 11:01:02 -07:00
Feng Liu bebd2e1ce1 [SPARK-22222][CORE] Fix the ARRAY_MAX in BufferHolder and add a test
## What changes were proposed in this pull request?

We should not break the assumption that the length of the allocated byte array is word rounded:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java#L170
So we want to use `Integer.MAX_VALUE - 15` instead of `Integer.MAX_VALUE - 8` as the upper bound of an allocated byte array.

cc: srowen gatorsmile
## How was this patch tested?

Since the Spark unit test JVM has less than 1GB heap, here we run the test code as a submit job, so it can run on a JVM has 4GB memory.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Feng Liu <fengliu@databricks.com>

Closes #19460 from liufengdb/fix_array_max.
2017-10-09 21:34:37 -07:00
Ryan Blue 155ab6347e [SPARK-22170][SQL] Reduce memory consumption in broadcast joins.
## What changes were proposed in this pull request?

This updates the broadcast join code path to lazily decompress pages and
iterate through UnsafeRows to prevent all rows from being held in memory
while the broadcast table is being built.

## How was this patch tested?

Existing tests.

Author: Ryan Blue <blue@apache.org>

Closes #19394 from rdblue/broadcast-driver-memory.
2017-10-09 15:22:41 -07:00
Liang-Chi Hsieh debcbec749 [SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query
## What changes were proposed in this pull request?

`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19336 from viirya/SPARK-21947.
2017-10-06 13:10:04 -07:00
Xingbo Jiang 08b204fd2c [SPARK-22214][SQL] Refactor the list hive partitions code
## What changes were proposed in this pull request?

In this PR we make a few changes to the list hive partitions code, to make the code more extensible.
The following changes are made:
1. In `HiveClientImpl.getPartitions()`, call `client.getPartitions` instead of `shim.getAllPartitions` when `spec` is empty;
2. In `HiveTableScanExec`, previously we always call `listPartitionsByFilter` if the config `metastorePartitionPruning` is enabled, but actually, we'd better call `listPartitions` if `partitionPruningPred` is empty;
3.  We should use sessionCatalog instead of SharedState.externalCatalog in `HiveTableScanExec`.

## How was this patch tested?

Tested by existing test cases since this is code refactor, no regression or behavior change is expected.

Author: Xingbo Jiang <xingbo.jiang@databricks.com>

Closes #19444 from jiangxb1987/hivePartitions.
2017-10-06 12:53:35 -07:00
Wenchen Fan bb035f1ee5 [SPARK-22169][SQL] support byte length literal as identifier
## What changes were proposed in this pull request?

By definition the table name in Spark can be something like `123x`, `25a`, etc., with exceptions for literals like `12L`, `23BD`, etc. However, Spark SQL has a special byte length literal, which stops users to use digits followed by `b`, `k`, `m`, `g` as identifiers.

byte length literal is not a standard sql literal and is only used in the `tableSample` parser rule. This PR move the parsing of byte length literal from lexer to parser, so that users can use it as identifiers.

## How was this patch tested?

regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #19392 from cloud-fan/parser-bug.
2017-10-04 13:13:51 -07:00
Takeshi Yamamuro 4a779bdac3 [SPARK-21871][SQL] Check actual bytecode size when compiling generated code
## What changes were proposed in this pull request?
This pr added code to check actual bytecode size when compiling generated code. In #18810, we added code to give up code compilation and use interpreter execution in `SparkPlan` if the line number of generated functions goes over `maxLinesPerFunction`. But, we already have code to collect metrics for compiled bytecode size in `CodeGenerator` object. So,we could easily reuse the code for this purpose.

## How was this patch tested?
Added tests in `WholeStageCodegenSuite`.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #19083 from maropu/SPARK-21871.
2017-10-04 10:08:24 -07:00
Jose Torres 3099c574c5 [SPARK-22136][SS] Implement stream-stream outer joins.
## What changes were proposed in this pull request?

Allow one-sided outer joins between two streams when a watermark is defined.

## How was this patch tested?

new unit tests

Author: Jose Torres <jose@databricks.com>

Closes #19327 from joseph-torres/outerjoin.
2017-10-03 21:42:51 -07:00
gatorsmile 5f69433453 [SPARK-22171][SQL] Describe Table Extended Failed when Table Owner is Empty
## What changes were proposed in this pull request?

Users could hit `java.lang.NullPointerException` when the tables were created by Hive and the table's owner is `null` that are got from Hive metastore. `DESC EXTENDED` failed with the error:

> SQLExecutionException: java.lang.NullPointerException at scala.collection.immutable.StringOps$.length$extension(StringOps.scala:47) at scala.collection.immutable.StringOps.length(StringOps.scala:47) at scala.collection.IndexedSeqOptimized$class.isEmpty(IndexedSeqOptimized.scala:27) at scala.collection.immutable.StringOps.isEmpty(StringOps.scala:29) at scala.collection.TraversableOnce$class.nonEmpty(TraversableOnce.scala:111) at scala.collection.immutable.StringOps.nonEmpty(StringOps.scala:29) at org.apache.spark.sql.catalyst.catalog.CatalogTable.toLinkedHashMap(interface.scala:300) at org.apache.spark.sql.execution.command.DescribeTableCommand.describeFormattedTableInfo(tables.scala:565) at org.apache.spark.sql.execution.command.DescribeTableCommand.run(tables.scala:543) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:66) at

## How was this patch tested?
Added a unit test case

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19395 from gatorsmile/desc.
2017-10-03 21:27:58 -07:00
Reynold Xin 4c5158eec9 [SPARK-21644][SQL] LocalLimit.maxRows is defined incorrectly
## What changes were proposed in this pull request?
The definition of `maxRows` in `LocalLimit` operator was simply wrong. This patch introduces a new `maxRowsPerPartition` method and uses that in pruning. The patch also adds more documentation on why we need local limit vs global limit.

Note that this previously has never been a bug because the way the code is structured, but future use of the maxRows could lead to bugs.

## How was this patch tested?
Should be covered by existing test cases.

Closes #18851

Author: gatorsmile <gatorsmile@gmail.com>
Author: Reynold Xin <rxin@databricks.com>

Closes #19393 from gatorsmile/pr-18851.
2017-10-03 12:38:13 -07:00
gatorsmile 530fe68329 [SPARK-21904][SQL] Rename tempTables to tempViews in SessionCatalog
### What changes were proposed in this pull request?
`tempTables` is not right. To be consistent, we need to rename the internal variable names/comments to tempViews in SessionCatalog too.

### How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #19117 from gatorsmile/renameTempTablesToTempViews.
2017-09-29 19:35:32 -07:00
Wang Gengliang 0fa4dbe4f4 [SPARK-22141][FOLLOWUP][SQL] Add comments for the order of batches
## What changes were proposed in this pull request?
Add comments for specifying the position of  batch "Check Cartesian Products", as rxin suggested in https://github.com/apache/spark/pull/19362 .

## How was this patch tested?
Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19379 from gengliangwang/SPARK-22141-followup.
2017-09-28 23:23:30 -07:00
Reynold Xin 323806e68f [SPARK-22160][SQL] Make sample points per partition (in range partitioner) configurable and bump the default value up to 100
## What changes were proposed in this pull request?
Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.

## How was this patch tested?
Added a pretty sophisticated test based on chi square test ...

Author: Reynold Xin <rxin@databricks.com>

Closes #19387 from rxin/SPARK-22160.
2017-09-28 21:07:12 -07:00
Reynold Xin d29d1e8799 [SPARK-22159][SQL] Make config names consistently end with "enabled".
## What changes were proposed in this pull request?
spark.sql.execution.arrow.enable and spark.sql.codegen.aggregate.map.twolevel.enable -> enabled

## How was this patch tested?
N/A

Author: Reynold Xin <rxin@databricks.com>

Closes #19384 from rxin/SPARK-22159.
2017-09-28 15:59:05 -07:00
Wang Gengliang 9c5935d00b [SPARK-22141][SQL] Propagate empty relation before checking Cartesian products
## What changes were proposed in this pull request?

When inferring constraints from children, Join's condition can be simplified as None.
For example,
```
val testRelation = LocalRelation('a.int)
val x = testRelation.as("x")
val y = testRelation.where($"a" === 2 && !($"a" === 2)).as("y")
x.join.where($"x.a" === $"y.a")
```
The plan will become
```
Join Inner
:- LocalRelation <empty>, [a#23]
+- LocalRelation <empty>, [a#224]
```
And the Cartesian products check will throw exception for above plan.

Propagate empty relation before checking Cartesian products, and the issue is resolved.

## How was this patch tested?

Unit test

Author: Wang Gengliang <ltnwgl@gmail.com>

Closes #19362 from gengliangwang/MoveCheckCartesianProducts.
2017-09-27 12:44:10 +02:00
Juliusz Sompolski f21f6ce998 [SPARK-22103][FOLLOWUP] Rename addExtraCode to addInnerClass
## What changes were proposed in this pull request?

Address PR comments that appeared post-merge, to rename `addExtraCode` to `addInnerClass`,
and not count the size of the inner class to the size of the outer class.

## How was this patch tested?

YOLO.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #19353 from juliuszsompolski/SPARK-22103followup.
2017-09-26 10:04:34 -07:00
Juliusz Sompolski 038b185736 [SPARK-22103] Move HashAggregateExec parent consume to a separate function in codegen
## What changes were proposed in this pull request?

HashAggregateExec codegen uses two paths for fast hash table and a generic one.
It generates code paths for iterating over both, and both code paths generate the consume code of the parent operator, resulting in that code being expanded twice.
This leads to a long generated function that might be an issue for the compiler (see e.g. SPARK-21603).
I propose to remove the double expansion by generating the consume code in a helper function that can just be called from both iterating loops.

An issue with separating the `consume` code to a helper function was that a number of places relied and assumed on being in the scope of an outside `produce` loop and e.g. use `continue` to jump out.
I replaced such code flows with nested scopes. It is code that should be handled the same by compiler, while getting rid of depending on assumptions that are outside of the `consume`'s own scope.

## How was this patch tested?

Existing test coverage.

Author: Juliusz Sompolski <julek@databricks.com>

Closes #19324 from juliuszsompolski/aggrconsumecodegen.
2017-09-25 12:50:25 -07:00
Zhenhua Wang 365a29bdbf [SPARK-22100][SQL] Make percentile_approx support date/timestamp type and change the output type to be the same as input type
## What changes were proposed in this pull request?

The `percentile_approx` function previously accepted numeric type input and output double type results.

But since all numeric types, date and timestamp types are represented as numerics internally, `percentile_approx` can support them easily.

After this PR, it supports date type, timestamp type and numeric types as input types. The result type is also changed to be the same as the input type, which is more reasonable for percentiles.

This change is also required when we generate equi-height histograms for these types.

## How was this patch tested?

Added a new test and modified some existing tests.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #19321 from wzhfy/approx_percentile_support_types.
2017-09-25 09:28:42 -07:00
Sean Owen 576c43fb42 [SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes
## What changes were proposed in this pull request?

Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including:

- Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference
- Fixing lots of "eta expansion of zero arg method deprecated" warnings
- Resolving the SparkContext.sequenceFile implicits compile problem
- Fixing an odd but valid jetty-server missing dependency in hive-thriftserver

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19307 from srowen/Scala212.
2017-09-24 09:40:13 +01:00
Kevin Yu 4a8c9e29bc [SPARK-22110][SQL][DOCUMENTATION] Add usage and improve documentation with arguments and examples for trim function
## What changes were proposed in this pull request?

This PR proposes to enhance the documentation for `trim` functions in the function description session.

- Add more `usage`, `arguments` and `examples` for the trim function
- Adjust space in the `usage` session

After the changes, the trim function documentation will look like this:

- `trim`

```trim(str) - Removes the leading and trailing space characters from str.

trim(BOTH trimStr FROM str) - Remove the leading and trailing trimStr characters from str

trim(LEADING trimStr FROM str) - Remove the leading trimStr characters from str

trim(TRAILING trimStr FROM str) - Remove the trailing trimStr characters from str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
BOTH, FROM - these are keywords to specify trimming string characters from both ends of the string
LEADING, FROM - these are keywords to specify trimming string characters from the left end of the string
TRAILING, FROM - these are keywords to specify trimming string characters from the right end of the string
Examples:

> SELECT trim('    SparkSQL   ');
 SparkSQL
> SELECT trim('SL', 'SSparkSQLS');
 parkSQ
> SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
 parkSQ
> SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
 parkSQLS
> SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
 SSparkSQ
```

- `ltrim`

```ltrim

ltrim(str) - Removes the leading space characters from str.

ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:

> SELECT ltrim('    SparkSQL   ');
 SparkSQL
> SELECT ltrim('Sp', 'SSparkSQLS');
 arkSQLS
```

- `rtrim`
```rtrim

rtrim(str) - Removes the trailing space characters from str.

rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the str

Arguments:

str - a string expression
trimStr - the trim string characters to trim, the default value is a single space
Examples:

> SELECT rtrim('    SparkSQL   ');
 SparkSQL
> SELECT rtrim('LQSa', 'SSparkSQLS');
 SSpark
```

This is the trim characters function jira: [trim function](https://issues.apache.org/jira/browse/SPARK-14878)

## How was this patch tested?

Manually tested
```
spark-sql> describe function extended trim;
17/09/22 17:03:04 INFO CodeGenerator: Code generated in 153.026533 ms
Function: trim
Class: org.apache.spark.sql.catalyst.expressions.StringTrim
Usage:
    trim(str) - Removes the leading and trailing space characters from `str`.

    trim(BOTH trimStr FROM str) - Remove the leading and trailing `trimStr` characters from `str`

    trim(LEADING trimStr FROM str) - Remove the leading `trimStr` characters from `str`

    trim(TRAILING trimStr FROM str) - Remove the trailing `trimStr` characters from `str`

Extended Usage:
    Arguments:
      * str - a string expression
      * trimStr - the trim string characters to trim, the default value is a single space
      * BOTH, FROM - these are keywords to specify trimming string characters from both ends of
          the string
      * LEADING, FROM - these are keywords to specify trimming string characters from the left
          end of the string
      * TRAILING, FROM - these are keywords to specify trimming string characters from the right
          end of the string

    Examples:
      > SELECT trim('    SparkSQL   ');
       SparkSQL
      > SELECT trim('SL', 'SSparkSQLS');
       parkSQ
      > SELECT trim(BOTH 'SL' FROM 'SSparkSQLS');
       parkSQ
      > SELECT trim(LEADING 'SL' FROM 'SSparkSQLS');
       parkSQLS
      > SELECT trim(TRAILING 'SL' FROM 'SSparkSQLS');
       SSparkSQ
  ```
```
spark-sql> describe function extended ltrim;
Function: ltrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimLeft
Usage:
    ltrim(str) - Removes the leading space characters from `str`.

    ltrim(trimStr, str) - Removes the leading string contains the characters from the trim string

Extended Usage:
    Arguments:
      * str - a string expression
      * trimStr - the trim string characters to trim, the default value is a single space

    Examples:
      > SELECT ltrim('    SparkSQL   ');
       SparkSQL
      > SELECT ltrim('Sp', 'SSparkSQLS');
       arkSQLS

```

```
spark-sql> describe function extended rtrim;
Function: rtrim
Class: org.apache.spark.sql.catalyst.expressions.StringTrimRight
Usage:
    rtrim(str) - Removes the trailing space characters from `str`.

    rtrim(trimStr, str) - Removes the trailing string which contains the characters from the trim string from the `str`

Extended Usage:
    Arguments:
      * str - a string expression
      * trimStr - the trim string characters to trim, the default value is a single space

    Examples:
      > SELECT rtrim('    SparkSQL   ');
       SparkSQL
      > SELECT rtrim('LQSa', 'SSparkSQLS');
       SSpark

```

Author: Kevin Yu <qyu@us.ibm.com>

Closes #19329 from kevinyu98/spark-14878-5.
2017-09-23 10:27:40 -07:00
Sean Owen 50ada2a4d3 [SPARK-22033][CORE] BufferHolder, other size checks should account for the specific VM array size limitations
## What changes were proposed in this pull request?

Try to avoid allocating an array bigger than Integer.MAX_VALUE - 8, which is the actual max size on some JVMs, in several places

## How was this patch tested?

Existing tests

Author: Sean Owen <sowen@cloudera.com>

Closes #19266 from srowen/SPARK-22033.
2017-09-23 15:40:59 +01:00
maryannxue 5960686e79 [SPARK-21998][SQL] SortMergeJoinExec did not calculate its outputOrdering correctly during physical planning
## What changes were proposed in this pull request?

Right now the calculation of SortMergeJoinExec's outputOrdering relies on the fact that its children have already been sorted on the join keys, while this is often not true until EnsureRequirements has been applied. So we ended up not getting the correct outputOrdering during physical planning stage before Sort nodes are added to the children.

For example, J = {A join B on key1 = key2}
1. if A is NOT ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
2. if A is ordered on key1 ASC, J's outputOrdering should include "key1 ASC"
3. if A is ordered on key1 ASC, with sameOrderExp=c1, J's outputOrdering should include "key1 ASC, sameOrderExp=c1"

So to fix this I changed the  behavior of <code>getKeyOrdering(keys, childOutputOrdering)</code> to:
1. If the childOutputOrdering satisfies (is a superset of) the required child ordering => childOutputOrdering
2. Otherwise => required child ordering

In addition, I organized the logic for deciding the relationship between two orderings into SparkPlan, so that it can be reused by EnsureRequirements and SortMergeJoinExec, and potentially other classes.

## How was this patch tested?

Added new test cases.
Passed all integration tests.

Author: maryannxue <maryann.xue@gmail.com>

Closes #19281 from maryannxue/spark-21998.
2017-09-21 23:54:16 -07:00
Tathagata Das f32a842505 [SPARK-22053][SS] Stream-stream inner join in Append Mode
## What changes were proposed in this pull request?

#### Architecture
This PR implements stream-stream inner join using a two-way symmetric hash join. At a high level, we want to do the following.

1. For each stream, we maintain the past rows as state in State Store.
  - For each joining key, there can be multiple rows that have been received.
  - So, we have to effectively maintain a key-to-list-of-values multimap as state for each stream.
2. In each batch, for each input row in each stream
  - Look up the other streams state to see if there are matching rows, and output them if they satisfy the joining condition
  - Add the input row to corresponding stream’s state.
  - If the data has a timestamp/window column with watermark, then we will use that to calculate the threshold for keys that are required to buffered for future matches and drop the rest from the state.

Cleaning up old unnecessary state rows depends completely on whether watermark has been defined and what are join conditions. We definitely want to support state clean up two types of queries that are likely to be common.

- Queries to time range conditions - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND leftTime > rightTime - INTERVAL 8 MINUTES AND leftTime < rightTime + INTERVAL 1 HOUR`
- Queries with windows as the matching key - E.g. `SELECT * FROM leftTable, rightTable ON leftKey = rightKey AND window(leftTime, "1 hour") = window(rightTime, "1 hour")` (pseudo-SQL)

#### Implementation
The stream-stream join is primarily implemented in three classes
- `StreamingSymmetricHashJoinExec` implements the above symmetric join algorithm.
- `SymmetricsHashJoinStateManagers` manages the streaming state for the join. This essentially is a fault-tolerant key-to-list-of-values multimap built on the StateStore APIs. `StreamingSymmetricHashJoinExec` instantiates two such managers, one for each join side.
- `StreamingSymmetricHashJoinExecHelper` is a helper class to extract threshold for the state based on the join conditions and the event watermark.

Refer to the scaladocs class for more implementation details.

Besides the implementation of stream-stream inner join SparkPlan. Some additional changes are
- Allowed inner join in append mode in UnsupportedOperationChecker
- Prevented stream-stream join on an empty batch dataframe to be collapsed by the optimizer

## How was this patch tested?
- New tests in StreamingJoinSuite
- Updated tests UnsupportedOperationSuite

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #19271 from tdas/SPARK-22053.
2017-09-21 15:39:07 -07:00
Liang-Chi Hsieh 9cac249fd5 [SPARK-22088][SQL] Incorrect scalastyle comment causes wrong styles in stringExpressions
## What changes were proposed in this pull request?

There is an incorrect `scalastyle:on` comment in `stringExpressions.scala` and causes the line size limit check ineffective in the file. There are many lines of code and comment which are more than 100 chars.

## How was this patch tested?

Code style change only.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19305 from viirya/fix-wrong-style.
2017-09-21 11:51:00 -07:00
Liang-Chi Hsieh 1270e71753 [SPARK-22086][DOCS] Add expression description for CASE WHEN
## What changes were proposed in this pull request?

In SQL conditional expressions, only CASE WHEN lacks for expression description. This patch fills the gap.

## How was this patch tested?

Only documentation change.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #19304 from viirya/casewhen-doc.
2017-09-21 22:45:06 +09:00
Zhenhua Wang 1d1a09be9f [SPARK-17997][SQL] Add an aggregation function for counting distinct values for multiple intervals
## What changes were proposed in this pull request?

This work is a part of [SPARK-17074](https://issues.apache.org/jira/browse/SPARK-17074) to compute equi-height histograms. Equi-height histogram is an array of bins. A bin consists of two endpoints which form an interval of values and the ndv in that interval.

This PR creates a new aggregate function, given an array of endpoints, counting distinct values (ndv) in intervals among those endpoints.

This PR also refactors `HyperLogLogPlusPlus` by extracting a helper class `HyperLogLogPlusPlusHelper`, where the underlying HLLPP algorithm locates.

## How was this patch tested?

Add new test cases.

Author: Zhenhua Wang <wangzhenhua@huawei.com>

Closes #15544 from wzhfy/countIntervals.
2017-09-21 21:43:02 +08:00