16
16
from decimal import ROUND_DOWN , Decimal
17
17
from functools import lru_cache
18
18
from subprocess import CalledProcessError
19
- from typing import Callable , Dict , Optional
19
+ from typing import Callable , Dict , Optional , Union
20
20
21
21
from . import coverage
22
22
from .authproxy import AuthServiceProxy , JSONRPCException
@@ -317,8 +317,7 @@ class PortName(enum.Enum):
317
317
PORT_MIN = int (os .getenv ("TEST_RUNNER_PORT_MIN" , default = 20000 ))
318
318
# The number of ports to "reserve" for p2p and rpc, each
319
319
PORT_RANGE = 3000
320
- # The number of times we increment the port counters and test it again before
321
- # giving up.
320
+ # The number of times we skip ports and test it again before giving up.
322
321
MAX_PORT_RETRY = 5
323
322
PORT_START_MAP : Dict [PortName , int ] = {
324
323
PortName .P2P : 0 ,
@@ -334,7 +333,7 @@ class PortName(enum.Enum):
334
333
335
334
class PortSeed :
336
335
# Must be initialized with a unique integer for each process
337
- n = None
336
+ n : Union [ int , None ] = None
338
337
339
338
340
339
def get_rpc_proxy (url , node_number , * , timeout = None , coveragedir = None ):
@@ -365,19 +364,6 @@ def get_rpc_proxy(url, node_number, *, timeout=None, coveragedir=None):
365
364
return coverage .AuthServiceProxyWrapper (proxy , coverage_logfile )
366
365
367
366
368
- # We initialize the port counters at runtime, because at import time PortSeed.n
369
- # will not yet be defined. It is defined based on a command line option
370
- # in the BitcoinTestFramework class __init__
371
- def initialize_port (port_name : PortName ):
372
- global LAST_USED_PORT_MAP
373
- assert PortSeed .n is not None
374
- LAST_USED_PORT_MAP [port_name ] = (
375
- PORT_MIN
376
- + PORT_START_MAP [port_name ]
377
- + (MAX_NODES * PortSeed .n ) % (PORT_RANGE - 1 - MAX_NODES )
378
- )
379
-
380
-
381
367
def is_port_available (port : int ) -> bool :
382
368
with socket .socket (socket .AF_INET , socket .SOCK_STREAM ) as sock :
383
369
sock .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
@@ -388,43 +374,59 @@ def is_port_available(port: int) -> bool:
388
374
return False
389
375
390
376
391
- # The LRU cache ensures that for a given type and peer / node index, the
392
- # functions always return the same port, and that it is tested only the
393
- # first time. The parameter `n` is not unused, it is the key in the cache
394
- # dictionary.
395
- @lru_cache (maxsize = None )
396
- def unique_port (port_name : PortName , n : int ) -> int :
397
- global LAST_USED_PORT_MAP
398
- if port_name not in LAST_USED_PORT_MAP :
399
- initialize_port (port_name )
400
-
401
- tried_ports = []
402
- for _ in range (MAX_PORT_RETRY ):
403
- LAST_USED_PORT_MAP [port_name ] += 1
404
- port = LAST_USED_PORT_MAP [port_name ]
405
- tried_ports .append (port )
406
- if is_port_available (port ):
407
- return port
408
-
409
- raise RuntimeError (
410
- f"Could not find available { port_name } port after { MAX_PORT_RETRY } attempts (tried ports { tried_ports } )."
411
- )
377
+ class UniquePort :
378
+ port_base : dict [PortName , int ] = {}
379
+
380
+ # The LRU cache ensures that for a given type and peer / node index, the
381
+ # functions always return the same port, and that it is tested only the
382
+ # first time. The parameter `n` is not unused, it is the key in the cache
383
+ # dictionary.
384
+ @staticmethod
385
+ @lru_cache (maxsize = None )
386
+ def get (port_name : PortName , n : int ) -> int :
387
+ def initialize_port (port_name : PortName ):
388
+ assert PortSeed .n is not None
389
+ UniquePort .port_base [port_name ] = (
390
+ PORT_MIN
391
+ + PORT_START_MAP [port_name ]
392
+ + (MAX_NODES * PortSeed .n ) % (PORT_RANGE - 1 - MAX_NODES )
393
+ )
394
+
395
+ if port_name not in UniquePort .port_base :
396
+ initialize_port (port_name )
397
+
398
+ tried_ports = []
399
+ for _ in range (MAX_PORT_RETRY ):
400
+ port = UniquePort .port_base [port_name ] + n
401
+ tried_ports .append (port )
402
+ if is_port_available (port ):
403
+ return port
404
+ # If we are running a lot of tests in parallel it's possible we get a
405
+ # collision. In this case the next ports are likely to collide as well,
406
+ # so we "jump" ports by the CPU count (aka the max number of concurrent
407
+ # test we are likely to run).
408
+ PortSeed .n = (PortSeed .n or 0 ) + (os .cpu_count () or 1 )
409
+ initialize_port (port_name )
410
+
411
+ raise RuntimeError (
412
+ f"Could not find available { port_name } port after { MAX_PORT_RETRY } attempts (tried ports { tried_ports } )."
413
+ )
412
414
413
415
414
416
def p2p_port (n : int ) -> int :
415
- return unique_port (PortName .P2P , n )
417
+ return UniquePort . get (PortName .P2P , n )
416
418
417
419
418
420
def rpc_port (n : int ) -> int :
419
- return unique_port (PortName .RPC , n )
421
+ return UniquePort . get (PortName .RPC , n )
420
422
421
423
422
424
def chronik_port (n : int ) -> int :
423
- return unique_port (PortName .CHRONIK , n )
425
+ return UniquePort . get (PortName .CHRONIK , n )
424
426
425
427
426
428
def chronikelectrum_port (n : int ) -> int :
427
- return unique_port (PortName .CHRONIKELECTRUM , n )
429
+ return UniquePort . get (PortName .CHRONIKELECTRUM , n )
428
430
429
431
430
432
def rpc_url (datadir , chain , host , port ):
0 commit comments