Doubled buckets. Part 2 (stray tcp)#646
Conversation
be90d04 to
2f96b14
Compare
83458b3 to
396cc20
Compare
396cc20 to
78cf3e9
Compare
78cf3e9 to
4ba14a1
Compare
c2ca05c to
2740f3a
Compare
5d3bd46 to
dbb998f
Compare
This patch fixes the flakiness of the `log_verbosity_2_2_test`. In Vshard some logs have a composite structure, for example "Error during recovery: <ERR>", where ERR can change over the time and have a different type. In rare situations it can lead to bugs when we use `wait_log_exactly_once` function, because it ignores the volatile part of the log. Now, we fix it by specifying a certain error's name which we expect in `wait_log_exactly_once` of `log_verbosity_2_2_test`. Needed for tarantool#214 NO_DOC=test
Before this patch we woke up the GC service instead of recovery service in `bucket_recovery_pause`. It could lead to a longer tests' execution time. Now, we fix it by changing `garbage_collector_wakeup` to `recovery_wakeup`. Needed for tarantool#214 NO_DOC=test
Before this patch the `bucket_move` and `bucket_wait_transfer` helper functions were used only in `storage_1_1_1_test`. However in future patches these helpers can also be applicable (e.g. in tarantoolgh-214). This patch moves `bucket_move` and `bucket_wait_transfer` into `vtest` module so that we can use it in other tests. Needed for tarantool#214 NO_TEST=test NO_DOC=test
In future patches (e.g. tarantoolgh-214) we will need to compare vclocks with different conditions. To avoid duplication of code we unify the process of vclocks' comparison by introducing the general `vclock_compare` function which can allow us to make different comparisons of vclocks by comparator. We move this function in `util` vshard module. Needed for tarantool#214 NO_DOC=internal
Before this patch the recovery service used functions, which determine if the bucket can be recovered, one by one in `recovery_step_by_type`. Since in further patches the number of bucket's statuses will be increased and the logic of recovery service will be more complex, it can lead to degradation of codebase readability. In order to fix it we decided to: 1) Used for-loop based checking instead of one by one based checking of buckets' recoverability. 2) Join the logic of saving recovered bucket ids and changing of `_bucket` space into one separate function - `recover_bucket_to_state`. Needed for tarantool#214 NO_TEST=refactoring NO_DOC=refactoring
In this patch we extract a block of code responsible for sending asynchronous requests and waiting corresponding responses from `replicaset_map_call` into the separate function - `replicas_map_call`. It was done because in the next patch - tarantoolgh-214 we will need to have a function which performs map-reduce along the masters of cluster. The `replicas_map_call` can be overused in it. Needed for tarantool#214 NO_TEST=refactoring NO_DOC=refactoring
dbb998f to
195fe13
Compare
Serpentian
left a comment
There was a problem hiding this comment.
First part of review, mostly whining about naming and comments, the only serious one from my point of view is boostrap.
| -- for the connection explicitly. | ||
| timeout, err, err_id = lreplicaset.wait_masters_connect( | ||
| replicasets, timeout) | ||
| replicasets, {timeout = timeout}) |
There was a problem hiding this comment.
There was a problem hiding this comment.
Yes, it sounds better, fixed
| -- Parallel call on all masters in the cluster. Fails if couldn't be done | ||
| -- on at least one master. | ||
| -- | ||
| local function cluster_map_master_call(replicasets, func, args, opts) |
There was a problem hiding this comment.
To hell with it, it's not critical, there are more important things worth spending time on. This one is already tested with more high level functions anyway
| -- are invalid. | ||
| is_bucket_protected = true, | ||
| -- Flag whether a newly elected master is synchronized with other | ||
| -- replicas in replicaset (that is it has the same vclock). It is |
There was a problem hiding this comment.
Nit:
that is it has the same vclock
I'd rather drop that part, it's misleading. We don't wait for vclock of the instances anymore, just the vclock of changes in _bucket.
| -- replicas in replicaset (that is it has the same vclock). It is | ||
| -- needed to prevent the doubled buckets in the cluster during | ||
| -- rebalancing and recovery process. | ||
| is_bucket_in_sync = false, |
There was a problem hiding this comment.
Nit: maybe are_buckets_in_sync or is_bucket_space_in_sync? The current one sounds a little bit awkward
There was a problem hiding this comment.
I like is_bucket_space_in_sync, but it might look bulky just a bit.
Vlad prefers the current name of this variable - #646 (comment).
There was a problem hiding this comment.
The motivation is that we already have is_bucket_protected. And some other members refer to bucket space without mentioning "space" explicitly. Like bucket_on_replace, bucket_on_truncate.
| is_bucket_in_sync = false, | ||
| -- Number of transactions over the _bucket space. | ||
| bucket_txns_in_progress_count = 0, | ||
| -- Vclock of the last transaction over the _bucket space. |
There was a problem hiding this comment.
Nit: should be vclock after a last transaction. It's not guaranteed, that txn over _bucket will have that vclock.
| lfiber.sleep(wait_interval) | ||
| end | ||
| if M.bucket_latest_vclock == nil then | ||
| M.bucket_latest_vclock = box.info.vclock |
There was a problem hiding this comment.
Let's better initialize it during first vshard.storage.cfg and add an assertion here, that it's not nil. Current approach is way too error prone
|
|
||
| local _, err = bucket_modify_wrapper(function() | ||
| box.space._bucket:insert({i, BRECEIVING}) | ||
| end, false) |
There was a problem hiding this comment.
It's not a good idea to allow bootstrapping the cluster without any synchronization. Consider the case:
- User bootstrapps the cluster, only on 1 replicaset buckets are created.
- Master is changed in that replicaset. Old master still considers itself a master (due to misconfiguration), it considers itself syncrhonized, since it did that between 1 and 2.
- User calls boostrap one more time, it passes, since the new leader doesn't have the buckets, created on the old one.
- Old master considers itself in sync, sends the buckets to another replicaset
- We have doubled buckets.
Instead, I propose to make map call over masters in cluster_bootstrap and wait there for masters to be in sync. After that we make map call one more time and create the buckets (with protection for is_sync). Though, the bootstrap can still fail, if master changes between these two map cals, though, it seems we're ok with it now, since partial boostrap is anyway possible now, if master changes and router doesn't know about that yet.
We should drop the must_be_synced from the wrapper.
| -- committed transactions to disk, and sends the freshest vclock to the | ||
| -- storage that called this function. | ||
| -- | ||
| local function storage_wait_vclock_persisted(wait_interval) |
There was a problem hiding this comment.
Nit: let's mention the _bucket in the name of the function
| -- described issue. | ||
| ok, err = pcall(_bucket.insert, _bucket, {bucket_id, recvg, from}) | ||
| ok, err = bucket_modify_wrapper(function() | ||
| box.space._bucket:insert({bucket_id, recvg, from}) |
There was a problem hiding this comment.
See, you had pcalled it here, now you don't. This will lead to not calling the lsched.move_end in case of error, which is a bug
| if not ok then | ||
| return nil, lerror.make(err) | ||
| end | ||
| elseif b.status ~= recvg then |
Serpentian
left a comment
There was a problem hiding this comment.
The remaining comments for storage: introduce on_master_enable service commit
| else | ||
| box.space._bucket:update({bucket_id}, {{'=', 2, state}}) | ||
| _, err = bucket_modify_wrapper(function() | ||
| box.space._bucket:update({bucket_id}, {{'=', 2, state}}) |
There was a problem hiding this comment.
Can we move bucket wrapper part to the separate commit? It's very difficult to parse the commit, imho
| -- | ||
| local function rebalancer_service_apply_routes_f(service, routes) | ||
| local function rebalancer_service_apply_routes_f(service, limiter, routes) | ||
| while storage_check_bucket_is_synced(service, limiter) do |
There was a problem hiding this comment.
Maybe error instead of waiting? I understand, why do we wait in services, we cannot kill them. Though, we can do that for functions, which are called by these services: rebalancer_service_apply_routes_f, recovery_bucket_stat (you've actually already throw error in bucket stat)
There was a problem hiding this comment.
If I return an error from rebalancer_service_apply_routes_f, the calling node won't be able to receive it, since the function is invoked via pcall in a detached fiber. I consider to simply log the error and wait for the master to synchronize. A limiter is used to prevent log spam.
| local service = lservice_info.new('routes_applier') | ||
| local name = 'routes_applier' | ||
| local service = lservice_info.new(name) | ||
| local ratelimit = lratelimit.create{name = name} |
There was a problem hiding this comment.
Hmm, let's won't introduce the limiter in that commit, we already have the ticket in order to properly limit the messages in that service, let's won't do that in the scope of this patchset, we don't have time for that: #611
Let's instead just throw error and that's it, as I proposed above.
|
|
||
| local function master_role_update() | ||
| if this_is_master() and M.is_configured then | ||
| if not M.on_master_enable_fiber and not M.is_bucket_in_sync then |
There was a problem hiding this comment.
Maybe we can move that fiber creation to the master_on_enable and there create the fiber unconditionally? I'm really worried about these checks.
The problem with master_role_update is that it's the function, which is called several times even during single vshard.storage.cfg, it's not the case for master_role_update.
Or we should move the stopping of the fiber in master_role_update alternatively. We should make them consistent: either start/stop in master_role_update or start/stop in master_on_enable/disable.
Update: hmm, you already have it there, then why do you need it in master_role_update?
There was a problem hiding this comment.
Now we create on_master_enable fiber in master_on_enable and cancel it in master_on_disable.
We need it in master_role_update because otherwise the upgraded storage will not start on_master_enable service.
| local function master_on_enable() | ||
| log.info("Stepping up into the master role") | ||
| M.is_master = true | ||
| M.on_master_enable_fiber = lfiber.new(on_master_enable_f) |
There was a problem hiding this comment.
Let's add assertions for is_sync == false and M.fiber == nil
Serpentian
left a comment
There was a problem hiding this comment.
Comments for the last 3 commits
| {'id', 'unsigned'}, | ||
| {'status', 'string'}, | ||
| {'destination', 'string', is_nullable = true} | ||
| {'destination', 'string', is_nullable = true}, |
| ::next_check:: | ||
| end | ||
| return replicas_map_call(replicas, func, args, | ||
| {timeout = timeout, except = opts.except}) |
There was a problem hiding this comment.
The except is useless here. You pass the rs_id, which is used in the wait_masters_connect, when the replicas_map_call requires replica_id in except
| bucket_opts = {generation = opts.generation} | ||
| end | ||
| if not b then | ||
| while M.errinj.ERRINJ_RECEIVE_DELAY do |
There was a problem hiding this comment.
Nit: let's name it ERRINJ_FIRST_RECEIVE_DELAY then. Since it's activated only on the first call of the recv. To be consistent with ERRINJ_LAST_RECEIVE_DELAY.
| end | ||
| end | ||
| if res_bucket_info then | ||
| log.info('The bucket %s is found on replicaset %s', |
There was a problem hiding this comment.
Nit: let's log on the higher level, in the recovery_step_by_type. Let's try placing all logs in one place and not hide them inside other functions, when it's possible
| _, err = recover_bucket_to_state(recovered_buckets, | ||
| bucket_id, BACTIVE) | ||
| else | ||
| _, err = recover_bucket_to_state(recovered_buckets, |
There was a problem hiding this comment.
Assertion here for remote_bucket.generation > generation
| local vutil = require('vshard.util') | ||
|
|
||
| local group_config = {{is_sync = false}, {is_sync = true}} | ||
| local group_config = {{is_sync = false}} |
There was a problem hiding this comment.
Why? If some tests doesn't work with sync, then only they should be disabled (of course with description, why they don't work)
| goto continue | ||
| local generation = bucket.opts.generation | ||
| local _ | ||
| if not remote_bucket then |
There was a problem hiding this comment.
We can majorly simplify the recovery process. Smth like that, though it's not debugged:
diff --git a/vshard/storage/init.lua b/vshard/storage/init.lua
index 4f875cb..66f1e39 100644
--- a/vshard/storage/init.lua
+++ b/vshard/storage/init.lua
@@ -94,6 +94,7 @@ if not M then
ERRINJ_LAST_RECEIVE_DELAY = false,
ERRINJ_LAST_SEND_DELAY = false,
ERRINJ_RECEIVE_PARTIALLY = false,
+ ERRINJ_RECEIVE_DELAY = false,
ERRINJ_RECOVERY_PAUSE = false,
ERRINJ_DISCOVERY = false,
ERRINJ_BUCKET_GC_PAUSE = false,
@@ -1005,6 +1006,38 @@ local function recover_bucket_to_state(recovered_buckets, bucket_id, state)
recovered_buckets[state] = ids
end
+local function recovery_search_missed_bucket(bucket_id)
+ local opts = {timeout = consts.RECOVERY_GET_STAT_TIMEOUT,
+ except = M.this_replicaset.id}
+ local res, err, err_id = lreplicaset.cluster_map_master_call(
+ M.replicasets, 'vshard.storage._call',
+ {'recovery_bucket_stat', bucket_id}, opts)
+ if err then
+ err.master_id = err_id
+ return nil, err
+ end
+ local res_id, res_bucket_info
+ for rs_id, bucket_stat in pairs(res) do
+ local bucket_info, err = bucket_stat[1], bucket_stat[2]
+ if err then
+ err.master_id = err_id
+ return nil, err
+ end
+ -- Among all the masters we search those one which has the missed
+ -- remote bucket with higher generation.
+ if bucket_info and (not res_bucket_info or
+ res_bucket_info.generation < bucket_info.generation) then
+ res_id = rs_id
+ res_bucket_info = bucket_info
+ end
+ end
+ if res_bucket_info then
+ log.info('The bucket %s is found on replicaset %s',
+ res_bucket_info.id, res_id)
+ return res_id, res_bucket_info
+ end
+end
+
--
-- Check status of each transferring bucket. Resolve status where
-- possible.
@@ -1012,10 +1045,10 @@ end
local function recovery_step_by_type(type, limiter)
local _bucket = box.space._bucket
local is_step_empty = true
- local recovered = 0
local total = 0
local start_format = 'Starting %s buckets recovery step'
local recovered_buckets = {}
+ local ok
for _, bucket in _bucket.index.status:pairs(type) do
lfiber.testcancel()
total = total + 1
@@ -1073,30 +1106,55 @@ local function recovery_step_by_type(type, limiter)
log.info(start_format, type)
end
lfiber.testcancel()
- for state, checker in pairs(bucket_recoverability_checkers) do
- if checker(bucket, remote_bucket) then
- local _, err = recover_bucket_to_state(recovered_buckets,
- bucket_id, state)
- if err then
- goto continue
+ local generation = bucket.opts.generation
+ if not remote_bucket then
+ err = lerror.make(string.format('No bucket %s on replicaset %s.',
+ bucket_id, peer_id))
+ limiter:log_warn(err, err.message .. ' Scanning all the masters')
+ peer_id, remote_bucket = recovery_search_missed_bucket(bucket_id)
+ if not peer_id and remote_bucket then
+ err = remote_bucket
+ limiter:log_error(err, 'Error during scanning the master ' ..
+ '%s: %s', err.master_id, err)
+ goto continue
+ end
+ assert(not remote_bucket or generation ~= remote_bucket.generation)
+ end
+ if remote_bucket and remote_bucket.generation > generation then
+ ok, err = recover_bucket_to_state(
+ recovered_buckets, bucket_id, BGARBAGE)
+ elseif remote_bucket and remote_bucket.generation < generation then
+ ok, err = recover_bucket_to_state(
+ recovered_buckets, bucket_id, BACTIVE)
+ else
+ assert(not remote_bucket or remote_bucket.generation == generation)
+ for state, checker in pairs(bucket_recoverability_checkers) do
+ if checker(bucket, remote_bucket) then
+ ok, err = recover_bucket_to_state(recovered_buckets,
+ bucket_id, state)
+ if not ok then
+ break
+ end
+ break
end
- recovered = recovered + 1
- break
end
end
- if recovered == 0 and is_step_empty then
- log.info('Bucket %s is %s local and %s on replicaset %s, waiting',
+ if not ok then
+ log.error(err)
+ end
+ if #recovered_buckets and is_step_empty then
+ log.info('Buckete%s is %s local and %s on replicaset %s, waiting',
bucket_id, bucket.status, remote_bucket.status, peer_id)
end
is_step_empty = false
::continue::
end
- if recovered > 0 then
+ if #recovered_buckets then
log.info('Finish bucket recovery step, %d %s buckets are recovered '..
- 'among %d. Recovered buckets: %s', recovered, type, total,
- json_encode(recovered_buckets))
+ 'among %d. Recovered buckets: %s', #recovered_buckets,
+ type, total, json_encode(recovered_buckets))
end
- return total, recovered
+ return total, #recovered_buckets
end
--
@@ -1610,6 +1668,9 @@ local function bucket_recv_xc(bucket_id, from, data, opts)
bucket_opts = {generation = opts.generation}
end
if not b then
+ while M.errinj.ERRINJ_RECEIVE_DELAY do
+ lfiber.sleep(0.01)
+ end
if is_last then
local msg = 'last message is received, but the bucket does not '..
'exist anymore'In this patch we modify the api of `wait_masters_connect` function by changing the `timeout` parameter into `opts` parameter with `timeout` key. It was done to make it easier to use extra parameters in this function, such as `except` in tarantoolgh-214. Needed for tarantool#214 NO_TEST=refactoring NO_DOC=refactoring
7675a9d to
d586609
Compare
d586609 to
d5a6fd0
Compare
Before this patch the `rebalancer` and `recovery` service could start just right after master switch (by `auto` master detection or manual reconfiguration) before the master had time to sync its vclock with other replicas in replicaset. The newly elected master may not have received the `_bucket` space updats yes. And when the recovery service starts on another node it can mistakenly think that there is no bucket in this replicaset. This issue and other ones which will be fixed by further patches lead to doubled buckets in the cluster. To fix it we introduce a new storage service - `on_master_enable` service. If master is changed in replicaset, this service is triggered and waits until newly elected master syncs its vclock with other replicas. Other storage services - `rebalancer` and `recovery` can't start until `on_master_enable` set `M.is_bucket_in_sync`. Also we change `storage/storage.test`, `storage/recovery.test`, `storage-luatest/log_verbosity_2_2_test` and `router/router.test` so that they wouldn't failed. Now `rebalancer` and `recovery` services don't start immediately after master switch and it can shake some tests. Part of tarantool#214 NO_TEST=bugfix NO_DOC=bugfix
d5a6fd0 to
c03e43d
Compare
| replicaset_uuid = replicaset_uuid, | ||
| uri_format = uri_format, | ||
| module_unload_functions = module_unload_functions, | ||
| vclock_compare = vclock_compare, |
There was a problem hiding this comment.
You must be able to make vclock_lesseq() in storage/init.lua a one-line wrapper doing return vclock_compare(vc1, vc2, function(a, b) return a <= b end). I see no reason to keep 2 comparison functions, when one of them fully covers the other one. Hm?
| @@ -419,6 +419,13 @@ local function cluster_bootstrap(g, cfg) | |||
| replicaset_count = replicaset_count + 1 | |||
There was a problem hiding this comment.
In the commit's storage: introduce on_master_enable service message:
received the _bucket space updats yes->received the _bucket space updates yet.that they wouldn't failed->that they wouldn't fail.
| repeat | ||
| lfiber.sleep(0.1) | ||
| local is_master_synced = replicaset:callrw( | ||
| 'vshard.storage._call', {'storage_check_is_bucket_sync'}, opts) | ||
| until is_master_synced |
There was a problem hiding this comment.
Lets not ignore errors, and also we must respect the timeout passed in opts so we don't wait infinitely even if the network is working fine.
| end | ||
| end | ||
|
|
||
| local function storage_check_is_bucket_sync() |
There was a problem hiding this comment.
Normally check means that it returns an error when the condition isn't true. This is the difference between check-functions and is-functions - the latter will just return true/false, while the former will throw or return nil, err when not true. Given that this function returns no error, it would be more appropriate to call it storage_is_bucket_sync().
But in this exact case I don't think you need this function at all. We have storage_service_info for that. Lets make it return one more key in its result-map. And name it is_master_sync. The name is_bucket_sync I think might be too low-level for not-fully-internal APIs.
| local err = lerror.vshard(lerror.code.MASTER_NOT_SYNCED) | ||
| limiter:log_error(err, service:set_status_error( | ||
| 'Error during the start of %s: %s', service.name, err)) |
There was a problem hiding this comment.
I wouldn't say it is an error. It is a warning at worst. Also lets not create an error object for that, if we don't plan to return it.
Last thing - you mention here "during the start", but this check is never called just on start. You seem to always call it inside service loops, which means it might re-appear potentially long after start. Lets rephrase this to not mention "start".
| service:set_activity('synced') | ||
| log.info('New master has synchronized with other replicas') | ||
| M.is_bucket_in_sync = true | ||
| log.info('on_master_enable stopped') |
There was a problem hiding this comment.
I don't think users will know what on_master_enable means. If this is written to the logs, then lets make it more user-friendly. Like Master sync service stopped or similar. Or even remove this line entirely, because anyway 2 lines above we already say the result of this service.
| M.on_master_enable_fiber = nil | ||
| if M.on_master_enable_service == service then | ||
| M.on_master_enable_service = nil | ||
| end |
There was a problem hiding this comment.
You can't nullify on_master_enable_fiber unconditionally for the same reason why you can't nullify on_master_enable_service unconditionally. Because if this fiber was cancelled and another one was started, then you will remove the other fiber's ref here, not yours.
| M.is_master = true | ||
| assert(not M.is_bucket_in_sync and not M.on_master_enable_fiber) | ||
| M.on_master_enable_fiber = lfiber.new(on_master_enable_f) | ||
| M.on_master_enable_fiber:name('on_master_enable') |
There was a problem hiding this comment.
I understand that we named it 'on master enable', because it is started when master is enabled. But on_... is almost exclusively used by triggers, and it doesn't say what it actually does. For example, GC service and recovery are also started when master is enabled, but we can't call them 'on master enabled'. Lets please name this service appropriately, like master_sync service and master_sync_fiber.
| if M.on_master_enable_fiber then | ||
| M.on_master_enable_fiber:cancel() | ||
| M.on_master_enable_fiber = nil | ||
| end | ||
| M.is_bucket_in_sync = false |
There was a problem hiding this comment.
It doesn't look right that the start and stop are scattered and duplicated across on-enable, on-disable, and role-update.
Isn't it enough to do this entire logic inside master_role_update()? Like all other master-services do? Both start and stop of this server + the flag reset.
| M.rebalancer_worker_count = new_cfg.rebalancer_max_sending | ||
| M.sync_timeout = new_cfg.sync_timeout | ||
| M.current_cfg = new_cfg | ||
| M.bucket_latest_vclock = box.info.vclock |
There was a problem hiding this comment.
This doesn't look right. This vclock is -- Vclock after a last transaction over the _bucket space. But here it gets violated, because the cfg might be done while there are bucket transactions in progress. I recommend to remove this from here, and instead patch storage_wait_vclock_persisted to assign M.bucket_latest_vclock = box.info.vclock if it was nil when we see 0 transactions, before return. Then it will be correct.
Before this patch the
rebalancerandrecoveryservice could startjust right after master switch (by
automaster detection or manualreconfiguration) before the master had time to sync its vclock with
other replicas in replicaset. It could lead to doubled buckets according
to "Doubled buckets RFC".
To fix it we introduce a new storage service -
on_master_enableservice. If master is changed in replicaset, this service is triggered
and waits until newly elected master syncs its vclock with other
replicas. Other storage services -
rebalancerandrecoverycan'tstart until
on_master_enablesetM.buckets_are_in_sync.Closes #214
NO_TEST=bugfix
NO_DOC=bugfix