Skip to content

Commit 245d0de

Browse files
committed
Close listener when service is deleted or permission is lost to the service. Fixes #813
1 parent 97ec061 commit 245d0de

File tree

3 files changed

+91
-42
lines changed

3 files changed

+91
-42
lines changed

ziti/edge/network/hosting_conn.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,6 @@ func (conn *edgeHostConn) listen(session *rest_model.SessionDetail, service *res
367367
baseListener: baseListener{
368368
service: service,
369369
acceptC: make(chan edge.Conn, 10),
370-
errorC: make(chan error, 1),
371370
},
372371
token: *session.Token,
373372
edgeChan: conn,

ziti/edge/network/listener.go

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,26 @@ package network
1818

1919
import (
2020
"fmt"
21-
"github.com/michaelquigley/pfxlog"
22-
"github.com/openziti/edge-api/rest_model"
23-
"github.com/openziti/sdk-golang/xgress"
24-
"github.com/openziti/sdk-golang/ziti/edge"
25-
"github.com/pkg/errors"
2621
"math"
2722
"net"
2823
"reflect"
2924
"strings"
3025
"sync"
3126
"sync/atomic"
3227
"time"
28+
29+
"github.com/michaelquigley/pfxlog"
30+
"github.com/openziti/edge-api/rest_model"
31+
"github.com/openziti/foundation/v2/concurrenz"
32+
"github.com/openziti/sdk-golang/xgress"
33+
"github.com/openziti/sdk-golang/ziti/edge"
34+
"github.com/pkg/errors"
3335
)
3436

3537
type baseListener struct {
3638
service *rest_model.ServiceDetail
3739
acceptC chan edge.Conn
38-
errorC chan error
40+
err concurrenz.AtomicValue[error]
3941
closed atomic.Bool
4042
}
4143

@@ -79,10 +81,8 @@ func (listener *baseListener) AcceptEdge() (edge.Conn, error) {
7981
}
8082
}
8183

82-
select {
83-
case err := <-listener.errorC:
84+
if err := listener.err.Load(); err != nil {
8485
return nil, fmt.Errorf("listener is closed (%w)", err)
85-
default:
8686
}
8787

8888
return nil, errors.New("listener is closed")
@@ -188,7 +188,6 @@ func NewMultiListener(service *rest_model.ServiceDetail, getSessionF func() *res
188188
baseListener: baseListener{
189189
service: service,
190190
acceptC: make(chan edge.Conn),
191-
errorC: make(chan error),
192191
},
193192
listeners: map[*edgeListener]struct{}{},
194193
getSessionF: getSessionF,
@@ -405,37 +404,37 @@ func (self *multiListener) accept(conn edge.Conn, ticker *time.Ticker) {
405404
}
406405

407406
func (self *multiListener) Close() error {
408-
self.closed.Store(true)
409-
410-
self.listenerLock.Lock()
411-
defer self.listenerLock.Unlock()
407+
if self.closed.CompareAndSwap(false, true) {
408+
self.listenerLock.Lock()
409+
defer self.listenerLock.Unlock()
412410

413-
var resultErrors []error
414-
for child := range self.listeners {
415-
if err := child.Close(); err != nil {
416-
resultErrors = append(resultErrors, err)
411+
var resultErrors []error
412+
for child := range self.listeners {
413+
if err := child.Close(); err != nil {
414+
resultErrors = append(resultErrors, err)
415+
}
417416
}
418-
}
419417

420-
self.listeners = nil
418+
self.listeners = nil
421419

422-
select {
423-
case self.acceptC <- nil:
424-
default:
425-
// If the queue is full, bail out, we're just popping a nil on the
426-
// accept queue to let it return from accept more quickly
420+
select {
421+
case self.acceptC <- nil:
422+
default:
423+
// If the queue is full, bail out, we're just popping a nil on the
424+
// accept queue to let it return from accept more quickly
425+
}
426+
427+
return self.condenseErrors(resultErrors)
427428
}
428429

429-
return self.condenseErrors(resultErrors)
430+
return nil
430431
}
431432

432433
func (self *multiListener) CloseWithError(err error) {
433-
select {
434-
case self.errorC <- err:
435-
default:
434+
self.err.Store(err)
435+
if closeErr := self.Close(); closeErr != nil {
436+
pfxlog.Logger().WithError(err).Error("error closing edge listener")
436437
}
437-
438-
self.closed.Store(true)
439438
}
440439

441440
type MultipleErrors []error

ziti/ziti.go

Lines changed: 61 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"net"
2626
"net/url"
2727
"reflect"
28+
"slices"
2829
"strconv"
2930
"strings"
3031
"sync"
@@ -1757,7 +1758,12 @@ func (context *ContextImpl) createSessionWithBackoff(service *rest_model.Service
17571758
var session *rest_model.SessionDetail
17581759
operation := func() error {
17591760
latestSvc, _ := context.services.Get(*service.Name)
1760-
if latestSvc != nil && *latestSvc.ID != *service.ID {
1761+
1762+
if latestSvc == nil {
1763+
return backoff.Permanent(fmt.Errorf("service %v not found", *service.Name))
1764+
}
1765+
1766+
if *latestSvc.ID != *service.ID {
17611767
pfxlog.Logger().
17621768
WithField("serviceName", *service.Name).
17631769
WithField("oldServiceId", *service.ID).
@@ -1782,6 +1788,14 @@ func (context *ContextImpl) createSessionWithBackoff(service *rest_model.Service
17821788
return session, backoff.Retry(operation, expBackoff)
17831789
}
17841790

1791+
func (context *ContextImpl) isNotFoundApiError(err error) bool {
1792+
apiFormattedErr := &rest_util.APIFormattedError{}
1793+
if errors.As(err, &apiFormattedErr) {
1794+
return apiFormattedErr.APIError != nil && apiFormattedErr.Code == errorz.NotFoundCode
1795+
}
1796+
return false
1797+
}
1798+
17851799
func (context *ContextImpl) isUnauthorizedApiError(err error) bool {
17861800
apiFormattedErr := &rest_util.APIFormattedError{}
17871801
if errors.As(err, &apiFormattedErr) {
@@ -1815,7 +1829,7 @@ func (context *ContextImpl) createSession(service *rest_model.ServiceDetail, ses
18151829
}
18161830

18171831
var createSessionNotFound = &rest_session.CreateSessionNotFound{}
1818-
if errors.As(err, &createSessionNotFound) {
1832+
if context.isNotFoundApiError(err) || errors.As(err, &createSessionNotFound) {
18191833
if refreshErr := context.refreshServices(false, false); refreshErr != nil {
18201834
logger.WithError(refreshErr).Info("failed to refresh services after create session returned 404 (likely for service)")
18211835
}
@@ -2022,9 +2036,16 @@ func (mgr *listenerManager) notify(eventType ListenEventType) {
20222036

20232037
func (mgr *listenerManager) run() {
20242038
log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name))
2039+
2040+
start := time.Now()
20252041
// need to either establish a session, or fail if we can't create one
20262042
for mgr.session == nil {
2027-
mgr.createSessionWithBackoff()
2043+
if err := mgr.createSessionWithBackoff(); err != nil {
2044+
if time.Since(start) > mgr.options.ConnectTimeout {
2045+
log.WithError(err).Error("timed out trying to create session to bind service")
2046+
return
2047+
}
2048+
}
20282049
}
20292050

20302051
mgr.makeMoreListeners()
@@ -2234,6 +2255,17 @@ func (mgr *listenerManager) makeMoreListeners() {
22342255
}
22352256
}
22362257

2258+
func (mgr *listenerManager) createSessionWithPermissionCheck() {
2259+
log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name))
2260+
2261+
if err := mgr.createSessionWithBackoff(); err != nil {
2262+
if IsServiceAccessDeniedError(err) {
2263+
log.WithError(err).Error("service access lost, closing service listener")
2264+
mgr.listener.CloseWithError(err)
2265+
}
2266+
}
2267+
}
2268+
22372269
func (mgr *listenerManager) refreshSession() {
22382270
if time.Since(mgr.lastSessionRefresh) < 30*time.Second {
22392271
return
@@ -2242,7 +2274,7 @@ func (mgr *listenerManager) refreshSession() {
22422274
log := pfxlog.Logger().WithField("service", stringz.OrEmpty(mgr.service.Name))
22432275
if mgr.session == nil {
22442276
log.Debug("establishing initial session")
2245-
mgr.createSessionWithBackoff()
2277+
mgr.createSessionWithPermissionCheck()
22462278
return
22472279
}
22482280

@@ -2254,7 +2286,7 @@ func (mgr *listenerManager) refreshSession() {
22542286
var detailSessionNotFound = &rest_session.DetailSessionNotFound{}
22552287
if errors.As(err, &detailSessionNotFound) {
22562288
// try to create new session
2257-
mgr.createSessionWithBackoff()
2289+
mgr.createSessionWithPermissionCheck()
22582290
return
22592291
}
22602292

@@ -2286,7 +2318,7 @@ func (mgr *listenerManager) refreshSession() {
22862318
log.WithError(err).Errorf("failed to to refresh session %v", *mgr.session.ID)
22872319

22882320
// try to create new session
2289-
mgr.createSessionWithBackoff()
2321+
mgr.createSessionWithPermissionCheck()
22902322
}
22912323
}
22922324

@@ -2297,10 +2329,15 @@ func (mgr *listenerManager) refreshSession() {
22972329
}
22982330
}
22992331

2300-
func (mgr *listenerManager) createSessionWithBackoff() {
2332+
func (mgr *listenerManager) createSessionWithBackoff() error {
23012333
log := pfxlog.Logger().WithField("serviceName", *mgr.service.Name)
23022334
latestSvc, _ := mgr.context.services.Get(*mgr.service.Name)
2303-
if latestSvc != nil && *latestSvc.ID != *mgr.service.ID {
2335+
2336+
if latestSvc == nil || !slices.Contains(latestSvc.Permissions, rest_model.DialBindBind) {
2337+
return ServiceAccessDeniedError{serviceName: *mgr.service.Name}
2338+
}
2339+
2340+
if *latestSvc.ID != *mgr.service.ID {
23042341
log.WithField("oldServiceId", *mgr.service.ID).
23052342
WithField("newServiceId", *latestSvc.ID).
23062343
Info("service id changed, service was recreated")
@@ -2311,9 +2348,11 @@ func (mgr *listenerManager) createSessionWithBackoff() {
23112348
if session != nil {
23122349
mgr.sessionRefreshed(session)
23132350
log.Debug("new service session created")
2314-
} else {
2315-
log.WithError(err).Errorf("failed to create bind session for service %v", mgr.service.Name)
2351+
return nil
23162352
}
2353+
2354+
log.WithError(err).Errorf("failed to create bind session for service %v", mgr.service.Name)
2355+
return err
23172356
}
23182357

23192358
func (mgr *listenerManager) GetCurrentSession() *rest_model.SessionDetail {
@@ -2401,3 +2440,15 @@ const (
24012440
type ListenEventObserver interface {
24022441
Notify(eventType ListenEventType)
24032442
}
2443+
2444+
func IsServiceAccessDeniedError(err error) bool {
2445+
return errors.As(err, &ServiceAccessDeniedError{})
2446+
}
2447+
2448+
type ServiceAccessDeniedError struct {
2449+
serviceName string
2450+
}
2451+
2452+
func (s ServiceAccessDeniedError) Error() string {
2453+
return fmt.Sprintf("identity does not have permission to bind service '%v', or service does not exist", s.serviceName)
2454+
}

0 commit comments

Comments
 (0)