From 3326dfa869f562b9829700494c9a95b14e32c978 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Sun, 13 Sep 2020 21:53:06 +0200 Subject: [PATCH] Minor fixes for networking --- .../scorex/core/network/NetworkController.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/src/main/scala/scorex/core/network/NetworkController.scala b/src/main/scala/scorex/core/network/NetworkController.scala index f77fc32a3..59300edc9 100644 --- a/src/main/scala/scorex/core/network/NetworkController.scala +++ b/src/main/scala/scorex/core/network/NetworkController.scala @@ -67,10 +67,11 @@ class NetworkController(settings: NetworkSettings, log.info(s"Declared address: ${scorexContext.externalNodeAddress}") //bind to listen incoming connections - tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = false) + tcpManager ! Bind(self, bindAddress, options = Nil, pullMode = true) - override def receive: Receive = - bindingLogic orElse + override def receive: Receive = bindingLogic + + def mainLogic(tcpListener: ActorRef): Receive = businessLogic orElse peerCommands orElse connectionEvents orElse @@ -81,6 +82,7 @@ class NetworkController(settings: NetworkSettings, case Bound(_) => log.info("Successfully bound to the port " + settings.bindAddress.getPort) scheduleConnectionToPeer() + context become mainLogic(sender()) case CommandFailed(_: Bind) => log.error("Network port " + settings.bindAddress.getPort + " already in use!") @@ -140,10 +142,12 @@ class NetworkController(settings: NetworkSettings, log.info(s"Unconfirmed connection: ($remoteAddress, $localAddress) => $connectionId") if (connectionDirection.isOutgoing) createPeerConnectionHandler(connectionId, sender()) else peerManagerRef ! ConfirmConnection(connectionId, sender()) + tcpManager ! Tcp.ResumeAccepting(1) case Connected(remoteAddress, _) => log.warn(s"Connection to peer $remoteAddress is already established") sender() ! Close + tcpManager ! Tcp.ResumeAccepting(1) case ConnectionConfirmed(connectionId, handlerRef) => log.info(s"Connection confirmed to $connectionId") @@ -204,7 +208,7 @@ class NetworkController(settings: NetworkSettings, */ private def scheduleConnectionToPeer(): Unit = { context.system.scheduler.schedule(5.seconds, 5.seconds) { - if (connections.size < settings.maxConnections) { + if (connections.size + unconfirmedConnections.size < settings.maxConnections) { val randomPeerF = peerManagerRef ? RandomPeerExcluding(connections.values.flatMap(_.peerInfo).toSeq) randomPeerF.mapTo[Option[PeerInfo]].foreach { peerInfoOpt => peerInfoOpt.foreach(peerInfo => self ! ConnectTo(peerInfo)) @@ -294,7 +298,9 @@ class NetworkController(settings: NetworkSettings, } else { peerManagerRef ! AddOrUpdatePeer(peerInfo) - val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(peerInfo)) + val updatedPeerSpec = peerInfo.peerSpec.copy(declaredAddress = Some(peerInfo.peerSpec.address.getOrElse(remoteAddress))) + val updatedPeerInfo = peerInfo.copy(peerSpec = updatedPeerSpec) + val updatedConnectedPeer = connectedPeer.copy(peerInfo = Some(updatedPeerInfo)) connections += remoteAddress -> updatedConnectedPeer context.system.eventStream.publish(HandshakedPeer(updatedConnectedPeer)) }