Skip to content

Commit 24d4f81

Browse files
authored
Merge pull request #3183 from akhilmhdh/feat/connector
feat: fixed cli issues in gateway
2 parents 3badcea + 08f23e2 commit 24d4f81

File tree

4 files changed

+158
-77
lines changed

4 files changed

+158
-77
lines changed

cli/packages/gateway/connection.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,14 @@ func handleStream(stream quic.Stream, quicConn quic.Connection) {
6161

6262
switch string(cmd) {
6363
case "FORWARD-TCP":
64-
log.Info().Msg("Starting secure connector proxy...")
6564
proxyAddress := string(bytes.Split(args, []byte(" "))[0])
6665
destTarget, err := net.Dial("tcp", proxyAddress)
6766
if err != nil {
6867
log.Error().Msgf("Failed to connect to target: %v", err)
6968
return
7069
}
7170
defer destTarget.Close()
71+
log.Info().Msgf("Starting secure transmission between %s->%s", quicConn.LocalAddr().String(), destTarget.LocalAddr().String())
7272

7373
// Handle buffered data
7474
buffered := reader.Buffered()
@@ -87,6 +87,7 @@ func handleStream(stream quic.Stream, quicConn quic.Connection) {
8787
}
8888

8989
CopyDataFromQuicToTcp(stream, destTarget)
90+
log.Info().Msgf("Ending secure transmission between %s->%s", quicConn.LocalAddr().String(), destTarget.LocalAddr().String())
9091
return
9192
case "PING":
9293
if _, err := stream.Write([]byte("PONG\n")); err != nil {

cli/packages/gateway/gateway.go

+68-76
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,13 @@ import (
66
"crypto/x509"
77
"fmt"
88
"net"
9+
"os"
910
"strings"
1011
"sync"
1112
"time"
1213

1314
"github.com/Infisical/infisical-merge/packages/api"
15+
"github.com/Infisical/infisical-merge/packages/systemd"
1416
"github.com/go-resty/resty/v2"
1517
"github.com/pion/logging"
1618
"github.com/pion/turn/v4"
@@ -75,14 +77,18 @@ func (g *Gateway) ConnectWithRelay() error {
7577

7678
// Start a new TURN Client and wrap our net.Conn in a STUNConn
7779
// This allows us to simulate datagram based communication over a net.Conn
80+
logger := logging.NewDefaultLoggerFactory()
81+
if os.Getenv("LOG_LEVEL") == "debug" {
82+
logger.DefaultLogLevel = logging.LogLevelDebug
83+
}
7884
cfg := &turn.ClientConfig{
7985
STUNServerAddr: relayDetails.TurnServerAddress,
8086
TURNServerAddr: relayDetails.TurnServerAddress,
8187
Conn: turn.NewSTUNConn(conn),
8288
Username: relayDetails.TurnServerUsername,
8389
Password: relayDetails.TurnServerPassword,
8490
Realm: relayDetails.TurnServerRealm,
85-
LoggerFactory: logging.NewDefaultLoggerFactory(),
91+
LoggerFactory: logger,
8692
}
8793

8894
client, err := turn.NewClient(cfg)
@@ -96,10 +102,6 @@ func (g *Gateway) ConnectWithRelay() error {
96102
TurnServerAddress: relayDetails.TurnServerAddress,
97103
InfisicalStaticIp: relayDetails.InfisicalStaticIp,
98104
}
99-
// if port not specific allow all port
100-
if relayDetails.InfisicalStaticIp != "" && !strings.Contains(relayDetails.InfisicalStaticIp, ":") {
101-
g.config.InfisicalStaticIp = g.config.InfisicalStaticIp + ":0"
102-
}
103105

104106
g.client = client
105107
return nil
@@ -144,7 +146,10 @@ func (g *Gateway) Listen(ctx context.Context) error {
144146
errCh := make(chan error, 1)
145147
shutdownCh := make(chan bool, 1)
146148

147-
g.registerPermissionRefresh(ctx, errCh)
149+
if err = g.createPermissionForStaticIps(g.config.InfisicalStaticIp); err != nil {
150+
return err
151+
}
152+
148153
g.registerHeartBeat(ctx, errCh)
149154

150155
cert, err := tls.X509KeyPair([]byte(gatewayCert.Certificate), []byte(gatewayCert.PrivateKey))
@@ -171,8 +176,7 @@ func (g *Gateway) Listen(ctx context.Context) error {
171176
KeepAlivePeriod: 2 * time.Second,
172177
}
173178

174-
g.registerRelayIsActive(ctx, relayUdpConnection.LocalAddr().String(), tlsConfig, quicConfig, errCh)
175-
179+
g.registerRelayIsActive(ctx, relayUdpConnection.LocalAddr().String(), errCh)
176180
quicListener, err := quic.Listen(relayUdpConnection, tlsConfig, quicConfig)
177181
if err != nil {
178182
return fmt.Errorf("Failed to listen for QUIC: %w", err)
@@ -234,6 +238,8 @@ func (g *Gateway) Listen(ctx context.Context) error {
234238
}
235239
}()
236240

241+
// make this compatiable with systemd notify mode
242+
systemd.SdNotify(false, systemd.SdNotifyReady)
237243
select {
238244
case <-ctx.Done():
239245
log.Info().Msg("Shutting down gateway...")
@@ -282,90 +288,76 @@ func (g *Gateway) registerHeartBeat(ctx context.Context, errCh chan error) {
282288
}()
283289
}
284290

285-
func (g *Gateway) registerRelayIsActive(ctx context.Context, serverAddr string, tlsConf *tls.Config, quicConf *quic.Config, errCh chan error) {
286-
ticker := time.NewTicker(5 * time.Second)
287-
maxFailures := 3
288-
failures := 0
291+
func (g *Gateway) createPermissionForStaticIps(staticIps string) error {
292+
if staticIps == "" {
293+
return fmt.Errorf("Missing Infisical static ips for permission")
294+
}
289295

290-
go func() {
291-
time.Sleep(2 * time.Second)
292-
for {
293-
select {
294-
case <-ctx.Done():
295-
return
296-
case <-ticker.C:
297-
conn, err := quic.DialAddr(ctx, serverAddr, tlsConf, quicConf)
298-
if conn != nil {
299-
failures = 0
300-
conn.CloseWithError(0, "connection closed")
301-
}
296+
splittedIps := strings.Split(staticIps, ",")
297+
resolvedIps := make([]net.Addr, 0)
298+
for _, ip := range splittedIps {
299+
ip = strings.TrimSpace(ip)
300+
if ip == "" {
301+
continue
302+
}
302303

303-
if err != nil && !strings.Contains(err.Error(), "tls: failed to verify certificate") {
304-
failures++
305-
log.Warn().Err(err).Int("failures", failures).Msg("Relay connection check failed")
304+
// if port not specific allow all port
305+
if !strings.Contains(ip, ":") {
306+
ip = ip + ":0"
307+
}
306308

307-
if failures >= maxFailures {
308-
errCh <- fmt.Errorf("relay connection check failed: %w", err)
309-
}
310-
}
311-
}
309+
peerAddr, err := net.ResolveUDPAddr("udp", ip)
310+
if err != nil {
311+
return fmt.Errorf("Failed to resolve static ip for permission: %w", err)
312312
}
313-
}()
314-
}
315313

316-
func (g *Gateway) registerPermissionRefresh(ctx context.Context, errCh chan error) {
317-
if g.config.InfisicalStaticIp == "" {
318-
return
314+
resolvedIps = append(resolvedIps, peerAddr)
319315
}
320316

321-
log.Info().Msg("Starting TURN permission refresh routine")
322-
323-
go func() {
324-
ticker := time.NewTicker(30 * time.Second)
325-
defer ticker.Stop()
317+
if err := g.client.CreatePermission(resolvedIps...); err != nil {
318+
return fmt.Errorf("Failed to set ip permission: %w", err)
319+
}
320+
return nil
321+
}
326322

327-
g.refreshPermission(errCh)
323+
func (g *Gateway) registerRelayIsActive(ctx context.Context, relayAddress string, errCh chan error) error {
324+
ticker := time.NewTicker(10 * time.Second)
325+
maxFailures := 3
326+
failures := 0
328327

328+
go func() {
329+
time.Sleep(2 * time.Second)
329330
for {
330331
select {
331332
case <-ctx.Done():
332-
log.Info().Msg("Context cancelled, stopping TURN permission refresh")
333333
return
334334
case <-ticker.C:
335-
g.refreshPermission(errCh)
335+
// Configure TLS to skip verification
336+
tlsConfig := &tls.Config{
337+
InsecureSkipVerify: true,
338+
NextProtos: []string{"infisical-gateway"},
339+
}
340+
quicConfig := &quic.Config{
341+
EnableDatagrams: true,
342+
}
343+
func() {
344+
checkCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
345+
defer cancel()
346+
conn, err := quic.DialAddr(checkCtx, relayAddress, tlsConfig, quicConfig)
347+
if err != nil {
348+
failures++
349+
log.Warn().Err(err).Int("failures", failures).Msg("Relay connection check failed")
350+
if failures >= maxFailures {
351+
errCh <- fmt.Errorf("relay connection check failed: %w", err)
352+
}
353+
}
354+
if conn != nil {
355+
conn.CloseWithError(0, "closed")
356+
}
357+
}()
336358
}
337359
}
338360
}()
339-
}
340361

341-
func (g *Gateway) refreshPermission(errCh chan error) {
342-
log.Info().Msg("Attempting to refresh TURN permission")
343-
maxRetries := 3
344-
retryDelay := 5 * time.Second
345-
346-
var lastErr error
347-
for i := 0; i < maxRetries; i++ {
348-
peerAddr, err := net.ResolveUDPAddr("udp", g.config.InfisicalStaticIp)
349-
if err != nil {
350-
log.Error().Err(err).Msg("Failed to resolve static IP for permission refresh")
351-
continue
352-
}
353-
354-
if err := g.client.CreatePermission(peerAddr); err != nil {
355-
lastErr = err
356-
log.Warn().Err(err).Int("attempt", i+1).Msg("Failed to refresh TURN permission, retrying...")
357-
time.Sleep(retryDelay)
358-
continue
359-
}
360-
361-
log.Info().Msg("Successfully refreshed TURN permission")
362-
return
363-
}
364-
365-
if lastErr != nil {
366-
log.Error().Err(lastErr).Msg("Failed to refresh TURN permission after retries")
367-
if reconnectErr := g.ConnectWithRelay(); reconnectErr != nil {
368-
errCh <- fmt.Errorf("failed to refresh permissions and reconnect: %w", reconnectErr)
369-
}
370-
}
362+
return nil
371363
}

cli/packages/gateway/relay.go

+4
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"syscall"
1414

1515
udplistener "github.com/Infisical/infisical-merge/packages/gateway/udp_listener"
16+
"github.com/Infisical/infisical-merge/packages/systemd"
1617
"github.com/pion/logging"
1718
"github.com/pion/turn/v4"
1819
"github.com/rs/zerolog/log"
@@ -164,6 +165,9 @@ func (g *GatewayRelay) Run() error {
164165
}
165166

166167
log.Info().Msgf("Relay listening on %s\n", connAddress)
168+
169+
// make this compatiable with systemd notify mode
170+
systemd.SdNotify(false, systemd.SdNotifyReady)
167171
// Block until user sends SIGINT or SIGTERM
168172
sigs := make(chan os.Signal, 1)
169173
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

cli/packages/systemd/daemon.go

+84
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
// Copyright 2014 Docker, Inc.
2+
// Copyright 2015-2018 CoreOS, Inc.
3+
//
4+
// Licensed under the Apache License, Version 2.0 (the "License");
5+
// you may not use this file except in compliance with the License.
6+
// You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
//
16+
17+
// Package daemon provides a Go implementation of the sd_notify protocol.
18+
// It can be used to inform systemd of service start-up completion, watchdog
19+
// events, and other status changes.
20+
//
21+
// https://www.freedesktop.org/software/systemd/man/sd_notify.html#Description
22+
package systemd
23+
24+
import (
25+
"net"
26+
"os"
27+
)
28+
29+
const (
30+
// SdNotifyReady tells the service manager that service startup is finished
31+
// or the service finished loading its configuration.
32+
SdNotifyReady = "READY=1"
33+
34+
// SdNotifyStopping tells the service manager that the service is beginning
35+
// its shutdown.
36+
SdNotifyStopping = "STOPPING=1"
37+
38+
// SdNotifyReloading tells the service manager that this service is
39+
// reloading its configuration. Note that you must call SdNotifyReady when
40+
// it completed reloading.
41+
SdNotifyReloading = "RELOADING=1"
42+
43+
// SdNotifyWatchdog tells the service manager to update the watchdog
44+
// timestamp for the service.
45+
SdNotifyWatchdog = "WATCHDOG=1"
46+
)
47+
48+
// SdNotify sends a message to the init daemon. It is common to ignore the error.
49+
// If `unsetEnvironment` is true, the environment variable `NOTIFY_SOCKET`
50+
// will be unconditionally unset.
51+
//
52+
// It returns one of the following:
53+
// (false, nil) - notification not supported (i.e. NOTIFY_SOCKET is unset)
54+
// (false, err) - notification supported, but failure happened (e.g. error connecting to NOTIFY_SOCKET or while sending data)
55+
// (true, nil) - notification supported, data has been sent
56+
func SdNotify(unsetEnvironment bool, state string) (bool, error) {
57+
socketAddr := &net.UnixAddr{
58+
Name: os.Getenv("NOTIFY_SOCKET"),
59+
Net: "unixgram",
60+
}
61+
62+
// NOTIFY_SOCKET not set
63+
if socketAddr.Name == "" {
64+
return false, nil
65+
}
66+
67+
if unsetEnvironment {
68+
if err := os.Unsetenv("NOTIFY_SOCKET"); err != nil {
69+
return false, err
70+
}
71+
}
72+
73+
conn, err := net.DialUnix(socketAddr.Net, nil, socketAddr)
74+
// Error connecting to NOTIFY_SOCKET
75+
if err != nil {
76+
return false, err
77+
}
78+
defer conn.Close()
79+
80+
if _, err = conn.Write([]byte(state)); err != nil {
81+
return false, err
82+
}
83+
return true, nil
84+
}

0 commit comments

Comments
 (0)