Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce bulk init time and fix OOM (#892) #3808

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import tempfile
import threading
import time
from math import log2
from math import floor, log2
from typing import Any, Callable, List, Optional, Tuple, Type, Union
import torch # usort:skip

Expand Down Expand Up @@ -148,7 +148,8 @@ def __init__(
# Set to False to use cudaMallocManaged
uvm_host_mapped: bool = False,
enable_async_update: bool = True, # whether enable L2/rocksdb write to async background thread
# if > 0, insert all kv pairs to rocksdb at init time, in chunks of *bulk_init_chunk_size* rows
# if > 0, insert all kv pairs to rocksdb at init time, in chunks of *bulk_init_chunk_size* bytes
# number of rows will be decided by bulk_init_chunk_size / size_of_each_row
bulk_init_chunk_size: int = 0,
) -> None:
super(SSDTableBatchedEmbeddingBags, self).__init__()
Expand Down Expand Up @@ -244,7 +245,7 @@ def __init__(
f"{cache_size / 1024.0 / 1024.0 / 1024.0 : .2f}GB, "
f"weights precision: {weights_precision}, "
f"output dtype: {output_dtype}, "
f"chunk size in bulk init: {bulk_init_chunk_size} rows"
f"chunk size in bulk init: {bulk_init_chunk_size} bytes"
)
self.register_buffer(
"lxu_cache_state",
Expand Down Expand Up @@ -762,21 +763,24 @@ def _insert_all_kv(self) -> None:
initailization time.
"""
row_offset = 0
chunk_size = self.bulk_init_chunk_size
row_count = floor(
self.bulk_init_chunk_size
/ (8 + self.max_D * self.weights_precision.as_dtype().itemsize)
)
total_dim0 = 0
for dim0, _ in self.embedding_specs:
total_dim0 += dim0

start_ts = time.time()
chunk_tensor = torch.empty(
chunk_size,
row_count,
self.max_D,
dtype=self.weights_precision.as_dtype(),
device="cuda",
)
cpu_tensor = torch.empty_like(chunk_tensor, device="cpu")
for row_offset in range(0, total_dim0, chunk_size):
actual_dim0 = min(total_dim0 - row_offset, chunk_size)
for row_offset in range(0, total_dim0, row_count):
actual_dim0 = min(total_dim0 - row_offset, row_count)
chunk_tensor.uniform_(
self.ssd_uniform_init_lower, self.ssd_uniform_init_upper
)
Expand All @@ -785,9 +789,12 @@ def _insert_all_kv(self) -> None:
# This code is intentionally not calling through the getter property
# to avoid the lazy initialization thread from joining with itself.
self._ssd_db.set_range_to_storage(rand_val, row_offset, actual_dim0)
self.ssd_db.toggle_compaction(True)
end_ts = time.time()
elapsed = int((end_ts - start_ts) * 1e6)
logging.info(f"TBE bulk initialization took {elapsed:_} us")
logging.info(
f"TBE bulk initialization took {elapsed:_} us, each batch of {row_count} rows, total rows of {total_dim0}"
)

@torch.jit.ignore
def _report_duration(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class EmbeddingRocksDBWrapper : public torch::jit::CustomClassHolder {
return impl_->set_range_to_storage(weights, start, length);
}

void toggle_compaction(bool enable) {
impl_->toggle_compaction(enable);
}

void get(
at::Tensor indices,
at::Tensor weights,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,7 @@ static auto embedding_rocks_db_wrapper =
.def(
"set_range_to_storage",
&EmbeddingRocksDBWrapper::set_range_to_storage)
.def("toggle_compaction", &EmbeddingRocksDBWrapper::toggle_compaction)
.def(
"get",
&EmbeddingRocksDBWrapper::get,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,8 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
options.memtable_prefix_bloom_size_ratio = 0.05;
options.memtable_whole_key_filtering = true;
options.max_background_jobs = num_threads;
options.max_background_flushes = num_threads;
options.disable_auto_compactions = true;
options.env->SetBackgroundThreads(4, rocksdb::Env::HIGH);
options.env->SetBackgroundThreads(1, rocksdb::Env::LOW);

Expand Down Expand Up @@ -549,6 +551,17 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB {
folly::coro::blockingWait(set_kv_db_async(seq_indices, weights, count));
}

void toggle_compaction(bool enable) {
for (auto shard = 0; shard < dbs_.size(); ++shard) {
auto s = dbs_[shard]->SetOptions(
{{"disable_auto_compactions", enable ? "false" : "true"}});
if (!s.ok()) {
LOG(WARNING) << "Failed to toggle compaction to " << enable
<< " for shard " << shard << std::endl;
}
}
}

int64_t get_max_D() {
return max_D_;
}
Expand Down
Loading