@@ -667,8 +667,13 @@ cdef class ReadBuffer:
667
667
self ._finish_message()
668
668
return mem
669
669
670
- cdef redirect_messages(self , WriteBuffer buf, char mtype,
670
+ cdef int32_t redirect_messages(self , WriteBuffer buf, char mtype,
671
671
int stop_at = 0 ):
672
+ # Redirects messages from self into buf until either
673
+ # a message with a type different than mtype is encountered, or
674
+ # buf contains stop_at bytes.
675
+ # Returns the number of messages redirected.
676
+
672
677
if not self ._current_message_ready:
673
678
raise BufferError(
674
679
' consume_full_messages called on a buffer without a '
@@ -687,8 +692,11 @@ cdef class ReadBuffer:
687
692
ssize_t new_pos0
688
693
ssize_t pos_delta
689
694
int32_t done
695
+ int32_t count
690
696
697
+ count = 0
691
698
while True :
699
+ count += 1
692
700
buf.write_byte(mtype)
693
701
buf.write_int32(self ._current_message_len)
694
702
@@ -701,10 +709,10 @@ cdef class ReadBuffer:
701
709
if self ._length > 0 :
702
710
self ._ensure_first_buf()
703
711
else :
704
- return
712
+ return count
705
713
706
714
if stop_at and buf._length >= stop_at:
707
- return
715
+ return count
708
716
709
717
# Fast path: exhaust buf0 as efficiently as possible.
710
718
if self ._pos0 + 5 <= self ._len0:
@@ -727,14 +735,16 @@ cdef class ReadBuffer:
727
735
if new_pos0 + msg_len > cbuf_len:
728
736
break
729
737
new_pos0 += msg_len
738
+ count += 1
730
739
731
740
if new_pos0 != self ._pos0:
732
741
assert self ._pos0 < new_pos0 <= self ._len0
733
742
734
743
pos_delta = new_pos0 - self ._pos0
735
744
buf.write_cstr(
736
745
cbuf + self ._pos0,
737
- pos_delta)
746
+ pos_delta
747
+ )
738
748
739
749
self ._pos0 = new_pos0
740
750
self ._length -= pos_delta
@@ -743,11 +753,11 @@ cdef class ReadBuffer:
743
753
744
754
if done:
745
755
# The next message is of a different type.
746
- return
756
+ return count
747
757
748
758
# Back to slow path.
749
759
if not self .take_message_type(mtype):
750
- return
760
+ return count
751
761
752
762
cdef bytearray consume_messages(self , char mtype):
753
763
""" Consume consecutive messages of the same type."""
0 commit comments