Skip to content

Commit 5936e7f

Browse files
QiangCaijackylk
authored andcommittedOct 30, 2017
[CARBONDATA-1628][Streaming] Re-factory LoadTableCommand to reuse code for streaming ingest in the future
Re-factory LoadTableCommand to reuse code for streaming ingest in the future This closes apache#1439
1 parent 4d70a21 commit 5936e7f

File tree

2 files changed

+452
-312
lines changed

2 files changed

+452
-312
lines changed
 
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,300 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.carbondata.spark.util
19+
20+
import scala.collection.immutable
21+
import scala.collection.JavaConverters._
22+
import scala.collection.mutable
23+
24+
import org.apache.commons.lang3.StringUtils
25+
26+
import org.apache.carbondata.common.constants.LoggerAction
27+
import org.apache.carbondata.common.logging.LogServiceFactory
28+
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
29+
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
30+
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
31+
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
32+
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
33+
import org.apache.carbondata.processing.util.TableOptionConstant
34+
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
35+
import org.apache.carbondata.spark.load.ValidateUtil
36+
37+
/**
38+
* the util object of data loading
39+
*/
40+
object DataLoadingUtil {
41+
42+
/**
43+
* get data loading options and initialise default value
44+
*/
45+
def getDataLoadingOptions(
46+
carbonProperty: CarbonProperties,
47+
options: immutable.Map[String, String]): mutable.Map[String, String] = {
48+
val optionsFinal = scala.collection.mutable.Map[String, String]()
49+
optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
50+
optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
51+
optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
52+
optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
53+
optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
54+
optionsFinal.put("columndict", options.getOrElse("columndict", null))
55+
56+
optionsFinal.put(
57+
"serialization_null_format",
58+
options.getOrElse("serialization_null_format", "\\N"))
59+
60+
optionsFinal.put(
61+
"bad_records_logger_enable",
62+
options.getOrElse(
63+
"bad_records_logger_enable",
64+
carbonProperty.getProperty(
65+
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
66+
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
67+
68+
val badRecordActionValue = carbonProperty.getProperty(
69+
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
70+
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
71+
72+
optionsFinal.put(
73+
"bad_records_action",
74+
options.getOrElse(
75+
"bad_records_action",
76+
carbonProperty.getProperty(
77+
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
78+
badRecordActionValue)))
79+
80+
optionsFinal.put(
81+
"is_empty_data_bad_record",
82+
options.getOrElse(
83+
"is_empty_data_bad_record",
84+
carbonProperty.getProperty(
85+
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
86+
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
87+
88+
optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
89+
90+
optionsFinal.put(
91+
"complex_delimiter_level_1",
92+
options.getOrElse("complex_delimiter_level_1", "\\$"))
93+
94+
optionsFinal.put(
95+
"complex_delimiter_level_2",
96+
options.getOrElse("complex_delimiter_level_2", "\\:"))
97+
98+
optionsFinal.put(
99+
"dateformat",
100+
options.getOrElse(
101+
"dateformat",
102+
carbonProperty.getProperty(
103+
CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
104+
CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
105+
106+
optionsFinal.put(
107+
"global_sort_partitions",
108+
options.getOrElse(
109+
"global_sort_partitions",
110+
carbonProperty.getProperty(
111+
CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS,
112+
null)))
113+
114+
optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
115+
116+
optionsFinal.put(
117+
"batch_sort_size_inmb",
118+
options.getOrElse(
119+
"batch_sort_size_inmb",
120+
carbonProperty.getProperty(
121+
CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
122+
carbonProperty.getProperty(
123+
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
124+
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
125+
126+
optionsFinal.put(
127+
"bad_record_path",
128+
options.getOrElse(
129+
"bad_record_path",
130+
carbonProperty.getProperty(
131+
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
132+
carbonProperty.getProperty(
133+
CarbonCommonConstants.CARBON_BADRECORDS_LOC,
134+
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
135+
136+
val useOnePass = options.getOrElse(
137+
"single_pass",
138+
carbonProperty.getProperty(
139+
CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
140+
CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
141+
case "true" =>
142+
true
143+
case "false" =>
144+
// when single_pass = false and if either alldictionarypath
145+
// or columnDict is configured the do not allow load
146+
if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
147+
StringUtils.isNotEmpty(optionsFinal("columndict"))) {
148+
throw new MalformedCarbonCommandException(
149+
"Can not use all_dictionary_path or columndict without single_pass.")
150+
} else {
151+
false
152+
}
153+
case illegal =>
154+
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
155+
LOGGER.error(s"Can't use single_pass, because illegal syntax found: [$illegal] " +
156+
"Please set it as 'true' or 'false'")
157+
false
158+
}
159+
optionsFinal.put("single_pass", useOnePass.toString)
160+
optionsFinal
161+
}
162+
163+
/**
164+
* check whether using default value or not
165+
*/
166+
private def checkDefaultValue(value: String, default: String) = {
167+
if (StringUtils.isEmpty(value)) {
168+
default
169+
} else {
170+
value
171+
}
172+
}
173+
174+
/**
175+
* build CarbonLoadModel for data loading
176+
*/
177+
def buildCarbonLoadModel(
178+
table: CarbonTable,
179+
carbonProperty: CarbonProperties,
180+
options: immutable.Map[String, String],
181+
optionsFinal: mutable.Map[String, String],
182+
carbonLoadModel: CarbonLoadModel): Unit = {
183+
carbonLoadModel.setTableName(table.getFactTableName)
184+
carbonLoadModel.setDatabaseName(table.getDatabaseName)
185+
carbonLoadModel.setStorePath(table.getStorePath)
186+
carbonLoadModel.setTableName(table.getFactTableName)
187+
val dataLoadSchema = new CarbonDataLoadSchema(table)
188+
// Need to fill dimension relation
189+
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
190+
val sort_scope = optionsFinal("sort_scope")
191+
val single_pass = optionsFinal("single_pass")
192+
val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
193+
val bad_records_action = optionsFinal("bad_records_action")
194+
val bad_record_path = optionsFinal("bad_record_path")
195+
val global_sort_partitions = optionsFinal("global_sort_partitions")
196+
val dateFormat = optionsFinal("dateformat")
197+
val delimeter = optionsFinal("delimiter")
198+
val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
199+
val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
200+
val all_dictionary_path = optionsFinal("all_dictionary_path")
201+
val column_dict = optionsFinal("columndict")
202+
ValidateUtil.validateDateFormat(dateFormat, table, table.getFactTableName)
203+
ValidateUtil.validateSortScope(table, sort_scope)
204+
205+
if (bad_records_logger_enable.toBoolean ||
206+
LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
207+
if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
208+
sys.error("Invalid bad records location.")
209+
}
210+
}
211+
carbonLoadModel.setBadRecordsLocation(bad_record_path)
212+
213+
ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
214+
carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
215+
carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
216+
carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
217+
218+
// if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
219+
// we should use table schema to generate file header.
220+
var fileHeader = optionsFinal("fileheader")
221+
val headerOption = options.get("header")
222+
if (headerOption.isDefined) {
223+
// whether the csv file has file header
224+
// the default value is true
225+
val header = try {
226+
headerOption.get.toBoolean
227+
} catch {
228+
case ex: IllegalArgumentException =>
229+
throw new MalformedCarbonCommandException(
230+
"'header' option should be either 'true' or 'false'. " + ex.getMessage)
231+
}
232+
if (header) {
233+
if (fileHeader.nonEmpty) {
234+
throw new MalformedCarbonCommandException(
235+
"When 'header' option is true, 'fileheader' option is not required.")
236+
}
237+
} else {
238+
if (fileHeader.isEmpty) {
239+
fileHeader = table.getCreateOrderColumn(table.getFactTableName)
240+
.asScala.map(_.getColName).mkString(",")
241+
}
242+
}
243+
}
244+
245+
carbonLoadModel.setDateFormat(dateFormat)
246+
carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
247+
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
248+
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
249+
250+
carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
251+
CarbonCommonConstants.CARBON_DATE_FORMAT,
252+
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
253+
254+
carbonLoadModel.setSerializationNullFormat(
255+
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
256+
optionsFinal("serialization_null_format"))
257+
258+
carbonLoadModel.setBadRecordsLoggerEnable(
259+
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
260+
261+
carbonLoadModel.setBadRecordsAction(
262+
TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
263+
264+
carbonLoadModel.setIsEmptyDataBadRecord(
265+
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
266+
optionsFinal("is_empty_data_bad_record"))
267+
268+
carbonLoadModel.setSortScope(sort_scope)
269+
carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
270+
carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
271+
carbonLoadModel.setUseOnePass(single_pass.toBoolean)
272+
273+
if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
274+
complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
275+
delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
276+
sys.error(s"Field Delimiter & Complex types delimiter are same")
277+
} else {
278+
carbonLoadModel.setComplexDelimiterLevel1(
279+
CarbonUtil.delimiterConverter(complex_delimeter_level1))
280+
carbonLoadModel.setComplexDelimiterLevel2(
281+
CarbonUtil.delimiterConverter(complex_delimeter_level2))
282+
}
283+
// set local dictionary path, and dictionary file extension
284+
carbonLoadModel.setAllDictPath(all_dictionary_path)
285+
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
286+
carbonLoadModel.setCsvHeader(fileHeader)
287+
carbonLoadModel.setColDictFilePath(column_dict)
288+
carbonLoadModel.setDirectLoad(true)
289+
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
290+
291+
val validatedMaxColumns = CommonUtil.validateMaxColumns(
292+
carbonLoadModel.getCsvHeaderColumns,
293+
optionsFinal("maxcolumns"))
294+
295+
carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
296+
if (null == carbonLoadModel.getLoadMetadataDetails) {
297+
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
298+
}
299+
}
300+
}

‎integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/LoadTableCommand.scala

+152-312
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataPro
2727
import org.apache.spark.sql.hive.CarbonRelation
2828
import org.apache.spark.util.{CausedBy, FileUtils}
2929

30-
import org.apache.carbondata.common.constants.LoggerAction
3130
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
3231
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
3332
import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -39,14 +38,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
3938
import org.apache.carbondata.core.util.path.CarbonStorePath
4039
import org.apache.carbondata.format
4140
import org.apache.carbondata.processing.exception.DataLoadingException
42-
import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
4341
import org.apache.carbondata.processing.loading.exception.NoRetryException
4442
import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
45-
import org.apache.carbondata.processing.util.TableOptionConstant
4643
import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
47-
import org.apache.carbondata.spark.load.ValidateUtil
4844
import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, DictionaryLoadModel}
49-
import org.apache.carbondata.spark.util.{CommonUtil, GlobalDictionaryUtil}
45+
import org.apache.carbondata.spark.util.{CommonUtil, DataLoadingUtil, GlobalDictionaryUtil}
5046

5147
case class LoadTableCommand(
5248
databaseNameOp: Option[String],
@@ -60,89 +56,6 @@ case class LoadTableCommand(
6056
updateModel: Option[UpdateTableModel] = None)
6157
extends RunnableCommand with DataProcessCommand {
6258

63-
private def getFinalOptions(carbonProperty: CarbonProperties):
64-
scala.collection.mutable.Map[String, String] = {
65-
val optionsFinal = scala.collection.mutable.Map[String, String]()
66-
optionsFinal.put("delimiter", options.getOrElse("delimiter", ","))
67-
optionsFinal.put("quotechar", options.getOrElse("quotechar", "\""))
68-
optionsFinal.put("fileheader", options.getOrElse("fileheader", ""))
69-
optionsFinal.put("escapechar", options.getOrElse("escapechar", "\\"))
70-
optionsFinal.put("commentchar", options.getOrElse("commentchar", "#"))
71-
optionsFinal.put("columndict", options.getOrElse("columndict", null))
72-
optionsFinal
73-
.put("serialization_null_format", options.getOrElse("serialization_null_format", "\\N"))
74-
optionsFinal.put("bad_records_logger_enable", options.getOrElse("bad_records_logger_enable",
75-
carbonProperty
76-
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE,
77-
CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_LOGGER_ENABLE_DEFAULT)))
78-
val badRecordActionValue = carbonProperty
79-
.getProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION,
80-
CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION_DEFAULT)
81-
optionsFinal.put("bad_records_action", options.getOrElse("bad_records_action", carbonProperty
82-
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
83-
badRecordActionValue)))
84-
optionsFinal
85-
.put("is_empty_data_bad_record", options.getOrElse("is_empty_data_bad_record", carbonProperty
86-
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
87-
CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT)))
88-
optionsFinal.put("all_dictionary_path", options.getOrElse("all_dictionary_path", ""))
89-
optionsFinal
90-
.put("complex_delimiter_level_1", options.getOrElse("complex_delimiter_level_1", "\\$"))
91-
optionsFinal
92-
.put("complex_delimiter_level_2", options.getOrElse("complex_delimiter_level_2", "\\:"))
93-
optionsFinal.put("dateformat", options.getOrElse("dateformat",
94-
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
95-
CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)))
96-
97-
optionsFinal.put("global_sort_partitions", options.getOrElse("global_sort_partitions",
98-
carbonProperty
99-
.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_GLOBAL_SORT_PARTITIONS, null)))
100-
101-
optionsFinal.put("maxcolumns", options.getOrElse("maxcolumns", null))
102-
103-
optionsFinal.put("batch_sort_size_inmb", options.getOrElse("batch_sort_size_inmb",
104-
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BATCH_SORT_SIZE_INMB,
105-
carbonProperty.getProperty(CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB,
106-
CarbonCommonConstants.LOAD_BATCH_SORT_SIZE_INMB_DEFAULT))))
107-
108-
optionsFinal.put("bad_record_path", options.getOrElse("bad_record_path",
109-
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
110-
carbonProperty.getProperty(CarbonCommonConstants.CARBON_BADRECORDS_LOC,
111-
CarbonCommonConstants.CARBON_BADRECORDS_LOC_DEFAULT_VAL))))
112-
113-
val useOnePass = options.getOrElse("single_pass",
114-
carbonProperty.getProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS,
115-
CarbonLoadOptionConstants.CARBON_OPTIONS_SINGLE_PASS_DEFAULT)).trim.toLowerCase match {
116-
case "true" =>
117-
true
118-
case "false" =>
119-
// when single_pass = false and if either alldictionarypath
120-
// or columnDict is configured the do not allow load
121-
if (StringUtils.isNotEmpty(optionsFinal("all_dictionary_path")) ||
122-
StringUtils.isNotEmpty(optionsFinal("columndict"))) {
123-
throw new MalformedCarbonCommandException(
124-
"Can not use all_dictionary_path or columndict without single_pass.")
125-
} else {
126-
false
127-
}
128-
case illegal =>
129-
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
130-
LOGGER.error(s"Can't use single_pass, because illegal syntax found: [" + illegal + "] " +
131-
"Please set it as 'true' or 'false'")
132-
false
133-
}
134-
optionsFinal.put("single_pass", useOnePass.toString)
135-
optionsFinal
136-
}
137-
138-
private def checkDefaultValue(value: String, default: String) = {
139-
if (StringUtils.isEmpty(value)) {
140-
default
141-
} else {
142-
value
143-
}
144-
}
145-
14659
override def run(sparkSession: SparkSession): Seq[Row] = {
14760
processData(sparkSession)
14861
}
@@ -158,7 +71,6 @@ case class LoadTableCommand(
15871
}
15972

16073
val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
161-
16274
val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
16375
.lookupRelation(Option(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation]
16476
if (relation == null) {
@@ -172,7 +84,7 @@ case class LoadTableCommand(
17284

17385
val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
17486
carbonProperty.addProperty("zookeeper.enable.lock", "false")
175-
val optionsFinal = getFinalOptions(carbonProperty)
87+
val optionsFinal = DataLoadingUtil.getDataLoadingOptions(carbonProperty, options)
17688

17789
val tableProperties = relation.tableMeta.carbonTable.getTableInfo
17890
.getFactTable.getTableProperties
@@ -183,133 +95,26 @@ case class LoadTableCommand(
18395
CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT))))
18496

18597
try {
98+
val table = relation.tableMeta.carbonTable
99+
val carbonLoadModel = new CarbonLoadModel()
186100
val factPath = if (dataFrame.isDefined) {
187101
""
188102
} else {
189103
FileUtils.getPaths(
190104
CarbonUtil.checkAndAppendHDFSUrl(factPathFromUser))
191105
}
192-
val carbonLoadModel = new CarbonLoadModel()
193-
carbonLoadModel.setTableName(relation.tableMeta.carbonTableIdentifier.getTableName)
194-
carbonLoadModel.setDatabaseName(relation.tableMeta.carbonTableIdentifier.getDatabaseName)
195-
carbonLoadModel.setStorePath(relation.tableMeta.carbonTable.getStorePath)
196-
197-
val table = relation.tableMeta.carbonTable
198-
carbonLoadModel.setTableName(table.getFactTableName)
199-
val dataLoadSchema = new CarbonDataLoadSchema(table)
200-
// Need to fill dimension relation
201-
carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
202-
203-
val partitionLocation = relation.tableMeta.storePath + "/partition/" +
204-
relation.tableMeta.carbonTableIdentifier.getDatabaseName + "/" +
205-
relation.tableMeta.carbonTableIdentifier.getTableName + "/"
206-
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
207-
val sort_scope = optionsFinal("sort_scope")
208-
val single_pass = optionsFinal("single_pass")
209-
val bad_records_logger_enable = optionsFinal("bad_records_logger_enable")
210-
val bad_records_action = optionsFinal("bad_records_action")
211-
val bad_record_path = optionsFinal("bad_record_path")
212-
val global_sort_partitions = optionsFinal("global_sort_partitions")
213-
val dateFormat = optionsFinal("dateformat")
214-
val delimeter = optionsFinal("delimiter")
215-
val complex_delimeter_level1 = optionsFinal("complex_delimiter_level_1")
216-
val complex_delimeter_level2 = optionsFinal("complex_delimiter_level_2")
217-
val all_dictionary_path = optionsFinal("all_dictionary_path")
218-
val column_dict = optionsFinal("columndict")
219-
ValidateUtil.validateDateFormat(dateFormat, table, tableName)
220-
ValidateUtil.validateSortScope(table, sort_scope)
221-
222-
if (bad_records_logger_enable.toBoolean ||
223-
LoggerAction.REDIRECT.name().equalsIgnoreCase(bad_records_action)) {
224-
if (!CarbonUtil.isValidBadStorePath(bad_record_path)) {
225-
sys.error("Invalid bad records location.")
226-
}
227-
}
228-
carbonLoadModel.setBadRecordsLocation(bad_record_path)
229-
230-
ValidateUtil.validateGlobalSortPartitions(global_sort_partitions)
231-
carbonLoadModel.setEscapeChar(checkDefaultValue(optionsFinal("escapechar"), "\\"))
232-
carbonLoadModel.setQuoteChar(checkDefaultValue(optionsFinal("quotechar"), "\""))
233-
carbonLoadModel.setCommentChar(checkDefaultValue(optionsFinal("commentchar"), "#"))
234-
235-
// if there isn't file header in csv file and load sql doesn't provide FILEHEADER option,
236-
// we should use table schema to generate file header.
237-
var fileHeader = optionsFinal("fileheader")
238-
val headerOption = options.get("header")
239-
if (headerOption.isDefined) {
240-
// whether the csv file has file header
241-
// the default value is true
242-
val header = try {
243-
headerOption.get.toBoolean
244-
} catch {
245-
case ex: IllegalArgumentException =>
246-
throw new MalformedCarbonCommandException(
247-
"'header' option should be either 'true' or 'false'. " + ex.getMessage)
248-
}
249-
if (header) {
250-
if (fileHeader.nonEmpty) {
251-
throw new MalformedCarbonCommandException(
252-
"When 'header' option is true, 'fileheader' option is not required.")
253-
}
254-
} else {
255-
if (fileHeader.isEmpty) {
256-
fileHeader = table.getCreateOrderColumn(table.getFactTableName)
257-
.asScala.map(_.getColName).mkString(",")
258-
}
259-
}
260-
}
261-
262-
carbonLoadModel.setDateFormat(dateFormat)
263-
carbonLoadModel.setDefaultTimestampFormat(carbonProperty.getProperty(
264-
CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
265-
CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT))
266-
carbonLoadModel.setDefaultDateFormat(carbonProperty.getProperty(
267-
CarbonCommonConstants.CARBON_DATE_FORMAT,
268-
CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT))
269-
carbonLoadModel
270-
.setSerializationNullFormat(
271-
TableOptionConstant.SERIALIZATION_NULL_FORMAT.getName + "," +
272-
optionsFinal("serialization_null_format"))
273-
carbonLoadModel
274-
.setBadRecordsLoggerEnable(
275-
TableOptionConstant.BAD_RECORDS_LOGGER_ENABLE.getName + "," + bad_records_logger_enable)
276-
carbonLoadModel
277-
.setBadRecordsAction(
278-
TableOptionConstant.BAD_RECORDS_ACTION.getName + "," + bad_records_action)
279-
carbonLoadModel
280-
.setIsEmptyDataBadRecord(
281-
DataLoadProcessorConstants.IS_EMPTY_DATA_BAD_RECORD + "," +
282-
optionsFinal("is_empty_data_bad_record"))
283-
carbonLoadModel.setSortScope(sort_scope)
284-
carbonLoadModel.setBatchSortSizeInMb(optionsFinal("batch_sort_size_inmb"))
285-
carbonLoadModel.setGlobalSortPartitions(global_sort_partitions)
286-
carbonLoadModel.setUseOnePass(single_pass.toBoolean)
287-
if (delimeter.equalsIgnoreCase(complex_delimeter_level1) ||
288-
complex_delimeter_level1.equalsIgnoreCase(complex_delimeter_level2) ||
289-
delimeter.equalsIgnoreCase(complex_delimeter_level2)) {
290-
sys.error(s"Field Delimiter & Complex types delimiter are same")
291-
} else {
292-
carbonLoadModel.setComplexDelimiterLevel1(
293-
CarbonUtil.delimiterConverter(complex_delimeter_level1))
294-
carbonLoadModel.setComplexDelimiterLevel2(
295-
CarbonUtil.delimiterConverter(complex_delimeter_level2))
296-
}
297-
// set local dictionary path, and dictionary file extension
298-
carbonLoadModel.setAllDictPath(all_dictionary_path)
299-
300-
val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
301-
try {
106+
carbonLoadModel.setFactFilePath(factPath)
107+
DataLoadingUtil.buildCarbonLoadModel(
108+
table,
109+
carbonProperty,
110+
options,
111+
optionsFinal,
112+
carbonLoadModel
113+
)
114+
115+
try{
302116
// First system has to partition the data first and then call the load data
303117
LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
304-
carbonLoadModel.setFactFilePath(factPath)
305-
carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
306-
carbonLoadModel.setCsvHeader(fileHeader)
307-
carbonLoadModel.setColDictFilePath(column_dict)
308-
carbonLoadModel.setDirectLoad(true)
309-
carbonLoadModel.setCsvHeaderColumns(CommonUtil.getCsvHeaderColumns(carbonLoadModel))
310-
val validatedMaxColumns = CommonUtil.validateMaxColumns(carbonLoadModel.getCsvHeaderColumns,
311-
optionsFinal("maxcolumns"))
312-
carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
313118
GlobalDictionaryUtil.updateTableMetadataFunc = updateTableMetadata
314119
val storePath = relation.tableMeta.storePath
315120
// add the start entry for the new load in the table status file
@@ -320,11 +125,9 @@ case class LoadTableCommand(
320125
if (isOverwriteTable) {
321126
LOGGER.info(s"Overwrite of carbon table with $dbName.$tableName is in progress")
322127
}
323-
if (null == carbonLoadModel.getLoadMetadataDetails) {
324-
CommonUtil.readLoadMetadataDetails(carbonLoadModel)
325-
}
326128
if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
327-
StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
129+
StringUtils.isEmpty(carbonLoadModel.getColDictFilePath) &&
130+
StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
328131
LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
329132
LOGGER.audit(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
330133
carbonLoadModel.setUseOnePass(false)
@@ -337,111 +140,21 @@ case class LoadTableCommand(
337140
if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
338141
FileFactory.mkdirs(metadataDirectoryPath, fileType)
339142
}
143+
val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
144+
val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean
340145
if (carbonLoadModel.getUseOnePass) {
341-
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
342-
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
343-
.getCarbonTableIdentifier
344-
val carbonTablePath = CarbonStorePath
345-
.getCarbonTablePath(storePath, carbonTableIdentifier)
346-
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
347-
val dimensions = carbonTable.getDimensionByTableName(
348-
carbonTable.getFactTableName).asScala.toArray
349-
val colDictFilePath = carbonLoadModel.getColDictFilePath
350-
if (!StringUtils.isEmpty(colDictFilePath)) {
351-
carbonLoadModel.initPredefDictMap()
352-
// generate predefined dictionary
353-
GlobalDictionaryUtil
354-
.generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
355-
dimensions, carbonLoadModel, sparkSession.sqlContext, storePath, dictFolderPath)
356-
}
357-
if (!StringUtils.isEmpty(all_dictionary_path)) {
358-
carbonLoadModel.initPredefDictMap()
359-
GlobalDictionaryUtil
360-
.generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
361-
carbonLoadModel,
362-
storePath,
363-
carbonTableIdentifier,
364-
dictFolderPath,
365-
dimensions,
366-
all_dictionary_path)
367-
}
368-
// dictionaryServerClient dictionary generator
369-
val dictionaryServerPort = carbonProperty
370-
.getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
371-
CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
372-
val sparkDriverHost = sparkSession.sqlContext.sparkContext.
373-
getConf.get("spark.driver.host")
374-
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
375-
// start dictionary server when use one pass load and dimension with DICTIONARY
376-
// encoding is present.
377-
val allDimensions = table.getAllDimensions.asScala.toList
378-
val createDictionary = allDimensions.exists {
379-
carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
380-
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
381-
}
382-
val server: Option[DictionaryServer] = if (createDictionary) {
383-
val dictionaryServer = DictionaryServer
384-
.getInstance(dictionaryServerPort.toInt, carbonTable)
385-
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
386-
sparkSession.sparkContext.addSparkListener(new SparkListener() {
387-
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
388-
dictionaryServer.shutdown()
389-
}
390-
})
391-
Some(dictionaryServer)
392-
} else {
393-
None
394-
}
395-
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
146+
loadDataUsingOnePass(
147+
sparkSession,
148+
carbonProperty,
396149
carbonLoadModel,
397-
relation.tableMeta.storePath,
398150
columnar,
399-
partitionStatus,
400-
server,
401-
isOverwriteTable,
402-
dataFrame,
403-
updateModel)
151+
partitionStatus)
404152
} else {
405-
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
406-
val fields = dataFrame.get.schema.fields
407-
import org.apache.spark.sql.functions.udf
408-
// extracting only segment from tupleId
409-
val getSegIdUDF = udf((tupleId: String) =>
410-
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
411-
// getting all fields except tupleId field as it is not required in the value
412-
var otherFields = fields.toSeq
413-
.filter(field => !field.name
414-
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
415-
.map(field => new Column(field.name))
416-
417-
// extract tupleId field which will be used as a key
418-
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
419-
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
420-
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
421-
// use dataFrameWithoutTupleId as dictionaryDataFrame
422-
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
423-
otherFields = otherFields :+ segIdColumn
424-
// use dataFrameWithTupleId as loadDataFrame
425-
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
426-
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
427-
} else {
428-
(dataFrame, dataFrame)
429-
}
430-
431-
GlobalDictionaryUtil.generateGlobalDictionary(
432-
sparkSession.sqlContext,
433-
carbonLoadModel,
434-
relation.tableMeta.storePath,
435-
dictionaryDataFrame)
436-
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
153+
loadData(
154+
sparkSession,
437155
carbonLoadModel,
438-
relation.tableMeta.storePath,
439156
columnar,
440-
partitionStatus,
441-
None,
442-
isOverwriteTable,
443-
loadDataFrame,
444-
updateModel)
157+
partitionStatus)
445158
}
446159
} catch {
447160
case CausedBy(ex: NoRetryException) =>
@@ -454,6 +167,9 @@ case class LoadTableCommand(
454167
} finally {
455168
// Once the data load is successful delete the unwanted partition files
456169
try {
170+
val partitionLocation = table.getStorePath + "/partition/" +
171+
table.getDatabaseName + "/" +
172+
table.getFactTableName + "/"
457173
val fileType = FileFactory.getFileType(partitionLocation)
458174
if (FileFactory.isFileExist(partitionLocation, fileType)) {
459175
val file = FileFactory
@@ -480,6 +196,130 @@ case class LoadTableCommand(
480196
Seq.empty
481197
}
482198

199+
private def loadDataUsingOnePass(
200+
sparkSession: SparkSession,
201+
carbonProperty: CarbonProperties,
202+
carbonLoadModel: CarbonLoadModel,
203+
columnar: Boolean,
204+
partitionStatus: String): Unit = {
205+
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
206+
val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
207+
.getCarbonTableIdentifier
208+
val carbonTablePath = CarbonStorePath
209+
.getCarbonTablePath(carbonLoadModel.getStorePath, carbonTableIdentifier)
210+
val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
211+
val dimensions = carbonTable.getDimensionByTableName(
212+
carbonTable.getFactTableName).asScala.toArray
213+
val colDictFilePath = carbonLoadModel.getColDictFilePath
214+
if (!StringUtils.isEmpty(colDictFilePath)) {
215+
carbonLoadModel.initPredefDictMap()
216+
// generate predefined dictionary
217+
GlobalDictionaryUtil.generatePredefinedColDictionary(
218+
colDictFilePath,
219+
carbonTableIdentifier,
220+
dimensions,
221+
carbonLoadModel,
222+
sparkSession.sqlContext,
223+
carbonLoadModel.getStorePath,
224+
dictFolderPath)
225+
}
226+
if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
227+
carbonLoadModel.initPredefDictMap()
228+
GlobalDictionaryUtil
229+
.generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
230+
carbonLoadModel,
231+
carbonLoadModel.getStorePath,
232+
carbonTableIdentifier,
233+
dictFolderPath,
234+
dimensions,
235+
carbonLoadModel.getAllDictPath)
236+
}
237+
// dictionaryServerClient dictionary generator
238+
val dictionaryServerPort = carbonProperty
239+
.getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
240+
CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
241+
val sparkDriverHost = sparkSession.sqlContext.sparkContext.
242+
getConf.get("spark.driver.host")
243+
carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
244+
// start dictionary server when use one pass load and dimension with DICTIONARY
245+
// encoding is present.
246+
val allDimensions =
247+
carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getAllDimensions.asScala.toList
248+
val createDictionary = allDimensions.exists {
249+
carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
250+
!carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
251+
}
252+
val server: Option[DictionaryServer] = if (createDictionary) {
253+
val dictionaryServer = DictionaryServer
254+
.getInstance(dictionaryServerPort.toInt, carbonTable)
255+
carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
256+
sparkSession.sparkContext.addSparkListener(new SparkListener() {
257+
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) {
258+
dictionaryServer.shutdown()
259+
}
260+
})
261+
Some(dictionaryServer)
262+
} else {
263+
None
264+
}
265+
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
266+
carbonLoadModel,
267+
carbonLoadModel.getStorePath,
268+
columnar,
269+
partitionStatus,
270+
server,
271+
isOverwriteTable,
272+
dataFrame,
273+
updateModel)
274+
}
275+
276+
private def loadData(
277+
sparkSession: SparkSession,
278+
carbonLoadModel: CarbonLoadModel,
279+
columnar: Boolean,
280+
partitionStatus: String): Unit = {
281+
val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
282+
val fields = dataFrame.get.schema.fields
283+
import org.apache.spark.sql.functions.udf
284+
// extracting only segment from tupleId
285+
val getSegIdUDF = udf((tupleId: String) =>
286+
CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
287+
// getting all fields except tupleId field as it is not required in the value
288+
var otherFields = fields.toSeq
289+
.filter(field => !field.name
290+
.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
291+
.map(field => new Column(field.name))
292+
293+
// extract tupleId field which will be used as a key
294+
val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
295+
.quotedString(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))).
296+
as(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_SEGMENTID)
297+
// use dataFrameWithoutTupleId as dictionaryDataFrame
298+
val dataFrameWithoutTupleId = dataFrame.get.select(otherFields: _*)
299+
otherFields = otherFields :+ segIdColumn
300+
// use dataFrameWithTupleId as loadDataFrame
301+
val dataFrameWithTupleId = dataFrame.get.select(otherFields: _*)
302+
(Some(dataFrameWithoutTupleId), Some(dataFrameWithTupleId))
303+
} else {
304+
(dataFrame, dataFrame)
305+
}
306+
307+
GlobalDictionaryUtil.generateGlobalDictionary(
308+
sparkSession.sqlContext,
309+
carbonLoadModel,
310+
carbonLoadModel.getStorePath,
311+
dictionaryDataFrame)
312+
CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
313+
carbonLoadModel,
314+
carbonLoadModel.getStorePath,
315+
columnar,
316+
partitionStatus,
317+
None,
318+
isOverwriteTable,
319+
loadDataFrame,
320+
updateModel)
321+
}
322+
483323
private def updateTableMetadata(
484324
carbonLoadModel: CarbonLoadModel,
485325
sqlContext: SQLContext,

0 commit comments

Comments
 (0)
Please sign in to comment.