Skip to content

Commit 9c03854

Browse files
xuchuanyinjackylk
authored andcommitted
[CARBONDATA-1709][DataFrame] Support sort_columns option in dataframe writer
This PR adds SORT_COLUMNS option support in dataframe writer This closes apache#1496
1 parent 63afc00 commit 9c03854

File tree

3 files changed

+63
-3
lines changed

3 files changed

+63
-3
lines changed

integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala

+60-3
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import java.math.BigDecimal
2222

2323
import org.apache.spark.sql.test.util.QueryTest
2424
import org.apache.spark.sql.types._
25-
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
25+
import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, Row, SaveMode}
2626
import org.scalatest.BeforeAndAfterAll
2727

2828
class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
@@ -73,7 +73,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
7373
sql("DROP TABLE IF EXISTS carbon8")
7474
sql("DROP TABLE IF EXISTS carbon9")
7575
sql("DROP TABLE IF EXISTS carbon10")
76-
76+
sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
77+
sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
78+
sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
7779
}
7880

7981

@@ -236,13 +238,68 @@ test("test the boolean data type"){
236238
sql("select count(*) from carbon10 where c3 > 500"), Row(500)
237239
)
238240
sql("drop table carbon10")
239-
assert(! new File(path).exists())
241+
assert(!new File(path).exists())
240242
assert(intercept[AnalysisException](
241243
sql("select count(*) from carbon10 where c3 > 500"))
242244
.message
243245
.contains("not found"))
244246
}
245247

248+
private def getSortColumnValue(tableName: String): Array[String] = {
249+
val desc = sql(s"desc formatted $tableName")
250+
val sortColumnRow = desc.collect.find(r =>
251+
r(0).asInstanceOf[String].trim.equalsIgnoreCase("SORT_COLUMNS")
252+
)
253+
assert(sortColumnRow.isDefined)
254+
sortColumnRow.get.get(1).asInstanceOf[String].split(",")
255+
.map(_.trim.toLowerCase).filter(_.length > 0)
256+
}
257+
258+
private def getDefaultWriter(tableName: String): DataFrameWriter[Row] = {
259+
df2.write
260+
.format("carbondata")
261+
.option("tableName", tableName)
262+
.option("tempCSV", "false")
263+
.option("single_pass", "false")
264+
.option("table_blocksize", "256")
265+
.option("compress", "false")
266+
.mode(SaveMode.Overwrite)
267+
}
268+
269+
test("test load dataframe with sort_columns not specified," +
270+
" by default all string columns will be sort_columns") {
271+
// all string column will be sort_columns by default
272+
getDefaultWriter("df_write_sort_column_not_specified").save()
273+
checkAnswer(
274+
sql("select count(*) from df_write_sort_column_not_specified where c3 > 500"), Row(500)
275+
)
276+
277+
val sortColumnValue = getSortColumnValue("df_write_sort_column_not_specified")
278+
assert(sortColumnValue.sameElements(Array("c1", "c2")))
279+
}
280+
281+
test("test load dataframe with sort_columns specified") {
282+
// only specify c1 as sort_columns
283+
getDefaultWriter("df_write_specify_sort_column").option("sort_columns", "c1").save()
284+
checkAnswer(
285+
sql("select count(*) from df_write_specify_sort_column where c3 > 500"), Row(500)
286+
)
287+
288+
val sortColumnValue = getSortColumnValue("df_write_specify_sort_column")
289+
assert(sortColumnValue.sameElements(Array("c1")))
290+
}
291+
292+
test("test load dataframe with sort_columns specified empty") {
293+
// specify empty sort_column
294+
getDefaultWriter("df_write_empty_sort_column").option("sort_columns", "").save()
295+
checkAnswer(
296+
sql("select count(*) from df_write_empty_sort_column where c3 > 500"), Row(500)
297+
)
298+
299+
val sortColumnValue = getSortColumnValue("df_write_empty_sort_column")
300+
assert(sortColumnValue.isEmpty)
301+
}
302+
246303
override def afterAll {
247304
dropTable
248305
}

integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala

+2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class CarbonOption(options: Map[String, String]) {
4242

4343
def singlePass: Boolean = options.getOrElse("single_pass", "false").toBoolean
4444

45+
def sortColumns: Option[String] = options.get("sort_columns")
46+
4547
def dictionaryInclude: Option[String] = options.get("dictionary_include")
4648

4749
def dictionaryExclude: Option[String] = options.get("dictionary_exclude")

integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala

+1
Original file line numberDiff line numberDiff line change
@@ -168,6 +168,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
168168
s"${ field.name } ${ convertToCarbonType(field.dataType) }"
169169
}
170170
val property = Map(
171+
"SORT_COLUMNS" -> options.sortColumns,
171172
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
172173
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
173174
"TABLE_BLOCKSIZE" -> options.tableBlockSize

0 commit comments

Comments
 (0)