diff --git a/fdbclient/ServerKnobs.cpp b/fdbclient/ServerKnobs.cpp index 8e3ee343374..41aacfb7582 100644 --- a/fdbclient/ServerKnobs.cpp +++ b/fdbclient/ServerKnobs.cpp @@ -1118,6 +1118,7 @@ void ServerKnobs::initialize(Randomize randomize, ClientKnobs* clientKnobs, IsSi init( PHYSICAL_SHARD_MOVE_LOG_SEVERITY, 1 ); init( FETCH_SHARD_BUFFER_BYTE_LIMIT, 20e6 ); if( randomize && BUGGIFY ) FETCH_SHARD_BUFFER_BYTE_LIMIT = 1; init( FETCH_SHARD_UPDATES_BYTE_LIMIT, 2500000 ); if( randomize && BUGGIFY ) FETCH_SHARD_UPDATES_BYTE_LIMIT = 100; + init( SS_ENCODE_ID_IN_AVAILABLE_STATUS, false ); if ( isSimulated ) SS_ENCODE_ID_IN_AVAILABLE_STATUS = deterministicRandom()->coinflip(); //Wait Failure init( MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS, 250 ); if( randomize && BUGGIFY ) MAX_OUTSTANDING_WAIT_FAILURE_REQUESTS = 2; diff --git a/fdbclient/include/fdbclient/ServerKnobs.h b/fdbclient/include/fdbclient/ServerKnobs.h index b51ae4f407b..3e5367a947f 100644 --- a/fdbclient/include/fdbclient/ServerKnobs.h +++ b/fdbclient/include/fdbclient/ServerKnobs.h @@ -1157,6 +1157,7 @@ class SWIFT_CXX_IMMORTAL_SINGLETON_TYPE ServerKnobs : public KnobsImpl bulkLoadTaskState; // set if the data move is a bulk load data move + Optional> dcTeamIds; DataMoveMetaData() = default; DataMoveMetaData(UID id, Version version, KeyRange range) : id(id), version(version), priority(0), mode(0) { @@ -189,6 +190,11 @@ struct DataMoveMetaData { } DataMoveMetaData(UID id) : id(id), version(invalidVersion), priority(0), mode(0) {} + DataMoveMetaData(UID id, KeyRange range, std::unordered_map teamIds) { + ranges.push_back(range); + dcTeamIds = teamIds; + } + Phase getPhase() const { return static_cast(phase); } void setPhase(Phase phase) { this->phase = static_cast(phase); } diff --git a/fdbclient/include/fdbclient/StorageServerShard.h b/fdbclient/include/fdbclient/StorageServerShard.h index cbaf5bb766e..752ea18df49 100644 --- a/fdbclient/include/fdbclient/StorageServerShard.h +++ b/fdbclient/include/fdbclient/StorageServerShard.h @@ -95,7 +95,7 @@ struct StorageServerShard { template void serialize(Ar& ar) { - serializer(ar, range, version, id, desiredId, shardState, moveInShardId); + serializer(ar, range, version, id, desiredId, shardState, moveInShardId, dataMoveId, teamId); } KeyRange range; diff --git a/fdbserver/MoveKeys.actor.cpp b/fdbserver/MoveKeys.actor.cpp index 7bb3462f0db..bc168b73417 100644 --- a/fdbserver/MoveKeys.actor.cpp +++ b/fdbserver/MoveKeys.actor.cpp @@ -3404,6 +3404,11 @@ void seedShardServers(Arena& arena, CommitTransactionRef& tr, std::vector(), shardId, UID()); krmSetPreviouslyEmptyRange(tr, arena, keyServersPrefix, KeyRangeRef(KeyRef(), allKeys.end), ksValue, Value()); + // Set DataMoveMetaData for seed data move. Team id will be read by seed servers. + // TODO: clean up data move metadata for initial moves. + DataMoveMetaData metadata(shardId, allKeys); + tr.set(dataMoveKeyFor(shardId), dataMoveValue(metadata)); + for (auto& s : servers) { krmSetPreviouslyEmptyRange( tr, arena, serverKeysPrefixFor(s.id()), allKeys, serverKeysValue(shardId), serverKeysFalse); diff --git a/fdbserver/storageserver.actor.cpp b/fdbserver/storageserver.actor.cpp index 09905cf89fc..f6a026fb5ee 100644 --- a/fdbserver/storageserver.actor.cpp +++ b/fdbserver/storageserver.actor.cpp @@ -397,12 +397,13 @@ struct MoveInShard { struct AddingShard : NonCopyable { KeyRange keys; + UID dataMoveId; Future fetchClient; // holds FetchKeys() actor Promise fetchComplete; Promise readWrite; DataMovementReason reason; SSBulkLoadMetadata ssBulkLoadMetadata; - std::string teamId; + std::string teamId = invalidTeamId; // During the Fetching phase, it saves newer mutations whose version is greater or equal to fetchClient's // fetchVersion, while the shard is still busy catching up with fetchClient. It applies these updates after fetching @@ -432,6 +433,7 @@ struct AddingShard : NonCopyable { AddingShard(StorageServer* server, KeyRangeRef const& keys, + DataMoveId dataMoveId, DataMovementReason reason, const SSBulkLoadMetadata& ssBulkLoadMetadata); @@ -439,7 +441,15 @@ struct AddingShard : NonCopyable { AddingShard(AddingShard* prev, KeyRange const& keys) : keys(keys), fetchClient(prev->fetchClient), server(prev->server), transferredVersion(prev->transferredVersion), fetchVersion(prev->fetchVersion), phase(prev->phase), reason(prev->reason), - ssBulkLoadMetadata(prev->ssBulkLoadMetadata) {} + ssBulkLoadMetadata(prev->ssBulkLoadMetadata), teamId(prev->teamId) { + // No range split for bulk load case. + ASSERT(!ssBulkLoadMetadata.getConductBulkLoad()); + + // Previous adding shard should already have a team id. + if (prev->server->shardAware && SERVER_KNOBS->SHARD_ENCODE_LOCATION_METADATA) { + ASSERT(teamId != invalidTeamId); + } + } ~AddingShard() { if (!fetchComplete.isSet()) fetchComplete.send(Void()); @@ -475,6 +485,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { uint64_t changeCounter; uint64_t shardId; uint64_t desiredShardId; + UID dataMoveId; std::string teamId = invalidTeamId; Version version; @@ -483,9 +494,11 @@ class ShardInfo : public ReferenceCounted, NonCopyable { static ShardInfo* newReadWrite(KeyRange keys, StorageServer* data) { return new ShardInfo(keys, nullptr, data); } static ShardInfo* newAdding(StorageServer* data, KeyRange keys, + UID dataMoveId, DataMovementReason reason, const SSBulkLoadMetadata& ssBulkLoadMetadata) { - return new ShardInfo(keys, std::make_unique(data, keys, reason, ssBulkLoadMetadata), nullptr); + return new ShardInfo( + keys, std::make_unique(data, keys, dataMoveId, reason, ssBulkLoadMetadata), nullptr); } static ShardInfo* addingSplitLeft(KeyRange keys, AddingShard* oldShard) { return new ShardInfo(keys, std::make_unique(oldShard, keys), nullptr); @@ -493,6 +506,10 @@ class ShardInfo : public ReferenceCounted, NonCopyable { static ShardInfo* newShard(StorageServer* data, const StorageServerShard& shard); + static ShardInfo* newEmptyShard(KeyRange keys, StorageServer* data, Version version) { + return ShardInfo(keys, nullptr, data); + } + static bool canMerge(const ShardInfo* l, const ShardInfo* r) { if (l == nullptr || r == nullptr || l->keys.end != r->keys.begin || l->version == invalidVersion || r->version == invalidVersion) { @@ -541,16 +558,6 @@ class ShardInfo : public ReferenceCounted, NonCopyable { this->desiredShardId = shard.desiredId; } - // Returns true if the current shard is merged with `other`. - bool mergeWith(const ShardInfo* other) { - if (!canMerge(this, other)) { - return false; - } - this->keys = KeyRangeRef(this->keys.begin, other->range().end); - this->version = std::max(this->version, other->getVersion()); - return true; - } - void validate() const { // TODO: Complete this. } @@ -567,6 +574,7 @@ class ShardInfo : public ReferenceCounted, NonCopyable { std::shared_ptr getMoveInShard() const { return moveInShard; } Version getVersion() const { return version; } std::string getTeamId() const { return teamId; } + UID getDataMoveId() const { return dataMoveId; } void setChangeCounter(uint64_t shardChangeCounter) { changeCounter = shardChangeCounter; } void setShardId(uint64_t id) { shardId = id; } @@ -1024,6 +1032,11 @@ struct StorageServer : public IStorageMetricsService { pendingRemoveRanges; // Pending requests to remove ranges from physical shards std::deque, Standalone>> constructedData; + std::deque> + emptyRangeDataMoves; // Data move ids for data moves that assign an empty range to storage server. Team id needs + // to be fetched and updated for these ranges. Assigning empty ranges only happens on empty + // cluster or on data loss. + bool shardAware; // True if the storage server is aware of the physical shards. LocalityData locality; // Storage server's locality information. @@ -7609,8 +7622,8 @@ void removeDataRange(StorageServer* ss, data.erase(range.begin, range.end); } -void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available); -void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned); +void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available, const std::string& teamId); +void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned, UID dataMoveId); void updateStorageShard(StorageServer* self, StorageServerShard shard); void setRangeBasedBulkLoadStatus(StorageServer* self, KeyRangeRef keys, const SSBulkLoadMetadata& ssBulkLoadMetadata); @@ -8881,6 +8894,31 @@ ACTOR static Future processSampleFiles(StorageServer* data, return Void(); } +ACTOR Future getTeamId(StorageServer* self, ShardInfo* shard) { + state Transaction tr(self->cx); + tr.setOption(FDBTransactionOptions::PRIORITY_SYSTEM_IMMEDIATE); + tr.setOption(FDBTransactionOptions::LOCK_AWARE); + tr.setVersion(shard->getVersion()); + loop { + try { + Optional val = wait(tr.get(dataMoveKeyFor(shard->getDataMoveId()))); + if (!val.present()) { + TraceEvent(SevWarnAlways, "DataMoveMetadataNotAvailable").detail("DataMoveId", shard->getDataMoveId()); + wait(delay(0.01)); + continue; + } + DataMoveMetadata metadata = decodeDataMoveValue(val.get()); + if (!metadata.dcTeamIds.present()) { + TraceEvent("TeamIdNotSet").detail("DataMoveId", shard->getDataMoveId()).detail("Range", shard->range()); + continue; + } + auto teamId = dataMove.dcTeamIds.get()[self->locality.describeDcId()]; + } catch (Error& e) { + wait(tr.onError(e)); + } + } +} + ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { state const UID fetchKeysID = deterministicRandom()->randomUniqueID(); state TraceInterval interval("FetchKeys"); @@ -9319,8 +9357,11 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { shard->server->addShard(ShardInfo::newShard(data, rightShard)); } else { shard->server->addShard(ShardInfo::addingSplitLeft(KeyRangeRef(keys.begin, blockBegin), shard)); - shard->server->addShard(ShardInfo::newAdding( - data, KeyRangeRef(blockBegin, keys.end), shard->reason, shard->getSSBulkLoadMetadata())); + shard->server->addShard(ShardInfo::newAdding(data, + KeyRangeRef(blockBegin, keys.end), + anonymousShardId, + shard->reason, + shard->getSSBulkLoadMetadata())); if (conductBulkLoad) { TraceEvent(bulkLoadVerboseEventSev(), "SSBulkLoadTaskFetchKey", data->thisServerID) .detail("FKID", interval.pairID) @@ -9538,9 +9579,11 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { newShard.setShardState(StorageServerShard::ReadWrite); updateStorageShard(data, newShard); } - setAvailableStatus(data, - keys, - true); // keys will be available when getLatestVersion()==transferredVersion is durable + setAvailableStatus( + data, + keys, + true, + shard->teamId); // keys will be available when getLatestVersion()==transferredVersion is durable // Note that since it receives a pointer to FetchInjectionInfo, the thread does not leave this actor until // this point. @@ -9589,7 +9632,7 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { ASSERT(data->data().getLatestVersion() > data->version.get()); removeDataRange( data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, keys); - setAvailableStatus(data, keys, false); + setAvailableStatus(data, keys, false, invalidTeamId); // Prevent another, overlapping fetchKeys from entering the Fetching phase until // data->data().getLatestVersion() is durable data->newestDirtyVersion.insert(keys, data->data().getLatestVersion()); @@ -9619,10 +9662,12 @@ ACTOR Future fetchKeys(StorageServer* data, AddingShard* shard) { AddingShard::AddingShard(StorageServer* server, KeyRangeRef const& keys, + UID dataMoveId, DataMovementReason reason, const SSBulkLoadMetadata& ssBulkLoadMetadata) - : keys(keys), server(server), transferredVersion(invalidVersion), fetchVersion(invalidVersion), phase(WaitPrevious), - reason(reason), ssBulkLoadMetadata(ssBulkLoadMetadata) { + : keys(keys), dataMoveId(dataMoveId), server(server), transferredVersion(invalidVersion), + fetchVersion(invalidVersion), phase(WaitPrevious), reason(reason), ssBulkLoadMetadata(ssBulkLoadMetadata) { + ASSERT(!server->shardAware || (dataMoveId.isValid() && dataMoveId != anonymousShardId)); fetchClient = fetchKeys(server, this); } @@ -10083,7 +10128,7 @@ ACTOR Future fetchShardApplyUpdates(StorageServer* data, ASSERT(newShard.getShardState() == StorageServerShard::ReadWritePending); newShard.setShardState(StorageServerShard::ReadWrite); updateStorageShard(data, newShard); - setAvailableStatus(data, range, true); + setAvailableStatus(data, range, true, invalidTeamId); } // Wait for the transferredVersion (and therefore the shard data) to be committed and durable. @@ -10522,7 +10567,7 @@ ShardInfo* ShardInfo::newShard(StorageServer* data, const StorageServerShard& sh // moves. For case 1, the bulkload is available only if the encode_shard_location_metadata is on. Therefore, the // old data moves is never for bulkload. For case 2, fallback happens only if fetchCheckpoint fails which is not // a case for bulkload which does not do fetchCheckpoint. - res = newAdding(data, shard.range, DataMovementReason::INVALID, SSBulkLoadMetadata()); + res = newAdding(data, shard.range, shard., DataMovementReason::INVALID, SSBulkLoadMetadata()); break; case StorageServerShard::ReadWritePending: TraceEvent(SevWarnAlways, "CancellingAlmostReadyMoveInShard").detail("StorageServerShard", shard.toString()); @@ -10614,6 +10659,7 @@ ACTOR Future restoreShards(StorageServer* data, continue; } + // Shards are assigned. auto ranges = data->shards.getAffectedRangesAfterInsertion(shard.range, Reference()); for (int i = 0; i < ranges.size(); i++) { KeyRangeRef& range = static_cast(ranges[i]); @@ -10623,6 +10669,7 @@ ACTOR Future restoreShards(StorageServer* data, if (range == shard.range) { data->addShard(ShardInfo::newShard(data, shard)); } else { + ASSERT(shard.getShardState() != StorageServerShard::Adding); StorageServerShard rightShard = ranges[i].value->toStorageServerShard(); rightShard.range = range; data->addShard(ShardInfo::newShard(data, rightShard)); @@ -10648,6 +10695,7 @@ ACTOR Future restoreShards(StorageServer* data, wait(yield()); } + // Shard map and available status should be consistent. state int availableLoc; for (availableLoc = 0; availableLoc < availableShards.size(); availableLoc++) { KeyRangeRef shardRange( @@ -10675,6 +10723,7 @@ ACTOR Future restoreShards(StorageServer* data, wait(yield()); } + // Shard map and assigned status should be consistent. state int assignedLoc; for (assignedLoc = 0; assignedLoc < assignedShards.size(); ++assignedLoc) { KeyRangeRef shardRange(assignedShards[assignedLoc].key.removePrefix(persistShardAssignedKeys.begin), @@ -10887,7 +10936,8 @@ void changeServerKeys(StorageServer* data, .detail("End", range.end); changeNewestAvailable.emplace_back(range, latestVersion); data->addShard(ShardInfo::newReadWrite(range, data)); - setAvailableStatus(data, range, true); + // TODO: Read data move metadata to find out team id. + setAvailableStatus(data, range, true, invalidTeamId); } else { auto& shard = data->shards[range.begin]; if (!shard->assigned() || shard->range() != range) @@ -10914,7 +10964,7 @@ void changeServerKeys(StorageServer* data, ranges.clear(); for (auto r = removeRanges.begin(); r != removeRanges.end(); ++r) { removeDataRange(data, data->addVersionToMutationLog(data->data().getLatestVersion()), data->shards, *r); - setAvailableStatus(data, *r, false); + setAvailableStatus(data, *r, false, invalidTeamId); } // Clear the moving-in empty range, and set it available at the latestVersion. @@ -10927,7 +10977,8 @@ void changeServerKeys(StorageServer* data, range, data->updateEagerReads); data->newestAvailableVersion.insert(range, latestVersion); - setAvailableStatus(data, range, true); + // TODO: read team id from data move metadata. + setAvailableStatus(data, range, true, invalidTeamId); ++data->counters.kvSystemClearRanges; } validate(data); @@ -11198,7 +11249,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, changeNewestAvailable.emplace_back(range, latestVersion); updatedShards.push_back( StorageServerShard(range, version, desiredId, desiredId, StorageServerShard::ReadWrite)); - setAvailableStatus(data, range, true); + // TODO: Read team id from data move metadata. + setAvailableStatus(data, range, true, invalidTeamId); // Note: The initial range is available, however, the shard won't be created in the storage engine // until version is committed. data->pendingAddRanges[cVer].emplace_back(desiredId, range); @@ -11309,7 +11361,8 @@ void changeServerKeysWithPhysicalShards(StorageServer* data, range, data->updateEagerReads); data->newestAvailableVersion.insert(range, latestVersion); - setAvailableStatus(data, range, true); + // TODO: read team id from data move metadata. + setAvailableStatus(data, range, true, invalidTeamId); ++data->counters.kvSystemClearRanges; } validate(data); @@ -11481,13 +11534,14 @@ class StorageUpdater { .detail("DataMoveId", dataMoveId.toString()) .detail("ConductBulkLoad", conductBulkLoad) .detail("Context", changeServerKeysContextName(context)); + + // Add changes in shard assignment to the mutation log. DataMoveId is anonymousShardId when location + // metadata is disabled. + setAssignedStatus(data, keys, nowAssigned, dataMoveId); if (data->shardAware) { - setAssignedStatus(data, keys, nowAssigned); changeServerKeysWithPhysicalShards( data, keys, dataMoveId, nowAssigned, currentVersion - 1, context, enablePSM, conductBulkLoad); } else { - // add changes in shard assignment to the mutation log - setAssignedStatus(data, keys, nowAssigned); SSBulkLoadMetadata bulkLoadMetadata(dataMoveId, conductBulkLoad); setRangeBasedBulkLoadStatus(data, keys, bulkLoadMetadata); @@ -13357,7 +13411,7 @@ void StorageServerDisk::makeNewStorageServerDurable(const bool shardAware) { } } -void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) { +void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available, const std::string& teamId) { // ASSERT( self->debug_inApplyUpdate ); ASSERT(!keys.empty()); @@ -13370,8 +13424,14 @@ void setAvailableStatus(StorageServer* self, KeyRangeRef keys, bool available) { self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, availableKeys.begin, availableKeys.end)); ++self->counters.kvSystemClearRanges; - self->addMutationToMutationLog( - mLV, MutationRef(MutationRef::SetValue, availableKeys.begin, available ? "1"_sr : "0"_sr)); + StringRef availableValue; + if (SERVER_KNOBS->SS_ENCODE_ID_IN_AVAILABLE_STATUS) { + ASSERT(!available || !teamId.empty()); + availableValue = available ? StringRef(teamId) : "0"_sr; + } else { + availableValue = available ? "1"_sr : "0"_sr; + } + self->addMutationToMutationLog(mLV, MutationRef(MutationRef::SetValue, availableKeys.begin, availableValue)); if (keys.end != allKeys.end) { bool endAvailable = self->shards.rangeContaining(keys.end)->value()->isReadWritePending(); self->addMutationToMutationLog( @@ -13415,7 +13475,7 @@ void updateStorageShard(StorageServer* data, StorageServerShard shard) { } } -void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) { +void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned, UID dataMoveId) { ASSERT(!keys.empty()); Version logV = self->data().getLatestVersion(); auto& mLV = self->addVersionToMutationLog(logV); @@ -13424,6 +13484,13 @@ void setAssignedStatus(StorageServer* self, KeyRangeRef keys, bool nowAssigned) //TraceEvent("SetAssignedStatus", self->thisServerID).detail("Version", mLV.version).detail("RangeBegin", assignedKeys.begin).detail("RangeEnd", assignedKeys.end); self->addMutationToMutationLog(mLV, MutationRef(MutationRef::ClearRange, assignedKeys.begin, assignedKeys.end)); ++self->counters.kvSystemClearRanges; + StringRef assignedValue; + if (SERVER_KNOBS->SS_ENCODE_ID_IN_AVAILABLE_STATUS) { + ASSERT(!nowAssigned || dataMoveId.isValid()); + assignedValue = nowAssigned ? dataMoveId.toString() : "0"_sr; + } else { + assignedValue = nowAssigned ? "1" : "0"; + } self->addMutationToMutationLog( mLV, MutationRef(MutationRef::SetValue, assignedKeys.begin, nowAssigned ? "1"_sr : "0"_sr)); if (keys.end != allKeys.end) { @@ -13824,6 +13891,8 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor wait(yield()); } + // Updates available shard in storage server. Available shards might not be assigned to the storage server. Stale + // data removal and FetchKeys initiation will handled in later steps. state RangeResult available = fShardAvailable.get(); data->bytesRestored += available.logicalSize(); state int availableLoc; @@ -13833,8 +13902,13 @@ ACTOR Future restoreDurableState(StorageServer* data, IKeyValueStore* stor ? allKeys.end : available[availableLoc + 1].key.removePrefix(persistShardAvailableKeys.begin)); ASSERT(!keys.empty()); - bool nowAvailable = available[availableLoc].value != "0"_sr; + std::string teamId = available ? "UnknownId" : invalidTeamId; + + if (available[availableLoc].value.size() > 1) { + teamId = available[availableLoc].value; + } + /*if(nowAvailable) TraceEvent("AvailableShard", data->thisServerID).detail("RangeBegin", keys.begin).detail("RangeEnd", keys.end);*/ data->newestAvailableVersion.insert(keys, nowAvailable ? latestVersion : invalidVersion);