[SPARK-15898][SQL] DataFrameReader.text should return DataFrame

## What changes were proposed in this pull request?

We want to maintain API compatibility for DataFrameReader.text, and will introduce a new API called DataFrameReader.textFile which returns Dataset[String].

affected PRs:
https://github.com/apache/spark/pull/11731
https://github.com/apache/spark/pull/13104
https://github.com/apache/spark/pull/13184

## How was this patch tested?

N/A

Author: Wenchen Fan <wenchen@databricks.com>

Closes #13604 from cloud-fan/revert.
This commit is contained in:
Wenchen Fan 2016-06-12 21:36:41 -07:00 committed by Reynold Xin
parent 1f8f2b5c2a
commit e2ab79d5ea
15 changed files with 57 additions and 39 deletions

View file

@ -364,9 +364,10 @@ parquetFile <- function(x, ...) {
#' Create a SparkDataFrame from a text file.
#'
#' Loads a text file and returns a SparkDataFrame with a single string column named "value".
#' If the directory structure of the text files contains partitioning information, those are
#' ignored in the resulting DataFrame.
#' Loads text files and returns a SparkDataFrame whose schema starts with
#' a string column named "value", and followed by partitioned columns if
#' there are any.
#'
#' Each line in the text file is a new row in the resulting SparkDataFrame.
#'
#' @param path Path of file to read. A vector of multiple paths is allowed.

View file

@ -126,7 +126,7 @@ public final class JavaHdfsLR {
.appName("JavaHdfsLR")
.getOrCreate();
JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[1]);

View file

@ -82,7 +82,7 @@ public final class JavaPageRank {
// URL neighbor URL
// URL neighbor URL
// ...
JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(

View file

@ -46,7 +46,7 @@ public final class JavaWordCount {
.appName("JavaWordCount")
.getOrCreate();
JavaRDD<String> lines = spark.read().text(args[0]).javaRDD();
JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override

View file

@ -87,7 +87,7 @@ public class JavaALSExample {
// $example on$
JavaRDD<Rating> ratingsRDD = spark
.read().text("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.read().textFile("data/mllib/als/sample_movielens_ratings.txt").javaRDD()
.map(new Function<String, Rating>() {
public Rating call(String str) {
return Rating.parseRating(str);

View file

@ -59,7 +59,7 @@ public class JavaSparkSQL {
System.out.println("=== Data source: RDD ===");
// Load a text file and convert each line to a Java Bean.
String file = "examples/src/main/resources/people.txt";
JavaRDD<Person> people = spark.read().text(file).javaRDD().map(
JavaRDD<Person> people = spark.read().textFile(file).javaRDD().map(
new Function<String, Person>() {
@Override
public Person call(String line) {

View file

@ -72,7 +72,7 @@ object SparkHdfsLR {
.getOrCreate()
val inputPath = args(0)
val lines = spark.read.text(inputPath).rdd
val lines = spark.read.textFile(inputPath).rdd
val points = lines.map(parsePoint).cache()
val ITERATIONS = args(1).toInt

View file

@ -71,7 +71,7 @@ object SparkKMeans {
.appName("SparkKMeans")
.getOrCreate()
val lines = spark.read.text(args(0)).rdd
val lines = spark.read.textFile(args(0)).rdd
val data = lines.map(parseVector _).cache()
val K = args(1).toInt
val convergeDist = args(2).toDouble

View file

@ -56,7 +56,7 @@ object SparkPageRank {
.getOrCreate()
val iters = if (args.length > 1) args(1).toInt else 10
val lines = spark.read.text(args(0)).rdd
val lines = spark.read.textFile(args(0)).rdd
val links = lines.map{ s =>
val parts = s.split("\\s+")
(parts(0), parts(1))

View file

@ -50,7 +50,7 @@ object ALSExample {
import spark.implicits._
// $example on$
val ratings = spark.read.text("data/mllib/als/sample_movielens_ratings.txt")
val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt")
.map(parseRating)
.toDF()
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

View file

@ -33,7 +33,7 @@ object RankingMetricsExample {
import spark.implicits._
// $example on$
// Read in the ratings data
val ratings = spark.read.text("data/mllib/sample_movielens_data.txt").rdd.map { line =>
val ratings = spark.read.textFile("data/mllib/sample_movielens_data.txt").rdd.map { line =>
val fields = line.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble - 2.5)
}.cache()

View file

@ -291,10 +291,10 @@ class DataFrameReader(object):
@ignore_unicode_prefix
@since(1.6)
def text(self, paths):
"""Loads a text file and returns a :class:`DataFrame` with a single string column named "value".
If the directory structure of the text files contains partitioning information,
those are ignored in the resulting DataFrame. To include partitioning information as
columns, use ``read.format('text').load(...)``.
"""
Loads text files and returns a :class:`DataFrame` whose schema starts with a
string column named "value", and followed by partitioned columns if there
are any.
Each line in the text file is a new row in the resulting DataFrame.

View file

@ -450,14 +450,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
}
/**
* Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
* contains a single string column named "value".
* Loads text files and returns a [[DataFrame]] whose schema starts with a string column named
* "value", and followed by partitioned columns if there are any.
*
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use
* `read.format("text").load("...")`.
*
* Each line in the text files is a new element in the resulting Dataset. For example:
* Each line in the text files is a new row in the resulting DataFrame. For example:
* {{{
* // Scala:
* spark.read.text("/path/to/spark/README.md")
@ -467,12 +463,33 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* }}}
*
* @param paths input path
* @since 1.6.0
*/
@scala.annotation.varargs
def text(paths: String*): DataFrame = format("text").load(paths : _*)
/**
* Loads text files and returns a [[Dataset]] of String. The underlying schema of the Dataset
* contains a single string column named "value".
*
* If the directory structure of the text files contains partitioning information, those are
* ignored in the resulting Dataset. To include partitioning information as columns, use `text`.
*
* Each line in the text files is a new element in the resulting Dataset. For example:
* {{{
* // Scala:
* spark.read.textFile("/path/to/spark/README.md")
*
* // Java:
* spark.read().textFile("/path/to/spark/README.md")
* }}}
*
* @param paths input path
* @since 2.0.0
*/
@scala.annotation.varargs
def text(paths: String*): Dataset[String] = {
format("text").load(paths : _*).select("value")
.as[String](sparkSession.implicits.newStringEncoder)
def textFile(paths: String*): Dataset[String] = {
text(paths : _*).select("value").as[String](sparkSession.implicits.newStringEncoder)
}
///////////////////////////////////////////////////////////////////////////////////////

View file

@ -338,10 +338,10 @@ public class JavaDataFrameSuite {
@Test
public void testTextLoad() {
Dataset<String> ds1 = spark.read().text(getResource("text-suite.txt"));
Dataset<String> ds1 = spark.read().textFile(getResource("text-suite.txt"));
Assert.assertEquals(4L, ds1.count());
Dataset<String> ds2 = spark.read().text(
Dataset<String> ds2 = spark.read().textFile(
getResource("text-suite.txt"),
getResource("text-suite2.txt"));
Assert.assertEquals(5L, ds2.count());

View file

@ -36,7 +36,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
test("SQLContext.read.text() API") {
verifyFrame(spark.read.text(testFile).toDF())
verifyFrame(spark.read.text(testFile))
}
test("SPARK-12562 verify write.text() can handle column name beyond `value`") {
@ -45,7 +45,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
val tempFile = Utils.createTempDir()
tempFile.delete()
df.write.text(tempFile.getCanonicalPath)
verifyFrame(spark.read.text(tempFile.getCanonicalPath).toDF())
verifyFrame(spark.read.text(tempFile.getCanonicalPath))
Utils.deleteRecursively(tempFile)
}
@ -64,20 +64,20 @@ class TextSuite extends QueryTest with SharedSQLContext {
}
}
test("reading partitioned data using read.text()") {
test("reading partitioned data using read.textFile()") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
val df = spark.read.text(partitionedData)
val data = df.collect()
val ds = spark.read.textFile(partitionedData)
val data = ds.collect()
assert(df.schema == new StructType().add("value", StringType))
assert(ds.schema == new StructType().add("value", StringType))
assert(data.length == 2)
}
test("support for partitioned reading") {
test("support for partitioned reading using read.text()") {
val partitionedData = Thread.currentThread().getContextClassLoader
.getResource("text-partitioned").toString
val df = spark.read.format("text").load(partitionedData)
val df = spark.read.text(partitionedData)
val data = df.filter("year = '2015'").select("value").collect()
assert(data(0) == Row("2015-test"))
@ -94,7 +94,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
testDf.write.option("compression", codecName).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(_.getName.endsWith(s".txt$extension")))
verifyFrame(spark.read.text(tempDirPath).toDF())
verifyFrame(spark.read.text(tempDirPath))
}
val errMsg = intercept[IllegalArgumentException] {
@ -121,7 +121,7 @@ class TextSuite extends QueryTest with SharedSQLContext {
.options(extraOptions).mode(SaveMode.Overwrite).text(tempDirPath)
val compressedFiles = new File(tempDirPath).listFiles()
assert(compressedFiles.exists(!_.getName.endsWith(".txt.gz")))
verifyFrame(spark.read.options(extraOptions).text(tempDirPath).toDF())
verifyFrame(spark.read.options(extraOptions).text(tempDirPath))
}
}