Skip to content

Commit 72bb652

Browse files
author
Satyam Saxena
committed
Make idStore Get and Set Synchronized
1 parent 9dbb85d commit 72bb652

13 files changed

+117
-88
lines changed

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareIdGeneratorPerfTest.testGenerate.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
"iterations" : 4,
55
"threads" : 1,
66
"forks" : 3,
7-
"mean_ops" : 433192.4270233689
7+
"mean_ops" : 439867.61740979465
88
}

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareIdGeneratorTest.testGenerateWithBenchmark.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareIdGeneratorTest.testGenerateWithBenchmark",
33
"iterations" : 100000,
44
"threads" : 5,
5-
"totalMillis" : 2313,
6-
"avgTime" : 462.6
5+
"totalMillis" : 2722,
6+
"avgTime" : 544.4
77
}

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.PartitionAwareIdGeneratorTest.testGenerateWithConstraints.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareIdGeneratorTest.testGenerateWithConstraints",
33
"iterations" : 100000,
44
"threads" : 5,
5-
"totalMillis" : 7745,
6-
"avgTime" : 1549.0
5+
"totalMillis" : 7014,
6+
"avgTime" : 1402.8
77
}

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorPerfTest.testGenerate.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,5 @@
44
"iterations" : 4,
55
"threads" : 1,
66
"forks" : 3,
7-
"mean_ops" : 379648.93091326446
7+
"mean_ops" : 419418.9728131592
88
}

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithBenchmark.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"name" : "io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithBenchmark",
33
"iterations" : 100000,
44
"threads" : 5,
5-
"totalMillis" : 31489,
6-
"avgTime" : 6297.8
5+
"totalMillis" : 3668,
6+
"avgTime" : 733.6
77
}

ranger-discovery-bundle/perf/results/io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithConstraints.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
"name" : "io.appform.ranger.discovery.bundle.id.WeightedIdGeneratorTest.testGenerateWithConstraints",
33
"iterations" : 100000,
44
"threads" : 5,
5-
"totalMillis" : 105139,
6-
"avgTime" : 21027.8
5+
"totalMillis" : 28683,
6+
"avgTime" : 5736.6
77
}

ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/weighted/DistributedIdGenerator.java

+32-38
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,19 @@
77
import dev.failsafe.Failsafe;
88
import dev.failsafe.FailsafeExecutor;
99
import dev.failsafe.RetryPolicy;
10-
import io.appform.ranger.discovery.bundle.id.Constants;
1110
import io.appform.ranger.discovery.bundle.id.Id;
1211
import io.appform.ranger.discovery.bundle.id.IdGenerator;
1312
import io.appform.ranger.discovery.bundle.id.config.IdGeneratorConfig;
1413
import io.appform.ranger.discovery.bundle.id.constraints.PartitionValidationConstraint;
1514
import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter;
1615
import io.appform.ranger.discovery.bundle.id.formatter.IdFormatters;
17-
import lombok.Getter;
1816
import lombok.extern.slf4j.Slf4j;
1917
import lombok.val;
2018
import org.joda.time.DateTime;
2119
import org.joda.time.format.DateTimeFormat;
2220
import org.joda.time.format.DateTimeFormatter;
2321

2422
import java.security.SecureRandom;
25-
import java.time.Clock;
2623
import java.util.ArrayList;
2724
import java.util.Collections;
2825
import java.util.HashMap;
@@ -31,8 +28,6 @@
3128
import java.util.Objects;
3229
import java.util.Optional;
3330
import java.util.concurrent.ConcurrentHashMap;
34-
import java.util.concurrent.Executors;
35-
import java.util.concurrent.TimeUnit;
3631
import java.util.function.Function;
3732
import java.util.regex.Pattern;
3833

@@ -51,18 +46,16 @@ abstract class DistributedIdGenerator {
5146
private static final List<PartitionValidationConstraint> GLOBAL_CONSTRAINTS = new ArrayList<>();
5247
private static final Map<String, List<PartitionValidationConstraint>> DOMAIN_SPECIFIC_CONSTRAINTS = new HashMap<>();
5348
protected static final int NODE_ID = IdGenerator.getNodeId();
54-
@Getter
55-
private final Map<String, Map<Long, PartitionIdTracker>> idStore = new ConcurrentHashMap<>();
49+
private final Map<String, PartitionIdTracker[]> idStore = new ConcurrentHashMap<>();
5650
protected final IdFormatter idFormatter;
5751
protected final Function<String, Integer> partitionResolver;
5852
protected final IdGeneratorConfig idGeneratorConfig;
59-
private final Clock clock = Clock.systemDefaultZone();
6053
private final Meter retryLimitBreachedMeter;
6154

6255
/* idStore Structure
6356
{
64-
prefix: {
65-
timestamp: {
57+
prefix: [
58+
<timestamp>: {
6659
partitions: [
6760
{
6861
ids: [],
@@ -75,7 +68,7 @@ abstract class DistributedIdGenerator {
7568
],
7669
counter: <int>
7770
}
78-
}
71+
]
7972
}
8073
*/
8174

@@ -93,13 +86,6 @@ protected DistributedIdGenerator(final IdGeneratorConfig idGeneratorConfig,
9386
.handleResultIf(Objects::isNull)
9487
.build();
9588
retrier = Failsafe.with(Collections.singletonList(retryPolicy));
96-
97-
val executorService = Executors.newScheduledThreadPool(1);
98-
executorService.scheduleWithFixedDelay(
99-
this::deleteExpiredKeys,
100-
Constants.ID_DELETION_DELAY_IN_SECONDS,
101-
Constants.ID_DELETION_DELAY_IN_SECONDS,
102-
TimeUnit.SECONDS);
10389
}
10490

10591
protected DistributedIdGenerator(final IdGeneratorConfig idGeneratorConfig,
@@ -143,12 +129,12 @@ public Optional<Id> generate(final String prefix) {
143129
}
144130

145131
public Optional<Id> generateForPartition(final String prefix, final int targetPartitionId) {
146-
val prefixIdMap = idStore.computeIfAbsent(prefix, k -> new ConcurrentHashMap<>());
147132
val currentTimestamp = new DateTime();
148-
val timeKey = getTimeKey(clock.instant().getEpochSecond());
133+
val prefixIdMap = idStore.computeIfAbsent(prefix, k -> new PartitionIdTracker[idGeneratorConfig.getMaxDataBufferTimeInSeconds()]);
134+
val timeKey = getTimeKey(currentTimestamp.getMillis() / 1000);
135+
val partitionTracker = getPartitionTracker(prefixIdMap, currentTimestamp);
149136
val idCounter = generateForAllPartitions(
150-
prefixIdMap.computeIfAbsent(timeKey, key -> new PartitionIdTracker(idGeneratorConfig.getPartitionCount(),
151-
idGeneratorConfig.getIdPoolSize())),
137+
partitionTracker,
152138
prefix,
153139
currentTimestamp,
154140
targetPartitionId);
@@ -171,23 +157,24 @@ private Optional<Integer> generateForAllPartitions(final PartitionIdTracker part
171157
final DateTime timestamp,
172158
final int targetPartitionId) {
173159
val idPool = partitionIdTracker.getPartition(targetPartitionId);
174-
int idIdx = idPool.getUsableIdIndex();
175160
int retryCount = 0;
176161
try {
177-
while (!idPool.isIdPresentAtIndex(idIdx) && retryCount < idGeneratorConfig.getRetryConfig().getIdGenerationRetryCount()) {
162+
while (retryCount < idGeneratorConfig.getRetryConfig().getIdGenerationRetryCount()) {
178163
val counterValue = partitionIdTracker.getIdCounter();
179164
val txnId = String.format("%s%s", prefix, idFormatter.format(timestamp, NODE_ID, counterValue));
180165
val mappedPartitionId = partitionResolver.apply(txnId);
181166
partitionIdTracker.addId(mappedPartitionId, counterValue);
182167
retryCount += 1;
168+
val idOptional = idPool.getNextId();
169+
if (idOptional.isPresent()) {
170+
return idOptional;
171+
}
183172
}
184-
if (idPool.isIdPresentAtIndex(idIdx)) {
185-
return Optional.of(idPool.getId(idIdx));
186-
} else {
187-
retryLimitBreachedMeter.mark();
188-
log.warn("Retry Limit reached - {} - {} - {}", retryCount, idIdx, targetPartitionId);
189-
return Optional.empty();
190-
}
173+
174+
// Retry Limit Breached
175+
retryLimitBreachedMeter.mark();
176+
log.debug("Retry Limit reached - {} - {}", retryCount, targetPartitionId);
177+
return Optional.empty();
191178
} catch (Exception e) {
192179
log.error("Error while generating IDs", e);
193180
return Optional.empty();
@@ -293,16 +280,23 @@ protected boolean validateId(final List<PartitionValidationConstraint> inConstra
293280
return null == failedLocalConstraint;
294281
}
295282

296-
private long getTimeKey(long timeInSeconds) {
297-
// log.warn("{}:{}", timeInSeconds, timeInSeconds % idGeneratorConfig.getMaxDataBufferTimeInSeconds());
298-
return timeInSeconds % idGeneratorConfig.getMaxDataBufferTimeInSeconds();
283+
private int getTimeKey(long timeInSeconds) {
284+
return (int) timeInSeconds % idGeneratorConfig.getMaxDataBufferTimeInSeconds();
299285
}
300286

301-
private synchronized void deleteExpiredKeys() {
302-
val timeThreshold = getTimeKey(clock.instant().getEpochSecond()) - Constants.DELETION_THRESHOLD_IN_SECONDS;
303-
for (val entry : idStore.entrySet()) {
304-
entry.getValue().entrySet().removeIf(partitionIdTrackerEntry -> partitionIdTrackerEntry.getKey() < timeThreshold);
287+
private synchronized PartitionIdTracker getPartitionTracker(PartitionIdTracker[] partitionTrackerList, final DateTime timestamp) {
288+
val timeKey = getTimeKey(timestamp.getMillis() / 1000);
289+
if (timeKey >= partitionTrackerList.length) {
290+
throw new IndexOutOfBoundsException("Key should be less than " + partitionTrackerList.length);
291+
}
292+
if (partitionTrackerList[timeKey] == null) {
293+
partitionTrackerList[timeKey] = new PartitionIdTracker(idGeneratorConfig.getPartitionCount(), idGeneratorConfig.getIdPoolSize(), timestamp);
294+
}
295+
val partitionTracker = partitionTrackerList[timeKey];
296+
if (partitionTracker.getTimestamp().getMillis() / 1000 != timestamp.getMillis() / 1000) {
297+
partitionTracker.reset(timestamp);
305298
}
299+
return partitionTracker;
306300
}
307301

308302
}

ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/weighted/IdPool.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import lombok.val;
44

5+
import java.util.Optional;
56
import java.util.concurrent.atomic.AtomicInteger;
67
import java.util.concurrent.atomic.AtomicIntegerArray;
78

@@ -23,26 +24,32 @@ public IdPool(int size) {
2324
idList = new AtomicIntegerArray(capacity);
2425
}
2526

26-
public boolean isIdPresentAtIndex(int index) {
27-
return lastUsableIdx.get() > index;
28-
}
29-
30-
public int getId(int index) {
27+
private int getId(int index) {
3128
val arrayIdx = index % capacity;
3229
return idList.get(arrayIdx);
3330
}
3431

35-
public void setId(int id) {
32+
public synchronized void setId(int id) {
3633
// Don't set new IDs if the idPool is already full of unused IDs.
37-
if (lastUsableIdx.get() >= nextUsableIdx.get() + capacity) {
34+
if (lastUsableIdx.get() >= nextUsableIdx.get() + capacity - 1) {
3835
return;
3936
}
40-
val arrayIdx = lastUsableIdx.get() % capacity;
37+
val arrayIdx = lastUsableIdx.getAndIncrement() % capacity;
4138
idList.set(arrayIdx, id);
42-
lastUsableIdx.incrementAndGet();
4339
}
4440

45-
public int getUsableIdIndex() {
46-
return nextUsableIdx.getAndIncrement();
41+
public synchronized Optional<Integer> getNextId() {
42+
if (nextUsableIdx.get() < lastUsableIdx.get()) {
43+
val id = getId(nextUsableIdx.get());
44+
nextUsableIdx.getAndIncrement();
45+
return Optional.of(id);
46+
} else {
47+
return Optional.empty();
48+
}
49+
}
50+
51+
public void reset() {
52+
lastUsableIdx.set(0);
53+
nextUsableIdx.set(0);
4754
}
4855
}

ranger-discovery-bundle/src/main/java/io/appform/ranger/discovery/bundle/id/weighted/PartitionIdTracker.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
package io.appform.ranger.discovery.bundle.id.weighted;
22

3+
import lombok.Getter;
4+
import lombok.extern.slf4j.Slf4j;
5+
import lombok.val;
6+
import org.joda.time.DateTime;
7+
38
import java.util.concurrent.atomic.AtomicInteger;
49

510
import static io.appform.ranger.discovery.bundle.id.Constants.MAX_IDS_PER_SECOND;
611

12+
@Slf4j
713
public class PartitionIdTracker {
8-
// Array to store IdPools for each partition
14+
// Array to store IdPools for each partition
915
private final IdPool[] idPoolList;
1016

11-
// Counter to keep track of the number of IDs created
17+
// Counter to keep track of the number of IDs created
1218
private final AtomicInteger nextIdCounter = new AtomicInteger();
1319

14-
protected PartitionIdTracker(int partitionSize, int idPoolSize) {
20+
@Getter
21+
private DateTime timestamp;
22+
23+
protected PartitionIdTracker(int partitionSize, int idPoolSize, DateTime timestamp) {
24+
this.timestamp = timestamp;
1525
idPoolList = new IdPool[partitionSize];
1626
for (int i=0; i<partitionSize; i+= 1){
1727
idPoolList[i] = new IdPool(idPoolSize);
@@ -41,4 +51,15 @@ public int getIdCounter() throws Exception {
4151
throw new Exception("ID Generation Per Seconds Limit Reached.");
4252
}
4353
}
54+
55+
public synchronized void reset(DateTime timestamp) {
56+
if (this.timestamp.getMillis() / 1000 == timestamp.getMillis() / 1000) {
57+
return;
58+
}
59+
for (val idPool: idPoolList) {
60+
idPool.reset();
61+
}
62+
nextIdCounter.set(0);
63+
this.timestamp = timestamp;
64+
}
4465
}

ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/PartitionAwareIdGeneratorPerfTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ public void setUp() throws IOException {
3434
partitionAwareIdGenerator = new PartitionAwareIdGenerator(
3535
IdGeneratorConfig.builder()
3636
.partitionCount(1024)
37-
.retryConfig(IdGeneratorRetryConfig.builder().idGenerationRetryCount(1024).partitionRetryCount(1024).build())
37+
.idPoolSize(100)
38+
.retryConfig(IdGeneratorRetryConfig.builder().idGenerationRetryCount(4096).partitionRetryCount(4096).build())
3839
.build(),
3940
partitionResolverSupplier,
4041
mock(MetricRegistry.class)

ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/PartitionAwareIdGeneratorTest.java

+15-4
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import java.time.ZoneId;
1919
import java.util.ArrayList;
2020
import java.util.Collections;
21+
import java.util.HashMap;
2122
import java.util.HashSet;
2223
import java.util.List;
2324
import java.util.function.Function;
@@ -30,7 +31,6 @@
3031
/**
3132
* Test for {@link PartitionAwareIdGenerator}
3233
*/
33-
@Slf4j
3434
@SuppressWarnings({"unused", "FieldMayBeFinal"})
3535
class PartitionAwareIdGeneratorTest {
3636
final int numThreads = 5;
@@ -55,7 +55,6 @@ void setup() {
5555
idGeneratorConfig, partitionResolverSupplier, metricRegistry
5656
);
5757
}
58-
// ToDo: Add test for partition distribution spread.
5958

6059
@Test
6160
void testGenerateWithBenchmark() throws IOException {
@@ -70,6 +69,7 @@ void testGenerateWithBenchmark() throws IOException {
7069
this.getClass().getName() + ".testGenerateWithBenchmark");
7170
Assertions.assertEquals(numThreads * iterationCount, allIdsList.size());
7271
checkUniqueIds(allIdsList);
72+
checkDistribution(allIdsList);
7373
}
7474

7575
@Test
@@ -101,6 +101,19 @@ void checkUniqueIds(List<String> allIdsList) {
101101
Assertions.assertEquals(allIdsList.size(), uniqueIds.size());
102102
}
103103

104+
void checkDistribution(List<String> allIdsList) {
105+
val idCountMap = new HashMap<Integer, Integer>();
106+
for (val id: allIdsList) {
107+
val partitionId = partitionResolverSupplier.apply(id);
108+
idCountMap.put(partitionId, idCountMap.getOrDefault(partitionId, 0) + 1);
109+
}
110+
val expectedIdCount = (double) allIdsList.size() / idGeneratorConfig.getPartitionCount();
111+
for (int partitionId=0; partitionId < idGeneratorConfig.getPartitionCount(); partitionId++) {
112+
Assertions.assertTrue(expectedIdCount * 0.8 <= idCountMap.get(partitionId));
113+
Assertions.assertTrue(idCountMap.get(partitionId) <= expectedIdCount * 1.2);
114+
}
115+
}
116+
104117
@Test
105118
void testUniqueIds() {
106119
HashSet<String> allIDs = new HashSet<>();
@@ -109,8 +122,6 @@ void testUniqueIds() {
109122
val txnIdOptional = partitionAwareIdGenerator.generate("P");
110123
val txnId = txnIdOptional.map(Id::getId).orElse(null);
111124
if (allIDs.contains(txnId)) {
112-
log.warn(txnId);
113-
log.warn(String.valueOf(allIDs));
114125
allIdsUnique = false;
115126
} else {
116127
allIDs.add(txnId);

ranger-discovery-bundle/src/test/java/io/appform/ranger/discovery/bundle/id/WeightedIdGeneratorPerfTest.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ public void setUp() throws IOException {
5252
IdGeneratorConfig.builder()
5353
.partitionCount(1024)
5454
.weightedIdConfig(weightedIdConfig)
55-
.retryConfig(IdGeneratorRetryConfig.builder().idGenerationRetryCount(1024).partitionRetryCount(1024).build())
55+
.idPoolSize(100)
56+
.retryConfig(IdGeneratorRetryConfig.builder().idGenerationRetryCount(4096).partitionRetryCount(4096).build())
5657
.build(),
5758
partitionResolverSupplier,
5859
mock(MetricRegistry.class)

0 commit comments

Comments
 (0)