Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class HashLookupRecorder implements UsedLookupsRecorder {

public HashLookupRecorder(final PlanBEnv env,
final HashLookupDb hashLookupDb) {
this.usedLookupsDb = new UsedLookupsDb(env, hashLookupDb.getName() + "_used");
this.usedLookupsDb = new UsedLookupsDb(env, hashLookupDb.getName() + "-HashLookupDb");
this.hashLookupDb = hashLookupDb;
}

Expand All @@ -23,7 +23,7 @@ public void recordUsed(final LmdbWriter writer, final ByteBuffer byteBuffer) {
@Override
public void deleteUnused(final Txn<ByteBuffer> readTxn, final LmdbWriter writer) {
hashLookupDb.forEachHash(readTxn, hash -> {
if (usedLookupsDb.isUnused(readTxn, hash)) {
if (usedLookupsDb.isUnused(writer.getWriteTxn(), hash)) {
hashLookupDb.deleteByHash(writer.getWriteTxn(), hash);
writer.tryCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ public class UidLookupDb {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(UidLookupDb.class);

private static final ByteBuffer MAX_ID_KEY = ByteBuffer.allocateDirect(1);

static {
MAX_ID_KEY.put((byte) 0);
MAX_ID_KEY.flip();
}

private final String name;
private final ByteBuffers byteBuffers;
private final Dbi<ByteBuffer> keyToUidDbi;
Expand All @@ -47,26 +54,19 @@ public String getName() {
private long readMaxId(final Txn<ByteBuffer> txn) {
long id = 0;
try {
id = byteBuffers.useByte((byte) 0, keyByteBuffer -> {
final ByteBuffer valueBuffer = infoDbi.get(txn, keyByteBuffer);
if (valueBuffer != null) {
return valueBuffer.getLong();
} else {
return 0L;
}
});

final ByteBuffer valueBuffer = infoDbi.get(txn, MAX_ID_KEY);
if (valueBuffer != null) {
id = valueBuffer.getLong();
}
} catch (final Exception e) {
LOGGER.debug(e::getMessage, e);
}
return id;
}

private void writeMaxId(final Txn<ByteBuffer> txn, final long id) {
byteBuffers.useByte((byte) 0, keyByteBuffer -> {
byteBuffers.useLong(id, valueByteBuffer -> {
infoDbi.put(txn, keyByteBuffer, valueByteBuffer);
});
byteBuffers.useLong(id, valueByteBuffer -> {
infoDbi.put(txn, MAX_ID_KEY, valueByteBuffer);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class UidLookupRecorder implements UsedLookupsRecorder {

public UidLookupRecorder(final PlanBEnv env,
final UidLookupDb uidLookupDb) {
this.usedLookupsDb = new UsedLookupsDb(env, uidLookupDb.getName() + "_used");
this.usedLookupsDb = new UsedLookupsDb(env, uidLookupDb.getName() + "-UidLookupDb");
this.uidLookupDb = uidLookupDb;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class UsedLookupsDb {

public UsedLookupsDb(final PlanBEnv env,
final String name) {
dbi = env.openDbi(name, DbiFlags.MDB_CREATE);
dbi = env.openDbi(name + "-used", DbiFlags.MDB_CREATE);
}

public void record(final LmdbWriter writer, final ByteBuffer key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void write(final Txn<ByteBuffer> txn, final KeyPrefix key, final Consumer
ValSerdeUtil.write(key.getVal(), byteBuffers, valueByteBuffer -> {
if (valueByteBuffer.remaining() > USE_HASH_LOOKUP_THRESHOLD) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
hashLookupDb.put(txn, slice, idByteBuffer -> {
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
hashLookupDb.put(txn, valueSlice, idByteBuffer -> {
byteBuffers.use(idByteBuffer.remaining() + 1, prefixedBuffer -> {
// Add the variable type prefix to the lookup id.
prefixedBuffer.put(VariableValType.HASH_LOOKUP.getPrimitiveValue());
Expand All @@ -82,8 +82,8 @@ public void write(final Txn<ByteBuffer> txn, final KeyPrefix key, final Consumer
});
} else if (valueByteBuffer.remaining() > uidLookupThreshold) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
uidLookupDb.put(txn, slice, idByteBuffer -> {
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
uidLookupDb.put(txn, valueSlice, idByteBuffer -> {
byteBuffers.use(idByteBuffer.remaining() + 1, prefixedBuffer -> {
// Add the variable type prefix to the lookup id.
prefixedBuffer.put(VariableValType.UID_LOOKUP.getPrimitiveValue());
Expand Down Expand Up @@ -111,8 +111,8 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
return ValSerdeUtil.write(key.getVal(), byteBuffers, valueByteBuffer -> {
if (valueByteBuffer.remaining() > USE_HASH_LOOKUP_THRESHOLD) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
return hashLookupDb.get(txn, slice, optionalIdByteBuffer ->
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
return hashLookupDb.get(txn, valueSlice, optionalIdByteBuffer ->
optionalIdByteBuffer
.map(idByteBuffer ->
byteBuffers.use(idByteBuffer.remaining() + 1,
Expand All @@ -126,8 +126,8 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
.orElse(null));
} else if (valueByteBuffer.remaining() > uidLookupThreshold) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
return uidLookupDb.get(txn, slice, optionalIdByteBuffer ->
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
return uidLookupDb.get(txn, valueSlice, optionalIdByteBuffer ->
optionalIdByteBuffer
.map(idByteBuffer ->
byteBuffers.use(idByteBuffer.remaining() + 1,
Expand All @@ -146,7 +146,7 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
}, prefix, suffix);
}

private ByteBuffer getName(final ByteBuffer byteBuffer) {
private ByteBuffer getValueSlice(final ByteBuffer byteBuffer) {
return byteBuffer.slice(1, byteBuffer.remaining() - 1);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

import java.time.Instant;
import java.util.Objects;

@JsonPropertyOrder({"prefix", "time"})
@JsonInclude(Include.NON_NULL)
Expand All @@ -34,6 +35,20 @@ public Instant getTime() {
return time;
}

@Override
public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}
final TemporalKey that = (TemporalKey) o;
return Objects.equals(prefix, that.prefix) && Objects.equals(time, that.time);
}

@Override
public int hashCode() {
return Objects.hash(prefix, time);
}

@Override
public String toString() {
return prefix.toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package stroom.planb.impl.serde.temporalkey;

import stroom.bytebuffer.ByteBufferUtils;
import stroom.bytebuffer.impl6.ByteBuffers;
import stroom.planb.impl.db.Db;
import stroom.planb.impl.db.HashLookupDb;
Expand All @@ -14,17 +15,22 @@
import stroom.planb.impl.serde.val.ValSerdeUtil.Addition;
import stroom.planb.impl.serde.val.VariableValType;
import stroom.query.language.functions.Val;
import stroom.util.logging.LambdaLogger;
import stroom.util.logging.LambdaLoggerFactory;

import org.lmdbjava.Txn;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.Arrays;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;

public class VariableKeySerde implements TemporalKeySerde {

private static final LambdaLogger LOGGER = LambdaLoggerFactory.getLogger(VariableKeySerde.class);

private static final int USE_HASH_LOOKUP_THRESHOLD = Db.MAX_KEY_LENGTH;

private final int uidLookupThreshold;
Expand All @@ -46,37 +52,47 @@ public VariableKeySerde(final UidLookupDb uidLookupDb,

@Override
public TemporalKey read(final Txn<ByteBuffer> txn, final ByteBuffer byteBuffer) {
// Slice off the end to get the effective time.
final ByteBuffer timeSlice = byteBuffer.slice(byteBuffer.remaining() - timeSerde.getSize(),
timeSerde.getSize());
final Instant time = timeSerde.read(timeSlice);

// Slice off the name.
final ByteBuffer nameSlice = getPrefix(byteBuffer);

// Read the variable type.
final VariableValType valType = VariableValType.PRIMITIVE_VALUE_CONVERTER.fromPrimitiveValue(nameSlice.get());
final Val val = switch (valType) {
case DIRECT -> {
// Read direct.
yield ValSerdeUtil.read(nameSlice);
}
case UID_LOOKUP -> {
// Read via UI lookup.
final ByteBuffer valueByteBuffer = uidLookupDb.getValue(txn, nameSlice);
yield ValSerdeUtil.read(valueByteBuffer);
}
case HASH_LOOKUP -> {
// Read via hash lookup.
final ByteBuffer valueByteBuffer = hashLookupDb.getValue(txn, nameSlice);
yield ValSerdeUtil.read(valueByteBuffer);
}
};

return new TemporalKey(KeyPrefix.create(val), time);
try {
// Make sure the buffer we are reading contains what we expect it to.
// TODO : We can remove this code when we have diagnosed the issue.
checkBuffer(byteBuffer.duplicate());

// Slice off the end to get the effective time.
final ByteBuffer timeSlice = byteBuffer.slice(byteBuffer.remaining() - timeSerde.getSize(),
timeSerde.getSize());
final Instant time = timeSerde.read(timeSlice);

// Slice off the name.
final ByteBuffer nameSlice = getPrefixSlice(byteBuffer);

// Read the variable type.
final VariableValType valType =
VariableValType.PRIMITIVE_VALUE_CONVERTER.fromPrimitiveValue(nameSlice.get());
final Val val = switch (valType) {
case DIRECT -> {
// Read direct.
yield ValSerdeUtil.read(nameSlice);
}
case UID_LOOKUP -> {
// Read via UI lookup.
final ByteBuffer valueByteBuffer = uidLookupDb.getValue(txn, nameSlice);
yield ValSerdeUtil.read(valueByteBuffer);
}
case HASH_LOOKUP -> {
// Read via hash lookup.
final ByteBuffer valueByteBuffer = hashLookupDb.getValue(txn, nameSlice);
yield ValSerdeUtil.read(valueByteBuffer);
}
};

return new TemporalKey(KeyPrefix.create(val), time);
} catch (final RuntimeException e) {
throw new RuntimeException("Unexpected " + e.getMessage() + " reading " +
Arrays.toString(ByteBufferUtils.getBytes(byteBuffer)), e);
}
}

private ByteBuffer getPrefix(final ByteBuffer byteBuffer) {
private ByteBuffer getPrefixSlice(final ByteBuffer byteBuffer) {
// Slice off the name.
return byteBuffer.slice(0, byteBuffer.remaining() - timeSerde.getSize());
}
Expand All @@ -89,40 +105,58 @@ public void write(final Txn<ByteBuffer> txn, final TemporalKey key, final Consum
ValSerdeUtil.write(key.getPrefix().getVal(), byteBuffers, valueByteBuffer -> {
if (valueByteBuffer.remaining() > USE_HASH_LOOKUP_THRESHOLD) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
hashLookupDb.put(txn, slice, idByteBuffer -> {
byteBuffers.use(idByteBuffer.remaining() + 1 + timeSerde.getSize(), prefixedBuffer -> {
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
hashLookupDb.put(txn, valueSlice, idByteBuffer -> {
byteBuffers.use(1 + idByteBuffer.remaining() + timeSerde.getSize(), prefixedBuffer -> {
// Add the variable type prefix to the lookup id.
prefixedBuffer.put(VariableValType.HASH_LOOKUP.getPrimitiveValue());
prefixedBuffer.put(idByteBuffer);
timeSerde.write(prefixedBuffer, key.getTime());
prefixedBuffer.flip();
consumer.accept(prefixedBuffer);
consumer.accept(checkBuffer(prefixedBuffer));
});
return null;
});
} else if (valueByteBuffer.remaining() > uidLookupThreshold) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
uidLookupDb.put(txn, slice, idByteBuffer -> {
byteBuffers.use(idByteBuffer.remaining() + 1 + timeSerde.getSize(), prefixedBuffer -> {
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
uidLookupDb.put(txn, valueSlice, idByteBuffer -> {
byteBuffers.use(1 + idByteBuffer.remaining() + timeSerde.getSize(), prefixedBuffer -> {
// Add the variable type prefix to the lookup id.
prefixedBuffer.put(VariableValType.UID_LOOKUP.getPrimitiveValue());
prefixedBuffer.put(idByteBuffer);
timeSerde.write(prefixedBuffer, key.getTime());
prefixedBuffer.flip();
consumer.accept(prefixedBuffer);
consumer.accept(checkBuffer(prefixedBuffer));
});
return null;
});
} else {
// We have already added the direct variable prefix so just use the byte buffer.
consumer.accept(valueByteBuffer);
consumer.accept(checkBuffer(valueByteBuffer));
}
return null;
}, prefix, suffix);
}

private ByteBuffer checkBuffer(final ByteBuffer bb) {
// TODO : We can remove this code when we have diagnosed the issue.
if (bb.remaining() > 0) {
if (bb.get(0) == VariableValType.UID_LOOKUP.getPrimitiveValue()) {
if (bb.remaining() - 1 - timeSerde.getSize() > 8) {
try {
throw new IllegalStateException("Unexpected lookup value: " +
Arrays.toString(ByteBufferUtils.toBytes(bb.duplicate())));
} catch (final RuntimeException e) {
LOGGER.error(e::getMessage, e);
throw e;
}
}
}
}
return bb;
}

@Override
public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
final TemporalKey key,
Expand All @@ -133,8 +167,8 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
return ValSerdeUtil.write(key.getPrefix().getVal(), byteBuffers, valueByteBuffer -> {
if (valueByteBuffer.remaining() > USE_HASH_LOOKUP_THRESHOLD) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
return hashLookupDb.get(txn, slice, optionalIdByteBuffer ->
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
return hashLookupDb.get(txn, valueSlice, optionalIdByteBuffer ->
optionalIdByteBuffer
.map(idByteBuffer ->
byteBuffers.use(idByteBuffer.remaining() + 1 + timeSerde.getSize(),
Expand All @@ -149,8 +183,8 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
.orElse(null));
} else if (valueByteBuffer.remaining() > uidLookupThreshold) {
// We are going to store as a lookup so take off the variable type prefix.
final ByteBuffer slice = getName(valueByteBuffer);
return uidLookupDb.get(txn, slice, optionalIdByteBuffer ->
final ByteBuffer valueSlice = getValueSlice(valueByteBuffer);
return uidLookupDb.get(txn, valueSlice, optionalIdByteBuffer ->
optionalIdByteBuffer
.map(idByteBuffer ->
byteBuffers.use(idByteBuffer.remaining() + 1 + timeSerde.getSize(),
Expand All @@ -170,7 +204,7 @@ public <R> R toBufferForGet(final Txn<ByteBuffer> txn,
}, prefix, suffix);
}

private ByteBuffer getName(final ByteBuffer byteBuffer) {
private ByteBuffer getValueSlice(final ByteBuffer byteBuffer) {
return byteBuffer.slice(1, byteBuffer.remaining() - 1 - timeSerde.getSize());
}

Expand All @@ -185,6 +219,6 @@ public boolean usesLookup(final ByteBuffer byteBuffer) {
public UsedLookupsRecorder getUsedLookupsRecorder(final PlanBEnv env) {
return new UsedLookupsRecorderProxy(
new VariableUsedLookupsRecorder(env, uidLookupDb, hashLookupDb),
this::getPrefix);
this::getPrefixSlice);
}
}
Loading