@@ -21,7 +21,7 @@ use crate::{
21
21
config:: ServiceConfig ,
22
22
error:: { DispatchError , ParseError , PayloadError } ,
23
23
service:: HttpFlow ,
24
- Error , Extensions , OnConnectData , Request , Response , StatusCode ,
24
+ ConnectionType , Error , Extensions , OnConnectData , Request , Response , StatusCode ,
25
25
} ;
26
26
27
27
use super :: {
@@ -151,7 +151,8 @@ pin_project! {
151
151
error: Option <DispatchError >,
152
152
153
153
#[ pin]
154
- state: State <S , B , X >,
154
+ pub ( super ) state: State <S , B , X >,
155
+ // when Some(_) dispatcher is in state of receiving request payload
155
156
payload: Option <PayloadSender >,
156
157
messages: VecDeque <DispatcherMessage >,
157
158
@@ -174,7 +175,7 @@ enum DispatcherMessage {
174
175
175
176
pin_project ! {
176
177
#[ project = StateProj ]
177
- enum State <S , B , X >
178
+ pub ( super ) enum State <S , B , X >
178
179
where
179
180
S : Service <Request >,
180
181
X : Service <Request , Response = Request >,
@@ -194,7 +195,7 @@ where
194
195
X : Service < Request , Response = Request > ,
195
196
B : MessageBody ,
196
197
{
197
- fn is_none ( & self ) -> bool {
198
+ pub ( super ) fn is_none ( & self ) -> bool {
198
199
matches ! ( self , State :: None )
199
200
}
200
201
}
@@ -686,12 +687,74 @@ where
686
687
let can_not_read = !self . can_read ( cx) ;
687
688
688
689
// limit amount of non-processed requests
689
- if pipeline_queue_full || can_not_read {
690
+ if pipeline_queue_full {
690
691
return Ok ( false ) ;
691
692
}
692
693
693
694
let mut this = self . as_mut ( ) . project ( ) ;
694
695
696
+ if can_not_read {
697
+ log:: debug!( "cannot read request payload" ) ;
698
+
699
+ if let Some ( sender) = & this. payload {
700
+ // ...maybe handler does not want to read any more payload...
701
+ if let PayloadStatus :: Dropped = sender. need_read ( cx) {
702
+ log:: debug!( "handler dropped payload early; attempt to clean connection" ) ;
703
+ // ...in which case poll request payload a few times
704
+ loop {
705
+ match this. codec . decode ( this. read_buf ) ? {
706
+ Some ( msg) => {
707
+ match msg {
708
+ // payload decoded did not yield EOF yet
709
+ Message :: Chunk ( Some ( _) ) => {
710
+ // if non-clean connection, next loop iter will detect empty
711
+ // read buffer and close connection
712
+ }
713
+
714
+ // connection is in clean state for next request
715
+ Message :: Chunk ( None ) => {
716
+ log:: debug!( "connection successfully cleaned" ) ;
717
+
718
+ // reset dispatcher state
719
+ let _ = this. payload . take ( ) ;
720
+ this. state . set ( State :: None ) ;
721
+
722
+ // break out of payload decode loop
723
+ break ;
724
+ }
725
+
726
+ // Either whole payload is read and loop is broken or more data
727
+ // was expected in which case connection is closed. In both
728
+ // situations dispatcher cannot get here.
729
+ Message :: Item ( _) => {
730
+ unreachable ! ( "dispatcher is in payload receive state" )
731
+ }
732
+ }
733
+ }
734
+
735
+ // not enough info to decide if connection is going to be clean or not
736
+ None => {
737
+ log:: error!(
738
+ "handler did not read whole payload and dispatcher could not \
739
+ drain read buf; return 500 and close connection"
740
+ ) ;
741
+
742
+ this. flags . insert ( Flags :: SHUTDOWN ) ;
743
+ let mut res = Response :: internal_server_error ( ) . drop_body ( ) ;
744
+ res. head_mut ( ) . set_connection_type ( ConnectionType :: Close ) ;
745
+ this. messages . push_back ( DispatcherMessage :: Error ( res) ) ;
746
+ * this. error = Some ( DispatchError :: HandlerDroppedPayload ) ;
747
+ return Ok ( true ) ;
748
+ }
749
+ }
750
+ }
751
+ }
752
+ } else {
753
+ // can_not_read and no request payload
754
+ return Ok ( false ) ;
755
+ }
756
+ }
757
+
695
758
let mut updated = false ;
696
759
697
760
loop {
0 commit comments