## What changes were proposed in this pull request?
Don't use internal Spark logging in user examples, because users shouldn't / can't use it directly anyway. These examples already use println in some cases. Note that the usage in StreamingExamples is on purpose.
## How was this patch tested?
N/A
Closes#24649 from srowen/ExampleLog.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This PR excludes 'hive-thriftserver' in modules to test for hadoop3.2 for now as well
## How was this patch tested?
Manually tested via `run-tests.py`
Closes#24644 from HyukjinKwon/SPARK-27402.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This pr makes it support collect statistics when CTAS(create a data source table using the result of a query).
## How was this patch tested?
unit tests and manual tests:
```sql
bin/spark-sql --conf spark.sql.statistics.size.autoUpdate.enabled=true -S
spark-sql> CREATE TABLE spark_27694 USING parquet AS SELECT 'a', 'b';
spark-sql> DESC FORMATTED spark_27694;
a string NULL
b string NULL
# Detailed Table Information
Database default
Table spark_27694
Owner root
Created Time Mon May 13 19:45:33 GMT-07:00 2019
Last Access Wed Dec 31 17:00:00 GMT-07:00 1969
Created By Spark 3.0.0-SNAPSHOT
Type MANAGED
Provider parquet
Statistics 561 bytes
Location file:/user/hive/warehouse/spark_27694
Serde Library org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
InputFormat org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
```
Closes#24596 from wangyum/SPARK-27694.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Add a SQL config property for the default v2 catalog.
Existing tests for regressions.
Closes#24594 from rdblue/SPARK-27693-add-default-catalog-config.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This pr removes duplicate declaration of plugin `org.apache.maven.plugins:maven-antrun-plugin`:
```
[WARNING] Some problems were encountered while building the effective model for org.apache.spark:spark-network-yarn_2.12🫙3.0.0-SNAPSHOT
[WARNING] 'build.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-antrun-plugin line 177, column 15
[WARNING]
[WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
[WARNING]
[WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
[WARNING]
```
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/105523/consoleFull
## How was this patch tested?
Existing test
Closes#24641 from wangyum/SPARK-27610.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Both look added as of 2.0 (see SPARK-12541 and SPARK-12706). I referred existing docs and examples in other API docs.
## How was this patch tested?
Manually built the documentation and, by running examples, by running `DESCRIBE FUNCTION EXTENDED`.
Closes#24642 from HyukjinKwon/SPARK-27771.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Spark on k8s supports config for specifying the executor cpu requests
(spark.kubernetes.executor.request.cores) but a similar config is missing
for the driver. Instead, currently `spark.driver.cores` value is used for integer value.
Although `pod spec` can have `cpu` for the fine-grained control like the following, this PR proposes additional configuration `spark.kubernetes.driver.request.cores` for driver request cores.
```
resources:
requests:
memory: "64Mi"
cpu: "250m"
```
## How was this patch tested?
Unit tests
Closes#24630 from arunmahadevan/SPARK-27754.
Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull requesst?
If we set `hive.exec.stagingdir=.test-staging\tmp`,
But the staging directory is still `.hive-staging` on Windows OS.
Reasons for failure:
Test code:
```
val path = new Path("C:\\test\\hivetable")
println("path.toString: " + path.toString)
println("path.toUri.getPath: " + path.toUri.getPath)
```
Output:
```
path.toString: C:/test/hivetable
path.toUri.getPath: /C:/test/hivetable
```
We can see that `path.toUri.getPath` has one more separator than `path.toString`, and the separator is ' / ', not ' \ '
So `stagingPathName.stripPrefix(inputPathName).stripPrefix(File.separator).startsWith(".")` will return false
## How was this patch tested?
1. Existed tests
2. Manual testing on Windows OS
Closes#24446 from 10110346/stagingdir.
Authored-by: liuxian <liu.xian3@zte.com.cn>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
This PR aims to update `zstd-jni` library to `1.4.0-1` which improves the `level 1 compression speed` performance by 6% in most scenarios. The following is the full release note.
- https://github.com/facebook/zstd/releases/tag/v1.4.0
## How was this patch tested?
Pass the Jenkins.
Closes#24632 from dongjoon-hyun/SPARK-27755.
Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Currently, in `ParquetFilters` and `OrcFilters`, if the child predicate of `Or` operator can't be entirely pushed down, the predicates will be thrown away.
In fact, the conjunctive predicates under `Or` operators can be partially pushed down.
For example, says `a` and `b` are convertible, while `c` can't be pushed down, the predicate
`a or (b and c)`
can be converted as
`(a or b) and (a or c)`
We can still push down `(a or b)`.
We can't push down disjunctive predicates only when one of its children is not partially convertible.
This PR also improve the filter pushing down logic in `DataSourceV2Strategy`. With partial filter push down in `Or` operator, the result of `pushedFilters()` might not exist in the mapping `translatedFilterToExpr`. To fix it, this PR changes the mapping `translatedFilterToExpr` as leaf filter expression to `sources.filter`, and later on rebuild the whole expression with the mapping.
## How was this patch tested?
Unit test
Closes#24598 from gengliangwang/pushdownDisjunctivePredicates.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This PR upgrades lz4-java from 1.5.1 to 1.6.0. Lz4-java is available at https://github.com/lz4/lz4-java.
Changes from 1.5.1:
- Upgraded LZ4 to 1.9.1. Updated the JNI bindings, except for the one for Linux/i386. Decompression speed is improved on amd64.
- Deprecated use of LZ4FastDecompressor of a native instance because the corresponding C API function is deprecated. See the release note of LZ4 1.9.0 for details. Updated javadoc accordingly.
- Changed the module name from org.lz4.lz4-java to org.lz4.java to avoid using - in the module name. (severn-everett, Oliver Eikemeier, Rei Odaira)
- Enabled build with Java 11. Note that the distribution is still built with Java 7. (Rei Odaira)
## How was this patch tested?
Existing tests.
Closes#24629 from kiszk/SPARK-27752.
Authored-by: Kazuaki Ishizaki <ishizaki@jp.ibm.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Currently we have an analyzer rule, which resolves the output columns of data source v2 writing plans, to make sure the schema of input query is compatible with the table.
However, not all data sources need this check. For example, the `NoopDataSource` doesn't care about the schema of input query at all.
This PR introduces a new table capability: ACCEPT_ANY_SCHEMA. If a table reports this capability, we skip resolving output columns for it during write.
Note that, we already skip resolving output columns for `NoopDataSource` because it implements `SupportsSaveMode`. However, `SupportsSaveMode` is a hack and will be removed soon.
## How was this patch tested?
new test cases
Closes#24469 from cloud-fan/schema-check.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
Some APIs in Structured Streaming requires the user to specify an interval. Right now these APIs don't accept upper-case strings.
This PR adds a new method `fromCaseInsensitiveString` to `CalendarInterval` to support paring upper-case strings, and fixes all APIs that need to parse an interval string.
## How was this patch tested?
The new unit test.
Closes#24619 from zsxwing/SPARK-27735.
Authored-by: Shixiong Zhu <zsxwing@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
removed the unused "UnsafeKeyValueSorter.java" file
## How was this patch tested?
Ran Compilation and UT locally.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24622 from shivusondur/jira27722.
Authored-by: shivusondur <shivusondur@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
This is a followup of https://github.com/apache/spark/pull/20365 .
#20365 fixed this problem when the hint node is a root node. This PR fixes this problem for all the cases.
## How was this patch tested?
a new test
Closes#24580 from cloud-fan/bug.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
When we upgraded the built-in Hive to 2.3.4, the current `hive-thriftserver` module is not compatible, such as these Hive changes:
1. [HIVE-12442](https://issues.apache.org/jira/browse/HIVE-12442) HiveServer2: Refactor/repackage HiveServer2's Thrift code so that it can be used in the tasks
2. [HIVE-12237](https://issues.apache.org/jira/browse/HIVE-12237) Use slf4j as logging facade
3. [HIVE-13169](https://issues.apache.org/jira/browse/HIVE-13169) HiveServer2: Support delegation token based connection when using http transport
So this PR moves the incompatible code to `sql/hive-thriftserver/v1.2.1` and copies it to `sql/hive-thriftserver/v2.3.4` for the next code review.
## How was this patch tested?
manual tests:
```
diff -urNa sql/hive-thriftserver/v1.2.1 sql/hive-thriftserver/v2.3.4
```
Closes#24282 from wangyum/SPARK-27354.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
In the existing code, a broadcast execution timeout for the Future only causes a query failure, but the job running with the broadcast and the computation in the Future are not canceled. This wastes resources and slows down the other jobs. This PR tries to cancel both the running job and the running hashed relation construction thread.
## How was this patch tested?
Add new test suite `BroadcastExchangeExec`
Closes#24595 from jiangxb1987/SPARK-20774.
Authored-by: Xingbo Jiang <xingbo.jiang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
Kafka related Spark parameters has to start with `spark.kafka.` and not with `spark.sql.`. Because of this I've renamed `spark.sql.kafkaConsumerCache.capacity`.
Since Kafka consumer caching is not documented I've added this also.
## How was this patch tested?
Existing + added unit test.
```
cd docs
SKIP_API=1 jekyll build
```
and manual webpage check.
Closes#24590 from gaborgsomogyi/SPARK-27687.
Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This feature allows proxy servers to identify the actual request user
using a request parameter, and performs access control checks against
that user instead of the authenticated user. Impersonation is only
allowed if the authenticated user is configured as an admin.
The request parameter used ("doAs") matches the one currently used by
Knox, but it should be easy to change / customize if different proxy
servers use a different way of identifying the original user.
Tested with updated unit tests and also with a live server behind Knox.
Closes#24582 from vanzin/SPARK-27678.
Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
This replaces use of collection classes like `MutableList` and `ArrayStack` with workalikes that are available in 2.12, as they will be removed in 2.13. It also removes use of `.to[Collection]` as its uses was superfluous anyway. Removing `collection.breakOut` will have to wait until 2.13
## How was this patch tested?
Existing tests
Closes#24586 from srowen/SPARK-27682.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
`RecordBinaryComparator`, `UnsafeExternalRowSorter` and `UnsafeKeyValueSorter` now locates in catalyst, which should be moved to core, as they're used only in physical plan.
## How was this patch tested?
exist tests.
Closes#24607 from xianyinxin/SPARK-27713.
Authored-by: xy_xin <xianyin.xxy@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
`StructuredSessionization` comment contains duplicate 'add', I think it should be changed.
## How was this patch tested?
Exists UT.
Closes#24589 from beliefer/remove-duplicate-add-in-comment.
Lead-authored-by: gengjiaan <gengjiaan@360.cn>
Co-authored-by: Jiaan Geng <beliefer@163.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
This adds a v2 implementation for CTAS queries
* Update the SQL parser to parse CREATE queries using multi-part identifiers
* Update `CheckAnalysis` to validate partitioning references with the CTAS query schema
* Add `CreateTableAsSelect` v2 logical plan and `CreateTableAsSelectExec` v2 physical plan
* Update create conversion from `CreateTableAsSelectStatement` to support the new v2 logical plan
* Update `DataSourceV2Strategy` to convert v2 CTAS logical plan to the new physical plan
* Add `findNestedField` to `StructType` to support reference validation
## How was this patch tested?
We have been running these changes in production for several months. Also:
* Add a test suite `CreateTablePartitioningValidationSuite` for new analysis checks
* Add a test suite for v2 SQL, `DataSourceV2SQLSuite`
* Update catalyst `DDLParserSuite` to use multi-part identifiers (`Seq[String]`)
* Add test cases to `PlanResolutionSuite` for v2 CTAS: known catalog and v2 source implementation
Closes#24570 from rdblue/SPARK-24923-add-v2-ctas.
Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
We should remove materialized view first otherwise(note that Hive 3.1 could reproduce this issue):
```scala
Cause: org.apache.derby.shared.common.error.DerbySQLIntegrityConstraintViolationException: DELETE on table 'TBLS' caused a violation of foreign key constraint 'MV_TABLES_USED_FK2' for key (4). The statement has been rolled back.
at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.generateCsSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.wrapInSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.TransactionResourceImpl.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.ConnectionChild.handleException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeStatement(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedPreparedStatement.executeBatchElement(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedStatement.executeLargeBatch(Unknown Source)
```
## How was this patch tested?
Existing test
Closes#24592 from wangyum/SPARK-27690.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This removes usage of `Traversable`, which is removed in Scala 2.13. This is mostly an internal change, except for the change in the `SparkConf.setAll` method. See additional comments below.
## How was this patch tested?
Existing tests.
Closes#24584 from srowen/SPARK-27680.
Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
The Pull Request to add datatables to stage page SPARK-21809 got merged. The search functionality in those datatables being a great improvement for searching through a large number of tasks, also performs search over the raw data rather than the formatted data displayed in the tables. It would be great if the search can happen for the formatted data as well.
## What changes were proposed in this pull request?
Added code to enable searching over displayed data in tables e.g. searching on "165.7 MiB" or "0.3 ms" will now return the search results. Also, earlier we were missing search for two columns in the task table "Shuffle Read Bytes" as well as "Shuffle Remote Reads", which I have added here.
## How was this patch tested?
Manual Tests
Closes#24419 from pgandhi999/SPARK-25719.
Authored-by: pgandhi <pgandhi@verizonmedia.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Add in GPU and generic resource type allocation to the executors.
Note this is part of a bigger feature for gpu-aware scheduling and is just how the executor find the resources. The general flow :
- users ask for a certain set of resources, for instance number of gpus - each cluster manager has a specific way to do this.
- cluster manager allocates a container or set of resources (standalone mode)
- When spark launches the executor in that container, the executor either has to be told what resources it has or it has to auto discover them.
- Executor has to register with Driver and tell the driver the set of resources it has so the scheduler can use that to schedule tasks that requires a certain amount of each of those resources
In this pr I added configs and arguments to the executor to be able discover resources. The argument to the executor is intended to be used by standalone mode or other cluster managers that don't have isolation so that it can assign specific resources to specific executors in case there are multiple executors on a node. The argument is a file contains JSON Array of ResourceInformation objects.
The discovery script is meant to be used in an isolated environment where the executor only sees the resources it should use.
Note that there will be follow on PRs to add other parts like the scheduler part. See the epic high level jira: https://issues.apache.org/jira/browse/SPARK-24615
## How was this patch tested?
Added unit tests and manually tested.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24406 from tgravescs/gpu-sched-executor-clean.
Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
## What changes were proposed in this pull request?
The below example works with both Mysql and Hive, however not with spark.
```
mysql> select * from date_test where date_col >= '2000-1-1';
+------------+
| date_col |
+------------+
| 2000-01-01 |
+------------+
```
The reason is that Spark casts both sides to String type during date and string comparison for partial date support. Please find more details in https://issues.apache.org/jira/browse/SPARK-8420.
Based on some tests, the behavior of Date and String comparison in Hive and Mysql:
Hive: Cast to Date, partial date is not supported
Mysql: Cast to Date, certain "partial date" is supported by defining certain date string parse rules. Check out str_to_datetime in https://github.com/mysql/mysql-server/blob/5.5/sql-common/my_time.c
As below date patterns have been supported, the PR is to cast string to date when comparing string and date:
```
`yyyy`
`yyyy-[m]m`
`yyyy-[m]m-[d]d`
`yyyy-[m]m-[d]d `
`yyyy-[m]m-[d]d *`
`yyyy-[m]m-[d]dT*
```
## How was this patch tested?
UT has been added
Closes#24567 from pengbo/SPARK-27638.
Authored-by: mingbo.pb <mingbo.pb@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
When a null in a nested field in struct, casting from the struct throws error, currently.
```scala
scala> sql("select cast(struct(1, null) as struct<a:int,b:int>)").show
scala.MatchError: NullType (of class org.apache.spark.sql.types.NullType$)
at org.apache.spark.sql.catalyst.expressions.Cast.castToInt(Cast.scala:447)
at org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:635)
at org.apache.spark.sql.catalyst.expressions.Cast.$anonfun$castStruct$1(Cast.scala:603)
```
Similarly, inline table, which casts null in nested field under the hood, also throws an error.
```scala
scala> sql("select * FROM VALUES (('a', (10, null))), (('b', (10, 50))), (('c', null)) AS tab(x, y)").show
org.apache.spark.sql.AnalysisException: failed to evaluate expression named_struct('col1', 10, 'col2', NULL): NullType (of class org.apache.spark.sql.t
ypes.NullType$); line 1 pos 14
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
at org.apache.spark.sql.catalyst.analysis.ResolveInlineTables.$anonfun$convert$6(ResolveInlineTables.scala:106)
```
This fixes the issue.
## How was this patch tested?
Added tests.
Closes#24576 from viirya/cast-null.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This pr fix hadoop-3.2 test issues(except the `hive-thriftserver` module):
1. Add `hive.metastore.schema.verification` and `datanucleus.schema.autoCreateAll` to HiveConf.
2. hadoop-3.2 support access the Hive metastore from 0.12 to 2.2
After [SPARK-27176](https://issues.apache.org/jira/browse/SPARK-27176) and this PR, we upgraded the built-in Hive to 2.3 when enabling the Hadoop 3.2+ profile. This upgrade fixes the following issues:
- [HIVE-6727](https://issues.apache.org/jira/browse/HIVE-6727): Table level stats for external tables are set incorrectly.
- [HIVE-15653](https://issues.apache.org/jira/browse/HIVE-15653): Some ALTER TABLE commands drop table stats.
- [SPARK-12014](https://issues.apache.org/jira/browse/SPARK-12014): Spark SQL query containing semicolon is broken in Beeline.
- [SPARK-25193](https://issues.apache.org/jira/browse/SPARK-25193): insert overwrite doesn't throw exception when drop old data fails.
- [SPARK-25919](https://issues.apache.org/jira/browse/SPARK-25919): Date value corrupts when tables are "ParquetHiveSerDe" formatted and target table is Partitioned.
- [SPARK-26332](https://issues.apache.org/jira/browse/SPARK-26332): Spark sql write orc table on viewFS throws exception.
- [SPARK-26437](https://issues.apache.org/jira/browse/SPARK-26437): Decimal data becomes bigint to query, unable to query.
## How was this patch tested?
This pr test Spark’s Hadoop 3.2 profile on jenkins and #24591 test Spark’s Hadoop 2.7 profile on jenkins
This PR close#24591Closes#24391 from wangyum/SPARK-27402.
Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
## What changes were proposed in this pull request?
This PR goes to add `max_by()` and `min_by()` SQL aggregate functions.
Quoting from the [Presto docs](https://prestodb.github.io/docs/current/functions/aggregate.html#max_by)
> max_by(x, y) → [same as x]
> Returns the value of x associated with the maximum value of y over all input values.
`min_by()` works similarly.
## How was this patch tested?
Added tests.
Closes#24557 from viirya/SPARK-27653.
Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s.
```
object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}
/**
* Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
* are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
*/
def newDaemonCachedThreadPool(
prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
val threadFactory = namedThreadFactory(prefix)
val threadPool = new ThreadPoolExecutor(
maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable],
threadFactory)
threadPool.allowCoreThreadTimeOut(true)
threadPool
}
```
But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable.
A case has described in https://issues.apache.org/jira/browse/SPARK-26601
## How was this patch tested?
UT
Closes#23670 from caneGuy/zhoukang/make-broadcat-config.
Authored-by: zhoukang <zhoukang199191@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Remove the dead code of Spark Repl in Scala 2.11. We only support Scala 2.12
## How was this patch tested?
N/A
Closes#24588 from gatorsmile/removeDeadCode.
Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
In File source V1, the statistics of `HadoopFsRelation` is `compressionFactor * sizeInBytesOfAllFiles`.
To follow it, we can implement the interface SupportsReportStatistics in FileScan and report the same statistics.
## How was this patch tested?
Unit test
Closes#24571 from gengliangwang/stats.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
[SPARK-27343](https://issues.apache.org/jira/projects/SPARK/issues/SPARK-27343)
Based on the previous PR: https://github.com/apache/spark/pull/24270
Extracting parameters , building the objects of ConfigEntry.
For example:
for the parameter "spark.kafka.producer.cache.timeout",we build
```
private[kafka010] val PRODUCER_CACHE_TIMEOUT =
ConfigBuilder("spark.kafka.producer.cache.timeout")
.doc("The expire time to remove the unused producers.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("10m")
```
Closes#24574 from hehuiyuan/hehuiyuan-patch-9.
Authored-by: hehuiyuan <hehuiyuan@ZBMAC-C02WD3K5H.local>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
To move DS v2 API to the catalyst module, we can't refer to an internal class (`MutableColumnarRow`) in `ColumnarBatch`.
This PR creates a read-only version of `MutableColumnarRow`, and use it in `ColumnarBatch`.
close https://github.com/apache/spark/pull/24546
## How was this patch tested?
existing tests
Closes#24581 from cloud-fan/mutable-row.
Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
added the missing config for structured streaming when using file source.
from the code we have
```
/**
* Maximum age of a file that can be found in this directory, before it is ignored. For the
* first batch all files will be considered valid. If `latestFirst` is set to `true` and
* `maxFilesPerTrigger` is set, then this parameter will be ignored, because old files that are
* valid, and should be processed, may be ignored. Please refer to SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest file, and not the
* timestamp of the current system. That this means if the last file has timestamp 1000, and the
* current system time is 2000, and max age is 200, the system will purge files older than
* 800 (rather than 1800) from the internal state.
*
* Default to a week.
*/
val maxFileAgeMs: Long =
Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "7d"))
```
which is not documented.
also the file processing order was not mentioned but in the code we specifically select the file list based on file mtime:
```scala
private val fileSortOrder = if (sourceOptions.latestFirst) {
logWarning(
"""'latestFirst' is true. New files will be processed first, which may affect the watermark
|value. In addition, 'maxFileAge' will be ignored.""".stripMargin)
implicitly[Ordering[Long]].reverse
} else {
implicitly[Ordering[Long]]
}
val files = allFiles.sortBy(_.getModificationTime)(fileSortOrder).map { status =>
(status.getPath.toUri.toString, status.getModificationTime)
}
```
---------
![Screen Shot 2019-05-07 at 5 55 01 PM](https://user-images.githubusercontent.com/1124115/57335683-5a8b0400-70f1-11e9-98c8-99f173872842.png)
---------
![Screen Shot 2019-05-07 at 5 54 55 PM](https://user-images.githubusercontent.com/1124115/57335684-5a8b0400-70f1-11e9-996a-4bb1639e3d6b.png)
Closes#24548 from linehrr/master.
Lead-authored-by: ryne.yang <ryne.yang@acuityads.com>
Co-authored-by: Ryne Yang <ryne.yang@acuityads.com>
Co-authored-by: linehrr <linehrr@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
This patch fixes a bug where `--supervised` Spark jobs would retry multiple times whenever an agent would crash, come back, and re-register even when those jobs had already relaunched on a different agent.
That is:
```
- supervised driver is running on agent1
- agent1 crashes
- driver is relaunched on another agent as `<task-id>-retry-1`
- agent1 comes back online and re-registers with scheduler
- spark relaunches the same job as `<task-id>-retry-2`
- now there are two jobs running simultaneously
```
This is because when an agent would come back and re-register it would send a status update `TASK_FAILED` for its old driver-task. Previous logic would indiscriminately remove the `submissionId` from Zookeeper's `launchedDrivers` node and add it to `retryList` node. Then, when a new offer came in, it would relaunch another `-retry-` task even though one was previously running.
For example logs, scroll to bottom
## How was this patch tested?
- Added a unit test to simulate behavior described above
- Tested manually on a DC/OS cluster by
```
- launching a --supervised spark job
- dcos node ssh <to the agent with the running spark-driver>
- systemctl stop dcos-mesos-slave
- docker kill <driver-container-id>
- [ wait until spark job is relaunched ]
- systemctl start dcos-mesos-slave
- [ observe spark driver is not relaunched as `-retry-2` ]
```
Log snippets included below. Notice the `-retry-1` task is running when status update for the old task comes in afterward:
```
19/01/15 19:21:38 TRACE MesosClusterScheduler: Received offers from Mesos:
... [offers] ...
19/01/15 19:21:39 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2532 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001"
...
19/01/15 19:21:42 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_STARTING message=''
19/01/15 19:21:43 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_RUNNING message=''
...
19/01/15 19:29:12 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_LOST message='health check timed out' reason=REASON_SLAVE_REMOVED
...
19/01/15 19:31:12 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2681 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-1"
...
19/01/15 19:31:15 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_STARTING message=''
19/01/15 19:31:16 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-1 state=TASK_RUNNING message=''
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Unreachable agent re-reregistered'
...
19/01/15 19:33:45 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001 state=TASK_FAILED message='Abnormal executor termination: unknown container' reason=REASON_EXECUTOR_TERMINATED
19/01/15 19:33:45 ERROR MesosClusterScheduler: Unable to find driver with driver-20190115192138-0001 in status update
...
19/01/15 19:33:47 TRACE MesosClusterScheduler: Using offer 5d421001-0630-4214-9ecb-d5838a2ec149-O2729 to launch driver driver-20190115192138-0001 with taskId: value: "driver-20190115192138-0001-retry-2"
...
19/01/15 19:33:50 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_STARTING message=''
19/01/15 19:33:51 INFO MesosClusterScheduler: Received status update: taskId=driver-20190115192138-0001-retry-2 state=TASK_RUNNING message=''
```
Closes#24276 from samvantran/SPARK-27347-duplicate-retries.
Authored-by: Sam Tran <stran@mesosphere.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
For the below three thread configuration items applied to both driver and executor,
spark.rpc.io.serverThreads
spark.rpc.io.clientThreads
spark.rpc.netty.dispatcher.numThreads,
we separate them to driver specifics and executor specifics.
spark.driver.rpc.io.serverThreads < - > spark.executor.rpc.io.serverThreads
spark.driver.rpc.io.clientThreads < - > spark.executor.rpc.io.clientThreads
spark.driver.rpc.netty.dispatcher.numThreads < - > spark.executor.rpc.netty.dispatcher.numThreads
Spark reads these specifics first and fall back to the common configurations.
## How was this patch tested?
We ran the SimpleMap app without shuffle for benchmark purpose to test Spark's scalability in HPC with omini-path NIC which has higher bandwidth than normal ethernet NIC.
Spark's base version is 2.4.0.
Spark ran in the Standalone mode. Driver was in a standalone node.
After the separation, the performance is improved a lot in 256 nodes and 512 nodes. see below test results of SimpleMapTask before and after the enhancement. You can view the tables in the [JIRA](https://issues.apache.org/jira/browse/SPARK-26632) too.
ds: spark.driver.rpc.io.serverThreads
dc: spark.driver.rpc.io.clientThreads
dd: spark.driver.rpc.netty.dispatcher.numThreads
ed: spark.executor.rpc.netty.dispatcher.numThreads
time: Overall Time (s)
old time: Overall Time without Separation (s)
**Before:**
nodes | ds | dc | dd | ed | time
-- |-- | -- | -- | -- | --
128 nodes | 8 | 8 | 8 | 8 | 108
256 nodes | 8 | 8 | 8 | 8 | 196
512 nodes | 8 | 8 | 8 | 8 | 377
**After:**
nodes | ds | dc | dd | ed | time | improvement
-- | -- | -- | -- | -- | -- | --
128 nodes | 15 | 15 | 10 | 30 | 107 | 0.9%
256 nodes | 12 | 15 | 10 | 30 | 159 | 18.8%
512 nodes | 12 | 15 | 10 | 30 | 283 | 24.9%
Closes#23560 from zjf2012/thread_conf_separation.
Authored-by: jiafu.zhang@intel.com <jiafu.zhang@intel.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
## What changes were proposed in this pull request?
We should add since info to all expressions.
SPARK-7886 Rand / Randn
af3746ce0d RLike, Like (I manually checked that it exists from 1.0.0)
SPARK-8262 Split
SPARK-8256 RegExpReplace
SPARK-8255 RegExpExtract
9aadcffabd Coalesce / IsNull / IsNotNull (I manually checked that it exists from 1.0.0)
SPARK-14541 IfNull / NullIf / Nvl / Nvl2
SPARK-9080 IsNaN
SPARK-9168 NaNvl
## How was this patch tested?
N/A
Closes#24579 from HyukjinKwon/SPARK-27673.
Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
## What changes were proposed in this pull request?
- the accumulator warning is too verbose
- when a test fails with schema mismatch, you never see the error message / exception
Closes#24549 from ericl/test-nits.
Lead-authored-by: Eric Liang <ekl@databricks.com>
Co-authored-by: Eric Liang <ekhliang@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
If a type is annotated, `ScalaReflection` can fail if the datatype is an `Option`, a `Seq`, a `Map` and other similar types. This is because it assumes we are dealing with `TypeRef`, while types with annotations are `AnnotatedType`.
The PR deals with the case the annotation is present.
## How was this patch tested?
added UT
Closes#24564 from mgaido91/SPARK-27625.
Authored-by: Marco Gaido <marcogaido91@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
We updated our website a long time ago to describe Spark as the unified analytics engine, which is also how Spark is described in the community now. But our README and docs page still use the same description from 2011 ... This patch updates them.
The patch also updates the README example to use more modern APIs, and refer to Structured Streaming rather than Spark Streaming.
Closes#24573 from rxin/consistent-message.
Authored-by: Reynold Xin <rxin@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
Test steps to reproduce:
1) bin/spark-shell
```
val dataset = spark.createDataFrame(Seq(
(0L, 1L, 1.0),
(1L,2L,1.0),
(3L, 4L,1.0),
(4L,0L,0.1))).toDF("src", "dst", "weight")
val model = new PowerIterationClustering().
setMaxIter(10).
setInitMode("degree").
setWeightCol("weight")
val prediction = model.assignClusters(dataset).select("id", "cluster")
```
2) Open storage tab of the UI. We can see many RDD block cached, even after running the PIC.
In this PR, basically materializes the new graph before unpersisting the old ones.
## How was this patch tested?
Manually tested and existing UTs.
Before patch:
![Screenshot from 2019-05-06 02-53-45](https://user-images.githubusercontent.com/23054875/57201033-daf61b80-6fb0-11e9-97ff-7534909ce2d3.png)
After patch:
![Screenshot from 2019-05-06 03-41-04](https://user-images.githubusercontent.com/23054875/57201043-07aa3300-6fb1-11e9-855b-f63ee18ea371.png)
Closes#24531 from shahidki31/SPARK-27636.
Authored-by: Shahid <shahidki31@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
## What changes were proposed in this pull request?
Added method 'meanAveragePrecisionAt' k to RankingMetrics.
This branch is rebased with squashed commits from https://github.com/apache/spark/pull/24458
## How was this patch tested?
Added code in the existing test RankingMetricsSuite.
Please review http://spark.apache.org/contributing.html before opening a pull request.
Closes#24543 from qb-tarushg/SPARK-27540-REBASE.
Authored-by: qb-tarushg <tarush.grover@quantumblack.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
…rtBasedAggregate
Normally, the aggregate operations that are invoked for an aggregation buffer for User Defined Aggregate Functions(UDAF) follow the order like initialize(), update(), eval() OR initialize(), merge(), eval(). However, after a certain threshold configurable by spark.sql.objectHashAggregate.sortBased.fallbackThreshold is reached, ObjectHashAggregate falls back to SortBasedAggregator which invokes the merge or update operation without calling initialize() on the aggregate buffer.
## What changes were proposed in this pull request?
The fix here is to initialize aggregate buffers again when fallback to SortBasedAggregate operator happens.
## How was this patch tested?
The patch was tested as part of [SPARK-24935](https://issues.apache.org/jira/browse/SPARK-24935) as documented in PR https://github.com/apache/spark/pull/23778.
Closes#24149 from pgandhi999/SPARK-27207.
Authored-by: pgandhi <pgandhi@verizonmedia.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
## What changes were proposed in this pull request?
### Background:
The data source option `pathGlobFilter` is introduced for Binary file format: https://github.com/apache/spark/pull/24354 , which can be used for filtering file names, e.g. reading `.png` files only while there is `.json` files in the same directory.
### Proposal:
Make the option `pathGlobFilter` as a general option for all file sources. The path filtering should happen in the path globbing on Driver.
### Motivation:
Filtering the file path names in file scan tasks on executors is kind of ugly.
### Impact:
1. The splitting of file partitions will be more balanced.
2. The metrics of file scan will be more accurate.
3. Users can use the option for reading other file sources.
## How was this patch tested?
Unit tests
Closes#24518 from gengliangwang/globFilter.
Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
## What changes were proposed in this pull request?
The mothod `populateStartOffsets` exists a inappropriate identifier `secondLatestBatchId`.
I think `secondLatestBatchId = latestBatchId - 1` and `offsetLog.get(latestBatchId - 1)` is a offset.
So I change the identifier as follows:
`secondLatestOffsets = offsetLog.get(latestBatchId - 1)`
## How was this patch tested?
Exists UT.
Closes#24550 from beliefer/fix-inappropriate-identifier.
Authored-by: gengjiaan <gengjiaan@360.cn>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>