[SPARK-7280][SQL] Add "drop" column/s on a data frame
Takes a column name/s and returns a new DataFrame that drops a column/s. Author: rakeshchalasani <vnit.rakesh@gmail.com> Closes #5818 from rakeshchalasani/SPARK-7280 and squashes the following commits: ce2ec09 [rakeshchalasani] Minor edit 45c06f1 [rakeshchalasani] Change withColumnRename and format changes f68945a [rakeshchalasani] Minor fix 0b9104d [rakeshchalasani] Drop one column at a time 289afd2 [rakeshchalasani] [SPARK-7280][SQL] Add "drop" column/s on a data frame
This commit is contained in:
parent
149b3ee2da
commit
ee04413935
|
@ -851,15 +851,40 @@ class DataFrame private[sql](
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a new [[DataFrame]] with a column renamed.
|
* Returns a new [[DataFrame]] with a column renamed.
|
||||||
|
* This is a no-op if schema doesn't contain existingName.
|
||||||
* @group dfops
|
* @group dfops
|
||||||
*/
|
*/
|
||||||
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
|
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
|
||||||
val resolver = sqlContext.analyzer.resolver
|
val resolver = sqlContext.analyzer.resolver
|
||||||
|
val shouldRename = schema.exists(f => resolver(f.name, existingName))
|
||||||
|
if (shouldRename) {
|
||||||
val colNames = schema.map { field =>
|
val colNames = schema.map { field =>
|
||||||
val name = field.name
|
val name = field.name
|
||||||
if (resolver(name, existingName)) Column(name).as(newName) else Column(name)
|
if (resolver(name, existingName)) Column(name).as(newName) else Column(name)
|
||||||
}
|
}
|
||||||
select(colNames :_*)
|
select(colNames : _*)
|
||||||
|
} else {
|
||||||
|
this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a new [[DataFrame]] with a column dropped.
|
||||||
|
* This is a no-op if schema doesn't contain column name.
|
||||||
|
* @group dfops
|
||||||
|
*/
|
||||||
|
def drop(colName: String): DataFrame = {
|
||||||
|
val resolver = sqlContext.analyzer.resolver
|
||||||
|
val shouldDrop = schema.exists(f => resolver(f.name, colName))
|
||||||
|
if (shouldDrop) {
|
||||||
|
val colsAfterDrop = schema.filter { field =>
|
||||||
|
val name = field.name
|
||||||
|
!resolver(name, colName)
|
||||||
|
}.map(f => Column(f.name))
|
||||||
|
select(colsAfterDrop : _*)
|
||||||
|
} else {
|
||||||
|
this
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -499,6 +499,22 @@ class DataFrameSuite extends QueryTest {
|
||||||
Row(2) :: Row(3) :: Row(4) :: Nil)
|
Row(2) :: Row(3) :: Row(4) :: Nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("drop column using drop") {
|
||||||
|
val df = testData.drop("key")
|
||||||
|
checkAnswer(
|
||||||
|
df,
|
||||||
|
testData.collect().map(x => Row(x.getString(1))).toSeq)
|
||||||
|
assert(df.schema.map(_.name) === Seq("value"))
|
||||||
|
}
|
||||||
|
|
||||||
|
test("drop unknown column (no-op)") {
|
||||||
|
val df = testData.drop("random")
|
||||||
|
checkAnswer(
|
||||||
|
df,
|
||||||
|
testData.collect().toSeq)
|
||||||
|
assert(df.schema.map(_.name) === Seq("key","value"))
|
||||||
|
}
|
||||||
|
|
||||||
test("withColumnRenamed") {
|
test("withColumnRenamed") {
|
||||||
val df = testData.toDF().withColumn("newCol", col("key") + 1)
|
val df = testData.toDF().withColumn("newCol", col("key") + 1)
|
||||||
.withColumnRenamed("value", "valueRenamed")
|
.withColumnRenamed("value", "valueRenamed")
|
||||||
|
|
Loading…
Reference in a new issue