Commit graph

41 commits

Author SHA1 Message Date
Gengliang Wang 2eaf058788 [SPARK-25718][SQL] Detect recursive reference in Avro schema and throw exception
## What changes were proposed in this pull request?

Avro schema allows recursive reference, e.g. the schema for linked-list in https://avro.apache.org/docs/1.8.2/spec.html#schema_record
```
{
  "type": "record",
  "name": "LongList",
  "aliases": ["LinkedLongs"],                      // old name for this
  "fields" : [
    {"name": "value", "type": "long"},             // each element has a long
    {"name": "next", "type": ["null", "LongList"]} // optional next element
  ]
}
```

In current Spark SQL, it is impossible to convert the schema as `StructType` . Run `SchemaConverters.toSqlType(avroSchema)` and we will get stack overflow exception.

We should detect the recursive reference and throw exception for it.
## How was this patch tested?

New unit test case.

Closes #22709 from gengliangwang/avroRecursiveRef.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-10-13 14:49:38 +08:00
Gengliang Wang 928d0739c4 [SPARK-25595] Ignore corrupt Avro files if flag IGNORE_CORRUPT_FILES enabled
## What changes were proposed in this pull request?

With flag `IGNORE_CORRUPT_FILES` enabled, schema inference should ignore corrupt Avro files, which is consistent with Parquet and Orc data source.

## How was this patch tested?

Unit test

Closes #22611 from gengliangwang/ignoreCorruptAvro.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-10-03 17:08:55 +08:00
gatorsmile 9bf397c0e4 [SPARK-25592] Setting version to 3.0.0-SNAPSHOT
## What changes were proposed in this pull request?

This patch is to bump the master branch version to 3.0.0-SNAPSHOT.

## How was this patch tested?
N/A

Closes #22606 from gatorsmile/bump3.0.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-10-02 08:48:24 -07:00
Gengliang Wang 950ab79957 [SPARK-24777][SQL] Add write benchmark for AVRO
## What changes were proposed in this pull request?

Refactor `DataSourceWriteBenchmark` and add write benchmark for AVRO.

## How was this patch tested?

Build and run the benchmark.

Closes #22451 from gengliangwang/avroWriteBenchmark.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-09-20 17:41:24 -07:00
gatorsmile bb2f069cf2 [SPARK-25436] Bump master branch version to 2.5.0-SNAPSHOT
## What changes were proposed in this pull request?
In the dev list, we can still discuss whether the next version is 2.5.0 or 3.0.0. Let us first bump the master branch version to `2.5.0-SNAPSHOT`.

## How was this patch tested?
N/A

Closes #22426 from gatorsmile/bumpVersionMaster.

Authored-by: gatorsmile <gatorsmile@gmail.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
2018-09-15 16:24:02 -07:00
Arun Mahadevan 68ec207a32 [SPARK-25260][SQL] Fix namespace handling in SchemaConverters.toAvroType
## What changes were proposed in this pull request?

`toAvroType` converts spark data type to avro schema. It always appends the record name to namespace so its impossible to have an Avro namespace independent of the record name.

When invoked with a spark data type like,

```java
val sparkSchema = StructType(Seq(
    StructField("name", StringType, nullable = false),
    StructField("address", StructType(Seq(
        StructField("city", StringType, nullable = false),
        StructField("state", StringType, nullable = false))),
    nullable = false)))

// map it to an avro schema with record name "employee" and top level namespace "foo.bar",
val avroSchema = SchemaConverters.toAvroType(sparkSchema,  false, "employee", "foo.bar")

// result is
// avroSchema.getName = employee
// avroSchema.getNamespace = foo.bar.employee
// avroSchema.getFullname = foo.bar.employee.employee
```
The patch proposes to fix this so that the result is

```
avroSchema.getName = employee
avroSchema.getNamespace = foo.bar
avroSchema.getFullname = foo.bar.employee
```
## How was this patch tested?

New and existing unit tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #22251 from arunmahadevan/avro-fix.

Authored-by: Arun Mahadevan <arunm@apache.org>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-29 09:25:49 +08:00
Gengliang Wang e3b7bb4132 [SPARK-24811][FOLLOWUP][SQL] Revise package of AvroDataToCatalyst and CatalystDataToAvro
## What changes were proposed in this pull request?

In https://github.com/apache/spark/pull/21838, the class `AvroDataToCatalyst` and `CatalystDataToAvro` were put in package `org.apache.spark.sql`.
They should be moved to package  `org.apache.spark.sql.avro`.
Also optimize imports in Avro module.

## How was this patch tested?

Unit test

Closes #22196 from gengliangwang/avro_revise_package_name.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-23 15:08:46 +08:00
Gengliang Wang ac0174e55a [SPARK-25129][SQL] Make the mapping of com.databricks.spark.avro to built-in module configurable
## What changes were proposed in this pull request?

In https://issues.apache.org/jira/browse/SPARK-24924, the data source provider com.databricks.spark.avro is mapped to the new package org.apache.spark.sql.avro .

As per the discussion in the [Jira](https://issues.apache.org/jira/browse/SPARK-24924) and PR #22119, we should make the mapping configurable.

This PR also improve the error message when data source of Avro/Kafka is not found.

## How was this patch tested?

Unit test

Closes #22133 from gengliangwang/configurable_avro_mapping.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Xiao Li <gatorsmile@gmail.com>
2018-08-21 15:26:24 -07:00
Gengliang Wang 60af2501e1 [SPARK-25160][SQL] Avro: remove sql configuration spark.sql.avro.outputTimestampType
## What changes were proposed in this pull request?

In the PR for supporting logical timestamp types https://github.com/apache/spark/pull/21935, a SQL configuration spark.sql.avro.outputTimestampType is added, so that user can specify the output timestamp precision they want.

With PR https://github.com/apache/spark/pull/21847,  the output file can be written with user specified types.

So there is no need to have such trivial configuration. Otherwise to make it consistent we need to add configuration for all the Catalyst types that can be converted into different Avro types.

This PR also add a test case for user specified output schema with different timestamp types.

## How was this patch tested?

Unit test

Closes #22151 from gengliangwang/removeOutputTimestampType.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: hyukjinkwon <gurwls223@apache.org>
2018-08-20 20:42:27 +08:00
Gengliang Wang ab197308a7
[SPARK-25104][SQL] Avro: Validate user specified output schema
## What changes were proposed in this pull request?

With code changes in https://github.com/apache/spark/pull/21847 , Spark can write out to Avro file as per user provided output schema.

To make it more robust and user friendly, we should validate the Avro schema before tasks launched.

Also we should support output logical decimal type as BYTES (By default we output as FIXED)

## How was this patch tested?

Unit test

Closes #22094 from gengliangwang/AvroSerializerMatch.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-14 04:43:14 +00:00
Gengliang Wang 26775e3c8e [SPARK-25099][SQL][TEST] Generate Avro Binary files in test suite
## What changes were proposed in this pull request?

In PR https://github.com/apache/spark/pull/21984 and https://github.com/apache/spark/pull/21935 , the related test cases are using binary files created by Python scripts.

Generate the binary files in test suite to make it more transparent.  Also we can

Also move the related test cases to a new file `AvroLogicalTypeSuite.scala`.

## How was this patch tested?

Unit test.

Closes #22091 from gengliangwang/logicalType_suite.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 20:50:28 +08:00
Gengliang Wang be2238fb50 [SPARK-24774][SQL] Avro: Support logical decimal type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Decimal

## How was this patch tested?
Unit test

Closes #22037 from gengliangwang/avro_decimal.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-13 08:29:07 +08:00
Brian Lindblom 0cea9e3cd0
[SPARK-24855][SQL][EXTERNAL] Built-in AVRO support should support specified schema on write
## What changes were proposed in this pull request?

Allows `avroSchema` option to be specified on write, allowing a user to specify a schema in cases where this is required.  A trivial use case is reading in an avro dataset, making some small adjustment to a column or columns and writing out using the same schema.  Implicit schema creation from SQL Struct results in a schema that while for the most part, is functionally similar, is not necessarily compatible.

Allows `fixed` Field type to be utilized for records of specified `avroSchema`

## How was this patch tested?

Unit tests in AvroSuite are extended to test this with enum and fixed types.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes #21847 from lindblombr/specify_schema_on_write.

Lead-authored-by: Brian Lindblom <blindblom@apple.com>
Co-authored-by: DB Tsai <d_tsai@apple.com>
Signed-off-by: DB Tsai <d_tsai@apple.com>
2018-08-10 03:35:29 +00:00
Gengliang Wang 819c4de45a [SPARK-24772][SQL] Avro: support logical date type
## What changes were proposed in this pull request?

Support Avro logical date type:
https://avro.apache.org/docs/1.8.2/spec.html#Date

## How was this patch tested?

Unit test

Closes #21984 from gengliangwang/avro_date.

Authored-by: Gengliang Wang <gengliang.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2018-08-07 17:24:25 +08:00
DB Tsai 273b28404c
[SPARK-24993][SQL] Make Avro Fast Again
## What changes were proposed in this pull request?

When lindblombr at apple developed [SPARK-24855](https://github.com/apache/spark/pull/21847) to support specified schema on write, we found a performance regression in Avro writer for our dataset.

With this PR, the performance is improved, but not as good as Spark 2.3 + the old avro writer. There must be something we miss which we need to investigate further.

Spark 2.4
```
spark git:(master) ./build/mvn -DskipTests clean package
spark git:(master) bin/spark-shell --jars external/avro/target/spark-avro_2.11-2.4.0-SNAPSHOT.jar
```

Spark 2.3 + databricks avro
```
spark git:(branch-2.3) ./build/mvn -DskipTests clean package
spark git:(branch-2.3) bin/spark-shell --packages com.databricks:spark-avro_2.11:4.0.0
```

Current master:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|             2.95621|
| stddev|0.030895815479469294|
|    min|               2.915|
|    max|               3.049|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.31072999999999995|
| stddev|0.054139709842390006|
|    min|               0.259|
|    max|               0.692|
+-------+--------------------+
```

Current master with this PR:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  2.5804300000000002|
| stddev|0.011175600225672079|
|    min|               2.558|
|    max|                2.62|
+-------+--------------------+

+-------+--------------------+
|summary|           readTimes|
+-------+--------------------+
|  count|                 100|
|   mean| 0.29922000000000004|
| stddev|0.058261961532514166|
|    min|               0.251|
|    max|               0.732|
+-------+--------------------+
```

Spark 2.3 + databricks avro:
```
+-------+--------------------+
|summary|          writeTimes|
+-------+--------------------+
|  count|                 100|
|   mean|  1.7730500000000005|
| stddev|0.025199156230863575|
|    min|               1.729|
|    max|               1.833|
+-------+--------------------+

+-------+-------------------+
|summary|          readTimes|
+-------+-------------------+
|  count|                100|
|   mean|            0.29715|
| stddev|0.05685643358850465|
|    min|              0.258|
|    max|              0.718|
+-------+-------------------+
```

The following is the test code to reproduce the result.
```scala
    spark.sqlContext.setConf("spark.sql.avro.compression.codec", "uncompressed")
    val sparkSession = spark
    import sparkSession.implicits._
    val df = spark.sparkContext.range(1, 3000).repartition(1).map { uid =>
      val features = Array.fill(16000)(scala.math.random)
      (uid, scala.math.random, java.util.UUID.randomUUID().toString, java.util.UUID.randomUUID().toString, features)
    }.toDF("uid", "random", "uuid1", "uuid2", "features").cache()
    val size = df.count()

    // Write into ramdisk to rule out the disk IO impact
    val tempSaveDir = s"/Volumes/ramdisk/${java.util.UUID.randomUUID()}/"
    val n = 150
    val writeTimes = new Array[Double](n)
    var i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      df.write
        .format("com.databricks.spark.avro")
        .mode("overwrite")
        .save(tempSaveDir)
      val t2 = System.currentTimeMillis()
      writeTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    df.unpersist()

    // The first 50 runs are for warm-up
    val readTimes = new Array[Double](n)
    i = 0
    while (i < n) {
      val t1 = System.currentTimeMillis()
      val readDF = spark.read.format("com.databricks.spark.avro").load(tempSaveDir)
      assert(readDF.count() == size)
      val t2 = System.currentTimeMillis()
      readTimes(i) = (t2 - t1) / 1000.0
      i += 1
    }

    spark.sparkContext.parallelize(writeTimes.slice(50, 150)).toDF("writeTimes").describe("writeTimes").show()
    spark.sparkContext.parallelize(readTimes.slice(50, 150)).toDF("readTimes").describe("readTimes").show()
```

## How was this patch tested?

Existing tests.

Author: DB Tsai <d_tsai@apple.com>
Author: Brian Lindblom <blindblom@apple.com>

Closes #21952 from dbtsai/avro-performance-fix.
2018-08-03 07:43:54 +00:00
Gengliang Wang f45d60a5a1 [SPARK-25002][SQL] Avro: revise the output record namespace
## What changes were proposed in this pull request?

Currently the output namespace is starting with ".", e.g. `.topLevelRecord`

Although it is valid according to Avro spec, we should remove the starting dot in case of failures when the output Avro file is read by other lib:

https://github.com/linkedin/goavro/pull/96

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21974 from gengliangwang/avro_namespace.
2018-08-03 13:28:44 +08:00
Gengliang Wang 7cf16a7fa4 [SPARK-24773] Avro: support logical timestamp type with different precisions
## What changes were proposed in this pull request?

Support reading/writing Avro logical timestamp type with different precisions
https://avro.apache.org/docs/1.8.2/spec.html#Timestamp+%28millisecond+precision%29

To specify the output timestamp type, use Dataframe option `outputTimestampType`  or SQL config `spark.sql.avro.outputTimestampType`.  The supported values are
* `TIMESTAMP_MICROS`
* `TIMESTAMP_MILLIS`

The default output type is `TIMESTAMP_MICROS`
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21935 from gengliangwang/avro_timestamp.
2018-08-03 08:32:08 +08:00
Stavros Kontopoulos a65736996b [SPARK-14540][CORE] Fix remaining major issues for Scala 2.12 Support
## What changes were proposed in this pull request?
This PR addresses issues 2,3 in this [document](https://docs.google.com/document/d/1fbkjEL878witxVQpOCbjlvOvadHtVjYXeB-2mgzDTvk).

* We modified the closure cleaner to identify closures that are implemented via the LambdaMetaFactory mechanism (serializedLambdas) (issue2).

* We also fix the issue due to scala/bug#11016. There are two options for solving the Unit issue, either add () at the end of the closure or use the trick described in the doc. Otherwise overloading resolution does not work (we are not going to eliminate either of the methods) here. Compiler tries to adapt to Unit and makes these two methods candidates for overloading, when there is polymorphic overloading there is no ambiguity (that is the workaround implemented). This does not look that good but it serves its purpose as we need to support two different uses for method: `addTaskCompletionListener`. One that passes a TaskCompletionListener and one that passes a closure that is wrapped with a TaskCompletionListener later on (issue3).

Note: regarding issue 1 in the doc the plan is:

> Do Nothing. Don’t try to fix this as this is only a problem for Java users who would want to use 2.11 binaries. In that case they can cast to MapFunction to be able to utilize lambdas. In Spark 3.0.0 the API should be simplified so that this issue is removed.

## How was this patch tested?
This was manually tested:
```./dev/change-scala-version.sh 2.12
./build/mvn -DskipTests -Pscala-2.12 clean package
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.serializer.ProactiveClosureSerializationSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.util.ClosureCleanerSuite -Dtest=None
./build/mvn -Pscala-2.12 clean package -DwildcardSuites=org.apache.spark.streaming.DStreamClosureSuite -Dtest=None```

Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com>

Closes #21930 from skonto/scala2.12-sup.
2018-08-02 09:17:09 -05:00
Maxim Gekk d20c10fdf3 [SPARK-24952][SQL] Support LZMA2 compression by Avro datasource
## What changes were proposed in this pull request?

In the PR, I propose to support `LZMA2` (`XZ`) and `BZIP2` compressions by `AVRO` datasource  in write since the codecs may have better characteristics like compression ratio and speed comparing to already supported `snappy` and `deflate` codecs.

## How was this patch tested?

It was tested manually and by an existing test which was extended to check the `xz` and `bzip2` compressions.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21902 from MaxGekk/avro-xz-bzip2.
2018-07-31 09:12:57 +08:00
hyukjinkwon fca0b8528e [SPARK-24967][SQL] Avro: Use internal.Logging instead for logging
## What changes were proposed in this pull request?

Looks Avro uses direct `getLogger` to create a SLF4J logger. Should better use `internal.Logging` instead.

## How was this patch tested?

Exiting tests.

Author: hyukjinkwon <gurwls223@apache.org>

Closes #21914 from HyukjinKwon/avro-log.
2018-07-30 21:13:08 +08:00
Xiao Li c6a3db2fb6 [SPARK-24924][SQL][FOLLOW-UP] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?
Add one more test case for `com.databricks.spark.avro`.

## How was this patch tested?
N/A

Author: Xiao Li <gatorsmile@gmail.com>

Closes #21906 from gatorsmile/avro.
2018-07-28 13:43:32 +08:00
Maxim Gekk 0a0f68bae6 [SPARK-24881][SQL] New Avro option - compression
## What changes were proposed in this pull request?

In the PR, I added new option for Avro datasource - `compression`. The option allows to specify compression codec for saved Avro files. This option is similar to `compression` option in another datasources like `JSON` and `CSV`.

Also I added the SQL configs `spark.sql.avro.compression.codec` and `spark.sql.avro.deflate.level`. I put the configs into `SQLConf`. If the `compression` option is not specified by an user, the first SQL config is taken into account.

## How was this patch tested?

I added new test which read meta info from written avro files and checks `avro.codec` property.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21837 from MaxGekk/avro-compression.
2018-07-28 00:11:32 +08:00
Gengliang Wang fa09d91925 [SPARK-24919][BUILD] New linter rule for sparkContext.hadoopConfiguration
## What changes were proposed in this pull request?

In most cases, we should use `spark.sessionState.newHadoopConf()` instead of `sparkContext.hadoopConfiguration`, so that the hadoop configurations specified in Spark session
configuration will come into effect.

Add a rule matching `spark.sparkContext.hadoopConfiguration` or `spark.sqlContext.sparkContext.hadoopConfiguration` to prevent the usage.
## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21873 from gengliangwang/linterRule.
2018-07-26 16:50:59 -07:00
Dongjoon Hyun 58353d7f4b [SPARK-24924][SQL] Add mapping for built-in Avro data source
## What changes were proposed in this pull request?

This PR aims to the followings.
1. Like `com.databricks.spark.csv` mapping, we had better map `com.databricks.spark.avro` to built-in Avro data source.
2. Remove incorrect error message, `Please find an Avro package at ...`.

## How was this patch tested?

Pass the newly added tests.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #21878 from dongjoon-hyun/SPARK-24924.
2018-07-26 16:11:03 +08:00
Gengliang Wang c44eb561ec [SPARK-24768][FOLLOWUP][SQL] Avro migration followup: change artifactId to spark-avro
## What changes were proposed in this pull request?
After rethinking on the artifactId, I think it should be `spark-avro` instead of `spark-sql-avro`, which is simpler, and consistent with the previous artifactId. I think we need to change it before Spark 2.4 release.

Also a tiny change: use `spark.sessionState.newHadoopConf()` to get the hadoop configuration, thus the related hadoop configurations in SQLConf will come into effect.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21866 from gengliangwang/avro_followup.
2018-07-25 08:42:45 -07:00
Gengliang Wang 08e315f633 [SPARK-24887][SQL] Avro: use SerializableConfiguration in Spark utils to deduplicate code
## What changes were proposed in this pull request?

To implement the method `buildReader` in `FileFormat`, it is required to serialize the hadoop configuration for executors.

Previous spark-avro uses its own class `SerializableConfiguration` for the serialization. As now it is part of Spark, we can use SerializableConfiguration in Spark util to deduplicate the code.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21846 from gengliangwang/removeSerializableConfiguration.
2018-07-23 08:31:48 -07:00
Gengliang Wang f59de52a2a [SPARK-24883][SQL] Avro: remove implicit class AvroDataFrameWriter/AvroDataFrameReader
## What changes were proposed in this pull request?

As per Reynold's comment: https://github.com/apache/spark/pull/21742#discussion_r203496489

It makes sense to remove the implicit class AvroDataFrameWriter/AvroDataFrameReader, since the Avro package is external module.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21841 from gengliangwang/removeImplicit.
2018-07-23 15:27:33 +08:00
Gengliang Wang 8817c68f50 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

1. Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

2. Add a new function to_avro for converting a column into binary of avro format with the specified schema.

I created #21774 for this, but it failed the build https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7902/

Additional changes In this PR:
1. Add `scalacheck` dependency in pom.xml to resolve the failure.
2. Update the `log4j.properties` to make it consistent with other modules.

## How was this patch tested?

Unit test
Compile with different commands:
```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21838 from gengliangwang/from_and_to_avro.
2018-07-22 17:36:57 -07:00
Maxim Gekk 106880edcd [SPARK-24836][SQL] New option for Avro datasource - ignoreExtension
## What changes were proposed in this pull request?

I propose to add new option for AVRO datasource which should control ignoring of files without `.avro` extension in read. The option has name `ignoreExtension` with default value `true`. If both options `ignoreExtension` and `avro.mapred.ignore.inputs.without.extension` are set, `ignoreExtension` overrides the former one. Here is an example of usage:

```
spark
  .read
  .option("ignoreExtension", false)
  .avro("path to avro files")
```

## How was this patch tested?

I added a test which checks the option directly and a test for checking that new option overrides hadoop's config.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21798 from MaxGekk/avro-ignore-extension.
2018-07-20 20:04:40 -07:00
Gengliang Wang 00b864aa70 [SPARK-24876][SQL] Avro: simplify schema serialization
## What changes were proposed in this pull request?

Previously in the refactoring of Avro Serializer and Deserializer, a new class SerializableSchema is created for serializing the Avro schema:
https://github.com/apache/spark/pull/21762/files#diff-01fea32e6ec6bcf6f34d06282e08705aR37

On second thought, we can use `toString` method for serialization. After that, parse the JSON format schema on executor. This makes the code much simpler.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21829 from gengliangwang/removeSerializableSchema.
2018-07-20 14:57:59 -07:00
Xiao Li 9ad77b3037 Revert "[SPARK-24811][SQL] Avro: add new function from_avro and to_avro"
This reverts commit 244bcff194.
2018-07-20 12:55:38 -07:00
Gengliang Wang 244bcff194 [SPARK-24811][SQL] Avro: add new function from_avro and to_avro
## What changes were proposed in this pull request?

Add a new function from_avro for parsing a binary column of avro format and converting it into its corresponding catalyst value.

Add a new function to_avro for converting a column into binary of avro format with the specified schema.

This PR is in progress. Will add test cases.
## How was this patch tested?

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21774 from gengliangwang/from_and_to_avro.
2018-07-20 09:19:29 -07:00
Maxim Gekk cd5d93c0e4 [SPARK-24854][SQL] Gathering all Avro options into the AvroOptions class
## What changes were proposed in this pull request?

In the PR, I propose to put all `Avro` options in new class `AvroOptions` in the same way as for other datasources `JSON` and `CSV`.

## How was this patch tested?

It was tested by `AvroSuite`

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21810 from MaxGekk/avro-options.
2018-07-19 09:16:16 +08:00
Takuya UESHIN 34cb3b54e9 [SPARK-24386][SPARK-24768][BUILD][FOLLOWUP] Fix lint-java and Scala 2.12 build.
## What changes were proposed in this pull request?

This pr fixes lint-java and Scala 2.12 build.

lint-java:

```
[ERROR] src/test/resources/log4j.properties:[0] (misc) NewlineAtEndOfFile: File does not end with a newline.
```

Scala 2.12 build:

```
[error] /.../sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala:121: overloaded method value addTaskCompletionListener with alternatives:
[error]   (f: org.apache.spark.TaskContext => Unit)org.apache.spark.TaskContext <and>
[error]   (listener: org.apache.spark.util.TaskCompletionListener)org.apache.spark.TaskContext
[error]  cannot be applied to (org.apache.spark.TaskContext => java.util.List[Runnable])
[error]       context.addTaskCompletionListener { ctx =>
[error]               ^
```

## How was this patch tested?

Manually executed lint-java and Scala 2.12 build in my local environment.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes #21801 from ueshin/issues/SPARK-24386_24768/fix_build.
2018-07-18 19:17:18 +08:00
Maxim Gekk ba437fc5c7 [SPARK-24805][SQL] Do not ignore avro files without extensions by default
## What changes were proposed in this pull request?

In the PR, I propose to change default behaviour of AVRO datasource which currently ignores files without `.avro` extension in read by default. This PR sets the default value for `avro.mapred.ignore.inputs.without.extension` to `false` in the case if the parameter is not set by an user.

## How was this patch tested?

Added a test file without extension in AVRO format, and new test for reading the file with and wihout specified schema.

Author: Maxim Gekk <maxim.gekk@databricks.com>
Author: Maxim Gekk <max.gekk@gmail.com>

Closes #21769 from MaxGekk/avro-without-extension.
2018-07-16 14:35:44 -07:00
Maxim Gekk 9f929458fb [SPARK-24810][SQL] Fix paths to test files in AvroSuite
## What changes were proposed in this pull request?

In the PR, I propose to move `testFile()` to the common trait `SQLTestUtilsBase` and wrap test files in `AvroSuite` by the method `testFile()` which returns full paths to test files in the resource folder.

Author: Maxim Gekk <maxim.gekk@databricks.com>

Closes #21773 from MaxGekk/test-file.
2018-07-15 23:01:36 -07:00
Gengliang Wang 9603087638 [SPARK-24800][SQL] Refactor Avro Serializer and Deserializer
## What changes were proposed in this pull request?
Currently the Avro Deserializer converts input Avro format data to `Row`, and then convert the `Row` to `InternalRow`.
While the Avro Serializer converts `InternalRow` to `Row`, and then output Avro format data.
This PR allows direct conversion between `InternalRow` and Avro format data.

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21762 from gengliangwang/avro_io.
2018-07-15 22:06:33 +08:00
Gengliang Wang 3e7dc82960 [SPARK-24776][SQL] Avro unit test: deduplicate code and replace deprecated methods
## What changes were proposed in this pull request?

Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods

This is a follow up PR for #21760, the PR passes pull request tests but failed in: https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Compile/job/spark-master-compile-maven-hadoop-2.6/7842/

This PR is to fix it.
## How was this patch tested?
Unit test.
Compile with different commands:

```
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.6 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-2.7 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile
./build/mvn --force -DzincPort=3643 -DskipTests -Phadoop-3.1 -Phive-thriftserver -Pkinesis-asl -Pspark-ganglia-lgpl -Pmesos -Pyarn  compile test-compile

```

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21768 from gengliangwang/improve_avro_test.
2018-07-14 21:36:56 -07:00
Xiao Li 3bcb1b4814 Revert "[SPARK-24776][SQL] Avro unit test: use SQLTestUtils and replace deprecated methods"
This reverts commit c1b62e420a.
2018-07-13 10:06:26 -07:00
Gengliang Wang c1b62e420a [SPARK-24776][SQL] Avro unit test: use SQLTestUtils and replace deprecated methods
## What changes were proposed in this pull request?
Improve Avro unit test:
1. use QueryTest/SharedSQLContext/SQLTestUtils, instead of the duplicated test utils.
2. replace deprecated methods

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21760 from gengliangwang/improve_avro_test.
2018-07-13 08:55:46 -07:00
Gengliang Wang 395860a986 [SPARK-24768][SQL] Have a built-in AVRO data source implementation
## What changes were proposed in this pull request?

Apache Avro (https://avro.apache.org) is a popular data serialization format. It is widely used in the Spark and Hadoop ecosystem, especially for Kafka-based data pipelines.  Using the external package https://github.com/databricks/spark-avro, Spark SQL can read and write the avro data. Making spark-Avro built-in can provide a better experience for first-time users of Spark SQL and structured streaming. We expect the built-in Avro data source can further improve the adoption of structured streaming.
The proposal is to inline code from spark-avro package (https://github.com/databricks/spark-avro). The target release is Spark 2.4.

[Built-in AVRO Data Source In Spark 2.4.pdf](https://github.com/apache/spark/files/2181511/Built-in.AVRO.Data.Source.In.Spark.2.4.pdf)

## How was this patch tested?

Unit test

Author: Gengliang Wang <gengliang.wang@databricks.com>

Closes #21742 from gengliangwang/export_avro.
2018-07-12 13:55:25 -07:00