From c9897eabc355ffdbc2890a2afff1c073a9066d2a Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Wed, 10 Sep 2025 23:34:55 +0000 Subject: [PATCH 01/11] Only call emptyData after RDB validation Signed-off-by: Venkat Pamulapati --- src/rdb.c | 21 +++++++++++++++++++-- src/rdb.h | 2 ++ src/replication.c | 31 +++++++++++++++++-------------- src/server.h | 1 + 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index d3585f92a0..d097702223 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3074,6 +3074,15 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, return retval; } +//ven Temporary +/* Callback used by emptyData() while flushing away old data to load + * the new dataset received by the primary and by discardTempDb() + * after loading succeeded or failed. */ +void replicationEmptyDbCallback(hashtable *d) { + UNUSED(d); + if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); +} + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned. * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, @@ -3103,12 +3112,20 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin is_valkey_magic = true; } else { serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); - return C_ERR; + // Return this error so we know to terminate the sync gracefully without emptyData() + return RDB_INCOMPATIBLE; } rdbver = atoi(buf + 6); if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) { serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver); - return C_ERR; + return RDB_INCOMPATIBLE; + } + + // Only empty data if empty data flag is set + if (rdbflags & RDBFLAGS_EMPTY_DATA) { + int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + serverLog(LL_NOTICE, "RDB compatability check complete, flushing data %d", rdbver); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } /* Key-specific attributes, set by opcodes before the key type. */ diff --git a/src/rdb.h b/src/rdb.h index 0b76b1a416..fe947db7f8 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -176,6 +176,7 @@ typedef struct rdbSnapshotOptions { #define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/ #define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/ #define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */ +#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/ /* When rdbLoadObject() returns NULL, the err flag is * set to hold the type of error that occurred */ @@ -223,5 +224,6 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); int saveSnapshotToConnectionSockets(rdbSnapshotOptions options); +void replicationEmptyDbCallback(hashtable *d); #endif diff --git a/src/replication.c b/src/replication.c index 4ee619ebc1..2bf45e963a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1983,13 +1983,6 @@ void replicationSendNewlineToPrimary(void) { } } -/* Callback used by emptyData() while flushing away old data to load - * the new dataset received by the primary and by discardTempDb() - * after loading succeeded or failed. */ -void replicationEmptyDbCallback(hashtable *d) { - UNUSED(d); - if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); -} /* Once we have a link with the primary and the synchronization was * performed, this function materializes the primary client we store @@ -2281,8 +2274,9 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); + //(ven) we move this logic to later /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + // serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); dbarray = server.db; @@ -2410,14 +2404,17 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); + //(ven) we want to call this from within the rioload function /* Empty the databases only after the RDB file is ok, that is, before the RDB file * is actually loaded, in case we encounter an error and drop the replication stream * and leave an empty database. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + // serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + // emptyData(-1, empty_db_flags, replicationEmptyDbCallback); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); - if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) { + int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA); + + if (retval != RDB_OK) { serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization " "DB from disk, check server logs."); if (server.rdb_del_sync_files && allPersistenceDisabled()) { @@ -2427,9 +2424,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { bg_unlink(server.rdb_filename); } - /* If disk-based RDB loading fails, remove the half-loaded dataset. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + //(ven) new case for handling when the rdbs were incompatible, so no data was loaded + if (retval == RDB_INCOMPATIBLE) { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded."); + } else { + /* If disk-based RDB loading fails, remove the half-loaded dataset. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + } + /* Note that there's no point in restarting the AOF on sync failure, * it'll be restarted when sync succeeds or replica promoted. */ diff --git a/src/server.h b/src/server.h index 23c8511cfa..a956ca936e 100644 --- a/src/server.h +++ b/src/server.h @@ -349,6 +349,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define RDB_OK 0 #define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */ #define RDB_FAILED 2 /* Failed to load the RDB file. */ +#define RDB_INCOMPATIBLE 3/* RDB is incompatible with this version */ /* Command doc flags */ #define CMD_DOC_NONE 0 From f937c88e74d32a33f9a40f9260fb60128676a4ad Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Thu, 11 Sep 2025 17:40:33 +0000 Subject: [PATCH 02/11] move callback to server.h Signed-off-by: Venkat Pamulapati --- src/rdb.c | 10 +--------- src/replication.c | 19 ++++++++++--------- src/server.h | 1 + 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index d097702223..2bba0da262 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3074,14 +3074,6 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, return retval; } -//ven Temporary -/* Callback used by emptyData() while flushing away old data to load - * the new dataset received by the primary and by discardTempDb() - * after loading succeeded or failed. */ -void replicationEmptyDbCallback(hashtable *d) { - UNUSED(d); - if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); -} /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned. @@ -3124,7 +3116,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin // Only empty data if empty data flag is set if (rdbflags & RDBFLAGS_EMPTY_DATA) { int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; - serverLog(LL_NOTICE, "RDB compatability check complete, flushing data %d", rdbver); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); } diff --git a/src/replication.c b/src/replication.c index 2bf45e963a..0aebb09f7c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1983,6 +1983,13 @@ void replicationSendNewlineToPrimary(void) { } } +/* Callback used by emptyData() while flushing away old data to load + * the new dataset received by the primary and by discardTempDb() + * after loading succeeded or failed. */ +void replicationEmptyDbCallback(hashtable *d) { + UNUSED(d); + if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); +} /* Once we have a link with the primary and the synchronization was * performed, this function materializes the primary client we store @@ -2274,7 +2281,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - //(ven) we move this logic to later /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ // serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); @@ -2404,13 +2410,8 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - //(ven) we want to call this from within the rioload function - /* Empty the databases only after the RDB file is ok, that is, before the RDB file - * is actually loaded, in case we encounter an error and drop the replication stream - * and leave an empty database. */ - // serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - // emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - + /* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatbility + * and before loading the data from the RDB */ serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA); @@ -2424,7 +2425,7 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { bg_unlink(server.rdb_filename); } - //(ven) new case for handling when the rdbs were incompatible, so no data was loaded + /* If RDB failed compatability check, we did not load the new data set or flush our old data. */ if (retval == RDB_INCOMPATIBLE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded."); } else { diff --git a/src/server.h b/src/server.h index a956ca936e..860630dbf4 100644 --- a/src/server.h +++ b/src/server.h @@ -3082,6 +3082,7 @@ void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); int replicationCountAOFAcksByOffset(long long offset); void replicationSendNewlineToPrimary(void); +void replicationEmptyDbCallback(hashtable *d); long long replicationGetReplicaOffset(void); char *replicationGetReplicaName(client *c); long long getPsyncInitialOffset(void); From 011ca4d2b71a0002b630bfbe6584dd6d6f39337e Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Thu, 11 Sep 2025 18:50:43 +0000 Subject: [PATCH 03/11] initialize contexts after clearing, fixes failed test Signed-off-by: Venkat Pamulapati --- src/rdb.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 2bba0da262..84015a90e9 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3085,10 +3085,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int type, rdbver; uint64_t db_size = 0, expires_size = 0; int should_expand_db = 0; - if (rdb_loading_ctx->dbarray[0] == NULL) { - rdb_loading_ctx->dbarray[0] = createDatabase(0); - } - serverDb *db = rdb_loading_ctx->dbarray[0]; + char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -3118,8 +3115,16 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + + // Reinitialize rdbloadingcontext + rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent(); } + if (rdb_loading_ctx->dbarray[0] == NULL) { + rdb_loading_ctx->dbarray[0] = createDatabase(0); + } + serverDb *db = rdb_loading_ctx->dbarray[0]; + /* Key-specific attributes, set by opcodes before the key type. */ long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_clock = LRU_CLOCK(); From d81b62cd342bd4527678c8d44e37e09eaa5f59ac Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Thu, 11 Sep 2025 19:06:37 +0000 Subject: [PATCH 04/11] Uncomment Signed-off-by: Venkat Pamulapati --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 0aebb09f7c..43b058e621 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2282,7 +2282,7 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, replicationAttachToNewPrimary(); /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ - // serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); dbarray = server.db; From 58b2ae2fd2d5e4e5d07861aa7e243be7f90f3f0e Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Thu, 11 Sep 2025 21:40:45 +0000 Subject: [PATCH 05/11] Adding support to other rdbLoad calls Signed-off-by: Venkat Pamulapati --- src/debug.c | 4 +++- src/module.c | 4 +--- src/replication.c | 26 +++++++++++++++----------- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/debug.c b/src/debug.c index 56dbdfbb20..8236b6ea29 100644 --- a/src/debug.c +++ b/src/debug.c @@ -585,7 +585,9 @@ void debugCommand(client *c) { /* The default behavior is to remove the current dataset from * memory before loading the RDB file, however when MERGE is * used together with NOFLUSH, we are able to merge two datasets. */ - if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL); + if (flush) { + flags = flags | RDBFLAGS_EMPTY_DATA; + } protectClient(c); int ret = rdbLoad(server.rdb_filename, NULL, flags); diff --git a/src/module.c b/src/module.c index 18a477cd3e..27ef18f3db 100644 --- a/src/module.c +++ b/src/module.c @@ -13286,8 +13286,6 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { * will prevent COW memory issue. */ if (server.child_type == CHILD_TYPE_RDB) killRDBChild(); - emptyData(-1, EMPTYDB_NO_FLAGS, NULL); - /* rdbLoad() can go back to the networking and process network events. If * VM_RdbLoad() is called inside a command callback, we don't want to * process the current client. Otherwise, we may free the client or try to @@ -13295,7 +13293,7 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) { if (server.current_client) protectClient(server.current_client); serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE); - int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE); + int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA); if (server.current_client) unprotectClient(server.current_client); diff --git a/src/replication.c b/src/replication.c index 43b058e621..109ddec2b0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2280,13 +2280,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, /* We will soon start loading the RDB from socket, the replication history is changed, * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - - /* Even though we are on-empty-db and the database is empty, we still call emptyData. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - - dbarray = server.db; - functions_lib_ctx = functionsLibCtxGetCurrent(); } rioInitWithConn(&rdb, conn, server.repl_transfer_size); @@ -2300,12 +2293,19 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; int loadingFailed = 0; + int dataFlushed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) { + + // If we aren't using the swapdb method, then we want to empty the data before loading the rdb + int empty_data_flag = server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB ? RDBFLAGS_EMPTY_DATA : RDBFLAGS_NONE; + int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION | empty_data_flag, rsi, &loadingCtx); + if (retval != C_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); loadingFailed = 1; + // If we received RDB_INCOMPATIBLE, the old data was preserved + dataFlushed = retval != RDB_INCOMPATIBLE; } else if (*usemark) { /* Verify the end mark is correct. */ if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { @@ -2327,9 +2327,13 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { - /* Remove the half-loaded data in case we started with an empty replica. */ - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); - emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + if (!dataFlushed) { + /* Remove the half-loaded data in case we started with an empty replica. */ + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); + emptyData(-1, empty_db_flags, replicationEmptyDbCallback); + } else { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: No data was loaded, skipping discard step"); + } } /* Note that there's no point in restarting the AOF on SYNC From 15047493975c7716da17a50c730b92e8ae3b7392 Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Thu, 11 Sep 2025 21:40:45 +0000 Subject: [PATCH 06/11] Adding support to other rdbLoad calls Signed-off-by: Venkat Pamulapati --- src/replication.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/replication.c b/src/replication.c index 109ddec2b0..6660db011c 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2280,6 +2280,8 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, /* We will soon start loading the RDB from socket, the replication history is changed, * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); + dbarray = server.db; + functions_lib_ctx = functionsLibCtxGetCurrent(); } rioInitWithConn(&rdb, conn, server.repl_transfer_size); From 5d7a369e87411fdff7472187bd897ad26c89b514 Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Fri, 12 Sep 2025 10:50:50 +0000 Subject: [PATCH 07/11] Clean up + fix return code Signed-off-by: Venkat Pamulapati --- src/debug.c | 2 +- src/rdb.c | 15 ++++-- src/replication.c | 17 +++---- .../integration/cross-version-replication.tcl | 49 +++++++++++++++++++ 4 files changed, 67 insertions(+), 16 deletions(-) diff --git a/src/debug.c b/src/debug.c index 8236b6ea29..4b508ee372 100644 --- a/src/debug.c +++ b/src/debug.c @@ -586,7 +586,7 @@ void debugCommand(client *c) { * memory before loading the RDB file, however when MERGE is * used together with NOFLUSH, we are able to merge two datasets. */ if (flush) { - flags = flags | RDBFLAGS_EMPTY_DATA; + flags |= RDBFLAGS_EMPTY_DATA; } protectClient(c); diff --git a/src/rdb.c b/src/rdb.c index 84015a90e9..3f1acfb493 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3074,7 +3074,6 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, return retval; } - /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned. * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, @@ -3085,7 +3084,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin int type, rdbver; uint64_t db_size = 0, expires_size = 0; int should_expand_db = 0; - char buf[1024]; int error; long long empty_keys_skipped = 0; @@ -3101,7 +3099,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin is_valkey_magic = true; } else { serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); - // Return this error so we know to terminate the sync gracefully without emptyData() + /* Signal to terminate gracefully without clearing existing data */ return RDB_INCOMPATIBLE; } rdbver = atoi(buf + 6); @@ -3116,7 +3114,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - // Reinitialize rdbloadingcontext + /* functionsLibCtx is cleared when we call emptyData, reinitialize here. */ rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent(); } @@ -3566,7 +3564,14 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { rdb_fd = open(filename, O_RDONLY); if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1); } - return (retval == C_OK) ? RDB_OK : RDB_FAILED; + + if (retval == C_OK) { + return RDB_OK; + } else if (retval == RDB_INCOMPATIBLE) { + return RDB_INCOMPATIBLE; + } else { + return RDB_FAILED; + } } /* A background saving child (BGSAVE) terminated its work. Handle this. diff --git a/src/replication.c b/src/replication.c index 6660db011c..bf01c1d42a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2295,10 +2295,8 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading); if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM; int loadingFailed = 0; - int dataFlushed = 0; rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx}; - - // If we aren't using the swapdb method, then we want to empty the data before loading the rdb + /* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */ int empty_data_flag = server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB ? RDBFLAGS_EMPTY_DATA : RDBFLAGS_NONE; int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION | empty_data_flag, rsi, &loadingCtx); if (retval != C_OK) { @@ -2306,8 +2304,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); loadingFailed = 1; - // If we received RDB_INCOMPATIBLE, the old data was preserved - dataFlushed = retval != RDB_INCOMPATIBLE; } else if (*usemark) { /* Verify the end mark is correct. */ if (!rioRead(&rdb, buf, RDB_EOF_MARK_SIZE) || memcmp(buf, eofmark, RDB_EOF_MARK_SIZE) != 0) { @@ -2329,13 +2325,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx); serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background"); } else { - if (!dataFlushed) { - /* Remove the half-loaded data in case we started with an empty replica. */ + /* If we received RDB_INCOMPATIBLE, the old data was preserved */ + if (retval == RDB_INCOMPATIBLE) { + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: No data was loaded, skipping discard step"); + } else { + /* Remove the half-loaded data in case the load failed for other reasons. */ serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); - } else { - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: No data was loaded, skipping discard step"); - } + } } /* Note that there's no point in restarting the AOF on SYNC diff --git a/tests/integration/cross-version-replication.tcl b/tests/integration/cross-version-replication.tcl index 47329ee5aa..1e3abf6988 100644 --- a/tests/integration/cross-version-replication.tcl +++ b/tests/integration/cross-version-replication.tcl @@ -32,3 +32,52 @@ start_server {tags {"repl needs:other-server external:skip compatible-redis"} st } } } + +proc log_file_matches {log pattern} { + set fp [open $log r] + set content [read $fp] + close $fp + string match $pattern $content +} + +start_server {tags {"repl external:skip"}} { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + set replica_log [srv 0 stdout] + + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + test "Replica data gets flushed during successful full sync" { + # Force full sync + $master config set repl-backlog-size 1 + + # Populate replica with initial data + $replica set existing_key "original_value" + assert_equal [$replica dbsize] 1 + + # Populate master with different data + $master set master_key "master_value" + + # Start replication - should succeed and flush replica data + $replica replicaof $master_host $master_port + + # Wait for successful sync + wait_for_condition 50 100 { + [log_file_matches $replica_log "*PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data*"] + } else { + puts "Replica log contents:" + puts [exec cat $replica_log] + fail "Expected successful RDB sync not detected" + } + + # Verify replica data was flushed and replaced with master data + assert_equal [$replica get existing_key] "" + assert_equal [$replica get master_key] "master_value" + assert_equal [$replica dbsize] 1 + } + } +} \ No newline at end of file From e0f95e190f61eea8d55f9c88f02a9fcdd0a39fc2 Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Fri, 12 Sep 2025 10:55:05 +0000 Subject: [PATCH 08/11] Remove redundant test Signed-off-by: Venkat Pamulapati --- .../integration/cross-version-replication.tcl | 49 ------------------- 1 file changed, 49 deletions(-) diff --git a/tests/integration/cross-version-replication.tcl b/tests/integration/cross-version-replication.tcl index 1e3abf6988..47329ee5aa 100644 --- a/tests/integration/cross-version-replication.tcl +++ b/tests/integration/cross-version-replication.tcl @@ -32,52 +32,3 @@ start_server {tags {"repl needs:other-server external:skip compatible-redis"} st } } } - -proc log_file_matches {log pattern} { - set fp [open $log r] - set content [read $fp] - close $fp - string match $pattern $content -} - -start_server {tags {"repl external:skip"}} { - set replica [srv 0 client] - set replica_host [srv 0 host] - set replica_port [srv 0 port] - set replica_log [srv 0 stdout] - - start_server {} { - set master [srv 0 client] - set master_host [srv 0 host] - set master_port [srv 0 port] - - test "Replica data gets flushed during successful full sync" { - # Force full sync - $master config set repl-backlog-size 1 - - # Populate replica with initial data - $replica set existing_key "original_value" - assert_equal [$replica dbsize] 1 - - # Populate master with different data - $master set master_key "master_value" - - # Start replication - should succeed and flush replica data - $replica replicaof $master_host $master_port - - # Wait for successful sync - wait_for_condition 50 100 { - [log_file_matches $replica_log "*PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data*"] - } else { - puts "Replica log contents:" - puts [exec cat $replica_log] - fail "Expected successful RDB sync not detected" - } - - # Verify replica data was flushed and replaced with master data - assert_equal [$replica get existing_key] "" - assert_equal [$replica get master_key] "master_value" - assert_equal [$replica dbsize] 1 - } - } -} \ No newline at end of file From 744e03b2d19ccbada58a69d35422c218409cd770 Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Fri, 12 Sep 2025 22:37:55 +0000 Subject: [PATCH 09/11] Refactor return types on rdbLoadRioWithLoadingContext Signed-off-by: Venkat Pamulapati --- src/aof.c | 2 +- src/rdb.c | 33 +++++++++++++++------------------ src/replication.c | 2 +- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/aof.c b/src/aof.c index b88c60ffe3..d5b48cfe45 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1452,7 +1452,7 @@ int loadSingleAppendOnlyFile(char *filename) { if (fseek(fp, 0, SEEK_SET) == -1) goto readerr; rioInitWithFile(&rdb, fp); - if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != C_OK) { + if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != RDB_OK) { if (old_style) serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted", filename); diff --git a/src/rdb.c b/src/rdb.c index 3f1acfb493..34d6826087 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3074,8 +3074,11 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, return retval; } -/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, - * otherwise C_ERR is returned. +/* Load an RDB file from the rio stream 'rdb'. We return one of the following: + * - RDB_OK On success + * - RDB_INCOMPATIBLE If the RDB is has an invalid signature or version + * - RDB_FAILED + * if the RDB is has an invalid signature or version we return * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, * currently it only allow to set db object and functionLibCtx to which the data * will be loaded (in the future it might contains more such objects). */ @@ -3108,10 +3111,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin return RDB_INCOMPATIBLE; } - // Only empty data if empty data flag is set + /* Only empty data if RDBFLAGS_EMPTY_DATA is set */ if (rdbflags & RDBFLAGS_EMPTY_DATA) { int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB compatability check passed. Flushing old data"); + serverLog(LL_NOTICE, "RDB compatability check passed. Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); /* functionsLibCtx is cleared when we call emptyData, reinitialize here. */ @@ -3138,7 +3141,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) { serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d", type, rdbver); - return C_ERR; + return RDB_FAILED; } /* Handle special types. */ @@ -3419,7 +3422,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin } else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) { sdsfree(key); serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now."); - return C_ERR; + return RDB_FAILED; } else { sdsfree(key); goto eoferr; @@ -3503,7 +3506,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin "got (%llx). Aborting now.", (unsigned long long)expected, (unsigned long long)cksum); rdbReportCorruptRDB("RDB CRC error"); - return C_ERR; + return RDB_FAILED; } } } @@ -3515,7 +3518,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.", server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired); } - return C_OK; + return RDB_OK; /* Unexpected end of file is handled here calling rdbReportReadError(): * this will in turn either abort the server in most cases, or if we are loading @@ -3524,7 +3527,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin eoferr: serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbReportReadError("Unexpected EOF reading RDB file"); - return C_ERR; + return RDB_FAILED; } /* Like rdbLoadRio() but takes a filename instead of a rio stream. The @@ -3557,21 +3560,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) { retval = rdbLoadRio(&rdb, rdbflags, rsi); fclose(fp); - stopLoading(retval == C_OK); + stopLoading(retval == RDB_OK); /* Reclaim the cache backed by rdb */ - if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { + if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) { /* TODO: maybe we could combine the fopen and open into one in the future */ rdb_fd = open(filename, O_RDONLY); if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1); } - if (retval == C_OK) { - return RDB_OK; - } else if (retval == RDB_INCOMPATIBLE) { - return RDB_INCOMPATIBLE; - } else { - return RDB_FAILED; - } + return retval; } /* A background saving child (BGSAVE) terminated its work. Handle this. diff --git a/src/replication.c b/src/replication.c index bf01c1d42a..c49dfa6703 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2299,7 +2299,7 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, /* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */ int empty_data_flag = server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB ? RDBFLAGS_EMPTY_DATA : RDBFLAGS_NONE; int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION | empty_data_flag, rsi, &loadingCtx); - if (retval != C_OK) { + if (retval != RDB_OK) { /* RDB loading failed. */ serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB " "from socket, check server logs."); From cd70684b0ba04eebb2ad2b20d891808edb98f583 Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Mon, 15 Sep 2025 21:14:48 +0000 Subject: [PATCH 10/11] Updated comments/logging messages Signed-off-by: Venkat Pamulapati --- src/rdb.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 34d6826087..edd750a32a 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3102,7 +3102,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin is_valkey_magic = true; } else { serverLog(LL_WARNING, "Wrong signature trying to load DB from file"); - /* Signal to terminate gracefully without clearing existing data */ + /* Signal to terminate the rdbLoad without clearing existing data */ return RDB_INCOMPATIBLE; } rdbver = atoi(buf + 6); @@ -3114,7 +3114,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin /* Only empty data if RDBFLAGS_EMPTY_DATA is set */ if (rdbflags & RDBFLAGS_EMPTY_DATA) { int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; - serverLog(LL_NOTICE, "RDB compatability check passed. Flushing old data"); + serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data"); emptyData(-1, empty_db_flags, replicationEmptyDbCallback); /* functionsLibCtx is cleared when we call emptyData, reinitialize here. */ From 2d5a96ca85f889c1e5974c2a1bdc27a6305d072d Mon Sep 17 00:00:00 2001 From: Venkat Pamulapati Date: Mon, 22 Sep 2025 23:58:03 +0000 Subject: [PATCH 11/11] Updated comments and removed unecessary decalaration in server.h Signed-off-by: Venkat Pamulapati --- src/rdb.c | 6 +++--- src/rdb.h | 1 - src/replication.c | 10 +++++----- src/server.h | 3 +-- 4 files changed, 9 insertions(+), 11 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index edd750a32a..0950ea7918 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -75,6 +75,7 @@ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); +void replicationEmptyDbCallback(hashtable *ht); /* Returns true if the RDB version is valid and accepted, false otherwise. This * function takes configuration into account. The parameter `is_valkey_magic` @@ -3076,9 +3077,8 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi, /* Load an RDB file from the rio stream 'rdb'. We return one of the following: * - RDB_OK On success - * - RDB_INCOMPATIBLE If the RDB is has an invalid signature or version - * - RDB_FAILED - * if the RDB is has an invalid signature or version we return + * - RDB_INCOMPATIBLE If the RDB has an invalid signature or version + * - RDB_FAILED in all other failure cases * The rdb_loading_ctx argument holds objects to which the rdb will be loaded to, * currently it only allow to set db object and functionLibCtx to which the data * will be loaded (in the future it might contains more such objects). */ diff --git a/src/rdb.h b/src/rdb.h index fe947db7f8..4c9a32a462 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -224,6 +224,5 @@ int rdbSaveRio(int req, rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); ssize_t rdbSaveFunctions(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); int saveSnapshotToConnectionSockets(rdbSnapshotOptions options); -void replicationEmptyDbCallback(hashtable *d); #endif diff --git a/src/replication.c b/src/replication.c index c49dfa6703..a02d6be57d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1986,8 +1986,8 @@ void replicationSendNewlineToPrimary(void) { /* Callback used by emptyData() while flushing away old data to load * the new dataset received by the primary and by discardTempDb() * after loading succeeded or failed. */ -void replicationEmptyDbCallback(hashtable *d) { - UNUSED(d); +void replicationEmptyDbCallback(hashtable *ht) { + UNUSED(ht); if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary(); } @@ -2327,7 +2327,7 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark, } else { /* If we received RDB_INCOMPATIBLE, the old data was preserved */ if (retval == RDB_INCOMPATIBLE) { - serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: No data was loaded, skipping discard step"); + serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved"); } else { /* Remove the half-loaded data in case the load failed for other reasons. */ serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data"); @@ -2413,7 +2413,7 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { * we must discard the cached primary structure and force resync of sub-replicas. */ replicationAttachToNewPrimary(); - /* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatbility + /* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatability * and before loading the data from the RDB */ serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory"); int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA); @@ -2428,7 +2428,7 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) { bg_unlink(server.rdb_filename); } - /* If RDB failed compatability check, we did not load the new data set or flush our old data. */ + /* If RDB failed compatibility check, we did not load the new data set or flush our old data. */ if (retval == RDB_INCOMPATIBLE) { serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded."); } else { diff --git a/src/server.h b/src/server.h index 860630dbf4..3a8054ad36 100644 --- a/src/server.h +++ b/src/server.h @@ -349,7 +349,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define RDB_OK 0 #define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */ #define RDB_FAILED 2 /* Failed to load the RDB file. */ -#define RDB_INCOMPATIBLE 3/* RDB is incompatible with this version */ +#define RDB_INCOMPATIBLE 3 /* RDB version or signature is not compatible */ /* Command doc flags */ #define CMD_DOC_NONE 0 @@ -3082,7 +3082,6 @@ void unblockClientWaitingReplicas(client *c); int replicationCountAcksByOffset(long long offset); int replicationCountAOFAcksByOffset(long long offset); void replicationSendNewlineToPrimary(void); -void replicationEmptyDbCallback(hashtable *d); long long replicationGetReplicaOffset(void); char *replicationGetReplicaName(client *c); long long getPsyncInitialOffset(void);