99 "io"
1010 "net"
1111 "net/http"
12+ "os"
1213 "sort"
13- "strconv"
1414 "sync"
1515 "sync/atomic"
1616 "time"
@@ -33,11 +33,10 @@ const lineFeed = '\n'
3333type Server struct {
3434 // --- Options section
3535 // Listen Address
36- ListenAddr string
37- WebListenAddr string
38- ConfigPath string
39- // name -> upstream
40- Upstreams map [string ]* Upstream
36+ ListenAddr string
37+ HTTPListenAddr string
38+ ConfigPath string
39+
4140 ReadTimeout time.Duration
4241 WriteTimeout time.Duration
4342 // motd
@@ -53,14 +52,14 @@ type Server struct {
5352 activeConnCount atomic.Int64
5453 connIndex atomic.Uint64
5554
56- TCPListener net.Listener
57- HTTPListener net.Listener
55+ TCPListener * net.TCPListener
56+ HTTPListener * net.TCPListener
5857}
5958
6059func New () * Server {
6160 return & Server {
6261 bufPool : sync.Pool {
63- New : func () interface {} {
62+ New : func () any {
6463 buf := make ([]byte , TCPBufferSize )
6564 return & buf
6665 },
@@ -69,33 +68,33 @@ func New() *Server {
6968 }
7069}
7170
72- func (s * Server ) complete ( ) error {
73- if len (s .Upstreams ) == 0 {
71+ func (s * Server ) loadConfig ( c * Config ) error {
72+ if len (c .Upstreams ) == 0 {
7473 return fmt .Errorf ("no upstream found" )
7574 }
7675
7776 modules := map [string ]string {}
78- for upstreamName , v := range s .Upstreams {
79- addr := net . JoinHostPort ( v . Host , strconv . Itoa ( v . Port ))
77+ for upstreamName , v := range c .Upstreams {
78+ addr := v . Address
8079 _ , err := net .ResolveTCPAddr ("tcp" , addr )
8180 if err != nil {
8281 return fmt .Errorf ("resolve address: %w, upstream=%s, address=%s" , err , upstreamName , addr )
8382 }
8483 for _ , moduleName := range v .Modules {
8584 if _ , ok := modules [moduleName ]; ok {
86- return fmt .Errorf ("duplicated module name: %s, upstream=%s" , moduleName , upstreamName )
85+ return fmt .Errorf ("duplicate module name: %s, upstream=%s" , moduleName , upstreamName )
8786 }
8887 modules [moduleName ] = addr
8988 }
9089 }
9190
9291 s .reloadLock .Lock ()
92+ // s.ListenAddr = c.Proxy.Listen
93+ // s.HTTPListenAddr = c.Proxy.ListenHTTP
94+ s .Motd = c .Proxy .Motd
9395 s .modules = modules
9496 s .reloadLock .Unlock ()
9597
96- // .Upstreams is no longer used, reclaims the memory
97- s .Upstreams = nil
98-
9998 return nil
10099}
101100
@@ -255,12 +254,12 @@ func (s *Server) relay(ctx context.Context, downConn *net.TCPConn) error {
255254 return nil
256255}
257256
258- func (s * Server ) handleConn (ctx context.Context , conn net.Conn ) {
257+ func (s * Server ) handleConn (ctx context.Context , conn * net.TCPConn ) {
259258 s .activeConnCount .Add (1 )
260259 defer s .activeConnCount .Add (- 1 )
261260 _ = s .connIndex .Add (1 )
262- downConn := conn .( * net. TCPConn )
263- err := s .relay (ctx , downConn )
261+
262+ err := s .relay (ctx , conn )
264263 if err != nil {
265264 log .V (2 ).Errorf ("[WARN] handleConn: %s" , err )
266265 }
@@ -278,7 +277,7 @@ func (s *Server) runHTTPServer() error {
278277 Message string `json:"message"`
279278 }
280279
281- err := s .LoadConfigFromFile ()
280+ err := s .ReadConfigFromFile ()
282281 if err != nil {
283282 log .Errorf ("[ERROR] Load config: %s" , err )
284283 w .WriteHeader (http .StatusInternalServerError )
@@ -290,17 +289,20 @@ func (s *Server) runHTTPServer() error {
290289 _ = json .NewEncoder (w ).Encode (& resp )
291290 })
292291
293- mux .HandleFunc ("/status " , func (w http.ResponseWriter , r * http.Request ) {
292+ mux .HandleFunc ("/telegraf " , func (w http.ResponseWriter , r * http.Request ) {
294293 if r .Method != http .MethodGet {
295294 w .WriteHeader (http .StatusMethodNotAllowed )
296295 return
297296 }
298297
299- var resp struct {
300- Count int64 `json:"count"`
298+ timestamp := time .Now ().Truncate (time .Second ).UnixNano ()
299+ count := s .GetActiveConnectionCount ()
300+ hostname , err := os .Hostname ()
301+ if err != nil {
302+ hostname = "(unknown)"
301303 }
302- resp . Count = s . GetActiveConnectionCount ()
303- _ = json . NewEncoder ( w ). Encode ( & resp )
304+ // https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
305+ _ , _ = fmt . Fprintf ( w , "rsync-proxy,host=%q count=%d %d \n " , hostname , count , timestamp )
304306 })
305307
306308 return http .Serve (s .HTTPListener , & mux )
@@ -314,15 +316,15 @@ func (s *Server) Listen() error {
314316 s .ListenAddr = l1 .Addr ().String ()
315317 log .V (3 ).Infof ("[INFO] Rsync proxy listening on %s" , s .ListenAddr )
316318
317- l2 , err := net .Listen ("tcp" , s .WebListenAddr )
319+ l2 , err := net .Listen ("tcp" , s .HTTPListenAddr )
318320 if err != nil {
319321 return fmt .Errorf ("create http listener: %w" , err )
320322 }
321- s .WebListenAddr = l2 .Addr ().String ()
322- log .V (3 ).Infof ("[INFO] HTTP server listening on %s" , s .WebListenAddr )
323+ s .HTTPListenAddr = l2 .Addr ().String ()
324+ log .V (3 ).Infof ("[INFO] HTTP server listening on %s" , s .HTTPListenAddr )
323325
324- s .TCPListener = l1
325- s .HTTPListener = l2
326+ s .TCPListener = l1 .( * net. TCPListener )
327+ s .HTTPListener = l2 .( * net. TCPListener )
326328 return nil
327329}
328330
@@ -336,14 +338,14 @@ func (s *Server) Close() {
336338}
337339
338340func (s * Server ) Run () error {
339- errCh := make (chan error )
341+ errC := make (chan error )
340342 go func () {
341343 err := s .runHTTPServer ()
342344 if err != nil {
343345 if errors .Is (err , net .ErrClosed ) {
344346 return
345347 }
346- errCh <- fmt .Errorf ("run http server: %w" , err )
348+ errC <- fmt .Errorf ("run http server: %w" , err )
347349 }
348350 }()
349351
@@ -352,12 +354,12 @@ func (s *Server) Run() error {
352354
353355 for {
354356 select {
355- case err := <- errCh :
357+ case err := <- errC :
356358 return err
357359 default :
358360 }
359361
360- conn , err := s .TCPListener .Accept ()
362+ conn , err := s .TCPListener .AcceptTCP ()
361363 if err != nil {
362364 if errors .Is (err , net .ErrClosed ) {
363365 return nil
0 commit comments