Skip to content

Add Distributed ID Generator #59

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 823635.7718335792
"mean_ops" : 819668.8832917466
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
"iterations" : 4,
"threads" : 1,
"forks" : 3,
"mean_ops" : 655010.8023043653
"mean_ops" : 664604.8239805239
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"name" : "io.appform.ranger.discovery.bundle.id.PartitionAwareNonceGeneratorTest.testGenerateWithBenchmark",
"iterations" : 100000,
"threads" : 5,
"totalMillis" : 8204,
"avgTime" : 1640.8
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import lombok.val;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;

class CircularQueue {
private static final String QUEUE_FULL_METRIC_STRING = "idGenerator.queueFull.forPrefix.";
private static final String UNUSED_INDICES_METRIC_STRING = "idGenerator.unusedIds.forPrefix.";

private final Meter queueFullMeter;
private final Meter unusedDataMeter;

/** List of data for the specific queue */
private final AtomicIntegerArray queue;

/** Size of the queue */
private final int size;

/** Pointer to track the first usable index. Helps to determine the next usable data. */
private final AtomicInteger firstIdx = new AtomicInteger(0);

/** Pointer to track index of last usable index. Helps to determine which index the next data should go in. */
private final AtomicInteger lastIdx = new AtomicInteger(0);


public CircularQueue(int size, final MetricRegistry metricRegistry, final String namespace) {
this.size = size;
this.queue = new AtomicIntegerArray(size);
this.queueFullMeter = metricRegistry.meter(QUEUE_FULL_METRIC_STRING + namespace);
this.unusedDataMeter = metricRegistry.meter(UNUSED_INDICES_METRIC_STRING + namespace);
}

public synchronized void setId(int id) {
// Don't store new data if the queue is already full of unused data.
if (lastIdx.get() >= firstIdx.get() + size - 1) {
queueFullMeter.mark();
return;
}
val arrayIdx = lastIdx.get() % size;
queue.set(arrayIdx, id);
lastIdx.getAndIncrement();
}

private int getId(int index) {
val arrayIdx = index % size;
return queue.get(arrayIdx);
}

public synchronized Optional<Integer> getNextId() {
if (firstIdx.get() < lastIdx.get()) {
val id = getId(firstIdx.getAndIncrement());
return Optional.of(id);
} else {
return Optional.empty();
}
}

public void reset() {
val unusedIds = lastIdx.get() - firstIdx.get();
unusedDataMeter.mark(unusedIds);
lastIdx.set(0);
firstIdx.set(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@
public class Constants {
public static final int MAX_ID_PER_MS = 1000;
public static final int MAX_NUM_NODES = 10000;
public static final int MAX_IDS_PER_SECOND = 1_000_000;
public static final int DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS = 60;
public static final int DEFAULT_PARTITION_RETRY_COUNT = 100;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ public class GenerationResult {
NonceInfo nonceInfo;
IdValidationState state;
Domain domain;
String namespace;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class IdGenerator {
private static final IdGeneratorBase baseGenerator = new DefaultIdGenerator();

public static void initialize(int node) {
baseGenerator.setNodeId(node);
baseGenerator.setNODE_ID(node);
}

public static synchronized void cleanUp() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.MetricRegistry;

import java.util.Optional;

public class IdPool {
/** List of IDs for the specific IdPool */
private final CircularQueue queue;

public IdPool(int size, final MetricRegistry metricRegistry, final String namespace) {
this.queue = new CircularQueue(size, metricRegistry, namespace);
}

public void setId(int id) {
queue.setId(id);
}

public Optional<Integer> getNextId() {
return queue.getNextId();
}

public void reset() {
queue.reset();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.appform.ranger.discovery.bundle.id;

import io.appform.ranger.discovery.bundle.id.formatter.IdFormatter;
import io.appform.ranger.discovery.bundle.id.generator.IdGeneratorBase;
import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import lombok.val;
import org.joda.time.DateTime;

@Slf4j
@UtilityClass
public class IdUtils {

public Id getIdFromNonceInfo(final NonceInfo idInfo, final String namespace, final IdFormatter idFormatter) {
val dateTime = new DateTime(idInfo.getTime());
val id = String.format("%s%s", namespace, idFormatter.format(dateTime, IdGeneratorBase.getNODE_ID(), idInfo.getExponent()));
return Id.builder()
.id(id)
.exponent(idInfo.getExponent())
.generatedDate(dateTime.toDate())
.node(IdGeneratorBase.getNODE_ID())
.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package io.appform.ranger.discovery.bundle.id;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Preconditions;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import lombok.val;

import java.time.Instant;
import java.util.concurrent.atomic.AtomicInteger;

import static io.appform.ranger.discovery.bundle.id.Constants.MAX_IDS_PER_SECOND;

/**
* Tracks generated IDs and pointers for generating next IDs for a partition.
*/
@Slf4j
public class PartitionIdTracker {
/** Array to store IdPools for each partition */
private final IdPool[] idPoolList;

/** Counter to keep track of the number of IDs created */
private final AtomicInteger nextIdCounter = new AtomicInteger();

@Getter
private Instant instant;

private final Meter generatedIdCountMeter;

public PartitionIdTracker(final int partitionSize,
final int idPoolSize,
final Instant instant,
final MetricRegistry metricRegistry,
final String namespace) {
this.instant = instant;
idPoolList = new IdPool[partitionSize];
for (int i=0; i<partitionSize; i+= 1){
idPoolList[i] = new IdPool(idPoolSize, metricRegistry, namespace);
}
this.generatedIdCountMeter = metricRegistry.meter("generatedIdCount.forPrefix." + namespace);
}

/** Method to get the IdPool for a specific partition */
public IdPool getPartition(final int partitionId) {
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length);
return idPoolList[partitionId];
}

public void addId(final int partitionId, final NonceInfo idInfo) {
Preconditions.checkArgument(partitionId < idPoolList.length, "Invalid partitionId " + partitionId + " for IdPool of size " + idPoolList.length);
if (instant.getEpochSecond() == idInfo.getTime() / 1000) {
idPoolList[partitionId].setId(idInfo.getExponent());
}
}

public NonceInfo getNonceInfo() {
Preconditions.checkArgument(nextIdCounter.get() < MAX_IDS_PER_SECOND, "ID Generation Per Second Limit Reached.");
val timeInSeconds = instant.getEpochSecond();
return new NonceInfo(nextIdCounter.getAndIncrement(), timeInSeconds * 1000L);
}

public synchronized void reset(final Instant instant) {
// Do not reset if the time (in seconds) hasn't changed
if (this.instant.getEpochSecond() == instant.getEpochSecond()) {
return;
}
// Reset all ID Pools because they contain IDs for an expired timestamp
for (val idPool: idPoolList) {
idPool.reset();
}
generatedIdCountMeter.mark(nextIdCounter.get());
nextIdCounter.set(0);
this.instant = instant;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.appform.ranger.discovery.bundle.id.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class DefaultNamespaceConfig {
/** Size of pre-generated id buffer. Each partition will have separate IdPool, each of size equal to this value. */
@NotNull
@Min(2)
private Integer idPoolSizePerPartition;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.appform.ranger.discovery.bundle.id.config;

import com.fasterxml.jackson.annotation.JsonIgnore;
import io.appform.ranger.discovery.bundle.id.Constants;
import io.dropwizard.validation.ValidationMethod;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.Valid;
import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;


@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class IdGeneratorConfig {

@NotNull
@Valid
private DefaultNamespaceConfig defaultNamespaceConfig;

@Builder.Default
@Valid
private Set<NamespaceConfig> namespaceConfig = Collections.emptySet();

@NotNull
@Min(1)
private int partitionCount;

/** Buffer time in seconds to pre-generate IDs for.
* This allows us to generate unique IDs even if the clock gets skewed. */
@Min(1)
@Max(300)
@Builder.Default
private int dataStorageLimitInSeconds = Constants.DEFAULT_DATA_STORAGE_TIME_LIMIT_IN_SECONDS;

@ValidationMethod(message = "Namespaces should be unique")
@JsonIgnore
public boolean areNamespacesUnique() {
Set<String> namespaces = namespaceConfig.stream()
.map(NamespaceConfig::getNamespace)
.collect(Collectors.toSet());
return namespaceConfig.size() == namespaces.size();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.appform.ranger.discovery.bundle.id.config;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class NamespaceConfig {
@NotNull
private String namespace;

/** Size of pre-generated id buffer. Value from DefaultNamespaceConfig will be used if this is null */
@Min(2)
private Integer idPoolSizePerBucket;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,11 @@ public Base36IdFormatter(IdFormatter idFormatter) {
this.idFormatter = idFormatter;
}

@Override
public IdParserType getType() {
throw new UnsupportedOperationException();
}

@Override
public String format(final DateTime dateTime,
final int nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ public class DefaultIdFormatter implements IdFormatter {
private static final Pattern PATTERN = Pattern.compile("(.*)([0-9]{15})([0-9]{4})([0-9]{3})");
private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormat.forPattern("yyMMddHHmmssSSS");

@Override
public IdParserType getType() {
return IdParserType.DEFAULT;
}

@Override
public String format(final DateTime dateTime,
final int nodeId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

public interface IdFormatter {

IdParserType getType();

String format(final DateTime dateTime,
final int nodeId,
final int randomNonce);
Expand Down
Loading