Skip to content

Commit 4000adf

Browse files
authored
fix: do not migrate during connection close (#2570)
* fix: do not migrate during connection close Fixes #2569 Before the change we had a corner case where Dragonfly would call OnPreMigrateThread but would not call CancelOnErrorCb because OnBreakCb has already been called (it resets break_cb_engaged_) On the other hand in OnPostMigrateThread we called RegisterOnErrorCb if breaker_cb_ which resulted in double registration. This change simplifies the logic by removing break_cb_engaged_ flag since CancelOnErrorCb is safe to call if nothing is registered. Moreover, we now skip Migrate flow if a socket is being closed. --------- Signed-off-by: Roman Gershman <[email protected]>
1 parent 6d11f86 commit 4000adf

File tree

9 files changed

+46
-33
lines changed

9 files changed

+46
-33
lines changed

Diff for: src/facade/dragonfly_connection.cc

+6-14
Original file line numberDiff line numberDiff line change
@@ -502,20 +502,15 @@ void Connection::OnShutdown() {
502502
}
503503

504504
void Connection::OnPreMigrateThread() {
505-
// If we migrating to another io_uring we should cancel any pending requests we have.
506-
if (break_cb_engaged_) {
507-
socket_->CancelOnErrorCb();
508-
break_cb_engaged_ = false;
509-
}
505+
CHECK(!cc_->conn_closing);
506+
507+
socket_->CancelOnErrorCb();
510508
}
511509

512510
void Connection::OnPostMigrateThread() {
513511
// Once we migrated, we should rearm OnBreakCb callback.
514512
if (breaker_cb_) {
515-
DCHECK(!break_cb_engaged_);
516-
517513
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
518-
break_cb_engaged_ = true;
519514
}
520515

521516
// Update tl variables
@@ -594,14 +589,11 @@ void Connection::HandleRequests() {
594589
cc_.reset(service_->CreateContext(peer, this));
595590
if (breaker_cb_) {
596591
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
597-
break_cb_engaged_ = true;
598592
}
599593

600594
ConnectionFlow(peer);
601595

602-
if (break_cb_engaged_) {
603-
socket_->CancelOnErrorCb();
604-
}
596+
socket_->CancelOnErrorCb(); // noop if nothing is registered.
605597

606598
cc_.reset();
607599
}
@@ -975,14 +967,13 @@ void Connection::OnBreakCb(int32_t mask) {
975967
<< cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError();
976968

977969
cc_->conn_closing = true;
978-
break_cb_engaged_ = false; // do not attempt to cancel it.
979970

980971
breaker_cb_(mask);
981972
evc_.notify(); // Notify dispatch fiber.
982973
}
983974

984975
void Connection::HandleMigrateRequest() {
985-
if (!migration_request_) {
976+
if (cc_->conn_closing || !migration_request_) {
986977
return;
987978
}
988979

@@ -996,6 +987,7 @@ void Connection::HandleMigrateRequest() {
996987
// We don't support migrating with subscriptions as it would require moving thread local
997988
// handles. We can't check above, as the queue might have contained a subscribe request.
998989
if (cc_->subscriptions == 0) {
990+
stats_->num_migrations++;
999991
migration_request_ = nullptr;
1000992

1001993
DecreaseStatsOnClose();

Diff for: src/facade/dragonfly_connection.h

-1
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,6 @@ class Connection : public util::Connection {
401401
std::string name_;
402402

403403
unsigned parser_error_ = 0;
404-
bool break_cb_engaged_ = false;
405404

406405
BreakerCb breaker_cb_;
407406
std::unique_ptr<Shutdown> shutdown_cb_;

Diff for: src/facade/facade.cc

+2-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
2020

2121
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
2222
// To break this code deliberately if we add/remove a field to this struct.
23-
static_assert(kSizeConnStats == 104u);
23+
static_assert(kSizeConnStats == 112u);
2424

2525
ADD(read_buf_capacity);
2626
ADD(dispatch_queue_entries);
@@ -36,6 +36,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
3636
ADD(num_conns);
3737
ADD(num_replicas);
3838
ADD(num_blocked_clients);
39+
ADD(num_migrations);
3940

4041
return *this;
4142
}

Diff for: src/facade/facade_types.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ struct ConnectionStats {
6161
uint32_t num_conns = 0;
6262
uint32_t num_replicas = 0;
6363
uint32_t num_blocked_clients = 0;
64-
64+
uint64_t num_migrations = 0;
6565
ConnectionStats& operator+=(const ConnectionStats& o);
6666
};
6767

Diff for: src/server/db_slice.cc

+3
Original file line numberDiff line numberDiff line change
@@ -1441,6 +1441,9 @@ void DbSlice::TrackKeys(const facade::Connection::WeakRef& conn, const ArgSlice&
14411441
}
14421442

14431443
void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
1444+
if (client_tracking_map_.empty())
1445+
return;
1446+
14441447
auto it = client_tracking_map_.find(key);
14451448
if (it != client_tracking_map_.end()) {
14461449
// notify all the clients.

Diff for: src/server/server_family.cc

+1
Original file line numberDiff line numberDiff line change
@@ -1859,6 +1859,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
18591859
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
18601860
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
18611861
append("total_net_input_bytes", conn_stats.io_read_bytes);
1862+
append("connection_migrations", conn_stats.num_migrations);
18621863
append("total_net_output_bytes", reply_stats.io_write_bytes);
18631864
append("instantaneous_input_kbps", -1);
18641865
append("instantaneous_output_kbps", -1);

Diff for: tests/dragonfly/eval_test.py

+29
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
from redis import asyncio as aioredis
44
import time
55
import json
6+
import logging
67
import pytest
78
import random
89
import itertools
910
import random
1011
import string
12+
13+
from .instance import DflyInstance
14+
1115
from . import dfly_args, dfly_multi_test_args
1216

1317
DJANGO_CACHEOPS_SCRIPT = """
@@ -310,3 +314,28 @@ async def measure_blocked():
310314
# At least some connection was seen blocked
311315
# Flaky: release build is too fast and never blocks
312316
# assert max_blocked > 0
317+
318+
319+
"""
320+
Tests migrate/close interaction for the connection
321+
Reproduces #2569
322+
"""
323+
324+
325+
@dfly_args({"proactor_threads": "4", "pipeline_squash": 0})
326+
async def test_migrate_close_connection(async_client: aioredis.Redis, df_server: DflyInstance):
327+
sha = await async_client.script_load("return redis.call('GET', KEYS[1])")
328+
329+
async def run():
330+
reader, writer = await asyncio.open_connection("localhost", df_server.port)
331+
332+
# write a EVALSHA that will ask for migration (75% it's on the wrong shard)
333+
writer.write((f"EVALSHA {sha} 1 a\r\n").encode())
334+
await writer.drain()
335+
336+
# disconnect the client connection
337+
writer.close()
338+
await writer.wait_closed()
339+
340+
tasks = [asyncio.create_task(run()) for _ in range(50)]
341+
await asyncio.gather(*tasks)

Diff for: tests/dragonfly/instance.py

+3-15
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
import logging
66
from dataclasses import dataclass
77
from typing import Dict, Optional, List, Union
8-
import os
98
import re
109
import psutil
1110
import itertools
@@ -163,22 +162,13 @@ def stop(self, kill=False):
163162
if proc is None:
164163
return
165164

166-
# if we have log files, it means that we started a process.
167-
# if it died before we could stop it, we should raise an exception
168-
if self.log_files:
169-
exitcode = proc.poll()
170-
if exitcode is not None:
171-
if exitcode != 0:
172-
raise Exception(f"Process exited with code {exitcode}")
173-
return
174-
175165
logging.debug(f"Stopping instance on {self._port}")
176166
try:
177167
if kill:
178168
proc.kill()
179169
else:
180170
proc.terminate()
181-
proc.communicate(timeout=30)
171+
proc.communicate(timeout=15)
182172
except subprocess.TimeoutExpired:
183173
# We need to send SIGUSR1 to DF such that it prints the stacktrace
184174
proc.send_signal(signal.SIGUSR1)
@@ -203,11 +193,9 @@ def _start(self):
203193
self._port = None
204194

205195
all_args = self.format_args(self.args)
206-
arg_str = " ".join(all_args)
207-
bin_path = os.path.realpath(self.params.path)
208-
logging.debug(f"Starting {bin_path} with arguments: {arg_str}")
196+
logging.debug(f"Starting instance with arguments {all_args} from {self.params.path}")
209197

210-
run_cmd = [bin_path, *all_args]
198+
run_cmd = [self.params.path, *all_args]
211199
if self.params.gdb:
212200
run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd
213201

0 commit comments

Comments
 (0)