Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 59 additions & 1 deletion src/core/compact_object.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {
#include "redis/quicklist.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/tdigest.h"
#include "redis/util.h"
#include "redis/zmalloc.h" // for non-string objects.
#include "redis/zset.h"
Expand Down Expand Up @@ -398,6 +399,12 @@ static_assert(sizeof(CompactObj) == 18);

namespace detail {

size_t MallocUsedTDigest(const td_histogram_t* tdigest) {
size_t size = sizeof(tdigest);
size += (2 * (tdigest->cap * sizeof(double)));
return size;
}

size_t RobjWrapper::MallocUsed(bool slow) const {
if (!inner_obj_)
return 0;
Expand All @@ -418,6 +425,8 @@ size_t RobjWrapper::MallocUsed(bool slow) const {
return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return slow ? MallocUsedStream((stream*)inner_obj_) : sz_;
case OBJ_TDIGEST:
return MallocUsedTDigest((td_histogram_t*)inner_obj_);

default:
LOG(FATAL) << "Not supported " << type_;
Expand Down Expand Up @@ -477,11 +486,17 @@ size_t RobjWrapper::Size() const {
case OBJ_STREAM:
// Size mean malloc bytes for streams
return sz_;
case OBJ_TDIGEST:
return 0;
default:;
}
return 0;
}

inline void FreeObjTDigest(void* ptr) {
td_free((td_histogram*)ptr);
}

void RobjWrapper::Free(MemoryResource* mr) {
if (!inner_obj_)
return;
Expand Down Expand Up @@ -511,6 +526,9 @@ void RobjWrapper::Free(MemoryResource* mr) {
case OBJ_STREAM:
FreeObjStream(inner_obj_);
break;
case OBJ_TDIGEST:
FreeObjTDigest(inner_obj_);
break;
default:
LOG(FATAL) << "Unknown object type";
break;
Expand Down Expand Up @@ -603,6 +621,9 @@ bool RobjWrapper::DefragIfNeeded(float ratio) {
return do_defrag(DefragSet);
} else if (type() == OBJ_ZSET) {
return do_defrag(DefragZSet);
} else if (type() == OBJ_TDIGEST) {
// TODO implement this
return false;
}
return false;
}
Expand Down Expand Up @@ -826,6 +847,10 @@ size_t CompactObj::Size() const {
DCHECK_EQ(mask_bits_.encoding, NONE_ENC);
raw_size = u_.sbf->current_size();
break;
case TOPK_TAG:
DCHECK_EQ(mask_bits_.encoding, NONE_ENC);
raw_size = 0;
break;
default:
LOG(DFATAL) << "Should not reach " << int(taglen_);
}
Expand Down Expand Up @@ -892,6 +917,10 @@ CompactObjType CompactObj::ObjType() const {
return OBJ_SBF;
}

if (taglen_ == TOPK_TAG) {
return OBJ_TOPK;
}

LOG(FATAL) << "TBD " << int(taglen_);
return kInvalidCompactObjType;
}
Expand Down Expand Up @@ -995,11 +1024,34 @@ void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_f
}
}

void CompactObj::SetTopK(size_t topk, size_t width, size_t depth, double decay) {
TopKeys::Options opts;
size_t total_buckets = 4;
// Heuristic
if (topk > 4) {
total_buckets = topk / 4;
}
opts.buckets = total_buckets;
opts.depth = 4;
// fingerprints = buckets * depth = topk
opts.decay_base = decay;
// We need this so we can set the key. The problem with this is upon cell reset,
// we don't set the key and a query for TopK won't return that key because we never set it.
opts.min_key_count_to_record = 0;
SetMeta(TOPK_TAG);
u_.topk = AllocateMR<TopKeys>(opts);
}

SBF* CompactObj::GetSBF() const {
DCHECK_EQ(SBF_TAG, taglen_);
return u_.sbf;
}

TopKeys* CompactObj::GetTopK() const {
DCHECK_EQ(TOPK_TAG, taglen_);
return u_.topk;
}

void CompactObj::SetString(std::string_view str) {
CHECK(!IsExternal());
mask_bits_.encoding = NONE_ENC;
Expand Down Expand Up @@ -1101,7 +1153,8 @@ bool CompactObj::HasAllocated() const {
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
return false;

DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG);
DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG ||
taglen_ == TOPK_TAG);
return true;
}

Expand Down Expand Up @@ -1295,6 +1348,8 @@ void CompactObj::Free() {
}
} else if (taglen_ == SBF_TAG) {
DeleteMR<SBF>(u_.sbf);
} else if (taglen_ == TOPK_TAG) {
DeleteMR<TopKeys>(u_.topk);
} else {
LOG(FATAL) << "Unsupported tag " << int(taglen_);
}
Expand Down Expand Up @@ -1327,6 +1382,9 @@ size_t CompactObj::MallocUsed(bool slow) const {
if (taglen_ == SBF_TAG) {
return u_.sbf->MallocUsed();
}
if (taglen_ == TOPK_TAG) {
return 0;
}
LOG(DFATAL) << "should not reach";
return 0;
}
Expand Down
6 changes: 6 additions & 0 deletions src/core/compact_object.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "core/mi_memory_resource.h"
#include "core/small_string.h"
#include "core/string_or_view.h"
#include "core/top_keys.h"

namespace dfly {

Expand Down Expand Up @@ -123,6 +124,7 @@ class CompactObj {
EXTERNAL_TAG = 20,
JSON_TAG = 21,
SBF_TAG = 22,
TOPK_TAG = 23,
};

// String encoding types.
Expand Down Expand Up @@ -311,6 +313,9 @@ class CompactObj {
void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor);
SBF* GetSBF() const;

void SetTopK(size_t topk, size_t width, size_t depth, double decay);
TopKeys* GetTopK() const;

// dest must have at least Size() bytes available
void GetString(char* dest) const;

Expand Down Expand Up @@ -479,6 +484,7 @@ class CompactObj {
// using 'packed' to reduce alignement of U to 1.
JsonWrapper json_obj __attribute__((packed));
SBF* sbf __attribute__((packed));
TopKeys* topk __attribute__((packed));
int64_t ival __attribute__((packed));
ExternalPtr ext_ptr;

Expand Down
20 changes: 20 additions & 0 deletions src/core/compact_object_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ extern "C" {
#include "redis/intset.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/td_malloc.h"
#include "redis/tdigest.h"
#include "redis/zmalloc.h"
}

Expand Down Expand Up @@ -682,6 +684,24 @@ TEST_F(CompactObjectTest, HuffMan) {
}
}

TEST_F(CompactObjectTest, TDigst) {
// Allocators
ASSERT_EQ(zmalloc, __td_malloc);
ASSERT_EQ(zcalloc, __td_calloc);
ASSERT_EQ(zrealloc, __td_realloc);
ASSERT_EQ(zfree, __td_free);

// Basic usage
td_histogram_t* hist = td_new(10);
cobj_.InitRobj(OBJ_TDIGEST, 0, hist);
ASSERT_EQ(cobj_.GetRobjWrapper()->type(), OBJ_TDIGEST);
ASSERT_EQ(cobj_.RObjPtr(), hist);
ASSERT_EQ(0, hist->unmerged_weight);
ASSERT_EQ(0, hist->merged_weight);
ASSERT_EQ(td_add(hist, 0.0, 1), 0);
cobj_.Reset();
}

static void ascii_pack_naive(const char* ascii, size_t len, uint8_t* bin) {
const char* end = ascii + len;

Expand Down
31 changes: 25 additions & 6 deletions src/core/top_keys.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ TopKeys::TopKeys(Options options)
}
}

void TopKeys::Touch(std::string_view key) {
auto ResetCell = [&](Cell& cell, uint64_t fingerprint) {
void TopKeys::Touch(std::string_view key, size_t incr) {
auto ResetCell = [&](Cell& cell, uint64_t fingerprint, size_t size = 1) {
cell.fingerprint = fingerprint;
cell.count = 1;
cell.count = size;
cell.key.clear();
};

Expand All @@ -36,13 +36,16 @@ void TopKeys::Touch(std::string_view key) {
Cell& cell = GetCell(id, bucket);
if (cell.count == 0) {
// No fingerprint in cell.
ResetCell(cell, fingerprint);
ResetCell(cell, fingerprint, incr);
if (incr > 1) {
cell.key = key;
}
} else if (cell.fingerprint == fingerprint) {
// Same fingerprint, simply increment count.

// We could make sure that, if !cell.key.empty(), then key == cell.key.empty() here. However,
// what do we do in case they are different?
++cell.count;
cell.count += incr;

if (cell.count >= options_.min_key_count_to_record && cell.key.empty()) {
cell.key = key;
Expand All @@ -51,7 +54,12 @@ void TopKeys::Touch(std::string_view key) {
// Different fingerprint, apply exponential decay.
const double rand = absl::Uniform(bitgen_, 0, 1.0);
if (rand < std::pow(options_.decay_base, -static_cast<double>(cell.count))) {
--cell.count;
if (incr != 1 && cell.count < incr) {
incr -= cell.count;
cell.count = 0;
} else {
cell.count -= incr;
}
if (cell.count == 0) {
ResetCell(cell, fingerprint);
}
Expand Down Expand Up @@ -88,4 +96,15 @@ const TopKeys::Cell& TopKeys::GetCell(uint32_t d, uint32_t bucket) const {
return fingerprints_[d * options_.buckets + bucket];
}

void TopKeys::Query(absl::flat_hash_map<std::string_view, bool>* keys) {
for (unsigned array = 0; array < options_.depth; ++array) {
for (unsigned bucket = 0; bucket < options_.buckets; ++bucket) {
const Cell& cell = GetCell(array, bucket);
if (!cell.key.empty() && keys->contains(cell.key)) {
keys->find(cell.key)->second = true;
}
}
}
}

} // end of namespace dfly
14 changes: 12 additions & 2 deletions src/core/top_keys.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <string>
#include <string_view>
#include <vector>

#include "base/random.h"

namespace dfly {
Expand All @@ -32,6 +33,7 @@ namespace dfly {
class TopKeys {
TopKeys(const TopKeys&) = delete;
TopKeys& operator=(const TopKeys&) = delete;

public:
struct Options {
// HeavyKeeper options
Expand All @@ -49,9 +51,17 @@ class TopKeys {

explicit TopKeys(Options options);

void Touch(std::string_view key);
void Touch(std::string_view key, size_t incr = 1);
absl::flat_hash_map<std::string, uint64_t> GetTopKeys() const;

// Checks whether each item in the list exists in the current set of TopKeys
// If a key in keys exists in TopKeys, we set its bool to True
void Query(absl::flat_hash_map<std::string_view, bool>* keys);

Options GetOptions() const {
return options_;
};

private:
// Each cell consists of a key-fingerprint, a count, and potentially the key itself, when it's
// above options_.min_key_count_to_record.
Expand All @@ -66,7 +76,7 @@ class TopKeys {
Options options_;
base::Xoroshiro128p bitgen_;

// fingerprints_'s size is options_.buckets * options_.arrays. Always access fields via GetCell().
// fingerprints_'s size is options_.buckets * options_.depth. Always access fields via GetCell().
std::vector<Cell> fingerprints_;
};

Expand Down
4 changes: 4 additions & 0 deletions src/facade/resp_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class RespExpr {
: std::nullopt;
}

double GetDouble() const {
return std::get<double>(u);
}

size_t UsedMemory() const {
return 0;
}
Expand Down
4 changes: 2 additions & 2 deletions src/redis/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ else()
set(ZMALLOC_DEPS "")
endif()

add_library(redis_lib crc16.c crc64.c crcspeed.c debug.c intset.c geo.c
add_library(redis_lib crc16.c crc64.c crcspeed.c debug.c intset.c geo.c
geohash.c geohash_helper.c t_zset.c
listpack.c lzf_c.c lzf_d.c sds.c
quicklist.c rax.c redis_aux.c t_stream.c
util.c ziplist.c hyperloglog.c ${ZMALLOC_SRC})
util.c ziplist.c hyperloglog.c tdigest.c ${ZMALLOC_SRC})

cxx_link(redis_lib ${ZMALLOC_DEPS})

Expand Down
2 changes: 1 addition & 1 deletion src/redis/dict.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ int _dictExpand(dict *d, unsigned long size, int* malloc_failed)
if (*malloc_failed)
return DICT_ERR;
} else
new_ht_table = zcalloc(newsize*sizeof(dictEntry*));
new_ht_table = zcalloc(newsize, sizeof(dictEntry*));

new_ht_used = 0;

Expand Down
2 changes: 2 additions & 0 deletions src/redis/redis_aux.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
/* redis.h auxiliary definitions */
/* the last one in object.h is OBJ_STREAM and it is 6,
* this will add enough place for Redis types to grow */
#define OBJ_TOPK 13U
#define OBJ_TDIGEST 14U
#define OBJ_JSON 15U
#define OBJ_SBF 16U

Expand Down
21 changes: 21 additions & 0 deletions src/redis/t-digest.LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# MIT License

Copyright (c) 2019 Bob Rudis

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Loading
Loading