1515#include < cloud/blockstore/libs/rdma/iface/protobuf.h>
1616#include < cloud/blockstore/libs/rdma/iface/protocol.h>
1717
18- #include < cloud/blockstore/libs/diagnostics/critical_events.h>
1918#include < cloud/blockstore/libs/service/context.h>
2019
2120#include < cloud/storage/core/libs/common/backoff_delay_provider.h>
@@ -54,7 +53,6 @@ constexpr TDuration POLL_TIMEOUT = TDuration::Seconds(1);
5453constexpr TDuration RESOLVE_TIMEOUT = TDuration::Seconds(10 );
5554constexpr TDuration MIN_CONNECT_TIMEOUT = TDuration::Seconds(1 );
5655constexpr TDuration FLUSH_TIMEOUT = TDuration::Seconds(10 );
57- constexpr TDuration LOG_THROTTLER_PERIOD = TDuration::Seconds(60 );
5856constexpr TDuration MIN_RECONNECT_DELAY = TDuration::MilliSeconds(10 );
5957constexpr TDuration INSTANT_RECONNECT_DELAY = TDuration::MicroSeconds(1 );
6058
@@ -267,7 +265,7 @@ struct TEndpointCounters
267265 TDynamicCounters::TCounterPtr SendErrors;
268266 TDynamicCounters::TCounterPtr RecvErrors;
269267
270- TDynamicCounters::TCounterPtr UnexpectedCompletions ;
268+ TDynamicCounters::TCounterPtr CompletionErrors ;
271269
272270 void Register (TDynamicCounters& counters)
273271 {
@@ -283,7 +281,7 @@ struct TEndpointCounters
283281 SendErrors = counters.GetCounter (" SendErrors" );
284282 RecvErrors = counters.GetCounter (" RecvErrors" );
285283
286- UnexpectedCompletions = counters.GetCounter (" UnexpectedCompletions " );
284+ CompletionErrors = counters.GetCounter (" CompletionErrors " );
287285 }
288286
289287 void RequestEnqueued ()
@@ -318,6 +316,11 @@ struct TEndpointCounters
318316 SendErrors->Inc ();
319317 }
320318
319+ void SendError ()
320+ {
321+ SendErrors->Inc ();
322+ }
323+
321324 void RecvResponseCompleted ()
322325 {
323326 ActiveRecv->Dec ();
@@ -332,6 +335,11 @@ struct TEndpointCounters
332335 RecvErrors->Inc ();
333336 }
334337
338+ void RecvError ()
339+ {
340+ RecvErrors->Inc ();
341+ }
342+
335343 void RequestAborted ()
336344 {
337345 ActiveRequests->Dec ();
@@ -343,9 +351,9 @@ struct TEndpointCounters
343351 UnknownRequests->Inc ();
344352 }
345353
346- void UnexpectedCompletion ()
354+ void CompletionError ()
347355 {
348- UnexpectedCompletions ->Inc ();
356+ CompletionErrors ->Inc ();
349357 }
350358};
351359
@@ -453,10 +461,6 @@ class TClientEndpoint final
453461 TLog Log;
454462 TReconnect Reconnect;
455463
456- struct {
457- TLogThrottler Unexpected = TLogThrottler(LOG_THROTTLER_PERIOD);
458- } LogThrottler;
459-
460464 // config might be adjusted during initial handshake
461465 TClientConfigPtr OriginalConfig;
462466 TClientConfig Config;
@@ -567,14 +571,13 @@ class TClientEndpoint final
567571private:
568572 // called from CQ thread
569573 void HandleQueuedRequests ();
570- bool IsWorkRequestValid (const TWorkRequestId& id) const ;
571- void HandleFlush (const TWorkRequestId& id) noexcept ;
572574 void SendRequest (TRequestPtr req, TSendWr* send);
573575 void SendRequestCompleted (TSendWr* send, ibv_wc_status status) noexcept ;
574576 void RecvResponse (TRecvWr* recv);
575577 void RecvResponseCompleted (TRecvWr* recv, ibv_wc_status status);
576578 void AbortRequest (TRequestPtr req, ui32 err, const TString& msg) noexcept ;
577579 void FreeRequest (TRequest* creq) noexcept ;
580+ int ValidateCompletion (ibv_wc* wc) noexcept ;
578581 ui64 GetNewReqId () noexcept ;
579582};
580583
@@ -1028,48 +1031,67 @@ bool TClientEndpoint::HandleCompletionEvents()
10281031 return false ;
10291032}
10301033
1031- bool TClientEndpoint::IsWorkRequestValid ( const TWorkRequestId& id) const
1034+ int TClientEndpoint::ValidateCompletion (ibv_wc* wc) noexcept
10321035{
1033- if (id.Magic == SendMagic && id.Index < SendWrs.size ()) {
1034- return true ;
1035- }
1036- if (id.Magic == RecvMagic && id.Index < RecvWrs.size ()) {
1037- return true ;
1038- }
1039- return false ;
1040- }
1036+ auto id = TWorkRequestId (wc->wr_id );
10411037
1042- void TClientEndpoint::HandleFlush (const TWorkRequestId& id) noexcept
1043- {
1044- // flush WRs have opcode=0
10451038 if (id.Magic == SendMagic && id.Index < SendWrs.size ()) {
1046- SendQueue.Push (&SendWrs[id.Index ]);
1047- return ;
1039+ if (wc->status == IBV_WC_WR_FLUSH_ERR) {
1040+ SendQueue.Push (&SendWrs[id.Index ]);
1041+ return -1 ;
1042+ }
1043+
1044+ if (wc->opcode != IBV_WC_SEND) {
1045+ RDMA_ERROR (
1046+ " completion error " << NVerbs::PrintCompletion (wc)
1047+ << " : unexpected opcode" );
1048+
1049+ Counters->CompletionError ();
1050+ SendQueue.Push (&SendWrs[id.Index ]);
1051+ return -1 ;
1052+ }
1053+
1054+ return 0 ;
10481055 }
1056+
10491057 if (id.Magic == RecvMagic && id.Index < RecvWrs.size ()) {
1050- RecvQueue.Push (&RecvWrs[id.Index ]);
1051- return ;
1058+ if (wc->status == IBV_WC_WR_FLUSH_ERR) {
1059+ RecvQueue.Push (&RecvWrs[id.Index ]);
1060+ return -1 ;
1061+ }
1062+
1063+ if (wc->opcode != IBV_WC_RECV) {
1064+ RDMA_ERROR (
1065+ " completion error " << NVerbs::PrintCompletion (wc)
1066+ << " : unexpected opcode" );
1067+
1068+ Counters->CompletionError ();
1069+ RecvQueue.Push (&RecvWrs[id.Index ]);
1070+ return -1 ;
1071+ }
1072+
1073+ return 0 ;
10521074 }
1075+
1076+ RDMA_ERROR (
1077+ " completion error " << NVerbs::PrintCompletion (wc)
1078+ << " : unexpected wr_id" );
1079+
1080+ Counters->CompletionError ();
1081+ return -1 ;
10531082}
10541083
10551084// implements NVerbs::ICompletionHandler
10561085void TClientEndpoint::HandleCompletionEvent (ibv_wc* wc)
10571086{
10581087 auto id = TWorkRequestId (wc->wr_id );
10591088
1060- RDMA_TRACE (NVerbs::GetOpcodeName (wc->opcode ) << " " << id
1061- << " completed with " << NVerbs::GetStatusString (wc->status ));
1062-
1063- if (!IsWorkRequestValid (id)) {
1064- RDMA_ERROR (LogThrottler.Unexpected , Log,
1065- " unexpected completion " << NVerbs::PrintCompletion (wc));
1089+ RDMA_TRACE (
1090+ NVerbs::GetOpcodeName (wc->opcode )
1091+ << " " << id << " completed with "
1092+ << NVerbs::GetStatusString (wc->status ));
10661093
1067- Counters->UnexpectedCompletion ();
1068- return ;
1069- }
1070-
1071- if (wc->status == IBV_WC_WR_FLUSH_ERR) {
1072- HandleFlush (id);
1094+ if (ValidateCompletion (wc)) {
10731095 return ;
10741096 }
10751097
@@ -1083,10 +1105,7 @@ void TClientEndpoint::HandleCompletionEvent(ibv_wc* wc)
10831105 break ;
10841106
10851107 default :
1086- RDMA_ERROR (LogThrottler.Unexpected , Log,
1087- " unexpected completion " << NVerbs::PrintCompletion (wc));
1088-
1089- Counters->UnexpectedCompletion ();
1108+ break ;
10901109 }
10911110}
10921111
@@ -1108,17 +1127,14 @@ void TClientEndpoint::SendRequest(TRequestPtr req, TSendWr* send)
11081127 try {
11091128 Verbs->PostSend (Connection->qp , &send->wr );
11101129 } catch (const TServiceError& e) {
1111- SendQueue.Push (send);
1112-
1113- ReportRdmaError (
1114- TStringBuilder ()
1115- << " SEND " << TWorkRequestId (send->wr .wr_id ) << " : " << e.what ());
1116-
1117- Disconnect ();
1130+ RDMA_ERROR (
1131+ " SEND " << TWorkRequestId (send->wr .wr_id ) << " : " << e.what ());
11181132
1133+ SendQueue.Push (send);
1134+ Counters->SendError ();
11191135 Counters->RequestEnqueued ();
11201136 QueuedRequests.Enqueue (std::move (req));
1121-
1137+ Disconnect ();
11221138 return ;
11231139 }
11241140
@@ -1143,13 +1159,11 @@ void TClientEndpoint::SendRequestCompleted(
11431159 };
11441160
11451161 if (status != IBV_WC_SUCCESS) {
1146- Counters->SendRequestError ();
1147-
1148- ReportRdmaError (
1149- TStringBuilder ()
1150- << " SEND request completed " << TWorkRequestId (send->wr .wr_id )
1151- << " failed: " << NVerbs::GetStatusString (status));
1162+ RDMA_ERROR (
1163+ " SEND " << TWorkRequestId (send->wr .wr_id ) << " : "
1164+ << NVerbs::GetStatusString (status));
11521165
1166+ Counters->SendRequestError ();
11531167 Disconnect ();
11541168 return ;
11551169 }
@@ -1163,21 +1177,26 @@ void TClientEndpoint::SendRequestCompleted(
11631177 req->CallContext ->RequestId );
11641178
11651179 } else if (ActiveRequests.TimedOut (reqId)) {
1166- RDMA_INFO (" SEND " << TWorkRequestId (send->wr .wr_id )
1167- << " : request has timed out before receiving send wc" );
1180+ RDMA_INFO (
1181+ " SEND "
1182+ << TWorkRequestId (send->wr .wr_id )
1183+ << " : request has timed out before receiving send completion" );
11681184
11691185 } else if (ActiveRequests.Completed (reqId)) {
1170- RDMA_INFO (" SEND " << TWorkRequestId (send->wr .wr_id )
1171- << " : request has been completed before receiving send wc" );
1186+ RDMA_INFO (
1187+ " SEND "
1188+ << TWorkRequestId (send->wr .wr_id )
1189+ << " : request has been completed before receiving send completion" );
11721190
11731191 } else if (ActiveRequests.Cancelled (reqId)) {
11741192 RDMA_INFO (
1175- " SEND " << TWorkRequestId (send->wr .wr_id )
1176- << " : request was cancelled before receiving send wc" );
1193+ " SEND "
1194+ << TWorkRequestId (send->wr .wr_id )
1195+ << " : request was cancelled before receiving send completion" );
11771196
11781197 } else {
1179- RDMA_ERROR (" SEND " << TWorkRequestId (send-> wr . wr_id )
1180- << " : request not found" )
1198+ RDMA_ERROR (
1199+ " SEND " << TWorkRequestId (send-> wr . wr_id ) << " : request not found" )
11811200 Counters->UnknownRequest ();
11821201 }
11831202}
@@ -1192,13 +1211,11 @@ void TClientEndpoint::RecvResponse(TRecvWr* recv)
11921211 try {
11931212 Verbs->PostRecv (Connection->qp , &recv->wr );
11941213 } catch (const TServiceError& e) {
1195- RecvQueue.Push (recv);
1196-
1197- ReportRdmaError (
1198- TStringBuilder ()
1199- << " RECV " << TWorkRequestId (recv->wr .wr_id )
1200- << " failed to post receive request: " << e.what ());
1214+ RDMA_ERROR (
1215+ " RECV " << TWorkRequestId (recv->wr .wr_id ) << " : " << e.what ());
12011216
1217+ Counters->RecvError ();
1218+ RecvQueue.Push (recv);
12021219 Disconnect ();
12031220 return ;
12041221 }
@@ -1211,24 +1228,23 @@ void TClientEndpoint::RecvResponseCompleted(
12111228 ibv_wc_status wc_status)
12121229{
12131230 if (wc_status != IBV_WC_SUCCESS) {
1231+ RDMA_ERROR (
1232+ " RECV " << TWorkRequestId (recv->wr .wr_id ) << " : "
1233+ << NVerbs::GetStatusString (wc_status));
1234+
12141235 Counters->RecvResponseError ();
12151236 RecvQueue.Push (recv);
1216-
1217- ReportRdmaError (
1218- TStringBuilder ()
1219- << " RECV " << TWorkRequestId (recv->wr .wr_id )
1220- << " failed: " << NVerbs::GetStatusString (wc_status));
1221-
12221237 Disconnect ();
12231238 return ;
12241239 }
12251240
12261241 auto * msg = recv->Message <TResponseMessage>();
12271242 int version = ParseMessageHeader (msg);
12281243 if (version != RDMA_PROTO_VERSION) {
1229- RDMA_ERROR (" RECV " << TWorkRequestId (recv->wr .wr_id )
1230- << " : incompatible protocol version "
1231- << version << " , expected " << int (RDMA_PROTO_VERSION));
1244+ RDMA_ERROR (
1245+ " RECV " << TWorkRequestId (recv->wr .wr_id )
1246+ << " : incompatible protocol version " << version
1247+ << " , expected " << int (RDMA_PROTO_VERSION));
12321248
12331249 Counters->RecvResponseError ();
12341250 RecvResponse (recv);
@@ -1243,8 +1259,8 @@ void TClientEndpoint::RecvResponseCompleted(
12431259
12441260 auto req = ActiveRequests.Pop (reqId);
12451261 if (!req) {
1246- RDMA_ERROR (" RECV " << TWorkRequestId (recv-> wr . wr_id )
1247- << " : request not found" );
1262+ RDMA_ERROR (
1263+ " RECV " << TWorkRequestId (recv-> wr . wr_id ) << " : request not found" );
12481264
12491265 Counters->UnknownRequest ();
12501266 return ;
@@ -2285,7 +2301,7 @@ void TClient::DumpHtml(IOutputStream& out) const
22852301 TABLEH () { out << " ActiveRecv" ; }
22862302 TABLEH () { out << " SendErrors" ; }
22872303 TABLEH () { out << " RecvErrors" ; }
2288- TABLEH () { out << " UnexpectedCompletions " ; }
2304+ TABLEH () { out << " CompletionErrors " ; }
22892305 }
22902306 TABLER () {
22912307 TABLED () { out << Counters->QueuedRequests ->Val (); }
@@ -2297,7 +2313,7 @@ void TClient::DumpHtml(IOutputStream& out) const
22972313 TABLED () { out << Counters->ActiveRecv ->Val (); }
22982314 TABLED () { out << Counters->SendErrors ->Val (); }
22992315 TABLED () { out << Counters->RecvErrors ->Val (); }
2300- TABLED () { out << Counters->UnexpectedCompletions ->Val (); }
2316+ TABLED () { out << Counters->CompletionErrors ->Val (); }
23012317 }
23022318 }
23032319 }
0 commit comments