Skip to content

Commit 7dcc2e7

Browse files
kunal642manishgupta88
authored andcommitted
[CARBONDATA-1928] Seperate the properties for timeout and retries for load flow
Currently the property that is used to configure the lock retry count and the interval between retries is common for all the locks. This will be problematic when the user has configured the retries to 10/20 for concurrent loading. This property will be affecting other lock behaviours also, all other locks would have to retry for 10 times too. 1. Change the name of the "carbon.load.metadata.lock.retries" property to "carbon.concurrent.lock.retries" AND "carbon.concurrent.lock.retry.timeout.sec" 2. Introduce a new property for all other locks "carbon.lock.retries" AND "carbon.lock.retry.timeout.sec" This closes apache#1708
1 parent 8a295b5 commit 7dcc2e7

File tree

8 files changed

+86
-24
lines changed

8 files changed

+86
-24
lines changed

core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java

+35-7
Original file line numberDiff line numberDiff line change
@@ -571,25 +571,53 @@ public final class CarbonCommonConstants {
571571
*/
572572
public static final String CARBON_TIMESTAMP_MILLIS = "dd-MM-yyyy HH:mm:ss:SSS";
573573
/**
574-
* NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
574+
* NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK.
575+
*
576+
* Because we want concurrent loads to be completed even if they have to wait for the lock
577+
* therefore taking the default as 100.
578+
*
579+
* Example: Concurrent loads will use this to wait to acquire the table status lock.
575580
*/
576-
public static final int NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT = 3;
581+
public static final int NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT = 100;
577582
/**
578583
* MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
584+
*
585+
* * Example: Concurrent loads will use this to wait to acquire the table status lock.
579586
*/
580-
public static final int MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT = 5;
587+
public static final int MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT = 1;
581588
/**
582589
* NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK
583590
*/
584591
@CarbonProperty
585-
public static final String NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK =
586-
"carbon.load.metadata.lock.retries";
592+
public static final String NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK =
593+
"carbon.concurrent.lock.retries";
587594
/**
588595
* MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK
589596
*/
590597
@CarbonProperty
591-
public static final String MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK =
592-
"carbon.load.metadata.lock.retry.timeout.sec";
598+
public static final String MAX_TIMEOUT_FOR_CONCURRENT_LOCK =
599+
"carbon.concurrent.lock.retry.timeout.sec";
600+
601+
/**
602+
* NUMBER_OF_TRIES_FOR_CARBON_LOCK
603+
*/
604+
public static final int NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT = 3;
605+
/**
606+
* MAX_TIMEOUT_FOR_CARBON_LOCK
607+
*/
608+
public static final int MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT = 5;
609+
/**
610+
* NUMBER_OF_TRIES_FOR_CARBON_LOCK
611+
*/
612+
@CarbonProperty
613+
public static final String NUMBER_OF_TRIES_FOR_CARBON_LOCK =
614+
"carbon.lock.retries";
615+
/**
616+
* MAX_TIMEOUT_FOR_CARBON_LOCK
617+
*/
618+
@CarbonProperty
619+
public static final String MAX_TIMEOUT_FOR_CARBON_LOCK =
620+
"carbon.lock.retry.timeout.sec";
593621

594622
/**
595623
* compressor for writing/reading carbondata file

core/src/main/java/org/apache/carbondata/core/locks/AbstractCarbonLock.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -50,25 +50,34 @@ public boolean lockWithRetries() {
5050
return false;
5151
}
5252

53+
/**
54+
* API for enabling the locking of file with retries.
55+
*/
56+
public boolean lockWithRetries(int retries, int retryInterval) {
57+
retryCount = retries;
58+
retryTimeout = retryInterval;
59+
return lockWithRetries();
60+
}
61+
5362
/**
5463
* Initializes the retry count and retry timeout.
5564
* This will determine how many times to retry to acquire lock and the retry timeout.
5665
*/
5766
protected void initRetry() {
5867
String retries = CarbonProperties.getInstance()
59-
.getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK);
68+
.getProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK);
6069
try {
6170
retryCount = Integer.parseInt(retries);
6271
} catch (NumberFormatException e) {
63-
retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_LOAD_METADATA_LOCK_DEFAULT;
72+
retryCount = CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CARBON_LOCK_DEFAULT;
6473
}
6574

6675
String maxTimeout = CarbonProperties.getInstance()
67-
.getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK);
76+
.getProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK);
6877
try {
6978
retryTimeout = Integer.parseInt(maxTimeout);
7079
} catch (NumberFormatException e) {
71-
retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK_DEFAULT;
80+
retryTimeout = CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK_DEFAULT;
7281
}
7382

7483
}

core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java

+13
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.carbondata.common.logging.LogService;
2121
import org.apache.carbondata.common.logging.LogServiceFactory;
2222
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
23+
import org.apache.carbondata.core.util.CarbonProperties;
2324

2425
/**
2526
* This class contains all carbon lock utilities
@@ -92,4 +93,16 @@ public static ICarbonLock getLockObject(AbsoluteTableIdentifier identifier, Stri
9293
"Acquire table lock failed after retry, please try after some time");
9394
}
9495

96+
/**
97+
* Get the value for the property. If NumberFormatException is thrown then return default value.
98+
*/
99+
public static int getLockProperty(String property, int defaultValue) {
100+
try {
101+
return Integer.parseInt(CarbonProperties.getInstance()
102+
.getProperty(property));
103+
} catch (NumberFormatException e) {
104+
return defaultValue;
105+
}
106+
}
107+
95108
}

core/src/main/java/org/apache/carbondata/core/locks/ICarbonLock.java

+7
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,13 @@ public interface ICarbonLock {
3636
*/
3737
boolean lockWithRetries();
3838

39+
/**
40+
* This will acquire the lock and if it doesnt get then it will retry after retryInterval.
41+
*
42+
* @return
43+
*/
44+
boolean lockWithRetries(int retryCount, int retryInterval);
45+
3946
/**
4047
* This method will delete the lock file at the specified location.
4148
*

integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionConcurrencyTestCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class DataRetentionConcurrencyTestCase extends QueryTest with BeforeAndAfterAll
3434
private val executorService = Executors.newFixedThreadPool(10)
3535

3636
override def beforeAll {
37-
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK, "1")
37+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK, "1")
3838
sql("drop table if exists concurrent")
3939
sql(
4040
"create table concurrent (ID int, date String, country String, name " +

integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataretention/DataRetentionTestCase.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class DataRetentionTestCase extends QueryTest with BeforeAndAfterAll {
5454
sql("drop table if exists DataRetentionTable")
5555
sql("drop table if exists retentionlock")
5656

57-
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_LOAD_METADATA_LOCK, "1")
57+
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CARBON_LOCK, "1")
5858
CarbonProperties.getInstance.addProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME, "1")
5959
CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
6060
sql(

integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala

+6-9
Original file line numberDiff line numberDiff line change
@@ -571,8 +571,7 @@ object CarbonDataRDDFactory {
571571
dataFrame: Option[DataFrame],
572572
carbonLoadModel: CarbonLoadModel,
573573
updateModel: Option[UpdateTableModel],
574-
carbonTable: CarbonTable
575-
): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
574+
carbonTable: CarbonTable): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
576575
val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
577576

578577
val updateRdd = dataFrame.get.rdd
@@ -632,8 +631,7 @@ object CarbonDataRDDFactory {
632631
updateModel: Option[UpdateTableModel],
633632
key: String,
634633
taskNo: Long,
635-
iter: Iterator[Row]
636-
): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
634+
iter: Iterator[Row]): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
637635
val rddResult = new updateResultImpl()
638636
val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
639637
val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
@@ -718,8 +716,7 @@ object CarbonDataRDDFactory {
718716
sqlContext: SQLContext,
719717
carbonLoadModel: CarbonLoadModel,
720718
carbonTable: CarbonTable,
721-
operationContext: OperationContext
722-
): Unit = {
719+
operationContext: OperationContext): Unit = {
723720
LOGGER.info(s"compaction need status is" +
724721
s" ${ CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable) }")
725722
if (CarbonDataMergerUtil.checkIfAutoLoadMergingRequired(carbonTable)) {
@@ -796,8 +793,7 @@ object CarbonDataRDDFactory {
796793
carbonLoadModel: CarbonLoadModel,
797794
loadStatus: SegmentStatus,
798795
newEntryLoadStatus: SegmentStatus,
799-
overwriteTable: Boolean
800-
): Boolean = {
796+
overwriteTable: Boolean): Boolean = {
801797
val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
802798
val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
803799
status(0)._2._1
@@ -814,7 +810,8 @@ object CarbonDataRDDFactory {
814810
val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
815811
overwriteTable)
816812
if (!done) {
817-
val errorMessage = "Dataload failed due to failure in table status updation."
813+
val errorMessage = s"Dataload failed due to failure in table status updation for" +
814+
s" ${carbonLoadModel.getTableName}"
818815
LOGGER.audit("Data load is failed for " +
819816
s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
820817
LOGGER.error("Dataload failed due to failure in table status updation.")

processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java

+10-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
4343
import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
4444
import org.apache.carbondata.core.fileoperations.FileWriteOperation;
45+
import org.apache.carbondata.core.locks.CarbonLockUtil;
4546
import org.apache.carbondata.core.locks.ICarbonLock;
4647
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
4748
import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -161,8 +162,14 @@ public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
161162
String tableStatusPath = carbonTablePath.getTableStatusFilePath();
162163
SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
163164
ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
165+
int retryCount = CarbonLockUtil
166+
.getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
167+
CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
168+
int maxTimeout = CarbonLockUtil
169+
.getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
170+
CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
164171
try {
165-
if (carbonLock.lockWithRetries()) {
172+
if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
166173
LOGGER.info(
167174
"Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
168175
+ " for table status updation");
@@ -379,7 +386,8 @@ public static void readAndUpdateLoadProgressInTableMeta(CarbonLoadModel model,
379386
boolean entryAdded =
380387
CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite);
381388
if (!entryAdded) {
382-
throw new IOException("Failed to add entry in table status for " + model.getTableName());
389+
throw new IOException("Dataload failed due to failure in table status updation for "
390+
+ model.getTableName());
383391
}
384392
}
385393

0 commit comments

Comments
 (0)