Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace linkedlist for takenbuffer with fixed array #336

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 19 additions & 16 deletions src/hnsw/external_index.c
Original file line number Diff line number Diff line change
Expand Up @@ -626,38 +626,42 @@ void *ldb_wal_index_node_retriever(void *ctxp, unsigned long long id)
page = extra_dirtied_get(ctx->extra_dirted, data_block_no, NULL);
if(page == NULL) {
buf = ReadBufferExtended(ctx->index_rel, MAIN_FORKNUM, data_block_no, RBM_NORMAL, NULL);
#if LANTERNDB_COPYNODES
LockBuffer(buf, BUFFER_LOCK_SHARE);
#endif
page = BufferGetPage(buf);
} else {
idx_page_prelocked = true;
}

nodepage = (HnswIndexTuple *)PageGetItem(page, PageGetItemId(page, tid_data.ip_posid));
#if LANTERNDB_COPYNODES
BufferNode *buffNode;
buffNode = (BufferNode *)palloc(sizeof(BufferNode));
buffNode->buf = (char *)palloc(nodepage->size);
memcpy(buffNode->buf, nodepage->node, nodepage->size);
char *buf_p = (char *)palloc(nodepage->size);
memcpy(buf_p, nodepage->node, nodepage->size);
if(!idx_page_prelocked) {
UnlockReleaseBuffer(buf);
}
dlist_push_tail(&ctx->takenbuffers, &buffNode->node);
return buffNode->buf;
#endif
if(ctx->takenbuffers[ ctx->takenbuffers_next ]) {
pfree(ctx->takenbuffers[ ctx->takenbuffers_next ]);
}

ctx->takenbuffers[ ctx->takenbuffers_next ] = buf_p;
ctx->takenbuffers_next = (ctx->takenbuffers_next + 1) % ctx->takenbuffers_size;
return buf_p;
#else
// if we locked the page, unlock it and only leave a pin on it.
// otherwise, it must must have been locked because we are in the middle of an update and that node
// was affected, so we must leave it locked
if(!idx_page_prelocked) {
// Wrap buf in a linked list node
BufferNode *buffNode;
buffNode = (BufferNode *)palloc(sizeof(BufferNode));
buffNode->buf = buf;

// Add buffNode to list of pinned buffers
dlist_push_tail(&ctx->takenbuffers, &buffNode->node);
LockBuffer(buf, BUFFER_LOCK_UNLOCK);
// Add buffer to the list of pinned buffers
if(ctx->takenbuffers[ ctx->takenbuffers_next ]) {
ReleaseBuffer(ctx->takenbuffers[ ctx->takenbuffers_next ]);
}

ctx->takenbuffers[ ctx->takenbuffers_next ] = buf;
ctx->takenbuffers_next = (ctx->takenbuffers_next + 1) % ctx->takenbuffers_size;
}
#endif

#if PG_VERSION_NUM >= 130000
CheckMem(work_mem,
Expand All @@ -666,7 +670,6 @@ void *ldb_wal_index_node_retriever(void *ctxp, unsigned long long id)
0,
"pinned more tuples during node retrieval than will fit in work_mem, cosider increasing work_mem");
#endif
// fa_cache_insert(&ctx->fa_cache, (uint32)id, nodepage->node);
return nodepage->node;
}

Expand Down
20 changes: 5 additions & 15 deletions src/hnsw/external_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@

#include "external_index_socket.h"
#include "extra_dirtied.h"
#include "fa_cache.h"
#include "hnsw.h"
#include "htab_cache.h"
#include "options.h"
#include "usearch.h"

Expand Down Expand Up @@ -83,29 +81,21 @@ typedef struct HnswIndexTuple

typedef struct
{
HTABCache block_numbers_cache;

Relation index_rel;

// used for inserts
HnswIndexHeaderPage *header_page_under_wal;

ExtraDirtiedBufs *extra_dirted;

FullyAssociativeCache fa_cache;

dlist_head takenbuffers;
} RetrieverCtx;

typedef struct
{
#if LANTERNDB_COPYNODES
char *buf;
char **takenbuffers;
#else
Buffer buf;
Buffer *takenbuffers;
#endif
dlist_node node;
} BufferNode;
uint32 takenbuffers_size;
uint32 takenbuffers_next;
} RetrieverCtx;

typedef struct
{
Expand Down
2 changes: 1 addition & 1 deletion src/hnsw/insert.c
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ bool ldb_aminsert(Relation index,
opts.quantization = usearch_scalar_b1_k;
usearch_scalar = usearch_scalar_b1_k;
}
opts.retriever_ctx = ldb_wal_retriever_area_init(index, hdr);
opts.retriever_ctx = ldb_wal_retriever_area_init(index, hdr, hdr->m);
opts.retriever = ldb_wal_index_node_retriever;
opts.retriever_mut = ldb_wal_index_node_retriever_mut;

Expand Down
60 changes: 21 additions & 39 deletions src/hnsw/retriever.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,48 @@

#include <assert.h>
#include <common/relpath.h>
#include <pg_config.h> // BLCKSZ
#include <storage/bufmgr.h> // Buffer
#include <utils/hsearch.h>
#include <utils/relcache.h>

#include "external_index.h"
#include "htab_cache.h"
#include "insert.h"

RetrieverCtx *ldb_wal_retriever_area_init(Relation index_rel, HnswIndexHeaderPage *header_page_under_wal)
RetrieverCtx *ldb_wal_retriever_area_init(Relation index_rel, HnswIndexHeaderPage *header_page_under_wal, uint32 m)
{
RetrieverCtx *ctx = palloc0(sizeof(RetrieverCtx));
ctx->index_rel = index_rel;
ctx->header_page_under_wal = header_page_under_wal;
ctx->extra_dirted = extra_dirtied_new();

fa_cache_init(&ctx->fa_cache);

dlist_init(&ctx->takenbuffers);

/* fill in a buffer with blockno index information, before spilling it to disk */
ctx->block_numbers_cache = cache_create("BlockNumberCache");
ctx->takenbuffers_size = m * 5;
ctx->takenbuffers_next = 0;
#if LANTERNDB_COPYNODES
ctx->takenbuffers = palloc0(sizeof(char *) * ctx->takenbuffers_size);
#else
ctx->takenbuffers = palloc0(sizeof(Buffer) * ctx->takenbuffers_size);
#endif

return ctx;
}

void ldb_wal_retriever_area_reset(RetrieverCtx *ctx)
{
dlist_mutable_iter miter;
dlist_foreach_modify(miter, &ctx->takenbuffers)
{
BufferNode *node = dlist_container(BufferNode, node, miter.cur);
if(node->buf != InvalidBuffer) {
ReleaseBuffer(node->buf);
for(uint32 i = 0; i < ctx->takenbuffers_size; i++) {
if(ctx->takenbuffers[ i ]) {
#if LANTERNDB_COPYNODES
pfree(ctx->takenbuffers[ i ]);
ctx->takenbuffers[ i ] = NULL;
#else
ReleaseBuffer(ctx->takenbuffers[ i ]);
ctx->takenbuffers[ i ] = 0;
#endif
}
dlist_delete(miter.cur);
pfree(node);
}
dlist_init(&ctx->takenbuffers);

fa_cache_init(&ctx->fa_cache);
ctx->takenbuffers_next = 0;
}

void ldb_wal_retriever_area_fini(RetrieverCtx *ctx)
{
cache_destroy(&ctx->block_numbers_cache);
dlist_mutable_iter miter;
dlist_foreach_modify(miter, &ctx->takenbuffers)
{
BufferNode *node = dlist_container(BufferNode, node, miter.cur);
#if LANTERNDB_COPYNODES
pfree(node->buf);
#else
if(node->buf != InvalidBuffer) {
ReleaseBuffer(node->buf);
}
#endif
dlist_delete(miter.cur);
pfree(node);
}
dlist_init(&ctx->takenbuffers);

ldb_wal_retriever_area_reset(ctx);
pfree(ctx->takenbuffers);
extra_dirtied_free(ctx->extra_dirted);
}
2 changes: 1 addition & 1 deletion src/hnsw/retriever.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

// this area is used to return pointers back to usearch

RetrieverCtx* ldb_wal_retriever_area_init(Relation index_rel, HnswIndexHeaderPage* header_page_under_wal);
RetrieverCtx* ldb_wal_retriever_area_init(Relation index_rel, HnswIndexHeaderPage* header_page_under_wal, uint32 m);
// can be used after each usearch_search to tell the retriever that the pointers given out
// will no longer be used
void ldb_wal_retriever_area_reset(RetrieverCtx* ctx);
Expand Down
5 changes: 3 additions & 2 deletions src/hnsw/scan.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,10 @@ IndexScanDesc ldb_ambeginscan(Relation index, int nkeys, int norderbys)
int dimensions;
usearch_error_t error = NULL;
usearch_init_options_t opts;
RetrieverCtx *retriever_ctx;

(void)CheckExtensionVersions();

RetrieverCtx *retriever_ctx = ldb_wal_retriever_area_init(index, NULL);

scan = RelationGetIndexScan(index, nkeys, norderbys);

// ** initialize usearch data structures and set up external retriever
Expand All @@ -52,6 +51,7 @@ IndexScanDesc ldb_ambeginscan(Relation index, int nkeys, int norderbys)
headerp = (HnswIndexHeaderPage *)PageGetContents(page);
assert(headerp->magicNumber == LDB_WAL_MAGIC_NUMBER);

retriever_ctx = ldb_wal_retriever_area_init(index, NULL, headerp->m);
// Initialize usearch index options based on params stored in our index header
dimensions = headerp->vector_dim;

Expand Down Expand Up @@ -279,6 +279,7 @@ bool ldb_amgettuple(IndexScanDesc scan, ScanDirection dir)
scanstate->labels,
scanstate->distances,
&error);

ldb_wal_retriever_area_reset(scanstate->retriever_ctx);

scanstate->count = num_returned;
Expand Down
Loading