@@ -114,6 +114,75 @@ bool ElemCompare(const quicklistEntry& entry, string_view elem) {
114
114
return elem == an.Piece ();
115
115
}
116
116
117
+ using FFResult = pair<PrimeKey, unsigned >; // key, argument index.
118
+
119
+ struct ShardFFResult {
120
+ PrimeKey key;
121
+ ShardId sid = kInvalidSid ;
122
+ };
123
+
124
+ OpResult<ShardFFResult> FindFirst (Transaction* trans) {
125
+ VLOG (2 ) << " FindFirst::Find " << trans->DebugId ();
126
+
127
+ // Holds Find results: (iterator to a found key, and its index in the passed arguments).
128
+ // See DbSlice::FindFirst for more details.
129
+ // spans all the shards for now.
130
+ std::vector<OpResult<FFResult>> find_res (trans->shard_set ()->size ());
131
+ fill (find_res.begin (), find_res.end (), OpStatus::KEY_NOTFOUND);
132
+
133
+ auto cb = [&find_res](auto * t, EngineShard* shard) {
134
+ auto args = t->ShardArgsInShard (shard->shard_id ());
135
+ OpResult<pair<PrimeIterator, unsigned >> ff_res =
136
+ shard->db_slice ().FindFirst (t->db_index (), args);
137
+
138
+ if (ff_res) {
139
+ FFResult ff_result (ff_res->first ->first .AsRef (), ff_res->second );
140
+ find_res[shard->shard_id ()] = move (ff_result);
141
+ } else {
142
+ find_res[shard->shard_id ()] = ff_res.status ();
143
+ }
144
+ return OpStatus::OK;
145
+ };
146
+
147
+ trans->Execute (move (cb), false );
148
+
149
+ uint32_t min_arg_indx = UINT32_MAX;
150
+
151
+ ShardFFResult shard_result;
152
+
153
+ for (size_t sid = 0 ; sid < find_res.size (); ++sid) {
154
+ const auto & fr = find_res[sid];
155
+ auto status = fr.status ();
156
+ if (status == OpStatus::KEY_NOTFOUND)
157
+ continue ;
158
+
159
+ if (status == OpStatus::WRONG_TYPE) {
160
+ return status;
161
+ }
162
+
163
+ CHECK (fr);
164
+
165
+ const auto & it_pos = fr.value ();
166
+
167
+ size_t arg_indx = trans->ReverseArgIndex (sid, it_pos.second );
168
+ if (arg_indx < min_arg_indx) {
169
+ min_arg_indx = arg_indx;
170
+ shard_result.sid = sid;
171
+
172
+ // we do not dereference the key, do not extract the string value, so it it
173
+ // ok to just move it. We can not dereference it due to limitations of SmallString
174
+ // that rely on thread-local data-structure for pointer translation.
175
+ shard_result.key = it_pos.first .AsRef ();
176
+ }
177
+ }
178
+
179
+ if (shard_result.sid == kInvalidSid ) {
180
+ return OpStatus::KEY_NOTFOUND;
181
+ }
182
+
183
+ return OpResult<ShardFFResult>{move (shard_result)};
184
+ }
185
+
117
186
class BPopper {
118
187
public:
119
188
explicit BPopper (ListDir dir);
@@ -122,22 +191,18 @@ class BPopper {
122
191
// If OK is returned then use result() to fetch the value.
123
192
OpStatus Run (Transaction* t, unsigned msec);
124
193
194
+
195
+ // returns (key, value) pair.
125
196
auto result () const {
126
197
return make_pair<string_view, string_view>(key_, value_);
127
198
}
128
199
129
- bool found () const {
130
- return found_;
131
- }
132
-
133
200
private:
134
201
OpStatus Pop (Transaction* t, EngineShard* shard);
135
202
136
203
ListDir dir_;
137
204
138
- bool found_ = false ;
139
- PrimeIterator find_it_;
140
- ShardId find_sid_ = std::numeric_limits<ShardId>::max();
205
+ ShardFFResult ff_result_;
141
206
142
207
string key_;
143
208
string value_;
@@ -158,7 +223,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
158
223
159
224
auto * stats = ServerState::tl_connection_stats ();
160
225
161
- OpResult<Transaction::FindFirstResult > result = t-> FindFirst ();
226
+ OpResult<ShardFFResult > result = FindFirst (t );
162
227
163
228
if (result.status () == OpStatus::KEY_NOTFOUND) {
164
229
if (is_multi) {
@@ -169,14 +234,16 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
169
234
return OpStatus::TIMED_OUT;
170
235
}
171
236
237
+ // Block
172
238
++stats->num_blocked_clients ;
173
239
bool wait_succeeded = t->WaitOnWatch (tp);
174
240
--stats->num_blocked_clients ;
175
241
176
242
if (!wait_succeeded)
177
243
return OpStatus::TIMED_OUT;
178
244
179
- result = t->FindFirst (); // retry - must find something.
245
+ // Now we have something for sure.
246
+ result = FindFirst (t); // retry - must find something.
180
247
}
181
248
182
249
if (!result) {
@@ -185,9 +252,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
185
252
}
186
253
187
254
VLOG (1 ) << " Popping an element" ;
188
- find_sid_ = result->sid ;
189
- find_it_ = result->find_res ;
190
- found_ = true ;
255
+ ff_result_ = move (result.value ());
191
256
192
257
auto cb = [this ](Transaction* t, EngineShard* shard) { return Pop (t, shard); };
193
258
t->Execute (std::move (cb), true );
@@ -196,18 +261,20 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
196
261
}
197
262
198
263
OpStatus BPopper::Pop (Transaction* t, EngineShard* shard) {
199
- DCHECK (found ());
200
-
201
- if (shard->shard_id () == find_sid_) {
202
- find_it_->first .GetString (&key_);
264
+ if (shard->shard_id () == ff_result_.sid ) {
265
+ ff_result_.key .GetString (&key_);
203
266
204
- quicklist* ql = GetQL (find_it_->second );
267
+ auto it_res = shard->db_slice ().Find (t->db_index (), key_, OBJ_LIST);
268
+ CHECK (it_res); // must exist and must be ok.
269
+ PrimeIterator it = *it_res;
270
+ quicklist* ql = GetQL (it->second );
205
271
value_ = ListPop (dir_, ql);
206
272
207
273
if (quicklistCount (ql) == 0 ) {
208
- CHECK (shard->db_slice ().Del (t->db_index (), find_it_ ));
274
+ CHECK (shard->db_slice ().Del (t->db_index (), it ));
209
275
}
210
276
}
277
+
211
278
return OpStatus::OK;
212
279
}
213
280
@@ -242,9 +309,10 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
242
309
string_view dest = ArgS (args, 2 );
243
310
244
311
OpResult<string> result;
245
- if (dest == src) {
312
+
313
+ if (cntx->transaction ->unique_shard_cnt () == 1 ) {
246
314
auto cb = [&](Transaction* t, EngineShard* shard) {
247
- return OpRPopLPushSingleKey (OpArgs{shard, t->db_index ()}, src);
315
+ return OpRPopLPushSingleShard (OpArgs{shard, t->db_index ()}, src, dest );
248
316
};
249
317
250
318
result = cntx->transaction ->ScheduleSingleHopT (std::move (cb));
@@ -446,11 +514,12 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
446
514
OpStatus result = popper.Run (transaction, unsigned (timeout * 1000 ));
447
515
448
516
if (result == OpStatus::OK) {
449
- CHECK (popper.found ());
450
- VLOG (1 ) << " BLPop returned " ;
451
-
452
517
auto res = popper.result ();
518
+
519
+ VLOG (1 ) << " BLPop returned from " << res.first ; // key.
520
+
453
521
std::string_view str_arr[2 ] = {res.first , res.second };
522
+
454
523
return (*cntx)->SendStringArr (str_arr);
455
524
}
456
525
@@ -550,7 +619,7 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
550
619
} else {
551
620
tie (it, new_key) = es->db_slice ().AddOrFind (op_args.db_ind , key);
552
621
}
553
- quicklist* ql;
622
+ quicklist* ql = nullptr ;
554
623
555
624
if (new_key) {
556
625
robj* o = createQuicklistObject ();
@@ -572,10 +641,12 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
572
641
quicklistPush (ql, es->tmp_str1 , sdslen (es->tmp_str1 ), pos);
573
642
}
574
643
575
- if (new_key && es->blocking_controller ()) {
576
- string tmp;
577
- string_view key = it->first .GetSlice (&tmp);
578
- es->blocking_controller ()->AwakeWatched (op_args.db_ind , key);
644
+ if (new_key) {
645
+ if (es->blocking_controller ()) {
646
+ string tmp;
647
+ string_view key = it->first .GetSlice (&tmp);
648
+ es->blocking_controller ()->AwakeWatched (op_args.db_ind , key);
649
+ }
579
650
} else {
580
651
es->db_slice ().PostUpdate (op_args.db_ind , it);
581
652
}
@@ -811,17 +882,54 @@ OpResult<StringVec> ListFamily::OpRange(const OpArgs& op_args, std::string_view
811
882
return str_vec;
812
883
}
813
884
814
- OpResult<string> ListFamily::OpRPopLPushSingleKey (const OpArgs& op_args, std::string_view key) {
885
+ OpResult<string> ListFamily::OpRPopLPushSingleShard (const OpArgs& op_args, string_view src,
886
+ string_view dest) {
815
887
auto & db_slice = op_args.shard ->db_slice ();
816
- auto it_res = db_slice.Find (op_args.db_ind , key, OBJ_LIST);
817
- if (!it_res)
818
- return it_res.status ();
888
+ auto src_res = db_slice.Find (op_args.db_ind , src, OBJ_LIST);
889
+ if (!src_res)
890
+ return src_res.status ();
891
+
892
+ PrimeIterator src_it = *src_res;
893
+ quicklist* src_ql = GetQL (src_it->second );
894
+
895
+ if (src == dest) { // simple case.
896
+ db_slice.PreUpdate (op_args.db_ind , src_it);
897
+ string val = ListPop (ListDir::RIGHT, src_ql);
898
+
899
+ quicklistPushHead (src_ql, val.data (), val.size ());
900
+ db_slice.PostUpdate (op_args.db_ind , src_it);
901
+
902
+ return val;
903
+ }
904
+
905
+ quicklist* dest_ql = nullptr ;
906
+ auto [dest_it, created] = db_slice.AddOrFind (op_args.db_ind , dest);
907
+ if (created) {
908
+ robj* obj = createQuicklistObject ();
909
+ dest_ql = (quicklist*)obj->ptr ;
910
+ quicklistSetOptions (dest_ql, FLAGS_list_max_listpack_size, FLAGS_list_compress_depth);
911
+ dest_it->second .ImportRObj (obj);
912
+
913
+ // Insertion of dest could invalidate src_it. Find it again.
914
+ src_it = db_slice.GetTables (op_args.db_ind ).first ->Find (src);
915
+ } else {
916
+ if (dest_it->second .ObjType () != OBJ_LIST)
917
+ return OpStatus::WRONG_TYPE;
918
+
919
+ dest_ql = GetQL (dest_it->second );
920
+ db_slice.PreUpdate (op_args.db_ind , dest_it);
921
+ }
922
+
923
+ db_slice.PreUpdate (op_args.db_ind , src_it);
924
+ string val = ListPop (ListDir::RIGHT, src_ql);
925
+ quicklistPushHead (dest_ql, val.data (), val.size ());
926
+ db_slice.PostUpdate (op_args.db_ind , src_it);
927
+ db_slice.PostUpdate (op_args.db_ind , dest_it);
928
+
929
+ if (quicklistCount (src_ql) == 0 ) {
930
+ CHECK (db_slice.Del (op_args.db_ind , src_it));
931
+ }
819
932
820
- PrimeIterator it = *it_res;
821
- quicklist* ql = GetQL (it->second );
822
- db_slice.PreUpdate (op_args.db_ind , it);
823
- string val = ListPop (ListDir::RIGHT, ql);
824
- quicklistPushHead (ql, val.data (), val.size ());
825
933
return val;
826
934
}
827
935
0 commit comments