1515import logging
1616import socket
1717import threading
18+ import errno
1819from urllib .parse import ParseResult
1920
2021log = logging .getLogger (__name__ )
2526class TcpClient (SocketClient ):
2627 def __init__ (self , endpoint : ParseResult ):
2728 self ._endpoint = endpoint
28- self ._sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
29- self ._write_lock = threading .Lock ()
29+ # using reentrant lock so that we can retry through recursion
30+ self ._write_lock = threading .RLock ()
31+ self ._connect_lock = threading .RLock ()
3032 self ._should_connect = True
3133
3234 def connect (self ) -> "TcpClient" :
33- try :
34- self ._sock .connect ((self ._endpoint .hostname , self ._endpoint .port ))
35- self ._should_connect = False
36- except socket .timeout as e :
37- log .error ("Socket timeout durring connect %s" % (e ,))
38- self ._should_connect = True
39- except Exception as e :
40- log .error ("Failed to connect to the socket. %s" % (e ,))
41- self ._should_connect = True
42- return self
43-
44- def send_message (self , message : bytes ) -> None :
45- if self ._sock ._closed or self ._should_connect : # type: ignore
35+ with self ._connect_lock :
36+ try :
37+ self ._sock = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
38+ self ._sock .connect ((self ._endpoint .hostname , self ._endpoint .port ))
39+ self ._should_connect = False
40+ except socket .timeout as e :
41+ log .error ("Socket timeout durring connect %s" % (e ,))
42+ except OSError as e :
43+ if e .errno == errno .EISCONN :
44+ log .debug ("Socket is already connected." )
45+ self ._should_connect = False
46+ else :
47+ log .error ("Failed to connect to the socket. %s" % (e ,))
48+ self ._should_connect = True
49+ except Exception as e :
50+ log .error ("Failed to connect to the socket. %s" % (e ,))
51+ self ._should_connect = True
52+ return self
53+
54+ # TODO: once #21 lands, we should increase the max retries
55+ # the reason this is only 1 is to allow for a single
56+ # reconnect attempt in case the agent disconnects
57+ # additional retries and backoff would impose back
58+ # pressure on the caller that may not be accounted
59+ # for. Before we do that, we need to run the I/O
60+ # operations on a background thread.s
61+ def send_message (self , message : bytes , retry : int = 1 ) -> None :
62+ if retry < 0 :
63+ log .error ("Max retries exhausted, dropping message" )
64+ return
65+
66+ if self ._sock is None or self ._sock ._closed or self ._should_connect : # type: ignore
4667 self .connect ()
4768
4869 with self ._write_lock :
@@ -52,9 +73,12 @@ def send_message(self, message: bytes) -> None:
5273 except socket .timeout as e :
5374 log .error ("Socket timeout durring send %s" % (e ,))
5475 self .connect ()
76+ self .send_message (message , retry - 1 )
5577 except socket .error as e :
5678 log .error ("Failed to write metrics to the socket due to socket.error. %s" % (e ,))
5779 self .connect ()
80+ self .send_message (message , retry - 1 )
5881 except Exception as e :
5982 log .error ("Failed to write metrics to the socket due to exception. %s" % (e ,))
6083 self .connect ()
84+ self .send_message (message , retry - 1 )
0 commit comments