[SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API.
Author: Reynold Xin <rxin@databricks.com> Closes #6211 from rxin/mllib-reader and squashes the following commits: 79a2cb9 [Reynold Xin] [SPARK-7654][MLlib] Migrate MLlib to the DataFrame reader/writer API.
This commit is contained in:
parent
1b4e710e5c
commit
161d0b4a41
|
@ -103,7 +103,7 @@ object DatasetExample {
|
||||||
tmpDir.deleteOnExit()
|
tmpDir.deleteOnExit()
|
||||||
val outputDir = new File(tmpDir, "dataset").toString
|
val outputDir = new File(tmpDir, "dataset").toString
|
||||||
println(s"Saving to $outputDir as Parquet file.")
|
println(s"Saving to $outputDir as Parquet file.")
|
||||||
df.saveAsParquetFile(outputDir)
|
df.write.parquet(outputDir)
|
||||||
|
|
||||||
println(s"Loading Parquet file with UDT from $outputDir.")
|
println(s"Loading Parquet file with UDT from $outputDir.")
|
||||||
val newDataset = sqlContext.read.parquet(outputDir)
|
val newDataset = sqlContext.read.parquet(outputDir)
|
||||||
|
|
|
@ -58,7 +58,7 @@ object RDDRelation {
|
||||||
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
|
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
|
||||||
|
|
||||||
// Write out an RDD as a parquet file.
|
// Write out an RDD as a parquet file.
|
||||||
df.saveAsParquetFile("pair.parquet")
|
df.write.parquet("pair.parquet")
|
||||||
|
|
||||||
// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
|
// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
|
||||||
val parquetFile = sqlContext.read.parquet("pair.parquet")
|
val parquetFile = sqlContext.read.parquet("pair.parquet")
|
||||||
|
|
|
@ -140,7 +140,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
|
||||||
|
|
||||||
// Create Parquet data.
|
// Create Parquet data.
|
||||||
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
||||||
dataRDD.saveAsParquetFile(dataPath(path))
|
dataRDD.write.parquet(dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): NaiveBayesModel = {
|
def load(sc: SparkContext, path: String): NaiveBayesModel = {
|
||||||
|
@ -186,7 +186,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
|
||||||
|
|
||||||
// Create Parquet data.
|
// Create Parquet data.
|
||||||
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
||||||
dataRDD.saveAsParquetFile(dataPath(path))
|
dataRDD.write.parquet(dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): NaiveBayesModel = {
|
def load(sc: SparkContext, path: String): NaiveBayesModel = {
|
||||||
|
|
|
@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {
|
||||||
|
|
||||||
// Create Parquet data.
|
// Create Parquet data.
|
||||||
val data = Data(weights, intercept, threshold)
|
val data = Data(weights, intercept, threshold)
|
||||||
sc.parallelize(Seq(data), 1).toDF().saveAsParquetFile(Loader.dataPath(path))
|
sc.parallelize(Seq(data), 1).toDF().write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -126,7 +126,7 @@ object GaussianMixtureModel extends Loader[GaussianMixtureModel] {
|
||||||
val dataArray = Array.tabulate(weights.length) { i =>
|
val dataArray = Array.tabulate(weights.length) { i =>
|
||||||
Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
|
Data(weights(i), gaussians(i).mu, gaussians(i).sigma)
|
||||||
}
|
}
|
||||||
sc.parallelize(dataArray, 1).toDF().saveAsParquetFile(Loader.dataPath(path))
|
sc.parallelize(dataArray, 1).toDF().write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
|
def load(sc: SparkContext, path: String): GaussianMixtureModel = {
|
||||||
|
|
|
@ -110,7 +110,7 @@ object KMeansModel extends Loader[KMeansModel] {
|
||||||
val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { case (point, id) =>
|
val dataRDD = sc.parallelize(model.clusterCenters.zipWithIndex).map { case (point, id) =>
|
||||||
Cluster(id, point)
|
Cluster(id, point)
|
||||||
}.toDF()
|
}.toDF()
|
||||||
dataRDD.saveAsParquetFile(Loader.dataPath(path))
|
dataRDD.write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): KMeansModel = {
|
def load(sc: SparkContext, path: String): KMeansModel = {
|
||||||
|
|
|
@ -74,7 +74,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
|
||||||
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
|
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
|
||||||
|
|
||||||
val dataRDD = model.assignments.toDF()
|
val dataRDD = model.assignments.toDF()
|
||||||
dataRDD.saveAsParquetFile(Loader.dataPath(path))
|
dataRDD.write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
|
def load(sc: SparkContext, path: String): PowerIterationClusteringModel = {
|
||||||
|
@ -86,7 +86,7 @@ object PowerIterationClusteringModel extends Loader[PowerIterationClusteringMode
|
||||||
assert(formatVersion == thisFormatVersion)
|
assert(formatVersion == thisFormatVersion)
|
||||||
|
|
||||||
val k = (metadata \ "k").extract[Int]
|
val k = (metadata \ "k").extract[Int]
|
||||||
val assignments = sqlContext.parquetFile(Loader.dataPath(path))
|
val assignments = sqlContext.read.parquet(Loader.dataPath(path))
|
||||||
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
|
Loader.checkSchema[PowerIterationClustering.Assignment](assignments.schema)
|
||||||
|
|
||||||
val assignmentsRDD = assignments.map {
|
val assignmentsRDD = assignments.map {
|
||||||
|
|
|
@ -580,7 +580,7 @@ object Word2VecModel extends Loader[Word2VecModel] {
|
||||||
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
|
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))
|
||||||
|
|
||||||
val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
|
val dataArray = model.toSeq.map { case (w, v) => Data(w, v) }
|
||||||
sc.parallelize(dataArray.toSeq, 1).toDF().saveAsParquetFile(Loader.dataPath(path))
|
sc.parallelize(dataArray.toSeq, 1).toDF().write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -281,8 +281,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
|
||||||
val metadata = compact(render(
|
val metadata = compact(render(
|
||||||
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
|
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
|
||||||
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
|
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
|
||||||
model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path))
|
model.userFeatures.toDF("id", "features").write.parquet(userPath(path))
|
||||||
model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path))
|
model.productFeatures.toDF("id", "features").write.parquet(productPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
|
def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
|
||||||
|
|
|
@ -184,7 +184,7 @@ object IsotonicRegressionModel extends Loader[IsotonicRegressionModel] {
|
||||||
|
|
||||||
sqlContext.createDataFrame(
|
sqlContext.createDataFrame(
|
||||||
boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
|
boundaries.toSeq.zip(predictions).map { case (b, p) => Data(b, p) }
|
||||||
).saveAsParquetFile(dataPath(path))
|
).write.parquet(dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
|
def load(sc: SparkContext, path: String): (Array[Double], Array[Double]) = {
|
||||||
|
|
|
@ -60,7 +60,7 @@ private[regression] object GLMRegressionModel {
|
||||||
val data = Data(weights, intercept)
|
val data = Data(weights, intercept)
|
||||||
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF()
|
||||||
// TODO: repartition with 1 partition after SPARK-5532 gets fixed
|
// TODO: repartition with 1 partition after SPARK-5532 gets fixed
|
||||||
dataRDD.saveAsParquetFile(Loader.dataPath(path))
|
dataRDD.write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -223,7 +223,7 @@ object DecisionTreeModel extends Loader[DecisionTreeModel] with Logging {
|
||||||
val dataRDD: DataFrame = sc.parallelize(nodes)
|
val dataRDD: DataFrame = sc.parallelize(nodes)
|
||||||
.map(NodeData.apply(0, _))
|
.map(NodeData.apply(0, _))
|
||||||
.toDF()
|
.toDF()
|
||||||
dataRDD.saveAsParquetFile(Loader.dataPath(path))
|
dataRDD.write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = {
|
def load(sc: SparkContext, path: String, algo: String, numNodes: Int): DecisionTreeModel = {
|
||||||
|
|
|
@ -414,7 +414,7 @@ private[tree] object TreeEnsembleModel extends Logging {
|
||||||
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
|
val dataRDD = sc.parallelize(model.trees.zipWithIndex).flatMap { case (tree, treeId) =>
|
||||||
tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
|
tree.topNode.subtreeIterator.toSeq.map(node => NodeData(treeId, node))
|
||||||
}.toDF()
|
}.toDF()
|
||||||
dataRDD.saveAsParquetFile(Loader.dataPath(path))
|
dataRDD.write.parquet(Loader.dataPath(path))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in a new issue