Skip to content

Extensions to log server disk queue (made in the context of version vector/unicast) #11777

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: version-vector-disk-queue
Choose a base branch
from
167 changes: 154 additions & 13 deletions fdbserver/TLogServer.actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ struct TLogQueueEntryRef {
UID id;
Version version;
Version knownCommittedVersion;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;
StringRef messages;
TLogQueueEntryRef() : version(0), knownCommittedVersion(0) {}
TLogQueueEntryRef() : version(0), knownCommittedVersion(0), prevVersion(0) {}
TLogQueueEntryRef(Arena& a, TLogQueueEntryRef const& from)
: id(from.id), version(from.version), knownCommittedVersion(from.knownCommittedVersion),
messages(a, from.messages) {}
messages(a, from.messages), prevVersion(from.prevVersion), tLogLocIds(from.tLogLocIds) {}

// To change this serialization, ProtocolVersion::TLogQueueEntryRef must be updated, and downgrades need to be
// considered
template <class Ar>
void serialize(Ar& ar) {
serializer(ar, version, messages, knownCommittedVersion, id);
serializer(ar, version, messages, knownCommittedVersion, prevVersion, tLogLocIds, id);
}
size_t expectedSize() const { return messages.expectedSize(); }
};
Expand All @@ -73,8 +75,11 @@ struct AlternativeTLogQueueEntryRef {
Version version;
Version knownCommittedVersion;
std::vector<TagsAndMessage>* alternativeMessages;
Version prevVersion;
std::vector<uint16_t> tLogLocIds;

AlternativeTLogQueueEntryRef() : version(0), knownCommittedVersion(0), alternativeMessages(nullptr) {}
AlternativeTLogQueueEntryRef()
: version(0), knownCommittedVersion(0), alternativeMessages(nullptr), prevVersion(0) {}

template <class Ar>
void serialize(Ar& ar) {
Expand All @@ -84,7 +89,7 @@ struct AlternativeTLogQueueEntryRef {
for (auto& msg : *alternativeMessages) {
ar.serializeBytes(msg.message);
}
serializer(ar, knownCommittedVersion, id);
serializer(ar, knownCommittedVersion, prevVersion, tLogLocIds, id);
}

uint32_t expectedSize() const {
Expand Down Expand Up @@ -139,6 +144,7 @@ struct TLogQueue final : public IClosable {
queue->close();
delete this;
}
IDiskQueue::location getNextReadLocation() { return queue->getNextReadLocation(); }

private:
IDiskQueue* queue;
Expand Down Expand Up @@ -217,6 +223,8 @@ static const KeyRangeRef persistTxsTagsKeys = KeyRangeRef("TxsTags/"_sr, "TxsTag
static const KeyRange persistTagMessagesKeys = prefixRange("TagMsg/"_sr);
static const KeyRange persistTagMessageRefsKeys = prefixRange("TagMsgRef/"_sr);
static const KeyRange persistTagPoppedKeys = prefixRange("TagPop/"_sr);
static const KeyRef persistUnicastRecoveryLocationKey = KeyRef("UnicastRecoveryLocation"_sr);
static const KeyRef persistSpillTargetLogDataIdKey = KeyRef("SpillTargetLogDataId"_sr);

static const KeyRef persistEncryptionAtRestModeKey = "encryptionAtRestMode"_sr;

Expand Down Expand Up @@ -731,6 +739,10 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
tLogData->persistentData->clear(KeyRangeRef(msgRefKey, strinc(msgRefKey)));
Key poppedKey = logIdKey.withPrefix(persistTagPoppedKeys.begin);
tLogData->persistentData->clear(KeyRangeRef(poppedKey, strinc(poppedKey)));
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistUnicastRecoveryLocationKey)));
tLogData->persistentData->clear(singleKeyRange(logIdKey.withPrefix(persistSpillTargetLogDataIdKey)));
}
}

for (auto it = peekTracker.begin(); it != peekTracker.end(); ++it) {
Expand Down Expand Up @@ -778,6 +790,12 @@ struct LogData : NonCopyable, public ReferenceCounted<LogData> {
stoppedPromise.send(Void());
}
}

void purgeUnknownCommittedVersions(Version upToVersion) {
while (!unknownCommittedVersions.empty() && unknownCommittedVersions.back().version <= upToVersion) {
unknownCommittedVersions.pop_back();
}
}
};

template <class T>
Expand Down Expand Up @@ -965,10 +983,24 @@ ACTOR Future<Void> popDiskQueue(TLogData* self, Reference<LogData> logData) {
IDiskQueue::location minLocation = 0;
Version minVersion = 0;
auto locationIter = logData->versionLocation.lower_bound(logData->persistentDataVersion);
// If version vector is enabled then we need to preserve all versions from "knownCommittedVersion"
// onwards (for recovery purpose). Adjust the iterator position accordingly.
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST &&
logData->knownCommittedVersion < logData->persistentDataVersion) {
Copy link
Contributor

@dlambrig dlambrig Apr 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I his this assert: ASSERT_WE_THINK(minVersion <= logData->knownCommittedVersion);.

There is a corner case when we call popDiskQueue right after having recovered.

In that case lower_bound(logData->persistentDataVersion) is the first version in the current epoch >= V-1. So minVersion is set to V. Because V > KCV this triggers the assert.

I think we want to change the operator < to <= in the conditional to logData->knownCommittedVersion <= logData->persistentDataVersion. That will select the highest version <= V in the current epoch at line 990. In the case I hit, there is no such version in the current epoch. So locationIter == logData->versionLocation.end(), and minVersion will stay at 0, and the assert is satisfied. This causes no pops to happen at this time which seems to be the right behavior (there is nothing to pop in that epoch yet).

locationIter = logData->versionLocation.lastLessOrEqual(logData->knownCommittedVersion);
}
if (locationIter != logData->versionLocation.end()) {
minLocation = locationIter->value.first;
minVersion = locationIter->key;
}
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT_WE_THINK(minVersion <= logData->knownCommittedVersion);
/*
for (auto it : self->id_data) {
ASSERT_WE_THINK(minVersion <= it.second->knownCommittedVersion);
}
*/
}
logData->minPoppedTagVersion = std::numeric_limits<Version>::max();

for (int tagLocality = 0; tagLocality < logData->tag_data.size(); tagLocality++) {
Expand Down Expand Up @@ -1108,6 +1140,56 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
KeyValueRef(persistRecoveryLocationKey, BinaryWriter::toValue(locationIter->value.first, Unversioned())));
}

state Version unicastRelevantMinimumVersion = std::numeric_limits<Version>::max();
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
// Track the position of the known committed version of "logData" - that is
// the position we will need to start reading the diskQueue from, in order to
// build "LogData::unknownCommittedVersions", in case of a restart.
state Reference<LogData> unicastRelevantLogData = logData;

// Track the position of the known committed version of the relevant log server
// from the previous epoch's log system too - if the system is currently
// going through recovery and the current log system is not yet committed
// to the coordinated state then the previous epoch's log system will become
// the current log system in case the current recovery fails, so we will
// need to consider log servers from the previous epoch too.
// auto const& dbInfo = self->dbInfo->get();
for (auto& it : self->id_data) {
// @note there is a scenario where "dbInfo.priorCommittedLogServers"
// becomes empty even though recovery is in progress - work around this
// case by checking the known committed versions of all LogData structures.
// @todo find why "dbInfo.priorCommiittedLogServers" is becoming empty
// in that case, and then enable this code.
// if (std::find(dbInfo.priorCommittedLogServers.begin(), dbInfo.priorCommittedLogServers.end(),
// it.second->logId) != dbInfo.priorCommittedLogServers.end()) {
if (it.second->logId != unicastRelevantLogData->logId &&
(it.second->versionLocation.lastLessOrEqual(it.second->knownCommittedVersion) !=
it.second->versionLocation.end())) {

if (it.second->knownCommittedVersion <= unicastRelevantLogData->knownCommittedVersion) {
unicastRelevantLogData = it.second;
}
}
}

if (unicastRelevantLogData) {
auto kcvLocationIter =
unicastRelevantLogData->versionLocation.lastLessOrEqual(unicastRelevantLogData->knownCommittedVersion);
if (kcvLocationIter != unicastRelevantLogData->versionLocation.end()) {
self->persistentData->set(
KeyValueRef(persistUnicastRecoveryLocationKey,
BinaryWriter::toValue(kcvLocationIter->value.first, Unversioned())));
}

self->persistentData->set(
KeyValueRef(persistSpillTargetLogDataIdKey, BinaryWriter::toValue(logData->logId, Unversioned())));

unicastRelevantMinimumVersion =
unicastRelevantLogData
->knownCommittedVersion; // we don't want to purge versions from diskqueue from this version onwards
}
}

self->persistentData->set(
KeyValueRef(BinaryWriter::toValue(logData->logId, Unversioned()).withPrefix(persistCurrentVersionKeys.begin),
BinaryWriter::toValue(newPersistentDataVersion, Unversioned())));
Expand Down Expand Up @@ -1186,7 +1268,9 @@ ACTOR Future<Void> updatePersistentData(TLogData* self, Reference<LogData> logDa
}
if (minVersion != std::numeric_limits<Version>::max()) {
self->persistentQueue->forgetBefore(
newPersistentDataVersion,
(!SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In main branch, the first parameter to forgotBefore on line 1189 is minVersion, not newPersistentDataVersion. minVersion is calculated above. Should minVersion be used here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

? newPersistentDataVersion
: std::min(minVersion, unicastRelevantMinimumVersion)),
logData); // SOMEDAY: this can cause a slow task (~0.5ms), presumably from erasing too many versions.
// Should we limit the number of versions cleared at a time?
}
Expand Down Expand Up @@ -2430,6 +2514,8 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = req.messages;
qe.id = logData->logId;
qe.prevVersion = req.seqPrevVersion;
qe.tLogLocIds = req.tLogLocIds;
Copy link
Contributor

@dlambrig dlambrig May 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we write the new disk queue entries (prevVersion and tLogLocIds) even when version vector is disabled, which makes sense because the disk format should be the same with and without the feature enabled. We do not populate tLogLocIds in the call to push() when version vector is disabled, though. If the version vector knob is changed from disabled->enabled recovery will trigger, and during recovery we encounter a tLogLocIds that is empty for a version, will that version not be recovered when it should have been? Should we populate tLogLocIds the way we do with prevVersion.

self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand All @@ -2439,11 +2525,10 @@ ACTOR Future<Void> tLogCommit(TLogData* self,
// Notifies the commitQueue actor to commit persistentQueue, and also unblocks tLogPeekMessages actors
logData->version.set(req.version);
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
ASSERT(req.tLogCount == req.tLogLocIds.size());
logData->unknownCommittedVersions.emplace_front(req.version, req.seqPrevVersion, req.tLogLocIds);
while (!logData->unknownCommittedVersions.empty() &&
logData->unknownCommittedVersions.back().version <= req.knownCommittedVersion) {
logData->unknownCommittedVersions.pop_back();
}
// Purge versions from "unknownCommittedVersions" list till "req.knownCommittedVersion".
logData->purgeUnknownCommittedVersions(req.knownCommittedVersion);
} else {
ASSERT(req.prevVersion == req.seqPrevVersion); // @todo remove this assert later
}
Expand Down Expand Up @@ -3058,6 +3143,7 @@ ACTOR Future<Void> pullAsyncData(TLogData* self,
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down Expand Up @@ -3213,10 +3299,13 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
state Future<RangeResult> fRecoverCounts = storage->readRange(persistRecoveryCountKeys);
state Future<RangeResult> fProtocolVersions = storage->readRange(persistProtocolVersionKeys);
state Future<RangeResult> fTLogSpillTypes = storage->readRange(persistTLogSpillTypeKeys);
state Future<Optional<Value>> fUnicastRecoveryLocation = storage->readValue(persistUnicastRecoveryLocationKey);
state Future<Optional<Value>> fSpillTargetLogDataId = storage->readValue(persistSpillTargetLogDataIdKey);

// FIXME: metadata in queue?

wait(waitForAll(std::vector{ fFormat, fRecoveryLocation, fEncryptionAtRestMode }));
wait(waitForAll(std::vector{
fFormat, fRecoveryLocation, fEncryptionAtRestMode, fUnicastRecoveryLocation, fSpillTargetLogDataId }));
wait(waitForAll(std::vector{ fVers,
fKnownCommitted,
fLocality,
Expand Down Expand Up @@ -3290,10 +3379,31 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
BinaryReader::fromStringRef<Version>(it.value, Unversioned());
}

state IDiskQueue::location minimumRecoveryLocation = 0;
state IDiskQueue::location minimumRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery)
state IDiskQueue::location nonUnicastRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery) when version vector/unicast is disabled
if (fRecoveryLocation.get().present()) {
minimumRecoveryLocation =
nonUnicastRecoveryLocation =
BinaryReader::fromStringRef<IDiskQueue::location>(fRecoveryLocation.get().get(), Unversioned());

// Initialize "minimumRecoveryLocation".
minimumRecoveryLocation = nonUnicastRecoveryLocation;
}

state IDiskQueue::location unicastRecoveryLocation =
0; // the position to start reading the disk queue from (on recovery) when version vector/unicast is enabled
// @note versions in the position range (unicastRecoveryLocation, nonUnicastRecoveryLocation]
// will be read in order to build "LogData::unknownCommittedVersions" (needed by the unicast
// recovery algorithm) only.
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && fUnicastRecoveryLocation.get().present()) {
unicastRecoveryLocation =
BinaryReader::fromStringRef<IDiskQueue::location>(fUnicastRecoveryLocation.get().get(), Unversioned());

// Update "minimumRecoveryLocation".
if (unicastRecoveryLocation < minimumRecoveryLocation) {
minimumRecoveryLocation = unicastRecoveryLocation;
}
}

state int idx = 0;
Expand Down Expand Up @@ -3403,6 +3513,10 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
bool recoveryFinished = wait(self->persistentQueue->initializeRecovery(minimumRecoveryLocation));
if (recoveryFinished)
throw end_of_stream();
// Check if the version to be read is to be used to build "LogData::unknownCommittedVersions" only.
state bool buildUnknownCommittedOnly =
(SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST &&
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation));
loop {
if (allRemoved.isReady()) {
CODE_PROBE(true, "all tlogs removed during queue recovery");
Expand All @@ -3425,11 +3539,32 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
// logData->version.get());

if (logData) {
if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST && buildUnknownCommittedOnly) {
logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);
// Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion".
logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion);
if (buildUnknownCommittedOnly) {
buildUnknownCommittedOnly =
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation);
}
continue;
}

if (!self->spillOrder.size() || self->spillOrder.back() != qe.id) {
self->spillOrder.push_back(qe.id);
}

logData->knownCommittedVersion =
std::max(logData->knownCommittedVersion, qe.knownCommittedVersion);

if (SERVER_KNOBS->ENABLE_VERSION_VECTOR_TLOG_UNICAST) {
logData->unknownCommittedVersions.emplace_front(qe.version, qe.prevVersion, qe.tLogLocIds);
// Purge versions from "unknownCommittedVersions" list till the "knownCommittedVersion".
logData->purgeUnknownCommittedVersions(logData->knownCommittedVersion);
}

if (qe.version > logData->version.get()) {
commitMessages(self, logData, qe.version, qe.arena(), qe.messages);
logData->version.set(qe.version);
Expand Down Expand Up @@ -3458,6 +3593,11 @@ ACTOR Future<Void> restorePersistentState(TLogData* self,
ASSERT_WE_THINK(qe.version == logData->version.get());
}
}

if (buildUnknownCommittedOnly) {
buildUnknownCommittedOnly =
(self->persistentQueue->getNextReadLocation() < nonUnicastRecoveryLocation);
}
}
when(wait(allRemoved)) {
throw worker_removed();
Expand Down Expand Up @@ -3707,6 +3847,7 @@ ACTOR Future<Void> tLogStart(TLogData* self, InitializeTLogRequest req, Locality
qe.knownCommittedVersion = logData->knownCommittedVersion;
qe.messages = StringRef();
qe.id = logData->logId;
qe.prevVersion = 0;
self->persistentQueue->push(qe, logData);

self->diskQueueCommitBytes += qe.expectedSize();
Expand Down