[SPARK-2736] PySpark converter and example script for reading Avro files
JIRA: https://issues.apache.org/jira/browse/SPARK-2736 This patch includes: 1. An Avro converter that converts Avro data types to Python. It handles all 3 Avro data mappings (Generic, Specific and Reflect). 2. An example Python script for reading Avro files using AvroKeyInputFormat and the converter. 3. Fixing a classloading issue. cc @MLnick @JoshRosen @mateiz Author: Kan Zhang <kzhang@apache.org> Closes #1916 from kanzhang/SPARK-2736 and squashes the following commits: 02443f8 [Kan Zhang] [SPARK-2736] Adding .avsc files to .rat-excludes f74e9a9 [Kan Zhang] [SPARK-2736] nit: clazz -> className 82cc505 [Kan Zhang] [SPARK-2736] Update data sample 0be7761 [Kan Zhang] [SPARK-2736] Example pyspark script and data files c8e5881 [Kan Zhang] [SPARK-2736] Trying to work with all 3 Avro data models 2271a5b [Kan Zhang] [SPARK-2736] Using the right class loader to find Avro classes 536876b [Kan Zhang] [SPARK-2736] Adding Avro to Java converter
This commit is contained in:
parent
3a8b68b735
commit
9422a9b084
|
@ -25,6 +25,7 @@ log4j-defaults.properties
|
|||
bootstrap-tooltip.js
|
||||
jquery-1.11.1.min.js
|
||||
sorttable.js
|
||||
.*avsc
|
||||
.*txt
|
||||
.*json
|
||||
.*data
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.spark.api.python
|
|||
|
||||
import org.apache.spark.broadcast.Broadcast
|
||||
import org.apache.spark.rdd.RDD
|
||||
import org.apache.spark.util.Utils
|
||||
import org.apache.spark.{Logging, SerializableWritable, SparkException}
|
||||
import org.apache.hadoop.conf.Configuration
|
||||
import org.apache.hadoop.io._
|
||||
|
@ -42,7 +43,7 @@ private[python] object Converter extends Logging {
|
|||
defaultConverter: Converter[Any, Any]): Converter[Any, Any] = {
|
||||
converterClass.map { cc =>
|
||||
Try {
|
||||
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
|
||||
val c = Utils.classForName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
|
||||
logInfo(s"Loaded converter: $cc")
|
||||
c
|
||||
} match {
|
||||
|
|
|
@ -372,8 +372,8 @@ private[spark] object PythonRDD extends Logging {
|
|||
batchSize: Int) = {
|
||||
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
|
||||
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
|
||||
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
|
||||
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
|
||||
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
|
||||
val confBroadcasted = sc.sc.broadcast(new SerializableWritable(sc.hadoopConfiguration()))
|
||||
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
|
||||
|
@ -440,9 +440,9 @@ private[spark] object PythonRDD extends Logging {
|
|||
keyClass: String,
|
||||
valueClass: String,
|
||||
conf: Configuration) = {
|
||||
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
|
||||
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
|
||||
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
|
||||
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
|
||||
if (path.isDefined) {
|
||||
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
|
||||
} else {
|
||||
|
@ -509,9 +509,9 @@ private[spark] object PythonRDD extends Logging {
|
|||
keyClass: String,
|
||||
valueClass: String,
|
||||
conf: Configuration) = {
|
||||
val kc = Class.forName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Class.forName(valueClass).asInstanceOf[Class[V]]
|
||||
val fc = Class.forName(inputFormatClass).asInstanceOf[Class[F]]
|
||||
val kc = Utils.classForName(keyClass).asInstanceOf[Class[K]]
|
||||
val vc = Utils.classForName(valueClass).asInstanceOf[Class[V]]
|
||||
val fc = Utils.classForName(inputFormatClass).asInstanceOf[Class[F]]
|
||||
if (path.isDefined) {
|
||||
sc.sc.hadoopFile(path.get, fc, kc, vc)
|
||||
} else {
|
||||
|
@ -558,7 +558,7 @@ private[spark] object PythonRDD extends Logging {
|
|||
for {
|
||||
k <- Option(keyClass)
|
||||
v <- Option(valueClass)
|
||||
} yield (Class.forName(k), Class.forName(v))
|
||||
} yield (Utils.classForName(k), Utils.classForName(v))
|
||||
}
|
||||
|
||||
private def getKeyValueConverters(keyConverterClass: String, valueConverterClass: String,
|
||||
|
@ -621,10 +621,10 @@ private[spark] object PythonRDD extends Logging {
|
|||
val (kc, vc) = getKeyValueTypes(keyClass, valueClass).getOrElse(
|
||||
inferKeyValueTypes(rdd, keyConverterClass, valueConverterClass))
|
||||
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
|
||||
val codec = Option(compressionCodecClass).map(Class.forName(_).asInstanceOf[Class[C]])
|
||||
val codec = Option(compressionCodecClass).map(Utils.classForName(_).asInstanceOf[Class[C]])
|
||||
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
|
||||
new JavaToWritableConverter)
|
||||
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
|
||||
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
|
||||
converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec)
|
||||
}
|
||||
|
||||
|
@ -653,7 +653,7 @@ private[spark] object PythonRDD extends Logging {
|
|||
val mergedConf = getMergedConf(confAsMap, pyRDD.context.hadoopConfiguration)
|
||||
val converted = convertRDD(rdd, keyConverterClass, valueConverterClass,
|
||||
new JavaToWritableConverter)
|
||||
val fc = Class.forName(outputFormatClass).asInstanceOf[Class[F]]
|
||||
val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]]
|
||||
converted.saveAsNewAPIHadoopFile(path, kc, vc, fc, mergedConf)
|
||||
}
|
||||
|
||||
|
|
|
@ -146,6 +146,9 @@ private[spark] object Utils extends Logging {
|
|||
Try { Class.forName(clazz, false, getContextOrSparkClassLoader) }.isSuccess
|
||||
}
|
||||
|
||||
/** Preferred alternative to Class.forName(className) */
|
||||
def classForName(className: String) = Class.forName(className, true, getContextOrSparkClassLoader)
|
||||
|
||||
/**
|
||||
* Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
|
||||
*/
|
||||
|
|
75
examples/src/main/python/avro_inputformat.py
Normal file
75
examples/src/main/python/avro_inputformat.py
Normal file
|
@ -0,0 +1,75 @@
|
|||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
# contributor license agreements. See the NOTICE file distributed with
|
||||
# this work for additional information regarding copyright ownership.
|
||||
# The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
# (the "License"); you may not use this file except in compliance with
|
||||
# the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
import sys
|
||||
|
||||
from pyspark import SparkContext
|
||||
|
||||
"""
|
||||
Read data file users.avro in local Spark distro:
|
||||
|
||||
$ cd $SPARK_HOME
|
||||
$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
|
||||
> examples/src/main/resources/users.avro
|
||||
{u'favorite_color': None, u'name': u'Alyssa', u'favorite_numbers': [3, 9, 15, 20]}
|
||||
{u'favorite_color': u'red', u'name': u'Ben', u'favorite_numbers': []}
|
||||
|
||||
To read name and favorite_color fields only, specify the following reader schema:
|
||||
|
||||
$ cat examples/src/main/resources/user.avsc
|
||||
{"namespace": "example.avro",
|
||||
"type": "record",
|
||||
"name": "User",
|
||||
"fields": [
|
||||
{"name": "name", "type": "string"},
|
||||
{"name": "favorite_color", "type": ["string", "null"]}
|
||||
]
|
||||
}
|
||||
|
||||
$ ./bin/spark-submit --driver-class-path /path/to/example/jar ./examples/src/main/python/avro_inputformat.py \
|
||||
> examples/src/main/resources/users.avro examples/src/main/resources/user.avsc
|
||||
{u'favorite_color': None, u'name': u'Alyssa'}
|
||||
{u'favorite_color': u'red', u'name': u'Ben'}
|
||||
"""
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2 and len(sys.argv) != 3:
|
||||
print >> sys.stderr, """
|
||||
Usage: avro_inputformat <data_file> [reader_schema_file]
|
||||
|
||||
Run with example jar:
|
||||
./bin/spark-submit --driver-class-path /path/to/example/jar /path/to/examples/avro_inputformat.py <data_file> [reader_schema_file]
|
||||
Assumes you have Avro data stored in <data_file>. Reader schema can be optionally specified in [reader_schema_file].
|
||||
"""
|
||||
exit(-1)
|
||||
|
||||
path = sys.argv[1]
|
||||
sc = SparkContext(appName="AvroKeyInputFormat")
|
||||
|
||||
conf = None
|
||||
if len(sys.argv) == 3:
|
||||
schema_rdd = sc.textFile(sys.argv[2], 1).collect()
|
||||
conf = {"avro.schema.input.key" : reduce(lambda x, y: x+y, schema_rdd)}
|
||||
|
||||
avro_rdd = sc.newAPIHadoopFile(path,
|
||||
"org.apache.avro.mapreduce.AvroKeyInputFormat",
|
||||
"org.apache.avro.mapred.AvroKey",
|
||||
"org.apache.hadoop.io.NullWritable",
|
||||
keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
|
||||
conf=conf)
|
||||
output = avro_rdd.map(lambda x: x[0]).collect()
|
||||
for k in output:
|
||||
print k
|
8
examples/src/main/resources/user.avsc
Normal file
8
examples/src/main/resources/user.avsc
Normal file
|
@ -0,0 +1,8 @@
|
|||
{"namespace": "example.avro",
|
||||
"type": "record",
|
||||
"name": "User",
|
||||
"fields": [
|
||||
{"name": "name", "type": "string"},
|
||||
{"name": "favorite_color", "type": ["string", "null"]}
|
||||
]
|
||||
}
|
BIN
examples/src/main/resources/users.avro
Normal file
BIN
examples/src/main/resources/users.avro
Normal file
Binary file not shown.
|
@ -0,0 +1,130 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.examples.pythonconverters
|
||||
|
||||
import java.util.{Collection => JCollection, Map => JMap}
|
||||
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
import org.apache.avro.generic.{GenericFixed, IndexedRecord}
|
||||
import org.apache.avro.mapred.AvroWrapper
|
||||
import org.apache.avro.Schema
|
||||
import org.apache.avro.Schema.Type._
|
||||
|
||||
import org.apache.spark.api.python.Converter
|
||||
import org.apache.spark.SparkException
|
||||
|
||||
|
||||
/**
|
||||
* Implementation of [[org.apache.spark.api.python.Converter]] that converts
|
||||
* an Avro Record wrapped in an AvroKey (or AvroValue) to a Java Map. It tries
|
||||
* to work with all 3 Avro data mappings (Generic, Specific and Reflect).
|
||||
*/
|
||||
class AvroWrapperToJavaConverter extends Converter[Any, Any] {
|
||||
override def convert(obj: Any): Any = {
|
||||
if (obj == null) {
|
||||
return null
|
||||
}
|
||||
obj.asInstanceOf[AvroWrapper[_]].datum() match {
|
||||
case null => null
|
||||
case record: IndexedRecord => unpackRecord(record)
|
||||
case other => throw new SparkException(
|
||||
s"Unsupported top-level Avro data type ${other.getClass.getName}")
|
||||
}
|
||||
}
|
||||
|
||||
def unpackRecord(obj: Any): JMap[String, Any] = {
|
||||
val map = new java.util.HashMap[String, Any]
|
||||
obj match {
|
||||
case record: IndexedRecord =>
|
||||
record.getSchema.getFields.zipWithIndex.foreach { case (f, i) =>
|
||||
map.put(f.name, fromAvro(record.get(i), f.schema))
|
||||
}
|
||||
case other => throw new SparkException(
|
||||
s"Unsupported RECORD type ${other.getClass.getName}")
|
||||
}
|
||||
map
|
||||
}
|
||||
|
||||
def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = {
|
||||
obj.asInstanceOf[JMap[_, _]].map { case (key, value) =>
|
||||
(key.toString, fromAvro(value, schema.getValueType))
|
||||
}
|
||||
}
|
||||
|
||||
def unpackFixed(obj: Any, schema: Schema): Array[Byte] = {
|
||||
unpackBytes(obj.asInstanceOf[GenericFixed].bytes())
|
||||
}
|
||||
|
||||
def unpackBytes(obj: Any): Array[Byte] = {
|
||||
val bytes: Array[Byte] = obj match {
|
||||
case buf: java.nio.ByteBuffer => buf.array()
|
||||
case arr: Array[Byte] => arr
|
||||
case other => throw new SparkException(
|
||||
s"Unknown BYTES type ${other.getClass.getName}")
|
||||
}
|
||||
val bytearray = new Array[Byte](bytes.length)
|
||||
System.arraycopy(bytes, 0, bytearray, 0, bytes.length)
|
||||
bytearray
|
||||
}
|
||||
|
||||
def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match {
|
||||
case c: JCollection[_] =>
|
||||
c.map(fromAvro(_, schema.getElementType))
|
||||
case arr: Array[_] if arr.getClass.getComponentType.isPrimitive =>
|
||||
arr.toSeq
|
||||
case arr: Array[_] =>
|
||||
arr.map(fromAvro(_, schema.getElementType)).toSeq
|
||||
case other => throw new SparkException(
|
||||
s"Unknown ARRAY type ${other.getClass.getName}")
|
||||
}
|
||||
|
||||
def unpackUnion(obj: Any, schema: Schema): Any = {
|
||||
schema.getTypes.toList match {
|
||||
case List(s) => fromAvro(obj, s)
|
||||
case List(n, s) if n.getType == NULL => fromAvro(obj, s)
|
||||
case List(s, n) if n.getType == NULL => fromAvro(obj, s)
|
||||
case _ => throw new SparkException(
|
||||
"Unions may only consist of a concrete type and null")
|
||||
}
|
||||
}
|
||||
|
||||
def fromAvro(obj: Any, schema: Schema): Any = {
|
||||
if (obj == null) {
|
||||
return null
|
||||
}
|
||||
schema.getType match {
|
||||
case UNION => unpackUnion(obj, schema)
|
||||
case ARRAY => unpackArray(obj, schema)
|
||||
case FIXED => unpackFixed(obj, schema)
|
||||
case MAP => unpackMap(obj, schema)
|
||||
case BYTES => unpackBytes(obj)
|
||||
case RECORD => unpackRecord(obj)
|
||||
case STRING => obj.toString
|
||||
case ENUM => obj.toString
|
||||
case NULL => obj
|
||||
case BOOLEAN => obj
|
||||
case DOUBLE => obj
|
||||
case FLOAT => obj
|
||||
case INT => obj
|
||||
case LONG => obj
|
||||
case other => throw new SparkException(
|
||||
s"Unknown Avro schema type ${other.getName}")
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue