[SPARK-16271][SQL] Implement Hive's UDFXPathUtil
## What changes were proposed in this pull request? This patch ports Hive's UDFXPathUtil over to Spark, which can be used to implement xpath functionality in Spark in the near future. ## How was this patch tested? Added two new test suites UDFXPathUtilSuite and ReusableStringReaderSuite. They have been ported over from Hive (but rewritten in Scala in order to leverage ScalaTest). Author: petermaxlee <petermaxlee@gmail.com> Closes #13961 from petermaxlee/xpath.
This commit is contained in:
parent
0df5ce1bc1
commit
153c2f9ac1
|
@ -0,0 +1,192 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.catalyst.expressions.xml;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.Reader;
|
||||||
|
import java.io.StringReader;
|
||||||
|
|
||||||
|
import javax.xml.namespace.QName;
|
||||||
|
import javax.xml.xpath.XPath;
|
||||||
|
import javax.xml.xpath.XPathConstants;
|
||||||
|
import javax.xml.xpath.XPathExpression;
|
||||||
|
import javax.xml.xpath.XPathExpressionException;
|
||||||
|
import javax.xml.xpath.XPathFactory;
|
||||||
|
|
||||||
|
import org.w3c.dom.Node;
|
||||||
|
import org.w3c.dom.NodeList;
|
||||||
|
import org.xml.sax.InputSource;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility class for all XPath UDFs. Each UDF instance should keep an instance of this class.
|
||||||
|
*
|
||||||
|
* This is based on Hive's UDFXPathUtil implementation.
|
||||||
|
*/
|
||||||
|
public class UDFXPathUtil {
|
||||||
|
private XPath xpath = XPathFactory.newInstance().newXPath();
|
||||||
|
private ReusableStringReader reader = new ReusableStringReader();
|
||||||
|
private InputSource inputSource = new InputSource(reader);
|
||||||
|
private XPathExpression expression = null;
|
||||||
|
private String oldPath = null;
|
||||||
|
|
||||||
|
public Object eval(String xml, String path, QName qname) {
|
||||||
|
if (xml == null || path == null || qname == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (xml.length() == 0 || path.length() == 0) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!path.equals(oldPath)) {
|
||||||
|
try {
|
||||||
|
expression = xpath.compile(path);
|
||||||
|
} catch (XPathExpressionException e) {
|
||||||
|
expression = null;
|
||||||
|
}
|
||||||
|
oldPath = path;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (expression == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
reader.set(xml);
|
||||||
|
|
||||||
|
try {
|
||||||
|
return expression.evaluate(inputSource, qname);
|
||||||
|
} catch (XPathExpressionException e) {
|
||||||
|
throw new RuntimeException ("Invalid expression '" + oldPath + "'", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public Boolean evalBoolean(String xml, String path) {
|
||||||
|
return (Boolean) eval(xml, path, XPathConstants.BOOLEAN);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String evalString(String xml, String path) {
|
||||||
|
return (String) eval(xml, path, XPathConstants.STRING);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Double evalNumber(String xml, String path) {
|
||||||
|
return (Double) eval(xml, path, XPathConstants.NUMBER);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Node evalNode(String xml, String path) {
|
||||||
|
return (Node) eval(xml, path, XPathConstants.NODE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NodeList evalNodeList(String xml, String path) {
|
||||||
|
return (NodeList) eval(xml, path, XPathConstants.NODESET);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reusable, non-threadsafe version of {@link StringReader}.
|
||||||
|
*/
|
||||||
|
public static class ReusableStringReader extends Reader {
|
||||||
|
|
||||||
|
private String str = null;
|
||||||
|
private int length = -1;
|
||||||
|
private int next = 0;
|
||||||
|
private int mark = 0;
|
||||||
|
|
||||||
|
public ReusableStringReader() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public void set(String s) {
|
||||||
|
this.str = s;
|
||||||
|
this.length = s.length();
|
||||||
|
this.mark = 0;
|
||||||
|
this.next = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Check to make sure that the stream has not been closed */
|
||||||
|
private void ensureOpen() throws IOException {
|
||||||
|
if (str == null)
|
||||||
|
throw new IOException("Stream closed");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read() throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
if (next >= length)
|
||||||
|
return -1;
|
||||||
|
return str.charAt(next++);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int read(char cbuf[], int off, int len) throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
if ((off < 0) || (off > cbuf.length) || (len < 0)
|
||||||
|
|| ((off + len) > cbuf.length) || ((off + len) < 0)) {
|
||||||
|
throw new IndexOutOfBoundsException();
|
||||||
|
} else if (len == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
if (next >= length)
|
||||||
|
return -1;
|
||||||
|
int n = Math.min(length - next, len);
|
||||||
|
str.getChars(next, next + n, cbuf, off);
|
||||||
|
next += n;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long skip(long ns) throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
if (next >= length)
|
||||||
|
return 0;
|
||||||
|
// Bound skip by beginning and end of the source
|
||||||
|
long n = Math.min(length - next, ns);
|
||||||
|
n = Math.max(-next, n);
|
||||||
|
next += n;
|
||||||
|
return n;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean ready() throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean markSupported() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void mark(int readAheadLimit) throws IOException {
|
||||||
|
if (readAheadLimit < 0) {
|
||||||
|
throw new IllegalArgumentException("Read-ahead limit < 0");
|
||||||
|
}
|
||||||
|
ensureOpen();
|
||||||
|
mark = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() throws IOException {
|
||||||
|
ensureOpen();
|
||||||
|
next = mark;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() {
|
||||||
|
str = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,103 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.catalyst.expressions.xml
|
||||||
|
|
||||||
|
import java.io.IOException
|
||||||
|
|
||||||
|
import org.apache.spark.SparkFunSuite
|
||||||
|
import org.apache.spark.sql.catalyst.expressions.xml.UDFXPathUtil.ReusableStringReader
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for [[UDFXPathUtil.ReusableStringReader]].
|
||||||
|
*
|
||||||
|
* Loosely based on Hive's TestReusableStringReader.java.
|
||||||
|
*/
|
||||||
|
class ReusableStringReaderSuite extends SparkFunSuite {
|
||||||
|
|
||||||
|
private val fox = "Quick brown fox jumps over the lazy dog."
|
||||||
|
|
||||||
|
test("empty reader") {
|
||||||
|
val reader = new ReusableStringReader
|
||||||
|
|
||||||
|
intercept[IOException] {
|
||||||
|
reader.read()
|
||||||
|
}
|
||||||
|
|
||||||
|
intercept[IOException] {
|
||||||
|
reader.ready()
|
||||||
|
}
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
test("mark reset") {
|
||||||
|
val reader = new ReusableStringReader
|
||||||
|
|
||||||
|
if (reader.markSupported()) {
|
||||||
|
reader.asInstanceOf[ReusableStringReader].set(fox)
|
||||||
|
assert(reader.ready())
|
||||||
|
|
||||||
|
val cc = new Array[Char](6)
|
||||||
|
var read = reader.read(cc)
|
||||||
|
assert(read == 6)
|
||||||
|
assert("Quick " == new String(cc))
|
||||||
|
|
||||||
|
reader.mark(100)
|
||||||
|
|
||||||
|
read = reader.read(cc)
|
||||||
|
assert(read == 6)
|
||||||
|
assert("brown " == new String(cc))
|
||||||
|
|
||||||
|
reader.reset()
|
||||||
|
read = reader.read(cc)
|
||||||
|
assert(read == 6)
|
||||||
|
assert("brown " == new String(cc))
|
||||||
|
}
|
||||||
|
reader.close()
|
||||||
|
}
|
||||||
|
|
||||||
|
test("skip") {
|
||||||
|
val reader = new ReusableStringReader
|
||||||
|
reader.asInstanceOf[ReusableStringReader].set(fox)
|
||||||
|
|
||||||
|
// skip entire the data:
|
||||||
|
var skipped = reader.skip(fox.length() + 1)
|
||||||
|
assert(fox.length() == skipped)
|
||||||
|
assert(-1 == reader.read())
|
||||||
|
|
||||||
|
reader.asInstanceOf[ReusableStringReader].set(fox) // reset the data
|
||||||
|
val cc = new Array[Char](6)
|
||||||
|
var read = reader.read(cc)
|
||||||
|
assert(read == 6)
|
||||||
|
assert("Quick " == new String(cc))
|
||||||
|
|
||||||
|
// skip some piece of data:
|
||||||
|
skipped = reader.skip(30)
|
||||||
|
assert(skipped == 30)
|
||||||
|
read = reader.read(cc)
|
||||||
|
assert(read == 4)
|
||||||
|
assert("dog." == new String(cc, 0, read))
|
||||||
|
|
||||||
|
// skip when already at EOF:
|
||||||
|
skipped = reader.skip(300)
|
||||||
|
assert(skipped == 0, skipped)
|
||||||
|
assert(reader.read() == -1)
|
||||||
|
|
||||||
|
reader.close()
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,99 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.spark.sql.catalyst.expressions.xml
|
||||||
|
|
||||||
|
import javax.xml.xpath.XPathConstants.STRING
|
||||||
|
|
||||||
|
import org.w3c.dom.Node
|
||||||
|
import org.w3c.dom.NodeList
|
||||||
|
|
||||||
|
import org.apache.spark.SparkFunSuite
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unit tests for [[UDFXPathUtil]]. Loosely based on Hive's TestUDFXPathUtil.java.
|
||||||
|
*/
|
||||||
|
class UDFXPathUtilSuite extends SparkFunSuite {
|
||||||
|
|
||||||
|
private lazy val util = new UDFXPathUtil
|
||||||
|
|
||||||
|
test("illegal arguments") {
|
||||||
|
// null args
|
||||||
|
assert(util.eval(null, "a/text()", STRING) == null)
|
||||||
|
assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", null, STRING) == null)
|
||||||
|
assert(
|
||||||
|
util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text()", null) == null)
|
||||||
|
|
||||||
|
// empty String args
|
||||||
|
assert(util.eval("", "a/text()", STRING) == null)
|
||||||
|
assert(util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "", STRING) == null)
|
||||||
|
|
||||||
|
// wrong expression:
|
||||||
|
assert(
|
||||||
|
util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/text(", STRING) == null)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("generic eval") {
|
||||||
|
val ret =
|
||||||
|
util.eval("<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/c[2]/text()", STRING)
|
||||||
|
assert(ret == "c2")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("boolean eval") {
|
||||||
|
var ret =
|
||||||
|
util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[1]/text()")
|
||||||
|
assert(ret == true)
|
||||||
|
|
||||||
|
ret = util.evalBoolean("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[4]")
|
||||||
|
assert(ret == false)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("string eval") {
|
||||||
|
var ret =
|
||||||
|
util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[3]/text()")
|
||||||
|
assert(ret == "b3")
|
||||||
|
|
||||||
|
ret =
|
||||||
|
util.evalString("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[4]/text()")
|
||||||
|
assert(ret == "")
|
||||||
|
|
||||||
|
ret = util.evalString(
|
||||||
|
"<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[2]/@k")
|
||||||
|
assert(ret == "foo")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("number eval") {
|
||||||
|
var ret =
|
||||||
|
util.evalNumber("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/c[2]")
|
||||||
|
assert(ret == -77.0d)
|
||||||
|
|
||||||
|
ret = util.evalNumber(
|
||||||
|
"<a><b>true</b><b k=\"foo\">FALSE</b><b>b3</b><c>c1</c><c>c2</c></a>", "a/b[2]/@k")
|
||||||
|
assert(ret.isNaN)
|
||||||
|
}
|
||||||
|
|
||||||
|
test("node eval") {
|
||||||
|
val ret = util.evalNode("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/c[2]")
|
||||||
|
assert(ret != null && ret.isInstanceOf[Node])
|
||||||
|
}
|
||||||
|
|
||||||
|
test("node list eval") {
|
||||||
|
val ret = util.evalNodeList("<a><b>true</b><b>false</b><b>b3</b><c>c1</c><c>-77</c></a>", "a/*")
|
||||||
|
assert(ret != null && ret.isInstanceOf[NodeList])
|
||||||
|
assert(ret.asInstanceOf[NodeList].getLength == 5)
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue