### What changes were proposed in this pull request?
This PR adds a note first and last can be non-deterministic in SQL function docs as well.
This is already documented in `functions.scala`.
### Why are the changes needed?
Some people look reading SQL docs only.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Jenkins will test.
Closes#27099 from HyukjinKwon/SPARK-30335.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR implements multiple performance optimizations for `ParquetRowConverter`, achieving some modest constant-factor wins for all fields and larger wins for map and array fields:
- Add `private[this]` to several `val`s (90cebf080a5d3857ea8cf2a89e8e060b8b5a2fbf)
- Keep a `fieldUpdaters` array, saving two`.updater()` calls per field (7318785d350cc924198d7514e40973fd76d54ad5): I suspect that these are often megamorphic calls, so cutting these out seems like it could be a relatively large performance win.
- Only call `currentRow.numFields` once per `start()` call (e05de150813b639929c18af1df09ec718d2d16fc): previously we'd call it once per field and this had a significant enough cost that it was visible during profiling.
- Reuse buffers in array and map converters (c7d1534685fbad5d2280b082f37bed6d75848e76, 6d16f596ef6af9fd8946a062f79d0eeace9e1959): previously we would create a brand-new Scala `ArrayBuffer` for each field read, but this isn't actually necessary because the data is already copied into a fresh array when `end()` constructs a `GenericArrayData`.
### Why are the changes needed?
To improve Parquet read performance; this is complementary to #26993's (orthogonal) improvements for nested struct read performance.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing tests, plus manual benchmarking with both synthetic and realistic schemas (similar to the ones in #26993). I've seen ~10%+ improvements in scan performance on certain real-world datasets.
Closes#27089 from JoshRosen/joshrosen/more-ParquetRowConverter-optimizations.
Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR modifies `ParquetRowConverter` to remove unnecessary `InternalRow.copy()` calls for structs that are directly nested in other structs.
### Why are the changes needed?
These changes can significantly improve performance when reading Parquet files that contain deeply-nested structs with many fields.
The `ParquetRowConverter` uses per-field `Converter`s for handling individual fields. Internally, these converters may have mutable state and may return mutable objects. In most cases, each `converter` is only invoked once per Parquet record (this is true for top-level fields, for example). However, arrays and maps may call their child element converters multiple times per Parquet record: in these cases we must be careful to copy any mutable outputs returned by child converters.
In the existing code, `InternalRow`s are copied whenever they are stored into _any_ parent container (not just maps and arrays). This copying can be especially expensive for deeply-nested fields, since a deep copy is performed at every level of nesting.
This PR modifies the code to avoid copies for structs that are directly nested in structs; see inline code comments for an argument for why this is safe.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
**Correctness**: I added new test cases to `ParquetIOSuite` to increase coverage of nested structs, including structs nested in arrays: previously this suite didn't test that case, so we used to lack mutation coverage of this `copy()` code (the suite's tests still passed if I incorrectly removed the `.copy()` in all cases). I also added a test for maps with struct keys and modified the existing "map with struct values" test case include maps with two elements (since the incorrect omission of a `copy()` can only be detected if the map has multiple elements).
**Performance**: I put together a simple local benchmark demonstrating the performance problems:
First, construct a nested schema:
```scala
case class Inner(
f1: Int,
f2: Long,
f3: String,
f4: Int,
f5: Long,
f6: String,
f7: Int,
f8: Long,
f9: String,
f10: Int
)
case class Wrapper1(inner: Inner)
case class Wrapper2(wrapper1: Wrapper1)
case class Wrapper3(wrapper2: Wrapper2)
```
`Wrapper3`'s schema looks like:
```
root
|-- wrapper2: struct (nullable = true)
| |-- wrapper1: struct (nullable = true)
| | |-- inner: struct (nullable = true)
| | | |-- f1: integer (nullable = true)
| | | |-- f2: long (nullable = true)
| | | |-- f3: string (nullable = true)
| | | |-- f4: integer (nullable = true)
| | | |-- f5: long (nullable = true)
| | | |-- f6: string (nullable = true)
| | | |-- f7: integer (nullable = true)
| | | |-- f8: long (nullable = true)
| | | |-- f9: string (nullable = true)
| | | |-- f10: integer (nullable = true)
```
Next, generate some fake data:
```scala
val data = spark.range(1, 1000 * 1000 * 25, 1, 1).map { i =>
Wrapper3(Wrapper2(Wrapper1(Inner(
i.toInt,
i * 2,
(i * 3).toString,
(i * 4).toInt,
i * 5,
(i * 6).toString,
(i * 7).toInt,
i * 8,
(i * 9).toString,
(i * 10).toInt
))))
}
data.write.mode("overwrite").parquet("/tmp/parquet-test")
```
I then ran a simple benchmark consisting of
```
spark.read.parquet("/tmp/parquet-test").selectExpr("hash(*)").rdd.count()
```
where the `hash(*)` is designed to force decoding of all Parquet fields but avoids `RowEncoder` costs in the `.rdd.count()` stage.
In the old code, expensive copying takes place at every level of nesting; this is apparent in the following flame graph:
![image](https://user-images.githubusercontent.com/50748/71389014-88a15380-25af-11ea-9537-3e87a2aef179.png)
After this PR's changes, the above toy benchmark runs ~30% faster.
Closes#26993 from JoshRosen/joshrosen/faster-parquet-nested-scan-by-avoiding-copies.
Lead-authored-by: Josh Rosen <rosenville@gmail.com>
Co-authored-by: Josh Rosen <joshrosen@stripe.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR tries to make conflict attributes resolution in `ResolveReferences` more scalable by doing resolution in batch way.
### Why are the changes needed?
Currently, `ResolveReferences` rule only resolves conflict attributes of one single conflict plan pair in one iteration, which can be inefficient when there're many conflicts.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Covered by existed tests.
Closes#27105 from Ngone51/resolve-conflict-columns-in-batch.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR adds a note that UserDefinedFunction's constructor is private.
### Why are the changes needed?
To match with Scala side. Scala side does not have it at all.
### Does this PR introduce any user-facing change?
Doc only changes but it declares UserDefinedFunction's constructor is private explicitly.
### How was this patch tested?
Jenkins
Closes#27101 from HyukjinKwon/SPARK-30430.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
PySpark UDF to convert MLlib vectors to dense arrays.
Example:
```
from pyspark.ml.functions import vector_to_array
df.select(vector_to_array(col("features"))
```
### Why are the changes needed?
If a PySpark user wants to convert MLlib sparse/dense vectors in a DataFrame into dense arrays, an efficient approach is to do that in JVM. However, it requires PySpark user to write Scala code and register it as a UDF. Often this is infeasible for a pure python project.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT.
Closes#26910 from WeichenXu123/vector_to_array.
Authored-by: WeichenXu <weichen.xu@databricks.com>
Signed-off-by: Xiangrui Meng <meng@databricks.com>
### What changes were proposed in this pull request?
This patch fixes flaky tests "master/worker web ui available" & "master/worker web ui available with reverseProxy" in MasterSuite.
Tracking back from stack trace below,
```
19/12/19 13:48:39.160 dispatcher-event-loop-4 INFO Worker: WorkerWebUI is available at http://localhost:8080/proxy/worker-20191219
134839-localhost-36054
19/12/19 13:48:39.296 WorkerUI-52072 WARN JettyUtils: GET /json/ failed: java.lang.NullPointerException
java.lang.NullPointerException
at org.apache.spark.deploy.worker.ui.WorkerPage.renderJson(WorkerPage.scala:39)
at org.apache.spark.ui.WebUI.$anonfun$attachPage$2(WebUI.scala:91)
at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
```
there's possible race condition in `Dispatcher.registerRpcEndpoint()`:
481fb63f97/core/src/main/scala/org/apache/spark/rpc/netty/Dispatcher.scala (L64-L77)
`getMessageLoop()` initializes a new Inbox for this endpoint for both DedicatedMessageLoop
and SharedMessageLoop, which calls `onStart()` "asynchronously" and "eventually" via posting `OnStart` message. `onStart()` will initialize UI page instance(s), so the execution of `endpointRefs.put()` and initializing UI page instance(s) are "concurrent".
MasterPage and WorkerPage retrieve endpoint ref and store it as "val" assuming endpoint ref is valid when they're initialized - so in bad case they could store "null" as endpoint ref, and don't change.
481fb63f97/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala (L33-L38)481fb63f97/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerPage.scala (L35-L41)
This patch breaks down the step to `find the right message loop` and `register endpoint to message loop`, and ensure endpoint ref is set "before" registering endpoint to message loop.
### Why are the changes needed?
We observed the test failures from Jenkins; below are the links:
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115583/testReport/https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/115700/testReport/
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs.
You can also reproduce the bug consistently via adding `Thread.sleep(1000)` just before `endpointRefs.put(endpoint, endpointRef)` in `Dispatcher.registerRpcEndpoint(...)`.
Closes#27010 from HeartSaVioR/SPARK-30313.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
Adding a `LogicalWriteInfo` interface as suggested by cloud-fan in https://github.com/apache/spark/pull/25990#issuecomment-555132991
### Why are the changes needed?
It provides compile-time guarantees where we previously had none, which will make it harder to introduce bugs in the future.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Compiles and passes tests
Closes#26678 from edrevo/add-logical-write-info.
Lead-authored-by: Ximo Guanter <joaquin.guantergonzalbez@telefonica.com>
Co-authored-by: Ximo Guanter
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Current catalyst rewrite non-correlated exists subquery to BroadcastNestLoopJoin, it's performance is not good , now we rewrite non-correlated EXISTS subquery to ScalaSubquery to optimize the performance.
We rewrite
```
WHERE EXISTS (SELECT A FROM TABLE B WHERE COL1 > 10)
```
to
```
WHERE (SELECT 1 FROM (SELECT A FROM TABLE B WHERE COL1 > 10) LIMIT 1) IS NOT NULL
```
to avoid build join to solve EXISTS expression.
### Why are the changes needed?
Optimize EXISTS performance.
### Does this PR introduce any user-facing change?
NO
### How was this patch tested?
Manuel Tested
Closes#26437 from AngersZhuuuu/SPARK-29800.
Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
1, fix `BaggedPoint.convertToBaggedRDD` when `subsamplingRate < 1.0`
2, reorg `RandomForest.runWithMetadata` btw
### Why are the changes needed?
In GBT, Instance weights will be discarded if subsamplingRate<1
1, `baggedPoint: BaggedPoint[TreePoint]` is used in the tree growth to find best split;
2, `BaggedPoint[TreePoint]` contains two weights:
```scala
class BaggedPoint[Datum](val datum: Datum, val subsampleCounts: Array[Int], val sampleWeight: Double = 1.0)
class TreePoint(val label: Double, val binnedFeatures: Array[Int], val weight: Double)
```
3, only the var `sampleWeight` in `BaggedPoint` is used, the var `weight` in `TreePoint` is never used in finding splits;
4, The method `BaggedPoint.convertToBaggedRDD` was changed in https://github.com/apache/spark/pull/21632, it was only for decisiontree, so only the following code path was changed;
```
if (numSubsamples == 1 && subsamplingRate == 1.0) {
convertToBaggedRDDWithoutSampling(input, extractSampleWeight)
}
```
5, In https://github.com/apache/spark/pull/25926, I made GBT support weights, but only test it with default `subsamplingRate==1`.
GBT with `subsamplingRate<1` will convert treePoints to baggedPoints via
```scala
convertToBaggedRDDSamplingWithoutReplacement(input, subsamplingRate, numSubsamples, seed)
```
in which the orignial weights from `weightCol` will be discarded and all `sampleWeight` are assigned default 1.0;
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
updated testsuites
Closes#27070 from zhengruifeng/gbt_sampling.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
make FMClassifier/Regressor call super class method extractLabeledPoints
### Why are the changes needed?
code reuse
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing tests
Closes#27093 from huaxingao/spark-FM.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
SQLCOnf Doc updated.
### Why are the changes needed?
Some doc comments were not written properly. Space was missing at many places. This patch updates the doc.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Documentation update.
Closes#27091 from iRakson/SQLConfDoc.
Authored-by: root1 <raksonrakesh@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
use `.ml.Summarizer` instead of `.mllib.MultivariateOnlineSummarizer` to avoid computation of unused metrics
### Why are the changes needed?
to avoid computation of unused metrics
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing testsuites
Closes#27059 from zhengruifeng/pac_summarizer.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Check before caching zippedData (as suggested in https://github.com/apache/spark/pull/26483#issuecomment-569702482).
### Why are the changes needed?
If the `data` is already cached before calling `run` method of `KMeans` then `zippedData.persist()` will hurt the performance. Hence, persisting it conditionally.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Manually.
Closes#27052 from amanomer/29823followup.
Authored-by: Aman Omer <amanomer1996@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Revert https://github.com/apache/spark/pull/20433 .
### Why are the changes needed?
According to the SQL standard, the INTERVAL prefix is required:
```
<interval literal> ::=
INTERVAL [ <sign> ] <interval string> <interval qualifier>
<interval string> ::=
<quote> <unquoted interval string> <quote>
```
### Does this PR introduce any user-facing change?
yes, but omitting the INTERVAL prefix is a new feature in 3.0
### How was this patch tested?
existing tests
Closes#27080 from cloud-fan/interval.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
The Deserializer assumed that avro arrays are always of type `GenericData$Array` which is not the case.
Assuming they are from java.util.List is safer and fixes a ClassCastException in some avro code.
### What changes were proposed in this pull request?
Java.util.List has all the necessary methods and is the base class of GenericData$Array.
### Why are the changes needed?
To prevent the following exception in more complex avro objects:
```
java.lang.ClassCastException: java.util.ArrayList cannot be cast to org.apache.avro.generic.GenericData$Array
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$19(AvroDeserializer.scala:170)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$newWriter$19$adapted(AvroDeserializer.scala:169)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1(AvroDeserializer.scala:314)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$1$adapted(AvroDeserializer.scala:310)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2(AvroDeserializer.scala:332)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$getRecordWriter$2$adapted(AvroDeserializer.scala:329)
at org.apache.spark.sql.avro.AvroDeserializer.$anonfun$converter$3(AvroDeserializer.scala:56)
at org.apache.spark.sql.avro.AvroDeserializer.deserialize(AvroDeserializer.scala:70)
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
The current tests already test this behavior. In essesence this patch just changes a type case to a more basic type. So I expect no functional impact.
Closes#26907 from steven-aerts/spark-30267.
Authored-by: Steven Aerts <steven.aerts@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
Make ```MultilayerPerceptronClassificationModel``` extend ```MultilayerPerceptronParams```
### Why are the changes needed?
Make ```MultilayerPerceptronClassificationModel``` extend ```MultilayerPerceptronParams``` to expose the training params, so user can see these params when calling ```extractParamMap```
### Does this PR introduce any user-facing change?
Yes. The ```MultilayerPerceptronParams``` such as ```seed```, ```maxIter``` ... are available in ```MultilayerPerceptronClassificationModel``` now
### How was this patch tested?
Manually tested ```MultilayerPerceptronClassificationModel.extractParamMap()``` to verify all the new params are there.
Closes#26838 from huaxingao/spark-30144.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
expose predictRaw and predictProbability on Python side
### Why are the changes needed?
to keep parity between scala and python
### Does this PR introduce any user-facing change?
Yes. Expose python ```predictRaw``` and ```predictProbability```
### How was this patch tested?
doctest
Closes#27082 from huaxingao/spark-30358.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
Using compound operations as well as increments and decrements on primitive fields are not atomic operations. Here when volatile primitive field is incremented or decremented, we run into data loss if threads interleave in steps of update.
Refer: https://wiki.sei.cmu.edu/confluence/display/java/VNA02-J.+Ensure+that+compound+operations+on+shared+variables+are+atomic
### What changes were proposed in this pull request?
Using `AtomicLong` instead of `long`
### Why are the changes needed?
volatile primitive field is incremented or decremented, we run into data loss if threads interleave in steps of update.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
All Existing UT can pass with the Change
Closes#27071 from ajithme/atomic.
Authored-by: Ajith <ajith2489@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
Remove `executorsPendingToRemove.clear()` from `CoarseGrainedSchedulerBackend.reset()`.
### Why are the changes needed?
Clear `executorsPendingToRemove` before remove executors will cause all tasks running on those "pending to remove" executors to count failures. But that's not true for the case of `executorsPendingToRemove(execId)=true`.
Besides, `executorsPendingToRemove` will be cleaned up within `removeExecutor()` at the end just as same as `executorsPendingLossReason`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Added a new test in `TaskSetManagerSuite`.
Closes#27017 from Ngone51/dont-clear-eptr-in-reset.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
1. For `ScanOperation`, if it collects more than one filters, then all filters must be deterministic. And filter can be non-deterministic iff there's only one collected filter.
2. `FileSourceStrategy` should filter out non-deterministic filter, as it will hit haven't initialized exception if it's a partition related filter.
### Why are the changes needed?
Strictly follow `CombineFilters`'s behavior which doesn't allow combine two filters where non-deterministic predicates exist. And avoid hitting exception for file source.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Test exists.
Closes#27073 from Ngone51/SPARK-29768-FOLLOWUP.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This bug manifested itself when another stream would potentially make a call
to NioBufferedFileInputStream.read() after it had reached EOF in the wrapped
stream. In that case, the refill() code would clear the output buffer the
first time EOF was found, leaving it in a readable state for subsequent
read() calls. If any of those calls were made, bad data would be returned.
By flipping the buffer before returning, even in the EOF case, you get the
correct behavior in subsequent calls. I picked that approach to avoid keeping
more state in this class, although it means calling the underlying stream
even after EOF (which is fine, but perhaps a little more expensive).
This showed up (at least) when using encryption, because the commons-crypto
StreamInput class does not track EOF internally, leaving it for the wrapped
stream to behave correctly.
Tested with added unit test + slightly modified test case attached to SPARK-18105.
Closes#27084 from vanzin/SPARK-30225.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Currently, we have a v2 adapter for v1 catalog (`V2SessionCatalog`), all the table/namespace commands can be implemented via v2 APIs.
Usually, a command needs to know which catalog it needs to operate, but different commands have different requirements about what to resolve. A few examples:
- `DROP NAMESPACE`: only need to know the name of the namespace.
- `DESC NAMESPACE`: need to lookup the namespace and get metadata, but is done during execution
- `DROP TABLE`: need to do lookup and make sure it's a table not (temp) view.
- `DESC TABLE`: need to lookup the table and get metadata.
For namespaces, the analyzer only needs to find the catalog and the namespace name. The command can do lookup during execution if needed.
For tables, mostly commands need the analyzer to do lookup.
Note that, table and namespace have a difference: `DESC NAMESPACE testcat` works and describes the root namespace under `testcat`, while `DESC TABLE testcat` fails if there is no table `testcat` under the current catalog. It's because namespaces can be named [], but tables can't. The commands should explicitly specify it needs to operate on namespace or table.
In this Pull Request, we introduce a new framework to resolve v2 commands:
1. parser creates logical plans or commands with `UnresolvedNamespace`/`UnresolvedTable`/`UnresolvedView`/`UnresolvedRelation`. (CREATE TABLE still keeps Seq[String], as it doesn't need to look up relations)
2. analyzer converts
2.1 `UnresolvedNamespace` to `ResolvesNamespace` (contains catalog and namespace identifier)
2.2 `UnresolvedTable` to `ResolvedTable` (contains catalog, identifier and `Table`)
2.3 `UnresolvedView` to `ResolvedView` (will be added later when we migrate view commands)
2.4 `UnresolvedRelation` to relation.
3. an extra analyzer rule to match commands with `V1Table` and converts them to corresponding v1 commands. This will be added later when we migrate existing commands
4. planner matches commands and converts them to the corresponding physical nodes.
We also introduce brand new v2 commands - the `comment` syntaxes to illustrate how to work with the newly added framework.
```sql
COMMENT ON (DATABASE|SCHEMA|NAMESPACE) ... IS ...
COMMENT ON TABLE ... IS ...
```
Details about the `comment` syntaxes:
As the new design of catalog v2, some properties become reserved, e.g. `location`, `comment`. We are going to disable setting reserved properties by dbproperties or tblproperites directly to avoid confliction with their related subClause or specific commands.
They are the best practices from PostgreSQL and presto.
https://www.postgresql.org/docs/12/sql-comment.htmlhttps://prestosql.io/docs/current/sql/comment.html
Mostly, the basic thoughts of the new framework came from the discussions bellow with cloud-fan, https://github.com/apache/spark/pull/26847#issuecomment-564510061,
### Why are the changes needed?
To make it easier to add new v2 commands, and easier to unify the table relation behavior.
### Does this PR introduce any user-facing change?
yes, add new syntax
### How was this patch tested?
add uts.
Closes#26847 from yaooqinn/SPARK-30214.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Needs to improve the Column name and tooltips for the Fair Scheduler Pool Table.
### Why are the changes needed?
Need to correct SchedulingMode column name to -> 'Scheduling Mode' and tooltips need to add for Minimum Share, Pool Weight and Scheduling Mode (require meaning full Tool tips for the end user to understand.)
### Does this PR introduce any user-facing change?
YES
![Screenshot 2020-01-03 at 10 10 47 AM](https://user-images.githubusercontent.com/8948111/71707687-7aee9800-2e11-11ea-93cc-52df0b9114dd.png)
### How was this patch tested?
Manual Testing.
Closes#27047 from 07ARB/SPARK-30384.
Lead-authored-by: 07ARB <ankitrajboudh@gmail.com>
Co-authored-by: Ankitraj <8948111+07ARB@users.noreply.github.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
### What changes were proposed in this pull request?
In the PR, I propose to add the `SuppressWarnings("deprecation")` annotation to Java tests for deprecated Spark SQL APIs.
### Why are the changes needed?
This eliminates the following warnings:
```
sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
Warning:Warning:line (32)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (91)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (100)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (109)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (118)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
sql/core/src/test/java/test/org/apache/spark/sql/Java8DatasetAggregatorSuite.java
Warning:Warning:line (28)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (37)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (46)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (55)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
Warning:Warning:line (64)java: org.apache.spark.sql.expressions.javalang.typed in org.apache.spark.sql.expressions.javalang has been deprecated
sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java
Warning:Warning:line (478)java: json(org.apache.spark.api.java.JavaRDD<java.lang.String>) in org.apache.spark.sql.DataFrameReader has been deprecated
```
and highlights warnings about real problems.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By existing test suites `Java8DatasetAggregatorSuite.java`, `JavaDataFrameSuite.java` and `JavaDatasetAggregatorSuite.java`.
Closes#27081 from MaxGekk/eliminate-warnings-part2.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
In the PR, I propose to throw `AnalysisException` when a removed SQL config is set to non-default value. The following SQL configs removed by #26559 are marked as removed:
1. `spark.sql.fromJsonForceNullableSchema`
2. `spark.sql.legacy.compareDateTimestampInTimestamp`
3. `spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation`
### Why are the changes needed?
To improve user experience with Spark SQL by notifying of removed SQL configs used by users.
### Does this PR introduce any user-facing change?
Yes, before the `set` command was silently ignored:
```sql
spark-sql> set spark.sql.fromJsonForceNullableSchema=false;
spark.sql.fromJsonForceNullableSchema false
```
after the exception should be raised:
```sql
spark-sql> set spark.sql.fromJsonForceNullableSchema=false;
Error in query: The SQL config 'spark.sql.fromJsonForceNullableSchema' was removed in the version 3.0.0. It was removed to prevent errors like SPARK-23173 for non-default value.;
```
### How was this patch tested?
Added new tests into `SQLConfSuite` for both cases when removed SQL configs are set to default and non-default values.
Closes#27057 from MaxGekk/remove-sql-configs-followup.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
There is a deadlock between `LiveListenerBus#stop` and `AsyncEventQueue#removeListenerOnError`.
We can reproduce as follows:
1. Post some events to `LiveListenerBus`
2. Call `LiveListenerBus#stop` and hold the synchronized lock of `bus`(5e92301723/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala (L229)), waiting until all the events are processed by listeners, then remove all the queues
3. Event queue would drain out events by posting to its listeners. If a listener is interrupted, it will call `AsyncEventQueue#removeListenerOnError`, inside it will call `bus.removeListener`(7b1b60c758/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala (L207)), trying to acquire synchronized lock of bus, resulting in deadlock
This PR removes the `synchronized` from `LiveListenerBus.stop` because underlying data structures themselves are thread-safe.
### Why are the changes needed?
To fix deadlock.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
New UT.
Closes#26924 from wangshuo128/event-queue-race-condition.
Authored-by: Wang Shuo <wangshuo128@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
### What changes were proposed in this pull request?
ShutdownHook of YarnClientSchedulerBackend prints just "Stopped" which can be improved to "YarnClientSchedulerBackend Stopped" for better understanding.
### Why are the changes needed?
While stopping or gracefully exiting the spark-shell/spark-sql --master yarn, only printing `stopped` is useless.
### Does this PR introduce any user-facing change?
Yes. Log info message change.
### How was this patch tested?
Manually
Closes#27049 from jobitmathew/imp_stop_message.
Authored-by: Jobit Mathew <jobit.mathew@huawei.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
1. For the interval arithmetic functions, e.g. `add`/`subtract`/`negative`/`multiply`/`divide`, enable overflow check when `ANSI` is on.
2. For `multiply`/`divide`, throw an exception when an overflow happens in spite of `ANSI` is on/off.
3. `add`/`subtract`/`negative` stay the same for backward compatibility.
4. `divide` by 0 throws ArithmeticException whether `ANSI` or not as same as numerics.
5. These behaviors fit the numeric type operations fully when ANSI is on.
6. These behaviors fit the numeric type operations fully when ANSI is off, except 2 and 4.
### Why are the changes needed?
1. bug fix
2. `ANSI` support
### Does this PR introduce any user-facing change?
When `ANSI` is on, interval `add`/`subtract`/`negative`/`multiply`/`divide` will overflow if any field overflows
### How was this patch tested?
add unit tests
Closes#26995 from yaooqinn/SPARK-30341.
Authored-by: Kent Yao <yaooqinn@hotmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Revert https://github.com/apache/spark/pull/26338 , as the syntax is actually the [hive style ALTER COLUMN](https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-ChangeColumnName/Type/Position/Comment).
This PR brings it back, and make it support multi-catalog:
1. renaming is not allowed as `AlterTableAlterColumnStatement` can't do renaming.
2. column name should be multi-part
### Why are the changes needed?
to not break hive compatibility.
### Does this PR introduce any user-facing change?
no, as the removal was merged in 3.0.
### How was this patch tested?
new parser tests
Closes#27076 from cloud-fan/alter.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Update CREATE VIEW command to store the current catalog and namespace instead of current database in view metadata. Also update analyzer to leverage the catalog and namespace in view metastore to resolve relations inside views.
Note that, this PR still keeps the way we resolve views, by recursively calling Analyzer. This is necessary because view text may contain CTE, window spec, etc. which needs rules outside of the main resolution batch (e.g. `CTESubstitution`)
### Why are the changes needed?
To resolve relations inside view correctly.
### Does this PR introduce any user-facing change?
Yes, fix a bug. Now tables referred by a view can be resolved correctly even if the current catalog/namespace has been updated.
### How was this patch tested?
a new test
Closes#26923 from cloud-fan/view.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This PR aim to fix the NoSuchElementException exception when enable AQE with insubquery expression.
### Why are the changes needed?
Fix exception
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
added new ut
Closes#27068 from JkSelf/fixSubqueryIssue.
Authored-by: jiake <ke.a.jia@intel.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
This patch is based on #23921 but revised to be simpler, as well as adds UT to test the behavior.
(This patch contains the commit from #23921 to retain credit.)
Spark loads new JARs for `ADD JAR` and `CREATE FUNCTION ... USING JAR` into jar classloader in shared state, and changes current thread's context classloader to jar classloader as many parts of remaining codes rely on current thread's context classloader.
This would work if the further queries will run in same thread and there's no change on context classloader for the thread, but once the context classloader of current thread is switched back by various reason, Spark fails to create instance of class for the function.
This bug mostly affects spark-shell, as spark-shell will roll back current thread's context classloader at every prompt. But it may also affects the case of job-server, where the queries may be running in multiple threads.
This patch fixes the issue via switching the context classloader to the classloader which loads the class. Hopefully FunctionBuilder created by `makeFunctionBuilder` has the information of Class as a part of closure, hence the Class itself can be provided regardless of current thread's context classloader.
### Why are the changes needed?
Without this patch, end users cannot execute Hive UDF using JAR twice in spark-shell.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
New UT.
Closes#27025 from HeartSaVioR/SPARK-26560-revised.
Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Co-authored-by: nivo091 <nivedeeta.singh@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Change config name from `spark.sql.legacy.typeCoercion.datetimeToString` to `spark.sql.legacy.typeCoercion.datetimeToString.enabled`.
### Why are the changes needed?
To follow the other boolean conf naming convention.
### Does this PR introduce any user-facing change?
No, it's newly added in Spark 3.0.
### How was this patch tested?
Pass Jenkins
Closes#27065 from Ngone51/SPARK-27638-FOLLOWUP.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Change config name from `spark.sql.optimizer.reassignLambdaVariableID` to `spark.sql.optimizer.reassignLambdaVariableID.enabled`.
### Why are the changes needed?
To follow the other boolean conf naming convention.
### Does this PR introduce any user-facing change?
No, it's newly added in Spark 3.0.
### How was this patch tested?
Pass Jenkins.
Closes#27063 from Ngone51/SPARK-27871-FOLLOWUP.
Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Calls of `requireNonStaticConf()` are removed from the `set()` methods in RuntimeConfig because those methods invoke `def set(key: String, value: String): Unit` where `requireNonStaticConf()` is called as well.
### Why are the changes needed?
To avoid unnecessary calls of `requireNonStaticConf()`.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
By existing tests from `SQLConfSuite`
Closes#27062 from MaxGekk/call-requireNonStaticConf-once.
Authored-by: Maxim Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Documentation added for refresh resources command in spark-sql.
### Why are the changes needed?
Previously, only refresh table command was documented.
### Does this PR introduce any user-facing change?
Yes. Now users can access documentation for refresh resources command.
### How was this patch tested?
Manually.
Closes#27023 from iRakson/SPARK-30363.
Authored-by: root1 <raksonrakesh@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
There're too many classes placed in a single package "org.apache.spark.sql.kafka010" which classes can be grouped by purpose.
As a part of change in SPARK-21869 (#26845), we moved out producer related classes to "org.apache.spark.sql.kafka010.producer" and only expose necessary classes/methods to the outside of package. This patch applies the same to consumer related classes.
### Why are the changes needed?
Described above.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs.
Closes#26991 from HeartSaVioR/SPARK-30336.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
### What changes were proposed in this pull request?
1, add new foreach-like methods: foreach/foreachNonZero
2, add iterator: iterator/activeIterator/nonZeroIterator
### Why are the changes needed?
see the [ticke](https://issues.apache.org/jira/browse/SPARK-30329) for details
foreach/foreachNonZero: for both convenience and performace (SparseVector.foreach should be faster than current traversal method)
iterator/activeIterator/nonZeroIterator: add the three iterators, so that we can futuremore add/change some impls based on those iterators for both ml and mllib sides, to avoid vector conversions.
### Does this PR introduce any user-facing change?
Yes, new methods are added
### How was this patch tested?
added testsuites
Closes#26982 from zhengruifeng/vector_iter.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
add instr.logSumOfWeights in the Algo that has weightCol support
### Why are the changes needed?
Many algorithms support weightCol now. I think weightsum is useful info to add to the log.
### Does this PR introduce any user-facing change?
no
### How was this patch tested?
manually tested
Closes#26972 from huaxingao/spark-30321.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
Refactor `RandomForest.findSplits` by applying `aggregateByKey` instead of `groupByKey`
### Why are the changes needed?
Current impl of `RandomForest.findSplits` uses `groupByKey` to collect non-zero values for each feature, so it is quite dangerous.
After looking into the following logic to find splits, I found that collecting all non-zero values is not necessary, and we only need weightSums of distinct values.
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
existing testsuites
Closes#27040 from zhengruifeng/rf_opt.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
add getter/setter in Python FM
### Why are the changes needed?
to be consistent with other algorithms
### Does this PR introduce any user-facing change?
Yes.
add getter/setter in Python FMRegressor/FMRegressionModel/FMClassifier/FMClassificationModel
### How was this patch tested?
doctest
Closes#27044 from huaxingao/spark-30378.
Authored-by: Huaxin Gao <huaxing@us.ibm.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
1, expose predictRaw and predictProbability
2, add tests
### Why are the changes needed?
single instance prediction is useful out of spark, specially for online prediction.
Current `predict` is exposed, but it is not enough.
### Does this PR introduce any user-facing change?
Yes, new methods are exposed
### How was this patch tested?
added testsuites
Closes#27015 from zhengruifeng/expose_raw_prob.
Authored-by: zhengruifeng <ruifengz@foxmail.com>
Signed-off-by: zhengruifeng <ruifengz@foxmail.com>
### What changes were proposed in this pull request?
This patch proposes to only convert first few elements of collection accumulators in `LiveEntityHelpers.newAccumulatorInfos`.
### Why are the changes needed?
One Spark job on our cluster uses collection accumulator to collect something and has encountered an exception like:
```
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:3332)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at java.lang.StringBuilder.append(StringBuilder.java:131)
at java.util.AbstractCollection.toString(AbstractCollection.java:462)
at java.util.Collections$UnmodifiableCollection.toString(Collections.java:1035)
at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2$$anonfun$apply$3.apply(LiveEntity.scala:596)
at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2$$anonfun$apply$3.apply(LiveEntity.scala:596)
at scala.Option.map(Option.scala:146)
at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2.apply(LiveEntity.scala:596)
at org.apache.spark.status.LiveEntityHelpers$$anonfun$newAccumulatorInfos$2.apply(LiveEntity.scala:591)
```
`LiveEntityHelpers.newAccumulatorInfos` converts `AccumulableInfo`s to `v1.AccumulableInfo` by calling `toString` on accumulator's value. For collection accumulator, it might take much more memory when in string representation, for example, collection accumulator of long values, and cause OOM (in this job, the driver memory is 6g).
Looks like the results of `newAccumulatorInfos` are used in api and ui. For such usage, it also does not make sense to have very long string of complete collection accumulators.
### Does this PR introduce any user-facing change?
Yes. Collection accumulator now only shows first few elements in api and ui.
### How was this patch tested?
Unit test.
Manual test. Launched a Spark shell, ran:
```scala
val accum = sc.collectionAccumulator[Long]("Collection Accumulator Example")
sc.range(0, 10000, 1, 1).foreach(x => accum.add(x))
accum.value
```
<img width="2533" alt="Screen Shot 2019-12-30 at 2 03 43 PM" src="https://user-images.githubusercontent.com/68855/71602488-6eb2c400-2b0d-11ea-8725-dba36478198f.png">
Closes#27038 from viirya/partial-collect-accu.
Lead-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Co-authored-by: Liang-Chi Hsieh <liangchi@uber.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
Currently if function lookup fails, spark will give it a second change by casting decimal type to double type. But for cases where decimal type doesn't exist, it's meaningless to lookup again and causes extra cost like unnecessary metastore access. We should throw exceptions directly in these cases.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Covered by existing tests.
Closes#26994 from wzhfy/avoid_udf_fail_twice.
Authored-by: Zhenhua Wang <wzh_zju@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This patch fixes a small bug in the example of streaming query, as the type of observable metrics is Java Map instead of Scala Map, so to use foreach it should be converted first.
### Why are the changes needed?
Described above.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Ran below query via `spark-shell`:
**Streaming**
```scala
import scala.collection.JavaConverters._
import scala.util.Random
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.asScala.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
println(s"alert! error ratio: $ratio")
}
}
}
def onQueryStarted(event: QueryStartedEvent): Unit = {}
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
})
val rates = spark
.readStream
.format("rate")
.option("rowsPerSecond", 10)
.load
val rand = new Random()
val df = rates.map { row => (row.getLong(1), if (row.getLong(1) % 2 == 0) "error" else null) }.toDF
val ds = df.selectExpr("_1 AS id", "_2 AS error")
// Observe row count (rc) and error row count (erc) in the batch Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("console").start()
```
Closes#27046 from HeartSaVioR/SPARK-29348-FOLLOWUP.
Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>