Skip to content

Commit eade07a

Browse files
authored
feat: support lz4 compression method (#4610)
The feature is enabled via set_method call but since no code calls it the lz4 compression is still disabled in dragonfly. Signed-off-by: Roman Gershman <[email protected]>
1 parent ff7a0d5 commit eade07a

File tree

5 files changed

+245
-92
lines changed

5 files changed

+245
-92
lines changed

.clang-tidy

+1
Original file line numberDiff line numberDiff line change
@@ -81,3 +81,4 @@ Checks: >
8181
# modernize-use-nullptr,
8282
# modernize-use-equals-default,
8383
# readability-qualified-auto,
84+
cppcoreguidelines-narrowing-conversions.WarnOnIntegerNarrowingConversion: 'false'

src/core/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ add_library(dfly_core allocation_tracker.cc bloom.cc compact_object.cc dense_set
1010
tx_queue.cc string_set.cc string_map.cc detail/bitpacking.cc)
1111

1212
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules
13-
fibers2 ${SEARCH_LIB} jsonpath OpenSSL::Crypto TRDP::dconv)
13+
fibers2 ${SEARCH_LIB} jsonpath OpenSSL::Crypto TRDP::dconv TRDP::lz4)
1414

1515
add_executable(dash_bench dash_bench.cc)
1616
cxx_link(dash_bench dfly_core redis_test_lib)

src/core/qlist.cc

+137-50
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ extern "C" {
1414
#include <absl/base/optimization.h>
1515
#include <absl/strings/escaping.h>
1616
#include <absl/strings/str_cat.h>
17+
#include <lz4frame.h>
1718

1819
#include "base/logging.h"
1920

@@ -33,12 +34,12 @@ using namespace std;
3334
#define SIZE_ESTIMATE_OVERHEAD 8
3435

3536
/* Minimum listpack size in bytes for attempting compression. */
36-
#define MIN_COMPRESS_BYTES 48
37+
#define MIN_COMPRESS_BYTES 256
3738

3839
/* Minimum size reduction in bytes to store compressed quicklistNode data.
3940
* This also prevents us from storing compression if the compression
4041
* resulted in a larger size than the original data. */
41-
#define MIN_COMPRESS_IMPROVE 8
42+
#define MIN_COMPRESS_IMPROVE 32
4243

4344
/* This macro is used to compress a node.
4445
*
@@ -49,19 +50,22 @@ using namespace std;
4950
*
5051
* If the 'recompress' flag of the node is false, we check whether the node is
5152
* within the range of compress depth before compressing it. */
52-
#define quicklistCompress(_node) \
53-
do { \
54-
if ((_node)->recompress) \
55-
CompressNode((_node)); \
56-
else \
57-
this->Compress(_node); \
53+
#define quicklistCompress(_node) \
54+
do { \
55+
if ((_node)->recompress) \
56+
CompressNode((_node), this->compr_method_); \
57+
else \
58+
this->Compress(_node); \
5859
} while (0)
5960

61+
#define QLIST_NODE_ENCODING_LZ4 3
62+
6063
namespace dfly {
6164

6265
namespace {
6366

6467
static_assert(sizeof(QList) == 32);
68+
static_assert(sizeof(QList::Node) == 40);
6569

6670
enum IterDir : uint8_t { FWD = 1, REV = 0 };
6771

@@ -181,22 +185,13 @@ inline ssize_t NodeSetEntry(QList::Node* node, uint8_t* entry) {
181185
return diff;
182186
}
183187

184-
/* Compress the listpack in 'node' and update encoding details.
185-
* Returns true if listpack compressed successfully.
186-
* Returns false if compression failed or if listpack too small to compress. */
187-
bool CompressNode(QList::Node* node) {
188-
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
189-
DCHECK(!node->dont_compress);
190-
191-
/* validate that the node is neither
192-
* tail nor head (it has prev and next)*/
193-
DCHECK(node->prev && node->next);
194-
195-
node->recompress = 0;
196-
/* Don't bother compressing small values */
197-
if (node->sz < MIN_COMPRESS_BYTES)
198-
return false;
188+
inline quicklistLZF* GetLzf(QList::Node* node) {
189+
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_LZF ||
190+
node->encoding == QLIST_NODE_ENCODING_LZ4);
191+
return (quicklistLZF*)node->entry;
192+
}
199193

194+
bool CompressLZF(QList::Node* node) {
200195
// We allocate LZF_STATE on heap, piggy-backing on the existing allocation.
201196
char* uptr = (char*)zmalloc(sizeof(quicklistLZF) + node->sz + sizeof(LZF_STATE));
202197
quicklistLZF* lzf = (quicklistLZF*)uptr;
@@ -208,10 +203,12 @@ bool CompressNode(QList::Node* node) {
208203
/* lzf_compress aborts/rejects compression if value not compressible. */
209204
DVLOG(2) << "Uncompressable " << node->sz << " vs " << lzf->sz;
210205
zfree(lzf);
211-
206+
QList::stats.bad_compression_attempts++;
212207
return false;
213208
}
214209
DVLOG(2) << "Compressed " << node->sz << " to " << lzf->sz;
210+
QList::stats.compressed_bytes += lzf->sz;
211+
QList::stats.raw_compressed_bytes += node->sz;
215212

216213
lzf = (quicklistLZF*)zrealloc(lzf, sizeof(*lzf) + lzf->sz);
217214
zfree(node->entry);
@@ -220,26 +217,109 @@ bool CompressNode(QList::Node* node) {
220217
return true;
221218
}
222219

223-
ssize_t CompressNodeIfNeeded(QList::Node* node) {
220+
bool CompressLZ4(QList::Node* node) {
221+
LZ4F_cctx* cntx;
222+
LZ4F_errorCode_t code = LZ4F_createCompressionContext(&cntx, LZ4F_VERSION);
223+
CHECK(!LZ4F_isError(code));
224+
225+
LZ4F_preferences_t lz4_pref = LZ4F_INIT_PREFERENCES;
226+
lz4_pref.compressionLevel = -1;
227+
lz4_pref.frameInfo.contentSize = node->sz;
228+
size_t buf_size = LZ4F_compressFrameBound(node->sz, &lz4_pref);
229+
230+
// We reuse quicklistLZF struct for LZ4 metadata.
231+
quicklistLZF* dest = (quicklistLZF*)zmalloc(sizeof(quicklistLZF) + buf_size);
232+
size_t compr_sz = LZ4F_compressFrame_usingCDict(cntx, dest->compressed, buf_size, node->entry,
233+
node->sz, nullptr /* dict */, &lz4_pref);
234+
CHECK(!LZ4F_isError(compr_sz));
235+
236+
code = LZ4F_freeCompressionContext(cntx);
237+
CHECK(!LZ4F_isError(code));
238+
239+
if (compr_sz + MIN_COMPRESS_IMPROVE >= node->sz) {
240+
QList::stats.bad_compression_attempts++;
241+
zfree(dest);
242+
return false;
243+
}
244+
245+
dest->sz = compr_sz;
246+
dest = (quicklistLZF*)zrealloc(dest, sizeof(quicklistLZF) + compr_sz);
247+
QList::stats.compressed_bytes += compr_sz;
248+
QList::stats.raw_compressed_bytes += node->sz;
249+
250+
zfree(node->entry);
251+
node->entry = (unsigned char*)dest;
252+
node->encoding = QLIST_NODE_ENCODING_LZ4;
253+
return true;
254+
}
255+
256+
/* Compress the listpack in 'node' and update encoding details.
257+
* Returns true if listpack compressed successfully.
258+
* Returns false if compression failed or if listpack too small to compress. */
259+
bool CompressNode(QList::Node* node, unsigned method) {
260+
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_RAW);
261+
DCHECK(!node->dont_compress);
262+
263+
/* validate that the node is neither
264+
* tail nor head (it has prev and next)*/
265+
DCHECK(node->prev && node->next);
266+
267+
node->recompress = 0;
268+
/* Don't bother compressing small values */
269+
if (node->sz < MIN_COMPRESS_BYTES)
270+
return false;
271+
272+
QList::stats.compression_attempts++;
273+
if (method == static_cast<unsigned>(QList::LZF)) {
274+
return CompressLZF(node);
275+
}
276+
277+
return CompressLZ4(node);
278+
}
279+
280+
ssize_t CompressNodeIfNeeded(QList::Node* node, unsigned method) {
224281
DCHECK(node);
225-
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW && !node->dont_compress) {
226-
if (CompressNode(node))
227-
return ((quicklistLZF*)node->entry)->sz - node->sz;
282+
if (node->encoding == QUICKLIST_NODE_ENCODING_RAW) {
283+
node->attempted_compress = 1;
284+
if (!node->dont_compress) {
285+
if (CompressNode(node, method))
286+
return ssize_t(GetLzf(node)->sz) - node->sz;
287+
}
228288
}
229289
return 0;
230290
}
231291

232292
/* Uncompress the listpack in 'node' and update encoding details.
233293
* Returns 1 on successful decode, 0 on failure to decode. */
234294
bool DecompressNode(bool recompress, QList::Node* node) {
295+
DCHECK(node->encoding == QUICKLIST_NODE_ENCODING_LZF ||
296+
node->encoding == QLIST_NODE_ENCODING_LZ4);
297+
235298
node->recompress = int(recompress);
236299

237300
void* decompressed = zmalloc(node->sz);
238-
quicklistLZF* lzf = (quicklistLZF*)node->entry;
239-
if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {
240-
/* Someone requested decompress, but we can't decompress. Not good. */
241-
zfree(decompressed);
242-
return false;
301+
quicklistLZF* lzf = GetLzf(node);
302+
QList::stats.decompression_calls++;
303+
QList::stats.compressed_bytes -= lzf->sz;
304+
QList::stats.raw_compressed_bytes -= node->sz;
305+
306+
if (node->encoding == QLIST_NODE_ENCODING_LZ4) {
307+
LZ4F_dctx* dctx = nullptr;
308+
LZ4F_errorCode_t code = LZ4F_createDecompressionContext(&dctx, LZ4F_VERSION);
309+
CHECK(!LZ4F_isError(code));
310+
size_t decompressed_sz = node->sz;
311+
size_t left =
312+
LZ4F_decompress(dctx, decompressed, &decompressed_sz, lzf->compressed, &lzf->sz, nullptr);
313+
CHECK_EQ(left, 0u);
314+
CHECK_EQ(decompressed_sz, node->sz);
315+
LZ4F_freeDecompressionContext(dctx);
316+
} else {
317+
if (lzf_decompress(lzf->compressed, lzf->sz, decompressed, node->sz) == 0) {
318+
LOG(DFATAL) << "Invalid LZF compressed data";
319+
/* Someone requested decompress, but we can't decompress. Not good. */
320+
zfree(decompressed);
321+
return false;
322+
}
243323
}
244324
zfree(lzf);
245325
node->entry = (uint8_t*)decompressed;
@@ -252,19 +332,19 @@ bool DecompressNode(bool recompress, QList::Node* node) {
252332
returns by how much the size of the node has increased.
253333
*/
254334
ssize_t DecompressNodeIfNeeded(bool recompress, QList::Node* node) {
255-
if ((node) && (node)->encoding == QUICKLIST_NODE_ENCODING_LZF) {
256-
size_t compressed_sz = ((quicklistLZF*)node->entry)->sz;
335+
if (node && node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
336+
size_t compressed_sz = GetLzf(node)->sz;
257337
if (DecompressNode(recompress, node)) {
258338
return node->sz - compressed_sz;
259339
}
260340
}
261341
return 0;
262342
}
263343

264-
ssize_t RecompressOnly(QList::Node* node) {
344+
ssize_t RecompressOnly(QList::Node* node, unsigned method) {
265345
if (node->recompress && !node->dont_compress) {
266-
if (CompressNode(node))
267-
return ((quicklistLZF*)node->entry)->sz - node->sz;
346+
if (CompressNode(node, method))
347+
return (GetLzf(node))->sz - node->sz;
268348
}
269349
return 0;
270350
}
@@ -299,11 +379,14 @@ QList::Node* SplitNode(QList::Node* node, int offset, bool after, ssize_t* diff)
299379

300380
} // namespace
301381

382+
__thread QList::Stats QList::stats;
383+
302384
void QList::SetPackedThreshold(unsigned threshold) {
303385
packed_threshold = threshold;
304386
}
305387

306388
QList::QList(int fill, int compress) : fill_(fill), compress_(compress), bookmark_count_(0) {
389+
compr_method_ = 0;
307390
}
308391

309392
QList::QList(QList&& other)
@@ -342,7 +425,11 @@ void QList::Clear() {
342425

343426
while (len_) {
344427
Node* next = current->next;
345-
428+
if (current->encoding != QUICKLIST_NODE_ENCODING_RAW) {
429+
quicklistLZF* lzf = (quicklistLZF*)current->entry;
430+
QList::stats.compressed_bytes -= lzf->sz;
431+
QList::stats.raw_compressed_bytes -= current->sz;
432+
}
346433
zfree(current->entry);
347434
zfree(current);
348435

@@ -587,7 +674,7 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
587674
uint8_t* new_entry = LP_Insert(node->entry, elem, it.zi_, after ? LP_AFTER : LP_BEFORE);
588675
malloc_size_ += NodeSetEntry(node, new_entry);
589676
node->count++;
590-
malloc_size_ += RecompressOnly(node);
677+
malloc_size_ += RecompressOnly(node, compr_method_);
591678
} else {
592679
bool insert_tail = at_tail && after;
593680
bool insert_head = at_head && !after;
@@ -598,17 +685,17 @@ void QList::Insert(Iterator it, std::string_view elem, InsertOpt insert_opt) {
598685
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
599686
malloc_size_ += NodeSetEntry(new_node, LP_Prepend(new_node->entry, elem));
600687
new_node->count++;
601-
malloc_size_ += RecompressOnly(new_node);
602-
malloc_size_ += RecompressOnly(node);
688+
malloc_size_ += RecompressOnly(new_node, compr_method_);
689+
malloc_size_ += RecompressOnly(node, compr_method_);
603690
} else if (insert_head && avail_prev) {
604691
/* If we are: at head, previous has free space, and inserting before:
605692
* - insert entry at tail of previous node. */
606693
auto* new_node = node->prev;
607694
malloc_size_ += DecompressNodeIfNeeded(true, new_node);
608695
malloc_size_ += NodeSetEntry(new_node, LP_Append(new_node->entry, elem));
609696
new_node->count++;
610-
malloc_size_ += RecompressOnly(new_node);
611-
malloc_size_ += RecompressOnly(node);
697+
malloc_size_ += RecompressOnly(new_node, compr_method_);
698+
malloc_size_ += RecompressOnly(node, compr_method_);
612699
} else if (insert_tail || insert_head) {
613700
/* If we are: full, and our prev/next has no available space, then:
614701
* - create new node and attach to qlist */
@@ -732,12 +819,12 @@ void QList::Compress(Node* node) {
732819
reverse = reverse->prev;
733820
}
734821

735-
if (!in_depth && node)
736-
malloc_size_ += CompressNodeIfNeeded(node);
737-
822+
if (!in_depth && node) {
823+
malloc_size_ += CompressNodeIfNeeded(node, this->compr_method_);
824+
}
738825
/* At this point, forward and reverse are one node beyond depth */
739-
malloc_size_ += CompressNodeIfNeeded(forward);
740-
malloc_size_ += CompressNodeIfNeeded(reverse);
826+
malloc_size_ += CompressNodeIfNeeded(forward, this->compr_method_);
827+
malloc_size_ += CompressNodeIfNeeded(reverse, this->compr_method_);
741828
}
742829

743830
/* Attempt to merge listpacks within two nodes on either side of 'center'.
@@ -1063,7 +1150,7 @@ bool QList::Erase(const long start, unsigned count) {
10631150
if (node->count == 0) {
10641151
DelNode(node);
10651152
} else {
1066-
malloc_size_ += RecompressOnly(node);
1153+
malloc_size_ += RecompressOnly(node, compr_method_);
10671154
}
10681155
}
10691156

0 commit comments

Comments
 (0)