Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -1452,7 +1452,7 @@ int loadSingleAppendOnlyFile(char *filename) {

if (fseek(fp, 0, SEEK_SET) == -1) goto readerr;
rioInitWithFile(&rdb, fp);
if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != C_OK) {
if (rdbLoadRio(&rdb, RDBFLAGS_AOF_PREAMBLE, NULL) != RDB_OK) {
if (old_style)
serverLog(LL_WARNING, "Error reading the RDB preamble of the AOF file %s, AOF loading aborted",
filename);
Expand Down
4 changes: 3 additions & 1 deletion src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,9 @@ void debugCommand(client *c) {
/* The default behavior is to remove the current dataset from
* memory before loading the RDB file, however when MERGE is
* used together with NOFLUSH, we are able to merge two datasets. */
if (flush) emptyData(-1, EMPTYDB_NO_FLAGS, NULL);
if (flush) {
flags |= RDBFLAGS_EMPTY_DATA;
}

protectClient(c);
int ret = rdbLoad(server.rdb_filename, NULL, flags);
Expand Down
4 changes: 1 addition & 3 deletions src/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -13286,16 +13286,14 @@ int VM_RdbLoad(ValkeyModuleCtx *ctx, ValkeyModuleRdbStream *stream, int flags) {
* will prevent COW memory issue. */
if (server.child_type == CHILD_TYPE_RDB) killRDBChild();

emptyData(-1, EMPTYDB_NO_FLAGS, NULL);

/* rdbLoad() can go back to the networking and process network events. If
* VM_RdbLoad() is called inside a command callback, we don't want to
* process the current client. Otherwise, we may free the client or try to
* process next message while we are already in the command callback. */
if (server.current_client) protectClient(server.current_client);

serverAssert(stream->type == VALKEYMODULE_RDB_STREAM_FILE);
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_NONE);
int ret = rdbLoad(stream->data.filename, NULL, RDBFLAGS_EMPTY_DATA);

if (server.current_client) unprotectClient(server.current_client);

Expand Down
48 changes: 32 additions & 16 deletions src/rdb.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ extern int rdbCheckMode;
void rdbCheckError(const char *fmt, ...);
void rdbCheckSetError(const char *fmt, ...);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
void replicationEmptyDbCallback(hashtable *ht);

/* Returns true if the RDB version is valid and accepted, false otherwise. This
* function takes configuration into account. The parameter `is_valkey_magic`
Expand Down Expand Up @@ -3074,8 +3075,10 @@ int rdbLoadRioWithLoadingCtxScopedRdb(rio *rdb, int rdbflags, rdbSaveInfo *rsi,
return retval;
}

/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned.
/* Load an RDB file from the rio stream 'rdb'. We return one of the following:
* - RDB_OK On success
* - RDB_INCOMPATIBLE If the RDB has an invalid signature or version
* - RDB_FAILED in all other failure cases
* The rdb_loading_ctx argument holds objects to which the rdb will be loaded to,
* currently it only allow to set db object and functionLibCtx to which the data
* will be loaded (in the future it might contains more such objects). */
Expand All @@ -3084,10 +3087,6 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
int type, rdbver;
uint64_t db_size = 0, expires_size = 0;
int should_expand_db = 0;
if (rdb_loading_ctx->dbarray[0] == NULL) {
rdb_loading_ctx->dbarray[0] = createDatabase(0);
}
serverDb *db = rdb_loading_ctx->dbarray[0];
char buf[1024];
int error;
long long empty_keys_skipped = 0;
Expand All @@ -3103,14 +3102,30 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
is_valkey_magic = true;
} else {
serverLog(LL_WARNING, "Wrong signature trying to load DB from file");
return C_ERR;
/* Signal to terminate the rdbLoad without clearing existing data */
return RDB_INCOMPATIBLE;
}
rdbver = atoi(buf + 6);
if (!rdbIsVersionAccepted(rdbver, is_valkey_magic, is_redis_magic)) {
serverLog(LL_WARNING, "Can't handle RDB format version %d", rdbver);
return C_ERR;
return RDB_INCOMPATIBLE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can look at rdb.tcl / replication.tcl to add integration tests

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed above, but testing this is challenging with the TCL framework. I've reproducible demo of the fix, and there exist tests that cover the code paths already. If anyone has any ideas for covering an incompatible RDB load during a full sync, I'd be open to suggesitons.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will look into adding a change to debug.c that allows us to hit this code path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We hit this code path also when loading an RDB from a file? Then we can test this instead of a full sync. We can copy a file into the right place and then call DEBUG RELOAD NOSAVE to load it. If the load fails because of bad signature or future version, we shouldn't flush the databases. Is this right?

There are some RDB files here, for example one with the hyptetical future RDB version 987: https://github.com/valkey-io/valkey/tree/unstable/tests/assets

}

/* Only empty data if RDBFLAGS_EMPTY_DATA is set */
if (rdbflags & RDBFLAGS_EMPTY_DATA) {
int empty_db_flags = server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
serverLog(LL_NOTICE, "RDB signature and version check passed. Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

/* functionsLibCtx is cleared when we call emptyData, reinitialize here. */
rdb_loading_ctx->functions_lib_ctx = functionsLibCtxGetCurrent();
}

if (rdb_loading_ctx->dbarray[0] == NULL) {
rdb_loading_ctx->dbarray[0] = createDatabase(0);
}
serverDb *db = rdb_loading_ctx->dbarray[0];

/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK();
Expand All @@ -3126,7 +3141,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
if (is_redis_magic && type >= RDB_FOREIGN_TYPE_MIN && type <= RDB_FOREIGN_TYPE_MAX) {
serverLog(LL_WARNING, "Can't handle foreign type or opcode %d in RDB with version %d",
type, rdbver);
return C_ERR;
return RDB_FAILED;
}

/* Handle special types. */
Expand Down Expand Up @@ -3407,7 +3422,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
} else if (error == RDB_LOAD_ERR_UNKNOWN_TYPE) {
sdsfree(key);
serverLog(LL_WARNING, "Unknown type or opcode when loading DB. Unrecoverable error, aborting now.");
return C_ERR;
return RDB_FAILED;
} else {
sdsfree(key);
goto eoferr;
Expand Down Expand Up @@ -3491,7 +3506,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
"got (%llx). Aborting now.",
(unsigned long long)expected, (unsigned long long)cksum);
rdbReportCorruptRDB("RDB CRC error");
return C_ERR;
return RDB_FAILED;
}
}
}
Expand All @@ -3503,7 +3518,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
serverLog(LL_NOTICE, "Done loading RDB, keys loaded: %lld, keys expired: %lld.",
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
}
return C_OK;
return RDB_OK;

/* Unexpected end of file is handled here calling rdbReportReadError():
* this will in turn either abort the server in most cases, or if we are loading
Expand All @@ -3512,7 +3527,7 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
eoferr:
serverLog(LL_WARNING, "Short read or OOM loading DB. Unrecoverable error, aborting now.");
rdbReportReadError("Unexpected EOF reading RDB file");
return C_ERR;
return RDB_FAILED;
}

/* Like rdbLoadRio() but takes a filename instead of a rio stream. The
Expand Down Expand Up @@ -3545,14 +3560,15 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi, int rdbflags) {
retval = rdbLoadRio(&rdb, rdbflags, rsi);

fclose(fp);
stopLoading(retval == C_OK);
stopLoading(retval == RDB_OK);
/* Reclaim the cache backed by rdb */
if (retval == C_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
if (retval == RDB_OK && !(rdbflags & RDBFLAGS_KEEP_CACHE)) {
/* TODO: maybe we could combine the fopen and open into one in the future */
rdb_fd = open(filename, O_RDONLY);
if (rdb_fd >= 0) bioCreateCloseJob(rdb_fd, 0, 1);
}
return (retval == C_OK) ? RDB_OK : RDB_FAILED;

return retval;
}

/* A background saving child (BGSAVE) terminated its work. Handle this.
Expand Down
1 change: 1 addition & 0 deletions src/rdb.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ typedef struct rdbSnapshotOptions {
#define RDBFLAGS_ALLOW_DUP (1 << 2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1 << 3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1 << 4) /* Don't reclaim cache after rdb file is generated */
#define RDBFLAGS_EMPTY_DATA (1 << 5) /* Flush the database after validating magic and rdb version*/

/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */
Expand Down
49 changes: 28 additions & 21 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -1986,8 +1986,8 @@ void replicationSendNewlineToPrimary(void) {
/* Callback used by emptyData() while flushing away old data to load
* the new dataset received by the primary and by discardTempDb()
* after loading succeeded or failed. */
void replicationEmptyDbCallback(hashtable *d) {
UNUSED(d);
void replicationEmptyDbCallback(hashtable *ht) {
UNUSED(ht);
if (server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToPrimary();
}

Expand Down Expand Up @@ -2280,11 +2280,6 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
/* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
}
Expand All @@ -2301,7 +2296,10 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
if (replicationSupportSkipRDBChecksum(conn, 1, *usemark)) rdb.flags |= RIO_FLAG_SKIP_RDB_CHECKSUM;
int loadingFailed = 0;
rdbLoadingCtx loadingCtx = {.dbarray = dbarray, .functions_lib_ctx = functions_lib_ctx};
if (rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION, rsi, &loadingCtx) != C_OK) {
/* If we aren't using the swapdb method, then we want to empty the data before loading the rdb */
int empty_data_flag = server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB ? RDBFLAGS_EMPTY_DATA : RDBFLAGS_NONE;
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION | empty_data_flag, rsi, &loadingCtx);
Comment on lines +2300 to +2301
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this logic look simpler if we use a flags variable and conditionally add the EMPTY_DATA flag to it? I think so...

Suggested change
int empty_data_flag = server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB ? RDBFLAGS_EMPTY_DATA : RDBFLAGS_NONE;
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, RDBFLAGS_REPLICATION | empty_data_flag, rsi, &loadingCtx);
int flags = RDBFLAGS_REPLICATION;
if (server.repl_diskless_load != REPL_DISKLESS_LOAD_SWAPDB) flags |= RDBFLAGS_EMPTY_DATA;
int retval = rdbLoadRioWithLoadingCtxScopedRdb(&rdb, flags, rsi, &loadingCtx);

if (retval != RDB_OK) {
/* RDB loading failed. */
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization DB "
"from socket, check server logs.");
Expand All @@ -2327,9 +2325,14 @@ int replicaLoadPrimaryRDBFromSocket(connection *conn, char *buf, char *eofmark,
disklessLoadDiscardFunctionsLibCtx(temp_functions_lib_ctx);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
/* If we received RDB_INCOMPATIBLE, the old data was preserved */
if (retval == RDB_INCOMPATIBLE) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: RDB version or signature incompatible, old data preserved");
} else {
/* Remove the half-loaded data in case the load failed for other reasons. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bracket is mis-indented.

}

/* Note that there's no point in restarting the AOF on SYNC
Expand Down Expand Up @@ -2410,14 +2413,12 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Empty the databases only after the RDB file is ok, that is, before the RDB file
* is actually loaded, in case we encounter an error and drop the replication stream
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

/* We pass RDBFLAGS_EMPTY_DATA to call emptyData() after validating rdb compatability
* and before loading the data from the RDB */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
int retval = rdbLoad(server.rdb_filename, rsi, RDBFLAGS_REPLICATION | RDBFLAGS_EMPTY_DATA);

if (retval != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
if (server.rdb_del_sync_files && allPersistenceDisabled()) {
Expand All @@ -2427,9 +2428,15 @@ int replicaLoadPrimaryRDBFromDisk(rdbSaveInfo *rsi) {
bg_unlink(server.rdb_filename);
}

/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
/* If RDB failed compatibility check, we did not load the new data set or flush our old data. */
if (retval == RDB_INCOMPATIBLE) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Skipping flush, no new data was loaded.");
} else {
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}


/* Note that there's no point in restarting the AOF on sync failure,
* it'll be restarted when sync succeeds or replica promoted. */
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define RDB_OK 0
#define RDB_NOT_EXIST 1 /* RDB file doesn't exist. */
#define RDB_FAILED 2 /* Failed to load the RDB file. */
#define RDB_INCOMPATIBLE 3 /* RDB version or signature is not compatible */

/* Command doc flags */
#define CMD_DOC_NONE 0
Expand Down