Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Velox fs for ssd cache evictlog file #11495

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 55 additions & 63 deletions velox/common/caching/SsdFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ DEFINE_bool(ssd_verify_write, false, "Read back data after writing to SSD");
namespace facebook::velox::cache {

namespace {

// TODO: Remove this function once we migrate all files to velox fs.
//
// Disable 'copy on write' on the given file. Will throw if failed for any
// reason, including file system not supporting cow feature.
void disableCow(int32_t fd) {
Expand All @@ -66,28 +69,6 @@ void disableCow(int32_t fd) {
#endif // linux
}

// TODO: Remove this function once we migrate all files to velox fs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have checkpoint file write though posix api? thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

void disableFileCow(int32_t fd) {
#ifdef linux
int attr{0};
auto res = ioctl(fd, FS_IOC_GETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_GETFLAGS) failed: {}, {}",
res,
folly::errnoStr(errno));
attr |= FS_NOCOW_FL;
res = ioctl(fd, FS_IOC_SETFLAGS, &attr);
VELOX_CHECK_EQ(
0,
res,
"ioctl(FS_IOC_SETFLAGS, FS_NOCOW_FL) failed: {}, {}",
res,
folly::errnoStr(errno));
#endif // linux
}

void addEntryToIovecs(AsyncDataCacheEntry& entry, std::vector<iovec>& iovecs) {
if (entry.tinyData() != nullptr) {
iovecs.push_back({entry.tinyData(), static_cast<size_t>(entry.size())});
Expand Down Expand Up @@ -354,7 +335,7 @@ bool SsdFile::growOrEvictLocked() {
}
}

const auto candidates =
auto candidates =
tracker_.findEvictionCandidates(3, numRegions_, regionPins_);
if (candidates.empty()) {
suspended_ = true;
Expand Down Expand Up @@ -676,44 +657,49 @@ bool SsdFile::removeFileEntries(
return true;
}

void SsdFile::logEviction(const std::vector<int32_t>& regions) {
if (checkpointEnabled()) {
const int32_t rc = ::write(
evictLogFd_, regions.data(), regions.size() * sizeof(regions[0]));
if (rc != regions.size() * sizeof(regions[0])) {
checkpointError(rc, "Failed to log eviction");
}
void SsdFile::logEviction(std::vector<int32_t>& regions) {
if (!checkpointEnabled()) {
return;
}
const auto length = regions.size() * sizeof(regions[0]);
const std::vector<iovec> iovecs = {{regions.data(), length}};
try {
evictLogWriteFile_->write(iovecs, 0, static_cast<int64_t>(length));
} catch (const std::exception& e) {
++stats_.writeSsdErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Failed to log eviction: " << e.what();
}
}

void SsdFile::deleteCheckpoint(bool keepLog) {
if (checkpointDeleted_) {
return;
}
if (evictLogFd_ >= 0) {
if (keepLog) {
::lseek(evictLogFd_, 0, SEEK_SET);
::ftruncate(evictLogFd_, 0);
::fsync(evictLogFd_);
} else {
::close(evictLogFd_);
evictLogFd_ = -1;

if (evictLogWriteFile_ != nullptr) {
try {
if (keepLog) {
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();
} else {
evictLogWriteFile_->close();
fs_->remove(getEvictLogFilePath());
evictLogWriteFile_.reset();
}
} catch (const std::exception& e) {
++stats_.deleteCheckpointErrors;
VELOX_SSD_CACHE_LOG(ERROR) << "Error in deleting evictLog: " << e.what();
}
}

checkpointDeleted_ = true;
const auto logPath = getEvictLogFilePath();
int32_t logRc = 0;
if (!keepLog) {
logRc = ::unlink(logPath.c_str());
}
const auto checkpointPath = getCheckpointFilePath();
const auto checkpointRc = ::unlink(checkpointPath.c_str());
if ((logRc != 0) || (checkpointRc != 0)) {
++stats_.deleteCheckpointErrors;
if (checkpointRc != 0) {
VELOX_SSD_CACHE_LOG(ERROR)
<< "Error in deleting log and checkpoint. log: " << logRc
<< " checkpoint: " << checkpointRc;
<< "Error in deleting checkpoint: " << checkpointRc;
}
if (checkpointRc != 0) {
++stats_.deleteCheckpointErrors;
}
}

Expand Down Expand Up @@ -851,8 +837,9 @@ void SsdFile::checkpoint(bool force) {
// NOTE: we shall truncate eviction log after checkpoint file sync
// completes so that we never recover from an old checkpoint file without
// log evictions. The latter might lead to data consistent issue.
checkRc(::ftruncate(evictLogFd_, 0), "Truncate of event log");
checkRc(::fsync(evictLogFd_), "Sync of evict log");
VELOX_CHECK_NOT_NULL(evictLogWriteFile_);
evictLogWriteFile_->truncate(0);
evictLogWriteFile_->flush();

VELOX_SSD_CACHE_LOG(INFO)
<< "Checkpoint persisted with " << entries_.size() << " cache entries";
Expand Down Expand Up @@ -883,18 +870,14 @@ void SsdFile::initializeCheckpoint() {
getCheckpointFilePath());
}
const auto logPath = getEvictLogFilePath();
evictLogFd_ = ::open(logPath.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
if (disableFileCow_) {
disableCow(evictLogFd_);
}
if (evictLogFd_ < 0) {
filesystems::FileOptions evictLogFileOptions;
evictLogFileOptions.shouldThrowOnFileAlreadyExists = false;
try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider to provide a macro in followup

#define FILE_CALL(fileFunction, errorCallback);

errorCallback expects a std::exception and both fileFunction and errorCallback could be a lambda function to capture requires execution context.

evictLogWriteFile_ = fs_->openFileForWrite(logPath, evictLogFileOptions);
} catch (std::exception& e) {
++stats_.openLogErrors;
// Failure to open the log at startup is a process terminating error.
VELOX_FAIL(
"Could not open evict log {}, rc {}: {}",
logPath,
evictLogFd_,
folly::errnoStr(errno));
VELOX_FAIL("Could not open evict log {}: {}", logPath, e.what());
}

try {
Expand Down Expand Up @@ -965,6 +948,9 @@ void SsdFile::disableFileCow() {
const std::unordered_map<std::string, std::string> attributes = {
{std::string(LocalWriteFile::Attributes::kNoCow), "true"}};
writeFile_->setAttributes(attributes);
if (evictLogWriteFile_ != nullptr) {
evictLogWriteFile_->setAttributes(attributes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't disable checkpoint file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is disabled in a separate path as we haven't switched checkpoint files to velox fs.

}
#endif // linux
}

Expand Down Expand Up @@ -1021,10 +1007,16 @@ void SsdFile::readCheckpoint(std::ifstream& state) {
idMap[id] = StringIdLease(fileIds(), id, name);
}

const auto logSize = ::lseek(evictLogFd_, 0, SEEK_END);
const auto logPath = getEvictLogFilePath();
const auto evictLogReadFile = fs_->openFileForRead(logPath);
const auto logSize = evictLogReadFile->size();
std::vector<uint32_t> evicted(logSize / sizeof(uint32_t));
const auto rc = ::pread(evictLogFd_, evicted.data(), logSize, 0);
VELOX_CHECK_EQ(logSize, rc, "Failed to read eviction log");
try {
evictLogReadFile->pread(0, logSize, evicted.data());
} catch (const std::exception& e) {
++stats_.readCheckpointErrors;
VELOX_FAIL("Failed to read eviction log: {}", e.what());
}
std::unordered_set<uint32_t> evictedMap;
for (auto region : evicted) {
evictedMap.insert(region);
Expand Down
16 changes: 11 additions & 5 deletions velox/common/caching/SsdFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,13 @@ class SsdFile {

/// Returns the checkpoint file path.
std::string getCheckpointFilePath() const {
return fileName_ + kCheckpointExtension;
// Faulty file path needs to be handled manually before we switch checkpoint
// file to Velox filesystem.
const std::string faultyPrefix = "faulty:";
std::string checkpointPath = fileName_ + kCheckpointExtension;
return checkpointPath.find(faultyPrefix) == 0
? checkpointPath.substr(faultyPrefix.size())
: checkpointPath;
}

/// Deletes the backing file. Used in testing.
Expand Down Expand Up @@ -477,7 +483,7 @@ class SsdFile {

// Synchronously logs that 'regions' are no longer valid in a possibly
// existing checkpoint.
void logEviction(const std::vector<int32_t>& regions);
void logEviction(std::vector<int32_t>& regions);

// Computes the checksum of data in cache 'entry'.
uint32_t checksumEntry(const AsyncDataCacheEntry& entry) const;
Expand Down Expand Up @@ -572,6 +578,9 @@ class SsdFile {
// WriteFile for cache data file.
std::unique_ptr<WriteFile> writeFile_;

// WriteFile for evict log file.
std::unique_ptr<WriteFile> evictLogWriteFile_;

// Counters.
SsdCacheStats stats_;

Expand All @@ -585,9 +594,6 @@ class SsdFile {
// Count of bytes written after last checkpoint.
std::atomic<uint64_t> bytesAfterCheckpoint_{0};

// fd for logging evictions.
int32_t evictLogFd_{-1};

// True if there was an error with checkpoint and the checkpoint was deleted.
bool checkpointDeleted_{false};
};
Expand Down
1 change: 1 addition & 0 deletions velox/common/caching/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ target_link_libraries(
PRIVATE
velox_caching
velox_file
velox_file_test_utils
velox_memory
velox_temp_path
Folly::folly
Expand Down
Loading
Loading