[SPARK-18922][SQL][CORE][STREAMING][TESTS] Fix all identified tests failed due to path and resource-not-closed problems on Windows
## What changes were proposed in this pull request? This PR proposes to fix all the test failures identified by testing with AppVeyor. **Scala - aborted tests** ``` WindowQuerySuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.execution.WindowQuerySuite *** ABORTED *** (156 milliseconds) org.apache.spark.sql.AnalysisException: LOAD DATA input path does not exist: C:projectssparksqlhive argetscala-2.11 est-classesdatafilespart_tiny.txt; OrcSourceSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.orc.OrcSourceSuite *** ABORTED *** (62 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ParquetMetastoreSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.ParquetMetastoreSuite *** ABORTED *** (4 seconds, 703 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ParquetSourceSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.sql.hive.ParquetSourceSuite *** ABORTED *** (3 seconds, 907 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-581a6575-454f-4f21-a516-a07f95266143; KafkaRDDSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaRDDSuite *** ABORTED *** (5 seconds, 212 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-4722304d-213e-4296-b556-951df1a46807 DirectKafkaStreamSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.DirectKafkaStreamSuite *** ABORTED *** (7 seconds, 127 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-d0d3eba7-4215-4e10-b40e-bb797e89338e at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1010) ReliableKafkaStreamSuite Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.ReliableKafkaStreamSuite *** ABORTED *** (5 seconds, 498 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-d33e45a0-287e-4bed-acae-ca809a89d888 KafkaStreamSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaStreamSuite *** ABORTED *** (2 seconds, 892 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-59c9d169-5a56-4519-9ef0-cefdbd3f2e6c KafkaClusterSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka.KafkaClusterSuite *** ABORTED *** (1 second, 690 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-3ef402b0-8689-4a60-85ae-e41e274f179d DirectKafkaStreamSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka010.DirectKafkaStreamSuite *** ABORTED *** (59 seconds, 626 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-426107da-68cf-4d94-b0d6-1f428f1c53f6 KafkaRDDSuite: Exception encountered when attempting to run a suite with class name: org.apache.spark.streaming.kafka010.KafkaRDDSuite *** ABORTED *** (2 minutes, 6 seconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-b9ce7929-5dae-46ab-a0c4-9ef6f58fbc2 ``` **Java - failed tests** ``` Test org.apache.spark.streaming.kafka.JavaKafkaRDDSuite.testKafkaRDD failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-1cee32f4-4390-4321-82c9-e8616b3f0fb0, took 9.61 sec Test org.apache.spark.streaming.kafka.JavaKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-f42695dd-242e-4b07-847c-f299b8e4676e, took 11.797 sec Test org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-85c0d062-78cf-459c-a2dd-7973572101ce, took 1.581 sec Test org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite.testKafkaRDD failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-49eb6b5c-8366-47a6-83f2-80c443c48280, took 17.895 sec org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite.testKafkaStream failed: java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-898cf826-d636-4b1c-a61a-c12a364c02e7, took 8.858 sec ``` **Scala - failed tests** ``` PartitionProviderCompatibilitySuite: - insert overwrite partition of new datasource table overwrites just partition *** FAILED *** (828 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-bb6337b9-4f99-45ab-ad2c-a787ab965c09 - SPARK-18635 special chars in partition values - partition management true *** FAILED *** (5 seconds, 360 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - SPARK-18635 special chars in partition values - partition management false *** FAILED *** (141 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` ``` UtilsSuite: - reading offset bytes of a file (compressed) *** FAILED *** (0 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-ecb2b7d5-db8b-43a7-b268-1bf242b5a491 - reading offset bytes across multiple files (compressed) *** FAILED *** (0 milliseconds) java.io.IOException: Failed to delete: C:\projects\spark\target\tmp\spark-25cc47a8-1faa-4da5-8862-cf174df63ce0 ``` ``` StatisticsSuite: - MetastoreRelations fallback to HDFS for size estimation *** FAILED *** (110 milliseconds) org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table or view 'csv_table' not found in database 'default'; ``` ``` SQLQuerySuite: - permanent UDTF *** FAILED *** (125 milliseconds) org.apache.spark.sql.AnalysisException: Undefined function: 'udtf_count_temp'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 24 - describe functions - user defined functions *** FAILED *** (125 milliseconds) org.apache.spark.sql.AnalysisException: Undefined function: 'udtf_count'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 7 - CTAS without serde with location *** FAILED *** (16 milliseconds) java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-ed673d73-edfc-404e-829e-2e2b9725d94e/c1 - derived from Hive query file: drop_database_removes_partition_dirs.q *** FAILED *** (47 milliseconds) java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_database_removes_partition_dirs_table - derived from Hive query file: drop_table_removes_partition_dirs.q *** FAILED *** (0 milliseconds) java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:projectsspark%09arget%09mpspark-d2ddf08e-699e-45be-9ebd-3dfe619680fe/drop_table_removes_partition_dirs_table2 - SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH *** FAILED *** (109 milliseconds) java.nio.file.InvalidPathException: Illegal char <:> at index 2: /C:/projects/spark/sql/hive/projectsspark arget mpspark-1a122f8c-dfb3-46c4-bab1-f30764baee0e/*part-r* ``` ``` HiveDDLSuite: - drop external tables in default database *** FAILED *** (16 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - add/drop partitions - external table *** FAILED *** (16 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - create/drop database - location without pre-created directory *** FAILED *** (16 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - create/drop database - location with pre-created directory *** FAILED *** (32 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - drop database containing tables - CASCADE *** FAILED *** (94 milliseconds) CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675) - drop an empty database - CASCADE *** FAILED *** (63 milliseconds) CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675) - drop database containing tables - RESTRICT *** FAILED *** (47 milliseconds) CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675) - drop an empty database - RESTRICT *** FAILED *** (47 milliseconds) CatalogDatabase(db1,,file:/C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be/db1.db,Map()) did not equal CatalogDatabase(db1,,file:C:/projects/spark/target/tmp/warehouse-d0665ee0-1e39-4805-b471-0b764f7838be\db1.db,Map()) (HiveDDLSuite.scala:675) - CREATE TABLE LIKE an external data source table *** FAILED *** (140 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-c5eba16d-07ae-4186-95bb-21c5811cf888; - CREATE TABLE LIKE an external Hive serde table *** FAILED *** (16 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - desc table for data source table - no user-defined schema *** FAILED *** (125 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-e8bf5bf5-721a-4cbe-9d6 at scala.collection.immutable.List.foreach(List.scala:381)d-5543a8301c1d; ``` ``` MetastoreDataSourcesSuite - CTAS: persisted bucketed data source table *** FAILED *** (16 milliseconds) java.lang.IllegalArgumentException: Can not create a Path from an empty string ``` ``` ShowCreateTableSuite: - simple external hive table *** FAILED *** (0 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` ``` PartitionedTablePerfStatsSuite: - hive table: partitioned pruned table reports only selected files *** FAILED *** (313 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: partitioned pruned table reports only selected files *** FAILED *** (219 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-311f45f8-d064-4023-a4bb-e28235bff64d; - hive table: lazy partition pruning reads only necessary partition data *** FAILED *** (203 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: lazy partition pruning reads only necessary partition data *** FAILED *** (187 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-fde874ca-66bd-4d0b-a40f-a043b65bf957; - hive table: lazy partition pruning with file status caching enabled *** FAILED *** (188 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: lazy partition pruning with file status caching enabled *** FAILED *** (187 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-e6d20183-dd68-4145-acbe-4a509849accd; - hive table: file status caching respects refresh table and refreshByPath *** FAILED *** (172 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: file status caching respects refresh table and refreshByPath *** FAILED *** (203 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-8b2c9651-2adf-4d58-874f-659007e21463; - hive table: file status cache respects size limit *** FAILED *** (219 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: file status cache respects size limit *** FAILED *** (171 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-7835ab57-cb48-4d2c-bb1d-b46d5a4c47e4; - datasource table: table setup does not scan filesystem *** FAILED *** (266 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-20598d76-c004-42a7-8061-6c56f0eda5e2; - hive table: table setup does not scan filesystem *** FAILED *** (266 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - hive table: num hive client calls does not scale with partition count *** FAILED *** (2 seconds, 281 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: num hive client calls does not scale with partition count *** FAILED *** (2 seconds, 422 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-4cfed321-4d1d-4b48-8d34-5c169afff383; - hive table: files read and cached when filesource partition management is off *** FAILED *** (234 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); - datasource table: all partition data cached in memory when partition management is off *** FAILED *** (203 milliseconds) org.apache.spark.sql.AnalysisException: Path does not exist: file:/C:projectsspark arget mpspark-4bcc0398-15c9-4f6a-811e-12d40f3eec12; - SPARK-18700: table loaded only once even when resolved concurrently *** FAILED *** (1 second, 266 milliseconds) org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:java.lang.IllegalArgumentException: Can not create a Path from an empty string); ``` ``` HiveSparkSubmitSuite: - temporary Hive UDF: define a UDF and use it *** FAILED *** (2 seconds, 94 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - permanent Hive UDF: define a UDF and use it *** FAILED *** (281 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - permanent Hive UDF: use a already defined permanent function *** FAILED *** (718 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-8368: includes jars passed in through --jars *** FAILED *** (3 seconds, 521 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-8020: set sql conf in spark conf *** FAILED *** (0 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-8489: MissingRequirementError during reflection *** FAILED *** (94 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-9757 Persist Parquet relation with decimal column *** FAILED *** (16 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-11009 fix wrong result of Window function in cluster mode *** FAILED *** (16 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-14244 fix window partition size attribute binding failure *** FAILED *** (78 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - set spark.sql.warehouse.dir *** FAILED *** (16 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - set hive.metastore.warehouse.dir *** FAILED *** (15 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-16901: set javax.jdo.option.ConnectionURL *** FAILED *** (16 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified - SPARK-18360: default table path of tables in default database should depend on the location of default database *** FAILED *** (15 milliseconds) java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "C:\projects\spark"): CreateProcess error=2, The system cannot find the file specified ``` ``` UtilsSuite: - resolveURIs with multiple paths *** FAILED *** (0 milliseconds) ".../jar3,file:/C:/pi.py[%23]py.pi,file:/C:/path%..." did not equal ".../jar3,file:/C:/pi.py[#]py.pi,file:/C:/path%..." (UtilsSuite.scala:468) ``` ``` CheckpointSuite: - recovery with file input stream *** FAILED *** (10 seconds, 205 milliseconds) The code passed to eventually never returned normally. Attempted 660 times over 10.014272499999999 seconds. Last failure message: Unexpected internal error near index 1 \ ^. (CheckpointSuite.scala:680) ``` ## How was this patch tested? Manually via AppVeyor as below: **Scala - aborted tests** ``` WindowQuerySuite - all passed OrcSourceSuite: - SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 seconds, 417 milliseconds) org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625) ParquetMetastoreSuite - all passed ParquetSourceSuite - all passed KafkaRDDSuite - all passed DirectKafkaStreamSuite - all passed ReliableKafkaStreamSuite - all passed KafkaStreamSuite - all passed KafkaClusterSuite - all passed DirectKafkaStreamSuite - all passed KafkaRDDSuite - all passed ``` **Java - failed tests** ``` org.apache.spark.streaming.kafka.JavaKafkaRDDSuite - all passed org.apache.spark.streaming.kafka.JavaDirectKafkaStreamSuite - all passed org.apache.spark.streaming.kafka.JavaKafkaStreamSuite - all passed org.apache.spark.streaming.kafka010.JavaDirectKafkaStreamSuite - all passed org.apache.spark.streaming.kafka010.JavaKafkaRDDSuite - all passed ``` **Scala - failed tests** ``` PartitionProviderCompatibilitySuite: - insert overwrite partition of new datasource table overwrites just partition (1 second, 953 milliseconds) - SPARK-18635 special chars in partition values - partition management true (6 seconds, 31 milliseconds) - SPARK-18635 special chars in partition values - partition management false (4 seconds, 578 milliseconds) ``` ``` UtilsSuite: - reading offset bytes of a file (compressed) (203 milliseconds) - reading offset bytes across multiple files (compressed) (0 milliseconds) ``` ``` StatisticsSuite: - MetastoreRelations fallback to HDFS for size estimation (94 milliseconds) ``` ``` SQLQuerySuite: - permanent UDTF (407 milliseconds) - describe functions - user defined functions (441 milliseconds) - CTAS without serde with location (2 seconds, 831 milliseconds) - derived from Hive query file: drop_database_removes_partition_dirs.q (734 milliseconds) - derived from Hive query file: drop_table_removes_partition_dirs.q (563 milliseconds) - SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH (453 milliseconds) ``` ``` HiveDDLSuite: - drop external tables in default database (3 seconds, 5 milliseconds) - add/drop partitions - external table (2 seconds, 750 milliseconds) - create/drop database - location without pre-created directory (500 milliseconds) - create/drop database - location with pre-created directory (407 milliseconds) - drop database containing tables - CASCADE (453 milliseconds) - drop an empty database - CASCADE (375 milliseconds) - drop database containing tables - RESTRICT (328 milliseconds) - drop an empty database - RESTRICT (391 milliseconds) - CREATE TABLE LIKE an external data source table (953 milliseconds) - CREATE TABLE LIKE an external Hive serde table (3 seconds, 782 milliseconds) - desc table for data source table - no user-defined schema (1 second, 150 milliseconds) ``` ``` MetastoreDataSourcesSuite - CTAS: persisted bucketed data source table (875 milliseconds) ``` ``` ShowCreateTableSuite: - simple external hive table (78 milliseconds) ``` ``` PartitionedTablePerfStatsSuite: - hive table: partitioned pruned table reports only selected files (1 second, 109 milliseconds) - datasource table: partitioned pruned table reports only selected files (860 milliseconds) - hive table: lazy partition pruning reads only necessary partition data (859 milliseconds) - datasource table: lazy partition pruning reads only necessary partition data (1 second, 219 milliseconds) - hive table: lazy partition pruning with file status caching enabled (875 milliseconds) - datasource table: lazy partition pruning with file status caching enabled (890 milliseconds) - hive table: file status caching respects refresh table and refreshByPath (922 milliseconds) - datasource table: file status caching respects refresh table and refreshByPath (640 milliseconds) - hive table: file status cache respects size limit (469 milliseconds) - datasource table: file status cache respects size limit (453 milliseconds) - datasource table: table setup does not scan filesystem (328 milliseconds) - hive table: table setup does not scan filesystem (313 milliseconds) - hive table: num hive client calls does not scale with partition count (5 seconds, 431 milliseconds) - datasource table: num hive client calls does not scale with partition count (4 seconds, 79 milliseconds) - hive table: files read and cached when filesource partition management is off (656 milliseconds) - datasource table: all partition data cached in memory when partition management is off (484 milliseconds) - SPARK-18700: table loaded only once even when resolved concurrently (2 seconds, 578 milliseconds) ``` ``` HiveSparkSubmitSuite: - temporary Hive UDF: define a UDF and use it (1 second, 745 milliseconds) - permanent Hive UDF: define a UDF and use it (406 milliseconds) - permanent Hive UDF: use a already defined permanent function (375 milliseconds) - SPARK-8368: includes jars passed in through --jars (391 milliseconds) - SPARK-8020: set sql conf in spark conf (156 milliseconds) - SPARK-8489: MissingRequirementError during reflection (187 milliseconds) - SPARK-9757 Persist Parquet relation with decimal column (157 milliseconds) - SPARK-11009 fix wrong result of Window function in cluster mode (156 milliseconds) - SPARK-14244 fix window partition size attribute binding failure (156 milliseconds) - set spark.sql.warehouse.dir (172 milliseconds) - set hive.metastore.warehouse.dir (156 milliseconds) - SPARK-16901: set javax.jdo.option.ConnectionURL (157 milliseconds) - SPARK-18360: default table path of tables in default database should depend on the location of default database (172 milliseconds) ``` ``` UtilsSuite: - resolveURIs with multiple paths (0 milliseconds) ``` ``` CheckpointSuite: - recovery with file input stream (4 seconds, 452 milliseconds) ``` Note: after resolving the aborted tests, there is a test failure identified as below: ``` OrcSourceSuite: - SPARK-18220: read Hive orc table with varchar column *** FAILED *** (4 seconds, 417 milliseconds) org.apache.spark.sql.execution.QueryExecutionException: FAILED: Execution Error, return code -101 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask. org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$runHive$1.apply(HiveClientImpl.scala:625) ``` This does not look due to this problem so this PR does not fix it here. Author: hyukjinkwon <gurwls223@gmail.com> Closes #16451 from HyukjinKwon/all-path-resource-fixes.
This commit is contained in:
parent
32286ba68a
commit
4e27578faa
|
@ -1488,10 +1488,11 @@ private[spark] object Utils extends Logging {
|
|||
|
||||
/** Return uncompressed file length of a compressed file. */
|
||||
private def getCompressedFileLength(file: File): Long = {
|
||||
var gzInputStream: GZIPInputStream = null
|
||||
try {
|
||||
// Uncompress .gz file to determine file size.
|
||||
var fileSize = 0L
|
||||
val gzInputStream = new GZIPInputStream(new FileInputStream(file))
|
||||
gzInputStream = new GZIPInputStream(new FileInputStream(file))
|
||||
val bufSize = 1024
|
||||
val buf = new Array[Byte](bufSize)
|
||||
var numBytes = ByteStreams.read(gzInputStream, buf, 0, bufSize)
|
||||
|
@ -1504,6 +1505,10 @@ private[spark] object Utils extends Logging {
|
|||
case e: Throwable =>
|
||||
logError(s"Cannot get file length of ${file}", e)
|
||||
throw e
|
||||
} finally {
|
||||
if (gzInputStream != null) {
|
||||
gzInputStream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -505,7 +505,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
|
|||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:$cwd/jar4#jar5,file:$cwd/path%20to/jar6")
|
||||
if (Utils.isWindows) {
|
||||
assertResolves("""hdfs:/jar1,file:/jar2,jar3,C:\pi.py#py.pi,C:\path to\jar4""",
|
||||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py#py.pi,file:/C:/path%20to/jar4")
|
||||
s"hdfs:/jar1,file:/jar2,file:$cwd/jar3,file:/C:/pi.py%23py.pi,file:/C:/path%20to/jar4")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.sql.kafka010
|
||||
|
||||
import java.io.File
|
||||
import java.io.{File, IOException}
|
||||
import java.lang.{Integer => JInt}
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.{Map => JMap, Properties}
|
||||
|
@ -138,10 +138,21 @@ class KafkaTestUtils extends Logging {
|
|||
|
||||
if (server != null) {
|
||||
server.shutdown()
|
||||
server.awaitShutdown()
|
||||
server = null
|
||||
}
|
||||
|
||||
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
|
||||
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
|
||||
// in some cases. It leads to test failures on Windows if the directory deletion failure
|
||||
// throws an exception.
|
||||
brokerConf.logDirs.foreach { f =>
|
||||
try {
|
||||
Utils.deleteRecursively(new File(f))
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if (zkUtils != null) {
|
||||
zkUtils.close()
|
||||
|
@ -374,8 +385,21 @@ class KafkaTestUtils extends Logging {
|
|||
|
||||
def shutdown() {
|
||||
factory.shutdown()
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
Utils.deleteRecursively(logDir)
|
||||
// The directories are not closed even if the ZooKeeper server is shut down.
|
||||
// Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
|
||||
// on Windows if the directory deletion failure throws an exception.
|
||||
try {
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
try {
|
||||
Utils.deleteRecursively(logDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.streaming.kafka010
|
||||
|
||||
import java.io.File
|
||||
import java.io.{File, IOException}
|
||||
import java.lang.{Integer => JInt}
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.{Map => JMap, Properties}
|
||||
|
@ -134,10 +134,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
|
|||
|
||||
if (server != null) {
|
||||
server.shutdown()
|
||||
server.awaitShutdown()
|
||||
server = null
|
||||
}
|
||||
|
||||
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
|
||||
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
|
||||
// in some cases. It leads to test failures on Windows if the directory deletion failure
|
||||
// throws an exception.
|
||||
brokerConf.logDirs.foreach { f =>
|
||||
try {
|
||||
Utils.deleteRecursively(new File(f))
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if (zkUtils != null) {
|
||||
zkUtils.close()
|
||||
|
@ -273,8 +284,21 @@ private[kafka010] class KafkaTestUtils extends Logging {
|
|||
|
||||
def shutdown() {
|
||||
factory.shutdown()
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
Utils.deleteRecursively(logDir)
|
||||
// The directories are not closed even if the ZooKeeper server is shut down.
|
||||
// Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
|
||||
// on Windows if the directory deletion failure throws an exception.
|
||||
try {
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
try {
|
||||
Utils.deleteRecursively(logDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -53,7 +53,6 @@ class DirectKafkaStreamSuite
|
|||
.setMaster("local[4]")
|
||||
.setAppName(this.getClass.getSimpleName)
|
||||
|
||||
private var sc: SparkContext = _
|
||||
private var ssc: StreamingContext = _
|
||||
private var testDir: File = _
|
||||
|
||||
|
@ -73,11 +72,7 @@ class DirectKafkaStreamSuite
|
|||
|
||||
after {
|
||||
if (ssc != null) {
|
||||
ssc.stop()
|
||||
sc = null
|
||||
}
|
||||
if (sc != null) {
|
||||
sc.stop()
|
||||
ssc.stop(stopSparkContext = true)
|
||||
}
|
||||
if (testDir != null) {
|
||||
Utils.deleteRecursively(testDir)
|
||||
|
@ -372,7 +367,7 @@ class DirectKafkaStreamSuite
|
|||
sendData(i)
|
||||
}
|
||||
|
||||
eventually(timeout(10 seconds), interval(50 milliseconds)) {
|
||||
eventually(timeout(20 seconds), interval(50 milliseconds)) {
|
||||
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
|
||||
}
|
||||
|
||||
|
@ -411,7 +406,7 @@ class DirectKafkaStreamSuite
|
|||
sendData(i)
|
||||
}
|
||||
|
||||
eventually(timeout(10 seconds), interval(50 milliseconds)) {
|
||||
eventually(timeout(20 seconds), interval(50 milliseconds)) {
|
||||
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
|
||||
}
|
||||
ssc.stop()
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.streaming.kafka
|
||||
|
||||
import java.io.File
|
||||
import java.io.{File, IOException}
|
||||
import java.lang.{Integer => JInt}
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.{Map => JMap, Properties}
|
||||
|
@ -137,10 +137,21 @@ private[kafka] class KafkaTestUtils extends Logging {
|
|||
|
||||
if (server != null) {
|
||||
server.shutdown()
|
||||
server.awaitShutdown()
|
||||
server = null
|
||||
}
|
||||
|
||||
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
|
||||
// On Windows, `logDirs` is left open even after Kafka server above is completely shut down
|
||||
// in some cases. It leads to test failures on Windows if the directory deletion failure
|
||||
// throws an exception.
|
||||
brokerConf.logDirs.foreach { f =>
|
||||
try {
|
||||
Utils.deleteRecursively(new File(f))
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
|
||||
if (zkClient != null) {
|
||||
zkClient.close()
|
||||
|
@ -268,8 +279,21 @@ private[kafka] class KafkaTestUtils extends Logging {
|
|||
|
||||
def shutdown() {
|
||||
factory.shutdown()
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
Utils.deleteRecursively(logDir)
|
||||
// The directories are not closed even if the ZooKeeper server is shut down.
|
||||
// Please see ZOOKEEPER-1844, which is fixed in 3.4.6+. It leads to test failures
|
||||
// on Windows if the directory deletion failure throws an exception.
|
||||
try {
|
||||
Utils.deleteRecursively(snapshotDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
try {
|
||||
Utils.deleteRecursively(logDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,6 @@ class DirectKafkaStreamSuite
|
|||
.setMaster("local[4]")
|
||||
.setAppName(this.getClass.getSimpleName)
|
||||
|
||||
private var sc: SparkContext = _
|
||||
private var ssc: StreamingContext = _
|
||||
private var testDir: File = _
|
||||
|
||||
|
@ -72,11 +71,7 @@ class DirectKafkaStreamSuite
|
|||
|
||||
after {
|
||||
if (ssc != null) {
|
||||
ssc.stop()
|
||||
sc = null
|
||||
}
|
||||
if (sc != null) {
|
||||
sc.stop()
|
||||
ssc.stop(stopSparkContext = true)
|
||||
}
|
||||
if (testDir != null) {
|
||||
Utils.deleteRecursively(testDir)
|
||||
|
@ -276,7 +271,7 @@ class DirectKafkaStreamSuite
|
|||
sendData(i)
|
||||
}
|
||||
|
||||
eventually(timeout(10 seconds), interval(50 milliseconds)) {
|
||||
eventually(timeout(20 seconds), interval(50 milliseconds)) {
|
||||
assert(DirectKafkaStreamSuite.total.get === (1 to 10).sum)
|
||||
}
|
||||
|
||||
|
@ -319,7 +314,7 @@ class DirectKafkaStreamSuite
|
|||
sendData(i)
|
||||
}
|
||||
|
||||
eventually(timeout(10 seconds), interval(50 milliseconds)) {
|
||||
eventually(timeout(20 seconds), interval(50 milliseconds)) {
|
||||
assert(DirectKafkaStreamSuite.total.get === (1 to 20).sum)
|
||||
}
|
||||
ssc.stop()
|
||||
|
|
|
@ -80,7 +80,7 @@ class ReliableKafkaStreamSuite extends SparkFunSuite
|
|||
|
||||
after {
|
||||
if (ssc != null) {
|
||||
ssc.stop()
|
||||
ssc.stop(stopSparkContext = true)
|
||||
ssc = null
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,6 +26,7 @@ import scala.collection.mutable.ArrayBuffer
|
|||
import scala.util.control.NonFatal
|
||||
import scala.util.Try
|
||||
|
||||
import org.apache.commons.lang3.StringEscapeUtils
|
||||
import org.apache.hadoop.fs.Path
|
||||
|
||||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
|
||||
|
@ -222,25 +223,34 @@ case class LoadDataCommand(
|
|||
val loadPath =
|
||||
if (isLocal) {
|
||||
val uri = Utils.resolveURI(path)
|
||||
val filePath = uri.getPath()
|
||||
val exists = if (filePath.contains("*")) {
|
||||
val file = new File(uri.getPath)
|
||||
val exists = if (file.getAbsolutePath.contains("*")) {
|
||||
val fileSystem = FileSystems.getDefault
|
||||
val pathPattern = fileSystem.getPath(filePath)
|
||||
val dir = pathPattern.getParent.toString
|
||||
val dir = file.getParentFile.getAbsolutePath
|
||||
if (dir.contains("*")) {
|
||||
throw new AnalysisException(
|
||||
s"LOAD DATA input path allows only filename wildcard: $path")
|
||||
}
|
||||
|
||||
// Note that special characters such as "*" on Windows are not allowed as a path.
|
||||
// Calling `WindowsFileSystem.getPath` throws an exception if there are in the path.
|
||||
val dirPath = fileSystem.getPath(dir)
|
||||
val pathPattern = new File(dirPath.toAbsolutePath.toString, file.getName).toURI.getPath
|
||||
val safePathPattern = if (Utils.isWindows) {
|
||||
// On Windows, the pattern should not start with slashes for absolute file paths.
|
||||
pathPattern.stripPrefix("/")
|
||||
} else {
|
||||
pathPattern
|
||||
}
|
||||
val files = new File(dir).listFiles()
|
||||
if (files == null) {
|
||||
false
|
||||
} else {
|
||||
val matcher = fileSystem.getPathMatcher("glob:" + pathPattern.toAbsolutePath)
|
||||
val matcher = fileSystem.getPathMatcher("glob:" + safePathPattern)
|
||||
files.exists(f => matcher.matches(fileSystem.getPath(f.getAbsolutePath)))
|
||||
}
|
||||
} else {
|
||||
new File(filePath).exists()
|
||||
new File(file.getAbsolutePath).exists()
|
||||
}
|
||||
if (!exists) {
|
||||
throw new AnalysisException(s"LOAD DATA input path does not exist: $path")
|
||||
|
|
|
@ -165,7 +165,7 @@ private[hive] class TestHiveSparkSession(
|
|||
System.clearProperty("spark.hostPort")
|
||||
|
||||
// For some hive test case which contain ${system:test.tmp.dir}
|
||||
System.setProperty("test.tmp.dir", Utils.createTempDir().getCanonicalPath)
|
||||
System.setProperty("test.tmp.dir", Utils.createTempDir().toURI.getPath)
|
||||
|
||||
/** The location of the compiled hive distribution */
|
||||
lazy val hiveHome = envVarToFile("HIVE_HOME")
|
||||
|
|
|
@ -339,7 +339,13 @@ class HiveSparkSubmitSuite
|
|||
private def runSparkSubmit(args: Seq[String]): Unit = {
|
||||
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
|
||||
val history = ArrayBuffer.empty[String]
|
||||
val commands = Seq("./bin/spark-submit") ++ args
|
||||
val sparkSubmit = if (Utils.isWindows) {
|
||||
// On Windows, `ProcessBuilder.directory` does not change the current working directory.
|
||||
new File("..\\..\\bin\\spark-submit.cmd").getAbsolutePath
|
||||
} else {
|
||||
"./bin/spark-submit"
|
||||
}
|
||||
val commands = Seq(sparkSubmit) ++ args
|
||||
val commandLine = commands.mkString("'", "' '", "'")
|
||||
|
||||
val builder = new ProcessBuilder(commands: _*).directory(new File(sparkHome))
|
||||
|
|
|
@ -1071,11 +1071,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
test("CTAS: persisted bucketed data source table") {
|
||||
withTempPath { dir =>
|
||||
withTable("t") {
|
||||
val path = dir.getCanonicalPath
|
||||
|
||||
sql(
|
||||
s"""CREATE TABLE t USING PARQUET
|
||||
|OPTIONS (PATH '$path')
|
||||
|OPTIONS (PATH '${dir.toURI}')
|
||||
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|
||||
|AS SELECT 1 AS a, 2 AS b
|
||||
""".stripMargin
|
||||
|
@ -1093,11 +1091,9 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|
|||
|
||||
withTempPath { dir =>
|
||||
withTable("t") {
|
||||
val path = dir.getCanonicalPath
|
||||
|
||||
sql(
|
||||
s"""CREATE TABLE t USING PARQUET
|
||||
|OPTIONS (PATH '$path')
|
||||
|OPTIONS (PATH '${dir.toURI}')
|
||||
|CLUSTERED BY (a) INTO 2 BUCKETS
|
||||
|AS SELECT 1 AS a, 2 AS b
|
||||
""".stripMargin
|
||||
|
|
|
@ -172,7 +172,7 @@ class PartitionProviderCompatibilitySuite
|
|||
withTempDir { dir2 =>
|
||||
sql(
|
||||
s"""alter table test partition (partCol=1)
|
||||
|set location '${dir2.getAbsolutePath}'""".stripMargin)
|
||||
|set location '${dir2.toURI}'""".stripMargin)
|
||||
assert(sql("select * from test").count() == 4)
|
||||
sql(
|
||||
"""insert overwrite table test
|
||||
|
|
|
@ -77,7 +77,7 @@ class PartitionedTablePerfStatsSuite
|
|||
|create external table $tableName (fieldOne long)
|
||||
|partitioned by (partCol1 int, partCol2 int)
|
||||
|stored as parquet
|
||||
|location "${dir.getAbsolutePath}"""".stripMargin)
|
||||
|location "${dir.toURI}"""".stripMargin)
|
||||
if (repair) {
|
||||
spark.sql(s"msck repair table $tableName")
|
||||
}
|
||||
|
@ -102,7 +102,7 @@ class PartitionedTablePerfStatsSuite
|
|||
spark.sql(s"""
|
||||
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|
||||
|using parquet
|
||||
|options (path "${dir.getAbsolutePath}")
|
||||
|options (path "${dir.toURI}")
|
||||
|partitioned by (partCol1, partCol2)""".stripMargin)
|
||||
if (repair) {
|
||||
spark.sql(s"msck repair table $tableName")
|
||||
|
|
|
@ -146,7 +146,7 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
|
|||
| c1 INT COMMENT 'bla',
|
||||
| c2 STRING
|
||||
|)
|
||||
|LOCATION '$dir'
|
||||
|LOCATION '${dir.toURI}'
|
||||
|TBLPROPERTIES (
|
||||
| 'prop1' = 'value1',
|
||||
| 'prop2' = 'value2'
|
||||
|
|
|
@ -57,7 +57,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
|
|||
\"separatorChar\" = \",\",
|
||||
\"quoteChar\" = \"\\\"\",
|
||||
\"escapeChar\" = \"\\\\\")
|
||||
LOCATION '$tempDir'
|
||||
LOCATION '${tempDir.toURI}'
|
||||
""")
|
||||
|
||||
spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
|
||||
|
|
|
@ -87,7 +87,7 @@ class HiveDDLSuite
|
|||
s"""
|
||||
|create table $tabName
|
||||
|stored as parquet
|
||||
|location '$tmpDir'
|
||||
|location '${tmpDir.toURI}'
|
||||
|as select 1, '3'
|
||||
""".stripMargin)
|
||||
|
||||
|
@ -269,7 +269,7 @@ class HiveDDLSuite
|
|||
s"""
|
||||
|CREATE EXTERNAL TABLE $externalTab (key INT, value STRING)
|
||||
|PARTITIONED BY (ds STRING, hr STRING)
|
||||
|LOCATION '$basePath'
|
||||
|LOCATION '${tmpDir.toURI}'
|
||||
""".stripMargin)
|
||||
|
||||
// Before data insertion, all the directory are empty
|
||||
|
@ -678,14 +678,10 @@ class HiveDDLSuite
|
|||
} else {
|
||||
assert(!fs.exists(new Path(tmpDir.toString)))
|
||||
}
|
||||
sql(s"CREATE DATABASE $dbName Location '$tmpDir'")
|
||||
sql(s"CREATE DATABASE $dbName Location '${tmpDir.toURI.getPath.stripSuffix("/")}'")
|
||||
val db1 = catalog.getDatabaseMetadata(dbName)
|
||||
val dbPath = "file:" + tmpDir
|
||||
assert(db1 == CatalogDatabase(
|
||||
dbName,
|
||||
"",
|
||||
if (dbPath.endsWith(File.separator)) dbPath.dropRight(1) else dbPath,
|
||||
Map.empty))
|
||||
val dbPath = tmpDir.toURI.toString.stripSuffix("/")
|
||||
assert(db1 == CatalogDatabase(dbName, "", dbPath, Map.empty))
|
||||
sql("USE db1")
|
||||
|
||||
sql(s"CREATE TABLE $tabName as SELECT 1")
|
||||
|
@ -713,10 +709,6 @@ class HiveDDLSuite
|
|||
}
|
||||
}
|
||||
|
||||
private def appendTrailingSlash(path: String): String = {
|
||||
if (!path.endsWith(File.separator)) path + File.separator else path
|
||||
}
|
||||
|
||||
private def dropDatabase(cascade: Boolean, tableExists: Boolean): Unit = {
|
||||
val dbName = "db1"
|
||||
val dbPath = new Path(spark.sessionState.conf.warehousePath)
|
||||
|
@ -724,7 +716,7 @@ class HiveDDLSuite
|
|||
|
||||
sql(s"CREATE DATABASE $dbName")
|
||||
val catalog = spark.sessionState.catalog
|
||||
val expectedDBLocation = "file:" + appendTrailingSlash(dbPath.toString) + s"$dbName.db"
|
||||
val expectedDBLocation = s"file:${dbPath.toUri.getPath.stripSuffix("/")}/$dbName.db"
|
||||
val db1 = catalog.getDatabaseMetadata(dbName)
|
||||
assert(db1 == CatalogDatabase(
|
||||
dbName,
|
||||
|
@ -857,7 +849,7 @@ class HiveDDLSuite
|
|||
val path = dir.getCanonicalPath
|
||||
spark.range(10).select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd)
|
||||
.write.format("parquet").save(path)
|
||||
sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '$path')")
|
||||
sql(s"CREATE TABLE $sourceTabName USING parquet OPTIONS (PATH '${dir.toURI}')")
|
||||
sql(s"CREATE TABLE $targetTabName LIKE $sourceTabName")
|
||||
|
||||
// The source table should be an external data source table
|
||||
|
@ -894,7 +886,7 @@ class HiveDDLSuite
|
|||
test("CREATE TABLE LIKE an external Hive serde table") {
|
||||
val catalog = spark.sessionState.catalog
|
||||
withTempDir { tmpDir =>
|
||||
val basePath = tmpDir.getCanonicalPath
|
||||
val basePath = tmpDir.toURI
|
||||
val sourceTabName = "tab1"
|
||||
val targetTabName = "tab2"
|
||||
withTable(sourceTabName, targetTabName) {
|
||||
|
@ -1112,7 +1104,7 @@ class HiveDDLSuite
|
|||
Seq("parquet", "json", "orc").foreach { fileFormat =>
|
||||
withTable("t1") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
val path = dir.toURI.toString
|
||||
spark.range(1).write.format(fileFormat).save(path)
|
||||
sql(s"CREATE TABLE t1 USING $fileFormat OPTIONS (PATH '$path')")
|
||||
|
||||
|
|
|
@ -126,7 +126,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
s"""
|
||||
|CREATE FUNCTION udtf_count_temp
|
||||
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
|
||||
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
|
||||
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
|
||||
""".stripMargin)
|
||||
|
||||
checkAnswer(
|
||||
|
@ -321,7 +321,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
s"""
|
||||
|CREATE FUNCTION udtf_count
|
||||
|AS 'org.apache.spark.sql.hive.execution.GenericUDTFCount2'
|
||||
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").getCanonicalPath()}'
|
||||
|USING JAR '${hiveContext.getHiveFile("TestUDTF.jar").toURI}'
|
||||
""".stripMargin)
|
||||
|
||||
checkKeywordsExist(sql("describe function udtf_count"),
|
||||
|
@ -644,7 +644,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
withTempDir { dir =>
|
||||
val defaultDataSource = sessionState.conf.defaultDataSourceName
|
||||
|
||||
val tempLocation = dir.getCanonicalPath
|
||||
val tempLocation = dir.toURI.getPath.stripSuffix("/")
|
||||
sql(s"CREATE TABLE ctas1 LOCATION 'file:$tempLocation/c1'" +
|
||||
" AS SELECT key k, value FROM src ORDER BY k, value")
|
||||
checkRelation("ctas1", true, defaultDataSource, Some(s"file:$tempLocation/c1"))
|
||||
|
@ -1953,16 +1953,18 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
|
||||
test("SPARK-17796 Support wildcard character in filename for LOAD DATA LOCAL INPATH") {
|
||||
withTempDir { dir =>
|
||||
val path = dir.toURI.toString.stripSuffix("/")
|
||||
val dirPath = dir.getAbsoluteFile
|
||||
for (i <- 1 to 3) {
|
||||
Files.write(s"$i", new File(s"$dir/part-r-0000$i"), StandardCharsets.UTF_8)
|
||||
Files.write(s"$i", new File(dirPath, s"part-r-0000$i"), StandardCharsets.UTF_8)
|
||||
}
|
||||
for (i <- 5 to 7) {
|
||||
Files.write(s"$i", new File(s"$dir/part-s-0000$i"), StandardCharsets.UTF_8)
|
||||
Files.write(s"$i", new File(dirPath, s"part-s-0000$i"), StandardCharsets.UTF_8)
|
||||
}
|
||||
|
||||
withTable("load_t") {
|
||||
sql("CREATE TABLE load_t (a STRING)")
|
||||
sql(s"LOAD DATA LOCAL INPATH '$dir/*part-r*' INTO TABLE load_t")
|
||||
sql(s"LOAD DATA LOCAL INPATH '$path/*part-r*' INTO TABLE load_t")
|
||||
checkAnswer(sql("SELECT * FROM load_t"), Seq(Row("1"), Row("2"), Row("3")))
|
||||
|
||||
val m = intercept[AnalysisException] {
|
||||
|
@ -1971,7 +1973,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
|
|||
assert(m.contains("LOAD DATA input path does not exist"))
|
||||
|
||||
val m2 = intercept[AnalysisException] {
|
||||
sql(s"LOAD DATA LOCAL INPATH '$dir*/*part*' INTO TABLE load_t")
|
||||
sql(s"LOAD DATA LOCAL INPATH '$path*/*part*' INTO TABLE load_t")
|
||||
}.getMessage
|
||||
assert(m2.contains("LOAD DATA input path allows only filename wildcard"))
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ class WindowQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleto
|
|||
| p_retailprice DOUBLE,
|
||||
| p_comment STRING)
|
||||
""".stripMargin)
|
||||
val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").getCanonicalPath
|
||||
val testData1 = TestHive.getHiveFile("data/files/part_tiny.txt").toURI
|
||||
sql(
|
||||
s"""
|
||||
|LOAD DATA LOCAL INPATH '$testData1' overwrite into table part
|
||||
|
|
|
@ -57,7 +57,7 @@ abstract class OrcSuite extends QueryTest with TestHiveSingleton with BeforeAndA
|
|||
| stringField STRING
|
||||
|)
|
||||
|STORED AS ORC
|
||||
|LOCATION '${orcTableAsDir.getCanonicalPath}'
|
||||
|LOCATION '${orcTableAsDir.toURI}'
|
||||
""".stripMargin)
|
||||
|
||||
sql(
|
||||
|
@ -172,7 +172,7 @@ class OrcSourceSuite extends OrcSuite {
|
|||
s"""CREATE TEMPORARY VIEW normal_orc_source
|
||||
|USING org.apache.spark.sql.hive.orc
|
||||
|OPTIONS (
|
||||
| PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
|
||||
| PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
|
||||
|)
|
||||
""".stripMargin)
|
||||
|
||||
|
@ -180,7 +180,7 @@ class OrcSourceSuite extends OrcSuite {
|
|||
s"""CREATE TEMPORARY VIEW normal_orc_as_source
|
||||
|USING org.apache.spark.sql.hive.orc
|
||||
|OPTIONS (
|
||||
| PATH '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}'
|
||||
| PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
|
||||
|)
|
||||
""".stripMargin)
|
||||
}
|
||||
|
|
|
@ -81,7 +81,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
STORED AS
|
||||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
location '${partitionedTableDir.getCanonicalPath}'
|
||||
location '${partitionedTableDir.toURI}'
|
||||
""")
|
||||
|
||||
sql(s"""
|
||||
|
@ -95,7 +95,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
STORED AS
|
||||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
location '${partitionedTableDirWithKey.getCanonicalPath}'
|
||||
location '${partitionedTableDirWithKey.toURI}'
|
||||
""")
|
||||
|
||||
sql(s"""
|
||||
|
@ -108,7 +108,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
STORED AS
|
||||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
location '${new File(normalTableDir, "normal").getCanonicalPath}'
|
||||
location '${new File(normalTableDir, "normal").toURI}'
|
||||
""")
|
||||
|
||||
sql(s"""
|
||||
|
@ -124,7 +124,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
STORED AS
|
||||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
LOCATION '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
|
||||
LOCATION '${partitionedTableDirWithComplexTypes.toURI}'
|
||||
""")
|
||||
|
||||
sql(s"""
|
||||
|
@ -140,7 +140,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
STORED AS
|
||||
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
|
||||
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
|
||||
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
|
||||
LOCATION '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
|
||||
""")
|
||||
|
||||
sql(
|
||||
|
@ -561,7 +561,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
|
|||
test("SPARK-15248: explicitly added partitions should be readable") {
|
||||
withTable("test_added_partitions", "test_temp") {
|
||||
withTempDir { src =>
|
||||
val partitionDir = new File(src, "partition").getCanonicalPath
|
||||
val partitionDir = new File(src, "partition").toURI
|
||||
sql(
|
||||
"""
|
||||
|CREATE TABLE test_added_partitions (a STRING)
|
||||
|
@ -636,7 +636,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
CREATE TEMPORARY VIEW partitioned_parquet
|
||||
USING org.apache.spark.sql.parquet
|
||||
OPTIONS (
|
||||
path '${partitionedTableDir.getCanonicalPath}'
|
||||
path '${partitionedTableDir.toURI}'
|
||||
)
|
||||
""")
|
||||
|
||||
|
@ -644,7 +644,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
CREATE TEMPORARY VIEW partitioned_parquet_with_key
|
||||
USING org.apache.spark.sql.parquet
|
||||
OPTIONS (
|
||||
path '${partitionedTableDirWithKey.getCanonicalPath}'
|
||||
path '${partitionedTableDirWithKey.toURI}'
|
||||
)
|
||||
""")
|
||||
|
||||
|
@ -652,7 +652,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
CREATE TEMPORARY VIEW normal_parquet
|
||||
USING org.apache.spark.sql.parquet
|
||||
OPTIONS (
|
||||
path '${new File(partitionedTableDir, "p=1").getCanonicalPath}'
|
||||
path '${new File(partitionedTableDir, "p=1").toURI}'
|
||||
)
|
||||
""")
|
||||
|
||||
|
@ -660,7 +660,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
CREATE TEMPORARY VIEW partitioned_parquet_with_key_and_complextypes
|
||||
USING org.apache.spark.sql.parquet
|
||||
OPTIONS (
|
||||
path '${partitionedTableDirWithKeyAndComplexTypes.getCanonicalPath}'
|
||||
path '${partitionedTableDirWithKeyAndComplexTypes.toURI}'
|
||||
)
|
||||
""")
|
||||
|
||||
|
@ -668,7 +668,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
CREATE TEMPORARY VIEW partitioned_parquet_with_complextypes
|
||||
USING org.apache.spark.sql.parquet
|
||||
OPTIONS (
|
||||
path '${partitionedTableDirWithComplexTypes.getCanonicalPath}'
|
||||
path '${partitionedTableDirWithComplexTypes.toURI}'
|
||||
)
|
||||
""")
|
||||
}
|
||||
|
@ -701,8 +701,6 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
|
||||
test("SPARK-8811: compatibility with array of struct in Hive") {
|
||||
withTempPath { dir =>
|
||||
val path = dir.getCanonicalPath
|
||||
|
||||
withTable("array_of_struct") {
|
||||
val conf = Seq(
|
||||
HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
|
||||
|
@ -712,7 +710,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
withSQLConf(conf: _*) {
|
||||
sql(
|
||||
s"""CREATE TABLE array_of_struct
|
||||
|STORED AS PARQUET LOCATION '$path'
|
||||
|STORED AS PARQUET LOCATION '${dir.toURI}'
|
||||
|AS SELECT
|
||||
| '1st' AS a,
|
||||
| '2nd' AS b,
|
||||
|
@ -720,7 +718,7 @@ class ParquetSourceSuite extends ParquetPartitioningTest {
|
|||
""".stripMargin)
|
||||
|
||||
checkAnswer(
|
||||
spark.read.parquet(path),
|
||||
spark.read.parquet(dir.getCanonicalPath),
|
||||
Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -175,6 +175,12 @@ private[streaming] class ReceiverSupervisorImpl(
|
|||
}
|
||||
|
||||
override protected def onStop(message: String, error: Option[Throwable]) {
|
||||
receivedBlockHandler match {
|
||||
case handler: WriteAheadLogBasedBlockHandler =>
|
||||
// Write ahead log should be closed.
|
||||
handler.stop()
|
||||
case _ =>
|
||||
}
|
||||
registeredBlockGenerators.asScala.foreach { _.stop() }
|
||||
env.rpcEnv.stop(endpoint)
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
|
||||
package org.apache.spark.streaming
|
||||
|
||||
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
|
||||
import java.io._
|
||||
import java.nio.charset.StandardCharsets
|
||||
import java.util.concurrent.ConcurrentLinkedQueue
|
||||
|
||||
|
@ -629,7 +629,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
|
|||
ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
|
||||
val filenames = fileInputDStream.batchTimeToSelectedFiles.synchronized
|
||||
{ fileInputDStream.batchTimeToSelectedFiles.values.flatten }
|
||||
filenames.map(_.split(File.separator).last.toInt).toSeq.sorted
|
||||
filenames.map(_.split("/").last.toInt).toSeq.sorted
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -755,7 +755,15 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
|
|||
assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
|
||||
}
|
||||
} finally {
|
||||
Utils.deleteRecursively(testDir)
|
||||
try {
|
||||
// As the driver shuts down in the middle of processing and the thread above sleeps
|
||||
// for a while, `testDir` can be not closed correctly at this point which causes the
|
||||
// test failure on Windows.
|
||||
Utils.deleteRecursively(testDir)
|
||||
} catch {
|
||||
case e: IOException if Utils.isWindows =>
|
||||
logWarning(e.getMessage)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue