@@ -299,9 +299,8 @@ impl Network {
299299 GossipsubEvent :: Message ( peer_id, msg_id, msg) => {
300300 log:: trace!( "Received message {:?} from peer {:?}: {:?}" , msg_id, peer_id, msg) ;
301301 for topic in msg. topics . iter ( ) {
302- if let Some ( output) = state. gossip_topics . get ( & topic) {
303- // let peer = Self::get_peer(peer_id).unwrap();
304- output. send ( ( msg, peer) ) ;
302+ if let Some ( output) = state. gossip_topics . get_mut ( & topic) {
303+ output. send ( ( msg. clone ( ) , peer_id. clone ( ) ) ) . await . ok ( ) ;
305304 } else {
306305 log:: warn!( "Unknown topic hash: {:?}" , topic) ;
307306 }
@@ -401,7 +400,7 @@ impl NetworkInterface for Network {
401400 self . events_tx . subscribe ( )
402401 }
403402
404- async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , Arc < Self :: PeerType > ) > + Send >
403+ async fn subscribe < T > ( & self , topic : & T ) -> Box < dyn Stream < Item = ( T :: Item , PeerId ) > + Send >
405404 where
406405 T : Topic + Sync ,
407406 {
@@ -417,9 +416,9 @@ impl NetworkInterface for Network {
417416 . await
418417 . expect ( "Couldn't subscribe to pubsub topic" ) ;
419418
420- Box :: new ( rx. map ( |( msg, peer ) | {
421- let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) ;
422- ( item, peer )
419+ Box :: new ( rx. map ( |( msg, peer_id ) | {
420+ let item: <T as Topic >:: Item = Deserialize :: deserialize_from_vec ( & msg. data ) . unwrap ( ) ;
421+ ( item, peer_id )
423422 } ) )
424423 }
425424
0 commit comments