Commit graph

2889 commits

Author SHA1 Message Date
Vayda, Oleksandr: IT (PRG) bc6ea614ad [SPARK-24348][SQL] "element_at" error fix
## What changes were proposed in this pull request?

### Fixes a `scala.MatchError` in the `element_at` operation - [SPARK-24348](https://issues.apache.org/jira/browse/SPARK-24348)

When calling `element_at` with a wrong first operand type an `AnalysisException` should be thrown instead of `scala.MatchError`

*Example:*
```sql
select element_at('foo', 1)
```

results in:
```
scala.MatchError: StringType (of class org.apache.spark.sql.types.StringType$)
	at org.apache.spark.sql.catalyst.expressions.ElementAt.inputTypes(collectionOperations.scala:1469)
	at org.apache.spark.sql.catalyst.expressions.ExpectsInputTypes$class.checkInputDataTypes(ExpectsInputTypes.scala:44)
	at org.apache.spark.sql.catalyst.expressions.ElementAt.checkInputDataTypes(collectionOperations.scala:1478)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved$lzycompute(Expression.scala:168)
	at org.apache.spark.sql.catalyst.expressions.Expression.resolved(Expression.scala:168)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:256)
	at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveAliases$$anonfun$org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveAliases$$assignAliases$1$$anonfun$apply$3.applyOrElse(Analyzer.scala:252)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
```

## How was this patch tested?

unit tests

Author: Vayda, Oleksandr: IT (PRG) <Oleksandr.Vayda@barclayscapital.com>

Closes #21395 from wajda/SPARK-24348-element_at-error-fix.
2018-05-22 13:01:07 -07:00
Liang-Chi Hsieh f9f055afa4 [SPARK-24121][SQL] Add API for handling expression code generation
## What changes were proposed in this pull request?

This patch tries to implement this [proposal](https://github.com/apache/spark/pull/19813#issuecomment-354045400) to add an API for handling expression code generation. It should allow us to manipulate how to generate codes for expressions.

In details, this adds an new abstraction `CodeBlock` to `JavaCode`. `CodeBlock` holds the code snippet and inputs for generating actual java code.

For example, in following java code:

```java
  int ${variable} = 1;
  boolean ${isNull} = ${CodeGenerator.defaultValue(BooleanType)};
```

`variable`, `isNull` are two `VariableValue` and `CodeGenerator.defaultValue(BooleanType)` is a string. They are all inputs to this code block and held by `CodeBlock` representing this code.

For codegen, we provide a specified string interpolator `code`, so you can define a code like this:
```scala
  val codeBlock =
    code"""
         |int ${variable} = 1;
         |boolean ${isNull} = ${CodeGenerator.defaultValue(BooleanType)};
        """.stripMargin
  // Generates actual java code.
  codeBlock.toString
```

Because those inputs are held separately in `CodeBlock` before generating code, we can safely manipulate them, e.g., replacing statements to aliased variables, etc..

## How was this patch tested?

Added tests.

Author: Liang-Chi Hsieh <viirya@gmail.com>

Closes #21193 from viirya/SPARK-24121.
2018-05-23 01:50:22 +08:00
Maxim Gekk 8086acc2f6 [SPARK-24244][SQL] Passing only required columns to the CSV parser
## What changes were proposed in this pull request?

uniVocity parser allows to specify only required column names or indexes for [parsing](https://www.univocity.com/pages/parsers-tutorial) like:

```
// Here we select only the columns by their indexes.
// The parser just skips the values in other columns
parserSettings.selectIndexes(4, 0, 1);
CsvParser parser = new CsvParser(parserSettings);
```
In this PR, I propose to extract indexes from required schema and pass them into the CSV parser. Benchmarks show the following improvements in parsing of 1000 columns:

```
Select 100 columns out of 1000: x1.76
Select 1 column out of 1000: x2
```

**Note**: Comparing to current implementation, the changes can return different result for malformed rows in the `DROPMALFORMED` and `FAILFAST` modes if only subset of all columns is requested. To have previous behavior, set `spark.sql.csv.parser.columnPruning.enabled` to `false`.

## How was this patch tested?

It was tested by new test which selects 3 columns out of 15, by existing tests and by new benchmarks.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21296 from MaxGekk/csv-column-pruning.
2018-05-22 22:07:32 +08:00
Marco Gaido d3d1807315 [SPARK-24313][SQL] Fix collection operations' interpreted evaluation for complex types
## What changes were proposed in this pull request?

The interpreted evaluation of several collection operations works only for simple datatypes. For complex data types, for instance, `array_contains` it returns always `false`. The list of the affected functions is `array_contains`, `array_position`, `element_at` and `GetMapValue`.

The PR fixes the behavior for all the datatypes.

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21361 from mgaido91/SPARK-24313.
2018-05-22 21:08:49 +08:00
Kris Mok 952e4d1c83 [SPARK-24321][SQL] Extract common code from Divide/Remainder to a base trait
## What changes were proposed in this pull request?

Extract common code from `Divide`/`Remainder` to a new base trait, `DivModLike`.

Further refactoring to make `Pmod` work with `DivModLike` is to be done as a separate task.

## How was this patch tested?

Existing tests in `ArithmeticExpressionSuite` covers the functionality.

Author: Kris Mok <kris.mok@databricks.com>

Closes #21367 from rednaxelafx/catalyst-divmod.
2018-05-22 19:12:30 +08:00
Wenchen Fan 03e90f65bf [SPARK-24250][SQL] support accessing SQLConf inside tasks
re-submit https://github.com/apache/spark/pull/21299 which broke build.

A few new commits are added to fix the SQLConf problem in `JsonSchemaInference.infer`, and prevent us to access `SQLConf` in DAGScheduler event loop thread.

## What changes were proposed in this pull request?

Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.

Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
  if (someConf == ...) ...
  ...
}
```

However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .

When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.

This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.

## How was this patch tested?

a new test suite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21376 from cloud-fan/config.
2018-05-22 00:19:18 +08:00
Marek Novotny a6e883feb3 [SPARK-23935][SQL] Adding map_entries function
## What changes were proposed in this pull request?

This PR adds `map_entries` function that returns an unordered array of all entries in the given map.

## How was this patch tested?

New tests added into:
- `CollectionExpressionSuite`
- `DataFrameFunctionsSuite`

## CodeGen examples
### Primitive types
```
val df = Seq(Map(1 -> 5, 2 -> 6)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final long project_size_0 = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 051 */           project_numElements_0,
/* 052 */           32);
/* 053 */         if (project_size_0 > 2147483632) {
/* 054 */           final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 055 */           for (int z = 0; z < project_numElements_0; z++) {
/* 056 */             project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getInt(z), project_values_0.getInt(z)});
/* 057 */           }
/* 058 */           project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
/* 059 */
/* 060 */         } else {
/* 061 */           final byte[] project_arrayBytes_0 = new byte[(int)project_size_0];
/* 062 */           UnsafeArrayData project_unsafeArrayData_0 = new UnsafeArrayData();
/* 063 */           Platform.putLong(project_arrayBytes_0, 16, project_numElements_0);
/* 064 */           project_unsafeArrayData_0.pointTo(project_arrayBytes_0, 16, (int)project_size_0);
/* 065 */
/* 066 */           final int project_structsOffset_0 = UnsafeArrayData.calculateHeaderPortionInBytes(project_numElements_0) + project_numElements_0 * 8;
/* 067 */           UnsafeRow project_unsafeRow_0 = new UnsafeRow(2);
/* 068 */           for (int z = 0; z < project_numElements_0; z++) {
/* 069 */             long offset = project_structsOffset_0 + z * 24L;
/* 070 */             project_unsafeArrayData_0.setLong(z, (offset << 32) + 24L);
/* 071 */             project_unsafeRow_0.pointTo(project_arrayBytes_0, 16 + offset, 24);
/* 072 */             project_unsafeRow_0.setInt(0, project_keys_0.getInt(z));
/* 073 */             project_unsafeRow_0.setInt(1, project_values_0.getInt(z));
/* 074 */           }
/* 075 */           project_value_0 = project_unsafeArrayData_0;
/* 076 */
/* 077 */         }
```
### Non-primitive types
```
val df = Seq(Map("a" -> "foo", "b" -> null)).toDF("m")
df.filter('m.isNotNull).select(map_entries('m)).debugCodegen
```
Result:
```
/* 042 */         boolean project_isNull_0 = false;
/* 043 */
/* 044 */         ArrayData project_value_0 = null;
/* 045 */
/* 046 */         final int project_numElements_0 = inputadapter_value_0.numElements();
/* 047 */         final ArrayData project_keys_0 = inputadapter_value_0.keyArray();
/* 048 */         final ArrayData project_values_0 = inputadapter_value_0.valueArray();
/* 049 */
/* 050 */         final Object[] project_internalRowArray_0 = new Object[project_numElements_0];
/* 051 */         for (int z = 0; z < project_numElements_0; z++) {
/* 052 */           project_internalRowArray_0[z] = new org.apache.spark.sql.catalyst.expressions.GenericInternalRow(new Object[]{project_keys_0.getUTF8String(z), project_values_0.getUTF8String(z)});
/* 053 */         }
/* 054 */         project_value_0 = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_internalRowArray_0);
```

Author: Marek Novotny <mn.mikke@gmail.com>

Closes #21236 from mn-mikke/feature/array-api-map_entries-to-master.
2018-05-21 23:14:03 +09:00
Wenchen Fan 000e25ae79 Revert "[SPARK-24250][SQL] support accessing SQLConf inside tasks"
This reverts commit dd37529a8d.
2018-05-20 16:13:42 +08:00
Wenchen Fan dd37529a8d [SPARK-24250][SQL] support accessing SQLConf inside tasks
## What changes were proposed in this pull request?

Previously in #20136 we decided to forbid tasks to access `SQLConf`, because it doesn't work and always give you the default conf value. In #21190 we fixed the check and all the places that violate it.

Currently the pattern of accessing configs at the executor side is: read the configs at the driver side, then access the variables holding the config values in the RDD closure, so that they will be serialized to the executor side. Something like
```
val someConf = conf.getXXX
child.execute().mapPartitions {
  if (someConf == ...) ...
  ...
}
```

However, this pattern is hard to apply if the config needs to be propagated via a long call stack. An example is `DataType.sameType`, and see how many changes were made in #21190 .

When it comes to code generation, it's even worse. I tried it locally and we need to change a ton of files to propagate configs to code generators.

This PR proposes to allow tasks to access `SQLConf`. The idea is, we can save all the SQL configs to job properties when an SQL execution is triggered. At executor side we rebuild the `SQLConf` from job properties.

## How was this patch tested?

a new test suite

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21299 from cloud-fan/config.
2018-05-19 18:51:02 +08:00
Marcelo Vanzin ed7ba7db8f [SPARK-23850][SQL] Add separate config for SQL options redaction.
The old code was relying on a core configuration and extended its
default value to include things that redact desired things in the
app's environment. Instead, add a SQL-specific option for which
options to redact, and apply both the core and SQL-specific rules
when redacting the options in the save command.

This is a little sub-optimal since it adds another config, but it
retains the current default behavior.

While there I also fixed a typo and a couple of minor config API
usage issues in the related redaction option that SQL already had.

Tested with existing unit tests, plus checking the env page on
a shell UI.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #21158 from vanzin/SPARK-23850.
2018-05-18 11:14:22 -07:00
jinxing 8a837bf4f3 [SPARK-24193] create TakeOrderedAndProjectExec only when the limit number is below spark.sql.execution.topKSortFallbackThreshold.
## What changes were proposed in this pull request?

Physical plan of `select colA from t order by colB limit M` is `TakeOrderedAndProject`;
Currently `TakeOrderedAndProject` sorts data in memory, see https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala#L158
We can add a config – if the number of limit (M) is too big, we can sort by disk. Thus memory issue can be resolved.

## How was this patch tested?

Test added

Author: jinxing <jinxing6042@126.com>

Closes #21252 from jinxing64/SPARK-24193.
2018-05-17 22:29:18 +08:00
Marco Gaido 69350aa2f0 [SPARK-23922][SQL] Add arrays_overlap function
## What changes were proposed in this pull request?

The PR adds the function `arrays_overlap`. This function returns `true` if the input arrays contain a non-null common element; if not, it returns `null` if any of the arrays contains a `null` element, `false` otherwise.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21028 from mgaido91/SPARK-23922.
2018-05-17 20:45:32 +08:00
Florent Pépin 3e66350c24 [SPARK-23925][SQL] Add array_repeat collection function
## What changes were proposed in this pull request?

The PR adds a new collection function, array_repeat. As there already was a function repeat with the same signature, with the only difference being the expected return type (String instead of Array), the new function is called array_repeat to distinguish.
The behaviour of the function is based on Presto's one.

The function creates an array containing a given element repeated the requested number of times.

## How was this patch tested?

New unit tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

Author: Florent Pépin <florentpepin.92@gmail.com>
Author: Florent Pépin <florent.pepin14@imperial.ac.uk>

Closes #21208 from pepinoflo/SPARK-23925.
2018-05-17 13:31:14 +09:00
Wenchen Fan 943493b165 Revert "[SPARK-22938][SQL][FOLLOWUP] Assert that SQLConf.get is acces…
…sed only on the driver"

This reverts commit a4206d58e0.

This is from https://github.com/apache/spark/pull/21299 and to ease the review of it.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21341 from cloud-fan/revert.
2018-05-16 22:01:24 +08:00
Jose Torres 3fabbc5762 [SPARK-24040][SS] Support single partition aggregates in continuous processing.
## What changes were proposed in this pull request?

Support aggregates with exactly 1 partition in continuous processing.

A few small tweaks are needed to make this work:

* Replace currentEpoch tracking with an ThreadLocal. This means that current epoch is scoped to a task rather than a node, but I think that's sustainable even once we add shuffle.
* Add a new testing-only flag to disable the UnsupportedOperationChecker whitelist of allowed continuous processing nodes. I think this is preferable to writing a pile of custom logic to enforce that there is in fact only 1 partition; we plan to support multi-partition aggregates before the next Spark release, so we'd just have to tear that logic back out.
* Restart continuous processing queries from the first available uncommitted epoch, rather than one that's guaranteed to be unused. This is required for stateful operators to overwrite partial state from the previous attempt at the epoch, and there was no specific motivation for the original strategy. In another PR before stabilizing the StreamWriter API, we'll need to narrow down and document more precise semantic guarantees for the epoch IDs.
* We need a single-partition ContinuousMemoryStream. The way MemoryStream is constructed means it can't be a text option like it is for rate source, unfortunately.

## How was this patch tested?

new unit tests

Author: Jose Torres <torres.joseph.f+github@gmail.com>

Closes #21239 from jose-torres/withAggr.
2018-05-15 10:25:29 -07:00
maryannxue 80c6d35a3e [SPARK-24035][SQL] SQL syntax for Pivot - fix antlr warning
## What changes were proposed in this pull request?

1. Change antlr rule to fix the warning.
2. Add PIVOT/LATERAL check in AstBuilder with a more meaningful error message.

## How was this patch tested?

1. Add a counter case in `PlanParserSuite.test("lateral view")`

Author: maryannxue <maryann.xue@gmail.com>

Closes #21324 from maryannxue/spark-24035-fix.
2018-05-14 23:34:42 -07:00
Maxim Gekk 8cd83acf40 [SPARK-24027][SQL] Support MapType with StringType for keys as the root type by from_json
## What changes were proposed in this pull request?

Currently, the from_json function support StructType or ArrayType as the root type. The PR allows to specify MapType(StringType, DataType) as the root type additionally to mentioned types. For example:

```scala
import org.apache.spark.sql.types._
val schema = MapType(StringType, IntegerType)
val in = Seq("""{"a": 1, "b": 2, "c": 3}""").toDS()
in.select(from_json($"value", schema, Map[String, String]())).collect()
```
```
res1: Array[org.apache.spark.sql.Row] = Array([Map(a -> 1, b -> 2, c -> 3)])
```

## How was this patch tested?

It was checked by new tests for the map type with integer type and struct type as value types. Also roundtrip tests like from_json(to_json) and to_json(from_json) for MapType are added.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21108 from MaxGekk/from_json-map-type.
2018-05-14 14:05:42 -07:00
Shixiong Zhu c26f673252 [SPARK-24246][SQL] Improve AnalysisException by setting the cause when it's available
## What changes were proposed in this pull request?

If there is an exception, it's better to set it as the cause of AnalysisException since the exception may contain useful debug information.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zsxwing@gmail.com>

Closes #21297 from zsxwing/SPARK-24246.
2018-05-14 11:37:57 -07:00
Cody Allen 32acfa78c6 Improve implicitNotFound message for Encoder
The `implicitNotFound` message for `Encoder` doesn't mention the name of
the type for which it can't find an encoder. Furthermore, it covers up
the fact that `Encoder` is the name of the relevant type class.
Hopefully this new message provides a little more specific type detail
while still giving the general message about which types are supported.

## What changes were proposed in this pull request?

Augment the existing message to mention that it's looking for an `Encoder` and what the type of the encoder is.

For example instead of:

```
Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
```

return this message:

```
Unable to find encoder for type Exception. An implicit Encoder[Exception] is needed to store Exception instances in a Dataset. Primitive types (Int, String, etc) and Product types (ca
se classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
```

## How was this patch tested?

It was tested manually in the Scala REPL, since triggering this in a test would cause a compilation error.

```
scala> implicitly[Encoder[Exception]]
<console>:51: error: Unable to find encoder for type Exception. An implicit Encoder[Exception] is needed to store Exception instances in a Dataset. Primitive types (Int, String, etc) and Product types (ca
se classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
       implicitly[Encoder[Exception]]
                 ^
```

Author: Cody Allen <ceedubs@gmail.com>

Closes #20869 from ceedubs/encoder-implicit-msg.
2018-05-12 14:35:40 -05:00
aditkumar 92f6f52ff0 [MINOR][DOCS] Documenting months_between direction
## What changes were proposed in this pull request?

It's useful to know what relationship between date1 and date2 results in a positive number.

Author: aditkumar <aditkumar@gmail.com>
Author: Adit Kumar <aditkumar@gmail.com>

Closes #20787 from aditkumar/master.
2018-05-11 14:42:23 -05:00
Wenchen Fan a4206d58e0 [SPARK-22938][SQL][FOLLOWUP] Assert that SQLConf.get is accessed only on the driver
## What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/20136 . #20136 didn't really work because in the test, we are using local backend, which shares the driver side `SparkEnv`, so `SparkEnv.get.executorId == SparkContext.DRIVER_IDENTIFIER` doesn't work.

This PR changes the check to `TaskContext.get != null`, and move the check to `SQLConf.get`, and fix all the places that violate this check:
* `InMemoryTableScanExec#createAndDecompressColumn` is executed inside `rdd.map`, we can't access `conf.offHeapColumnVectorEnabled` there. https://github.com/apache/spark/pull/21223 merged
* `DataType#sameType` may be executed in executor side, for things like json schema inference, so we can't call `conf.caseSensitiveAnalysis` there. This contributes to most of the code changes, as we need to add `caseSensitive` parameter to a lot of methods.
* `ParquetFilters` is used in the file scan function, which is executed in executor side, so we can't can't call `conf.parquetFilterPushDownDate` there. https://github.com/apache/spark/pull/21224 merged
* `WindowExec#createBoundOrdering` is called on executor side, so we can't use `conf.sessionLocalTimezone` there. https://github.com/apache/spark/pull/21225 merged
* `JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body. https://github.com/apache/spark/pull/21226 merged

## How was this patch tested?

existing test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21190 from cloud-fan/minor.
2018-05-11 09:01:40 +08:00
Maxim Gekk f4fed05121 [SPARK-24171] Adding a note for non-deterministic functions
## What changes were proposed in this pull request?

I propose to add a clear statement for functions like `collect_list()` about non-deterministic behavior of such functions. The behavior must be taken into account by user while creating and running queries.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21228 from MaxGekk/deterministic-comments.
2018-05-10 09:44:49 -07:00
Marco Gaido 94d6714482 [SPARK-23907][SQL] Add regr_* functions
## What changes were proposed in this pull request?

The PR introduces regr_slope, regr_intercept, regr_r2, regr_sxx, regr_syy, regr_sxy, regr_avgx, regr_avgy, regr_count.

The implementation of this functions mirrors Hive's one in HIVE-15978.

## How was this patch tested?

added UT (values compared with Hive)

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21054 from mgaido91/SPARK-23907.
2018-05-10 20:38:52 +09:00
Ryan Blue cac9b1dea1 [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
## What changes were proposed in this pull request?

This updates Parquet to 1.10.0 and updates the vectorized path for buffer management changes. Parquet 1.10.0 uses ByteBufferInputStream instead of byte arrays in encoders. This allows Parquet to break allocations into smaller chunks that are better for garbage collection.

## How was this patch tested?

Existing Parquet tests. Running in production at Netflix for about 3 months.

Author: Ryan Blue <blue@apache.org>

Closes #21070 from rdblue/SPARK-23972-update-parquet-to-1.10.0.
2018-05-09 12:27:32 +08:00
Maxim Gekk e3de6ab30d [SPARK-24068] Propagating DataFrameReader's options to Text datasource on schema inferring
## What changes were proposed in this pull request?

While reading CSV or JSON files, DataFrameReader's options are converted to Hadoop's parameters, for example there:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala#L302

but the options are not propagated to Text datasource on schema inferring, for instance:
https://github.com/apache/spark/blob/branch-2.3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala#L184-L188

The PR proposes propagation of user's options to Text datasource on scheme inferring in similar way as user's options are converted to Hadoop parameters if schema is specified.

## How was this patch tested?
The changes were tested manually by using https://github.com/twitter/hadoop-lzo:

```
hadoop-lzo> mvn clean package
hadoop-lzo> ln -s ./target/hadoop-lzo-0.4.21-SNAPSHOT.jar ./hadoop-lzo.jar
```
Create 2 test files in JSON and CSV format and compress them:
```shell
$ cat test.csv
col1|col2
a|1
$ lzop test.csv
$ cat test.json
{"col1":"a","col2":1}
$ lzop test.json
```
Run `spark-shell` with hadoop-lzo:
```
bin/spark-shell --jars ~/hadoop-lzo/hadoop-lzo.jar
```
reading compressed CSV and JSON without schema:
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("inferSchema",true).option("header",true).option("sep","|").csv("test.csv.lzo").show()
+----+----+
|col1|col2|
+----+----+
|   a|   1|
+----+----+
```
```scala
spark.read.option("io.compression.codecs", "com.hadoop.compression.lzo.LzopCodec").option("multiLine", true).json("test.json.lzo").printSchema
root
 |-- col1: string (nullable = true)
 |-- col2: long (nullable = true)
```

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21182 from MaxGekk/text-options.
2018-05-09 08:32:20 +08:00
Yuming Wang 487faf17ab [SPARK-24117][SQL] Unified the getSizePerRow
## What changes were proposed in this pull request?

This pr unified the `getSizePerRow` because `getSizePerRow` is used in many places. For example:

1. [LocalRelation.scala#L80](f70f46d1e5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala (L80))
2. [SizeInBytesOnlyStatsPlanVisitor.scala#L36](76b8b840dd/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/SizeInBytesOnlyStatsPlanVisitor.scala (L36))

## How was this patch tested?
Exist tests

Author: Yuming Wang <yumwang@ebay.com>

Closes #21189 from wangyum/SPARK-24117.
2018-05-08 23:43:02 +08:00
gatorsmile 2f6fe7d679 [SPARK-23094][SPARK-23723][SPARK-23724][SQL][FOLLOW-UP] Support custom encoding for json files
## What changes were proposed in this pull request?
This is to add a test case to check the behaviors when users write json in the specified UTF-16/UTF-32 encoding with multiline off.

## How was this patch tested?
N/A

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21254 from gatorsmile/followupSPARK-23094.
2018-05-08 21:24:35 +08:00
Henry Robinson cd12c5c3ec [SPARK-24128][SQL] Mention configuration option in implicit CROSS JOIN error
## What changes were proposed in this pull request?

Mention `spark.sql.crossJoin.enabled` in error message when an implicit `CROSS JOIN` is detected.

## How was this patch tested?

`CartesianProductSuite` and `JoinSuite`.

Author: Henry Robinson <henry@apache.org>

Closes #21201 from henryr/spark-24128.
2018-05-08 12:21:33 +08:00
Bruce Robbins d83e963724 [SPARK-24043][SQL] Interpreted Predicate should initialize nondeterministic expressions
## What changes were proposed in this pull request?

When creating an InterpretedPredicate instance, initialize any Nondeterministic expressions in the expression tree to avoid java.lang.IllegalArgumentException on later call to eval().

## How was this patch tested?

- sbt SQL tests
- python SQL tests
- new unit test

Author: Bruce Robbins <bersprockets@gmail.com>

Closes #21144 from bersprockets/interpretedpredicate.
2018-05-07 17:54:39 +02:00
Herman van Hovell 4e861db5f1 [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve
## What changes were proposed in this pull request?

`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes.

This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s):

``` scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
  s"""
     |SELECT $columns
     |FROM VALUES ($values) T($columns)
     |WHERE 1=2 AND 1 IN ($columns)
     |GROUP BY $columns
     |ORDER BY $columns
     |""".stripMargin

spark.time(sql(query))
```
## How was this patch tested?

Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes #14083 from hvanhovell/SPARK-16406.
2018-05-07 11:21:22 +02:00
Marco Gaido e35ad3cadd [SPARK-23930][SQL] Add slice function
## What changes were proposed in this pull request?

The PR add the `slice` function. The behavior of the function is based on Presto's one.

The function slices an array according to the requested start index and length.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21040 from mgaido91/SPARK-23930.
2018-05-07 16:57:37 +09:00
Kazuaki Ishizaki 7564a9a706 [SPARK-23921][SQL] Add array_sort function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_sort`. The behavior of the function is based on Presto's one.

The function sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21021 from kiszk/SPARK-23921.
2018-05-07 15:22:23 +09:00
gatorsmile f38ea00e83 [SPARK-24017][SQL] Refactor ExternalCatalog to be an interface
## What changes were proposed in this pull request?
This refactors the external catalog to be an interface. It can be easier for the future work in the catalog federation. After the refactoring, `ExternalCatalog` is much cleaner without mixing the listener event generation logic.

## How was this patch tested?
The existing tests

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21122 from gatorsmile/refactorExternalCatalog.
2018-05-06 20:41:32 -07:00
Tathagata Das 47b5b68528 [SPARK-24157][SS] Enabled no-data batches in MicroBatchExecution for streaming aggregation and deduplication.
## What changes were proposed in this pull request?

This PR enables the MicroBatchExecution to run no-data batches if some SparkPlan requires running another batch to output results based on updated watermark / processing time. In this PR, I have enabled streaming aggregations and streaming deduplicates to automatically run addition batch even if new data is available. See https://issues.apache.org/jira/browse/SPARK-24156 for more context.

Major changes/refactoring done in this PR.
- Refactoring MicroBatchExecution - A major point of confusion in MicroBatchExecution control flow was always (at least to me) was that `populateStartOffsets` internally called `constructNextBatch` which was not obvious from just the name "populateStartOffsets" and made the control flow from the main trigger execution loop very confusing (main loop in `runActivatedStream` called `constructNextBatch` but only if `populateStartOffsets` hadn't already called it). Instead, the refactoring makes it cleaner.
    - `populateStartOffsets` only the updates `availableOffsets` and `committedOffsets`. Does not call `constructNextBatch`.
    - Main loop in `runActivatedStream` calls `constructNextBatch` which returns true or false reflecting whether the next batch is ready for executing. This method is now idempotent; if a batch has already been constructed, then it will always return true until the batch has been executed.
    - If next batch is ready then we call `runBatch` or sleep.
    - That's it.

- Refactoring watermark management logic - This has been refactored out from `MicroBatchExecution` in a separate class to simplify `MicroBatchExecution`.

- New method `shouldRunAnotherBatch` in `IncrementalExecution` - This returns true if there is any stateful operation in the last execution plan that requires another batch for state cleanup, etc. This is used to decide whether to construct a batch or not in `constructNextBatch`.

- Changes to stream testing framework - Many tests used CheckLastBatch to validate answers. This assumed that there will be no more batches after the last set of input has been processed, so the last batch is the one that has output corresponding to the last input. This is not true anymore. To account for that, I made two changes.
    - `CheckNewAnswer` is a new test action that verifies the new rows generated since the last time the answer was checked by `CheckAnswer`, `CheckNewAnswer` or `CheckLastBatch`. This is agnostic to how many batches occurred between the last check and now. To do make this easier, I added a common trait between MemorySink and MemorySinkV2 to abstract out some common methods.
    - `assertNumStateRows` has been updated in the same way to be agnostic to batches while checking what the total rows and how many state rows were updated (sums up updates since the last check).

## How was this patch tested?
- Changes made to existing tests - Tests have been changed in one of the following patterns.
    - Tests where the last input was given again to force another batch to be executed and state cleaned up / output generated, they were simplified by removing the extra input.
    - Tests using aggregation+watermark where CheckLastBatch were replaced with CheckNewAnswer to make them batch agnostic.
- New tests added to check whether the flag works for streaming aggregation and deduplication

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21220 from tdas/SPARK-24157.
2018-05-04 16:35:24 -07:00
maryannxue e3201e165e [SPARK-24035][SQL] SQL syntax for Pivot
## What changes were proposed in this pull request?

Add SQL support for Pivot according to Pivot grammar defined by Oracle (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_clause.htm) with some simplifications, based on our existing functionality and limitations for Pivot at the backend:
1. For pivot_for_clause (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_for_clause.htm), the column list form is not supported, which means the pivot column can only be one single column.
2. For pivot_in_clause (https://docs.oracle.com/database/121/SQLRF/img_text/pivot_in_clause.htm), the sub-query form and "ANY" is not supported (this is only supported by Oracle for XML anyway).
3. For pivot_in_clause, aliases for the constant values are not supported.

The code changes are:
1. Add parser support for Pivot. Note that according to https://docs.oracle.com/database/121/SQLRF/statements_10002.htm#i2076542, Pivot cannot be used together with lateral views in the from clause. This restriction has been implemented in the Parser rule.
2. Infer group-by expressions: group-by expressions are not explicitly specified in SQL Pivot clause and need to be deduced based on this rule: https://docs.oracle.com/database/121/SQLRF/statements_10002.htm#CHDFAFIE, so we have to post-fix it at query analysis stage.
3. Override Pivot.resolved as "false": for the reason mentioned in [2] and the fact that output attributes change after Pivot being replaced by Project or Aggregate, we avoid resolving parent references until after Pivot has been resolved and replaced.
4. Verify aggregate expressions: only aggregate expressions with or without aliases can appear in the first part of the Pivot clause, and this check is performed as analysis stage.

## How was this patch tested?

A new test suite PivotSuite is added.

Author: maryannxue <maryann.xue@gmail.com>

Closes #21187 from maryannxue/spark-24035.
2018-05-03 17:05:02 -07:00
Wenchen Fan 96a50016bb [SPARK-24169][SQL] JsonToStructs should not access SQLConf at executor side
## What changes were proposed in this pull request?

This PR is extracted from #21190 , to make it easier to backport.

`JsonToStructs` can be serialized to executors and evaluate, we should not call `SQLConf.get.getConf(SQLConf.FROM_JSON_FORCE_NULLABLE_SCHEMA)` in the body.

## How was this patch tested?

tested in #21190

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21226 from cloud-fan/minor4.
2018-05-03 23:36:09 +08:00
Wenchen Fan 417ad92502 [SPARK-23715][SQL] the input of to/from_utc_timestamp can not have timezone
## What changes were proposed in this pull request?

`from_utc_timestamp` assumes its input is in UTC timezone and shifts it to the specified timezone. When the timestamp contains timezone(e.g. `2018-03-13T06:18:23+00:00`), Spark breaks the semantic and respect the timezone in the string. This is not what user expects and the result is different from Hive/Impala. `to_utc_timestamp` has the same problem.

More details please refer to the JIRA ticket.

This PR fixes this by returning null if the input timestamp contains timezone.

## How was this patch tested?

new tests

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21169 from cloud-fan/from_utc_timezone.
2018-05-03 19:27:01 +08:00
Kazuaki Ishizaki 5be8aab144 [SPARK-23923][SQL] Add cardinality function
## What changes were proposed in this pull request?

The PR adds the SQL function `cardinality`. The behavior of the function is based on Presto's one.

The function returns the length of the array or map stored in the column as `int` while the Presto version returns the value as `BigInt` (`long` in Spark). The discussions regarding the difference of return type are [here](https://github.com/apache/spark/pull/21031#issuecomment-381284638) and [there](https://github.com/apache/spark/pull/21031#discussion_r181622107).

## How was this patch tested?

Added UTs

Author: Kazuaki Ishizaki <ishizaki@jp.ibm.com>

Closes #21031 from kiszk/SPARK-23923.
2018-05-02 13:53:10 -07:00
Marco Gaido 504c9cfd21 [SPARK-24123][SQL] Fix precision issues in monthsBetween with more than 8 digits
## What changes were proposed in this pull request?

SPARK-23902 introduced the ability to retrieve more than 8 digits in `monthsBetween`. Unfortunately, current implementation can cause precision loss in such a case. This was causing also a flaky UT.

This PR mirrors Hive's implementation in order to avoid precision loss also when more than 8 digits are returned.

## How was this patch tested?

running 10000000 times the flaky UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21196 from mgaido91/SPARK-24123.
2018-05-02 13:49:15 -07:00
Marco Gaido 8dbf56c055 [SPARK-24013][SQL] Remove unneeded compress in ApproximatePercentile
## What changes were proposed in this pull request?

`ApproximatePercentile` contains a workaround logic to compress the samples since at the beginning `QuantileSummaries` was ignoring the compression threshold. This problem was fixed in SPARK-17439, but the workaround logic was not removed. So we are compressing the samples many more times than needed: this could lead to critical performance degradation.

This can create serious performance issues in queries like:
```
select approx_percentile(id, array(0.1)) from range(10000000)
```

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21133 from mgaido91/SPARK-24013.
2018-05-02 11:58:55 -07:00
wangyanlin01 7bbec0dced [SPARK-24061][SS] Add TypedFilter support for continuous processing
## What changes were proposed in this pull request?

Add TypedFilter support for continuous processing application.

## How was this patch tested?

unit tests

Author: wangyanlin01 <wangyanlin01@baidu.com>

Closes #21136 from yanlin-Lynn/SPARK-24061.
2018-05-01 16:22:52 +08:00
Maxim Gekk bd14da6fd5 [SPARK-23094][SPARK-23723][SPARK-23724][SQL] Support custom encoding for json files
## What changes were proposed in this pull request?

I propose new option for JSON datasource which allows to specify encoding (charset) of input and output files. Here is an example of using of the option:

```
spark.read.schema(schema)
  .option("multiline", "true")
  .option("encoding", "UTF-16LE")
  .json(fileName)
```

If the option is not specified, charset auto-detection mechanism is used by default.

The option can be used for saving datasets to jsons. Currently Spark is able to save datasets into json files in `UTF-8` charset only. The changes allow to save data in any supported charset. Here is the approximate list of supported charsets by Oracle Java SE: https://docs.oracle.com/javase/8/docs/technotes/guides/intl/encoding.doc.html . An user can specify the charset of output jsons via the charset option like `.option("charset", "UTF-16BE")`. By default the output charset is still `UTF-8` to keep backward compatibility.

The solution has the following restrictions for per-line mode (`multiline = false`):

- If charset is different from UTF-8, the lineSep option must be specified. The option required because Hadoop LineReader cannot detect the line separator correctly. Here is the ticket for solving the issue: https://issues.apache.org/jira/browse/SPARK-23725

- Encoding with [BOM](https://en.wikipedia.org/wiki/Byte_order_mark) are not supported. For example, the `UTF-16` and `UTF-32` encodings are blacklisted. The problem can be solved by https://github.com/MaxGekk/spark-1/pull/2

## How was this patch tested?

I added the following tests:
- reads an json file in `UTF-16LE` encoding with BOM in `multiline` mode
- read json file by using charset auto detection (`UTF-32BE` with BOM)
- read json file using of user's charset (`UTF-16LE`)
- saving in `UTF-32BE` and read the result by standard library (not by Spark)
- checking that default charset is `UTF-8`
- handling wrong (unsupported) charset

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #20937 from MaxGekk/json-encoding-line-sep.
2018-04-29 11:25:31 +08:00
Marco Gaido ad94e8592b [SPARK-23736][SQL][FOLLOWUP] Error message should contains SQL types
## What changes were proposed in this pull request?

In the error messages we should return the SQL types (like `string` rather than the internal types like `StringType`).

## How was this patch tested?

added UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21181 from mgaido91/SPARK-23736_followup.
2018-04-28 10:47:43 +08:00
gatorsmile ce2f919f8d [SPARK-23799][SQL][FOLLOW-UP] FilterEstimation.evaluateInSet produces wrong stats for STRING
## What changes were proposed in this pull request?
`colStat.min` AND `colStat.max` are empty for string type. Thus, `evaluateInSet` should not return zero when either `colStat.min` or `colStat.max`.

## How was this patch tested?
Added a test case.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #21147 from gatorsmile/cached.
2018-04-26 19:07:13 +08:00
Marco Gaido cd10f9df82 [SPARK-23916][SQL] Add array_join function
## What changes were proposed in this pull request?

The PR adds the SQL function `array_join`. The behavior of the function is based on Presto's one.

The function accepts an `array` of `string` which is to be joined, a `string` which is the delimiter to use between the items of the first argument and optionally a `string` which is used to replace `null` values.

## How was this patch tested?

added UTs

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21011 from mgaido91/SPARK-23916.
2018-04-26 13:37:13 +09:00
Marco Gaido 58c55cb4a6 [SPARK-23902][SQL] Add roundOff flag to months_between
## What changes were proposed in this pull request?

HIVE-15511 introduced the `roundOff` flag in order to disable the rounding to 8 digits which is performed in `months_between`. Since this can be a computational intensive operation, skipping it may improve performances when the rounding is not needed.

## How was this patch tested?

modified existing UT

Author: Marco Gaido <marcogaido91@gmail.com>

Closes #21008 from mgaido91/SPARK-23902.
2018-04-26 12:19:20 +09:00
Wenchen Fan ac4ca7c4dd [SPARK-24012][SQL][TEST][FOLLOWUP] add unit test
## What changes were proposed in this pull request?

a followup of https://github.com/apache/spark/pull/21100

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #21154 from cloud-fan/test.
2018-04-25 13:42:44 -07:00
liutang123 64e8408e6f [SPARK-24012][SQL] Union of map and other compatible column
## What changes were proposed in this pull request?
Union of map and other compatible column result in unresolved operator 'Union; exception

Reproduction
`spark-sql>select map(1,2), 'str' union all select map(1,2,3,null), 1`
Output:
```
Error in query: unresolved operator 'Union;;
'Union
:- Project [map(1, 2) AS map(1, 2)#106, str AS str#107]
:  +- OneRowRelation$
+- Project [map(1, cast(2 as int), 3, cast(null as int)) AS map(1, CAST(2 AS INT), 3, CAST(NULL AS INT))#109, 1 AS 1#108]
   +- OneRowRelation$
```
So, we should cast part of columns to be compatible when appropriate.

## How was this patch tested?
Added a test (query union of map and other columns) to SQLQueryTestSuite's union.sql.

Author: liutang123 <liutang123@yeah.net>

Closes #21100 from liutang123/SPARK-24012.
2018-04-25 18:10:51 +08:00
mn-mikke 5fea17b3be [SPARK-23821][SQL] Collection function: flatten
## What changes were proposed in this pull request?

This PR adds a new collection function that transforms an array of arrays into a single array. The PR comprises:
- An expression for flattening array structure
- Flatten function
- A wrapper for PySpark

## How was this patch tested?

New tests added into:
- CollectionExpressionsSuite
- DataFrameFunctionsSuite

## Codegen examples
### Primitive type
```
val df = Seq(
  Seq(Seq(1, 2), Seq(4, 5)),
  Seq(null, Seq(1))
).toDF("i")
df.filter($"i".isNotNull || $"i".isNull).select(flatten($"i")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             long project_size = UnsafeArrayData.calculateSizeOfUnderlyingByteArray(
/* 064 */               project_numElements,
/* 065 */               4);
/* 066 */             if (project_size > 2147483632) {
/* 067 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 068 */                 project_size + " bytes of data due to exceeding the limit 2147483632" +
/* 069 */                 " bytes for UnsafeArrayData.");
/* 070 */             }
/* 071 */
/* 072 */             byte[] project_array = new byte[(int)project_size];
/* 073 */             UnsafeArrayData project_tempArrayData = new UnsafeArrayData();
/* 074 */             Platform.putLong(project_array, 16, project_numElements);
/* 075 */             project_tempArrayData.pointTo(project_array, 16, (int)project_size);
/* 076 */             int project_counter = 0;
/* 077 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 078 */               ArrayData arr = inputadapter_value.getArray(k);
/* 079 */               for (int l = 0; l < arr.numElements(); l++) {
/* 080 */                 if (arr.isNullAt(l)) {
/* 081 */                   project_tempArrayData.setNullAt(project_counter);
/* 082 */                 } else {
/* 083 */                   project_tempArrayData.setInt(
/* 084 */                     project_counter,
/* 085 */                     arr.getInt(l)
/* 086 */                   );
/* 087 */                 }
/* 088 */                 project_counter++;
/* 089 */               }
/* 090 */             }
/* 091 */             project_value = project_tempArrayData;
/* 092 */
/* 093 */           }
/* 094 */
/* 095 */         }
```
### Non-primitive type
```
val df = Seq(
  Seq(Seq("a", "b"), Seq(null, "d")),
  Seq(null, Seq("a"))
).toDF("s")
df.filter($"s".isNotNull || $"s".isNull).select(flatten($"s")).debugCodegen
```
Result:
```
/* 033 */         boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 034 */         ArrayData inputadapter_value = inputadapter_isNull ?
/* 035 */         null : (inputadapter_row.getArray(0));
/* 036 */
/* 037 */         boolean filter_value = true;
/* 038 */
/* 039 */         if (!(!inputadapter_isNull)) {
/* 040 */           filter_value = inputadapter_isNull;
/* 041 */         }
/* 042 */         if (!filter_value) continue;
/* 043 */
/* 044 */         ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(1);
/* 045 */
/* 046 */         boolean project_isNull = inputadapter_isNull;
/* 047 */         ArrayData project_value = null;
/* 048 */
/* 049 */         if (!inputadapter_isNull) {
/* 050 */           for (int z = 0; !project_isNull && z < inputadapter_value.numElements(); z++) {
/* 051 */             project_isNull |= inputadapter_value.isNullAt(z);
/* 052 */           }
/* 053 */           if (!project_isNull) {
/* 054 */             long project_numElements = 0;
/* 055 */             for (int z = 0; z < inputadapter_value.numElements(); z++) {
/* 056 */               project_numElements += inputadapter_value.getArray(z).numElements();
/* 057 */             }
/* 058 */             if (project_numElements > 2147483632) {
/* 059 */               throw new RuntimeException("Unsuccessful try to flatten an array of arrays with " +
/* 060 */                 project_numElements + " elements due to exceeding the array size limit 2147483632.");
/* 061 */             }
/* 062 */
/* 063 */             Object[] project_arrayObject = new Object[(int)project_numElements];
/* 064 */             int project_counter = 0;
/* 065 */             for (int k = 0; k < inputadapter_value.numElements(); k++) {
/* 066 */               ArrayData arr = inputadapter_value.getArray(k);
/* 067 */               for (int l = 0; l < arr.numElements(); l++) {
/* 068 */                 project_arrayObject[project_counter] = arr.getUTF8String(l);
/* 069 */                 project_counter++;
/* 070 */               }
/* 071 */             }
/* 072 */             project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_arrayObject);
/* 073 */
/* 074 */           }
/* 075 */
/* 076 */         }
```

Author: mn-mikke <mrkAha12346github>

Closes #20938 from mn-mikke/feature/array-api-flatten-to-master.
2018-04-25 11:19:08 +09:00
Takeshi Yamamuro 4926a7c2f0 [SPARK-23589][SQL][FOLLOW-UP] Reuse InternalRow in ExternalMapToCatalyst eval
## What changes were proposed in this pull request?
This pr is a follow-up of #20980 and fixes code to reuse `InternalRow` for converting input keys/values in `ExternalMapToCatalyst` eval.

## How was this patch tested?
Existing tests.

Author: Takeshi Yamamuro <yamamuro@apache.org>

Closes #21137 from maropu/SPARK-23589-FOLLOWUP.
2018-04-24 17:52:05 +02:00