From 63f3f349592a13403976b993fe44eea75504b839 Mon Sep 17 00:00:00 2001 From: Luiz Felipe Limao Date: Thu, 18 Dec 2025 16:24:57 -0300 Subject: [PATCH] Add IsReady method to App interface and implement connection checks --- pkg/app.go | 40 +++++++++++++++++++++++++- pkg/cluster/etcd_service_discovery.go | 12 ++++++++ pkg/cluster/mocks/service_discovery.go | 15 ++++++++++ pkg/cluster/nats_rpc_client.go | 5 +++- pkg/cluster/nats_rpc_server.go | 6 +++- pkg/cluster/service_discovery.go | 7 ++++- pkg/mocks/app.go | 14 +++++++++ pkg/static.go | 4 +++ 8 files changed, 99 insertions(+), 4 deletions(-) diff --git a/pkg/app.go b/pkg/app.go index 640570bc..6ab23f84 100644 --- a/pkg/app.go +++ b/pkg/app.go @@ -27,7 +27,6 @@ import ( "reflect" "strings" "syscall" - "time" "github.com/golang/protobuf/proto" @@ -127,6 +126,7 @@ type Pitaya interface { GetModule(name string) (interfaces.Module, error) GetNumberOfConnectedClients() int64 + IsReady(ctx context.Context) bool } // App is the base app struct @@ -560,3 +560,41 @@ func (app *App) RegisterRPCJob(rpcJob worker.RPCJob) error { func (app *App) GetNumberOfConnectedClients() int64 { return app.sessionPool.GetSessionCount() } + +// IsReady checks if pitaya is ready and able to serve requests +func (app *App) IsReady(ctx context.Context) bool { + if !app.IsRunning() { + return false + } + + // Check NATS RPC Client connection + if app.rpcClient != nil { + if natsClient, ok := app.rpcClient.(*cluster.NatsRPCClient); ok { + if !natsClient.IsConnected() { + logger.Log.Info("pitaya is not ready") + return false + } + } + } + + // Check NATS RPC Server connection + if app.rpcServer != nil { + if natsServer, ok := app.rpcServer.(*cluster.NatsRPCServer); ok { + if !natsServer.IsConnected() { + logger.Log.Info("pitaya is not ready") + return false + } + } + } + + // Check ETCD connection + if app.serviceDiscovery != nil { + if !app.serviceDiscovery.IsConnected(ctx) { + logger.Log.Info("pitaya is not ready") + return false + } + } + + logger.Log.Info("pitaya is ready") + return true +} diff --git a/pkg/cluster/etcd_service_discovery.go b/pkg/cluster/etcd_service_discovery.go index 9185689c..ed7c4191 100644 --- a/pkg/cluster/etcd_service_discovery.go +++ b/pkg/cluster/etcd_service_discovery.go @@ -722,3 +722,15 @@ func (sd *etcdServiceDiscovery) isServerTypeBlacklisted(svType string) bool { } return false } + +func (sd *etcdServiceDiscovery) IsConnected(ctx context.Context) bool { + if sd.cli == nil { + return false + } + + checkCtx, cancel := context.WithTimeout(ctx, sd.revokeTimeout) + defer cancel() + + _, err := sd.cli.Get(checkCtx, "health-check", clientv3.WithLimit(1)) + return err == nil +} diff --git a/pkg/cluster/mocks/service_discovery.go b/pkg/cluster/mocks/service_discovery.go index 022c4792..9c89915f 100644 --- a/pkg/cluster/mocks/service_discovery.go +++ b/pkg/cluster/mocks/service_discovery.go @@ -5,6 +5,7 @@ package mocks import ( + context "context" reflect "reflect" gomock "github.com/golang/mock/gomock" @@ -128,6 +129,20 @@ func (mr *MockServiceDiscoveryMockRecorder) Init() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Init", reflect.TypeOf((*MockServiceDiscovery)(nil).Init)) } +// IsConnected mocks base method. +func (m *MockServiceDiscovery) IsConnected(arg0 context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsConnected", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsConnected indicates an expected call of IsConnected. +func (mr *MockServiceDiscoveryMockRecorder) IsConnected(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsConnected", reflect.TypeOf((*MockServiceDiscovery)(nil).IsConnected), arg0) +} + // Shutdown mocks base method. func (m *MockServiceDiscovery) Shutdown() error { m.ctrl.T.Helper() diff --git a/pkg/cluster/nats_rpc_client.go b/pkg/cluster/nats_rpc_client.go index 30e1f6d0..00231e7b 100644 --- a/pkg/cluster/nats_rpc_client.go +++ b/pkg/cluster/nats_rpc_client.go @@ -262,7 +262,6 @@ func (ns *NatsRPCClient) Init() error { // initConnection initializes or replaces the NATS connection func (ns *NatsRPCClient) initConnection(isReplacement bool) error { - if !isReplacement { ns.running = true logger.Log.Debugf("connecting to nats (client) with timeout of %s", ns.connectionTimeout) @@ -312,3 +311,7 @@ func (ns *NatsRPCClient) stop() { func (ns *NatsRPCClient) getSubscribeChannel() string { return fmt.Sprintf("pitaya/servers/%s/%s", ns.server.Type, ns.server.ID) } + +func (ns *NatsRPCClient) IsConnected() bool { + return ns.conn != nil && ns.conn.IsConnected() +} diff --git a/pkg/cluster/nats_rpc_server.go b/pkg/cluster/nats_rpc_server.go index b2e30123..06e526a3 100644 --- a/pkg/cluster/nats_rpc_server.go +++ b/pkg/cluster/nats_rpc_server.go @@ -356,7 +356,6 @@ func (ns *NatsRPCServer) Init() error { // initConnection initializes or replaces the NATS connection func (ns *NatsRPCServer) initConnection(isReplacement bool) error { - if !isReplacement { // TODO should we have concurrency here? it feels like we should go ns.handleMessages() @@ -479,3 +478,8 @@ func (ns *NatsRPCServer) reportMetrics() { } } } + +// IsConnected returns true if NATS connection is established +func (ns *NatsRPCServer) IsConnected() bool { + return ns.conn != nil && ns.conn.IsConnected() +} diff --git a/pkg/cluster/service_discovery.go b/pkg/cluster/service_discovery.go index f2dbe142..605cecfe 100644 --- a/pkg/cluster/service_discovery.go +++ b/pkg/cluster/service_discovery.go @@ -20,7 +20,11 @@ package cluster -import "github.com/topfreegames/pitaya/v3/pkg/interfaces" +import ( + "context" + + "github.com/topfreegames/pitaya/v3/pkg/interfaces" +) // ServiceDiscovery is the interface for a service discovery client type ServiceDiscovery interface { @@ -29,5 +33,6 @@ type ServiceDiscovery interface { GetServers() []*Server SyncServers(firstSync bool) error AddListener(listener SDListener) + IsConnected(ctx context.Context) bool interfaces.Module } diff --git a/pkg/mocks/app.go b/pkg/mocks/app.go index 2b54c2a0..f0117d7d 100644 --- a/pkg/mocks/app.go +++ b/pkg/mocks/app.go @@ -373,6 +373,20 @@ func (mr *MockPitayaMockRecorder) GroupRenewTTL(arg0, arg1 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GroupRenewTTL", reflect.TypeOf((*MockPitaya)(nil).GroupRenewTTL), arg0, arg1) } +// IsReady mocks base method. +func (m *MockPitaya) IsReady(arg0 context.Context) bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsReady", arg0) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsReady indicates an expected call of IsReady. +func (mr *MockPitayaMockRecorder) IsReady(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsReady", reflect.TypeOf((*MockPitaya)(nil).IsReady), arg0) +} + // IsRunning mocks base method. func (m *MockPitaya) IsRunning() bool { m.ctrl.T.Helper() diff --git a/pkg/static.go b/pkg/static.go index a4c667ea..18ad189b 100644 --- a/pkg/static.go +++ b/pkg/static.go @@ -229,3 +229,7 @@ func GetModule(name string) (interfaces.Module, error) { func GetNumberOfConnectedClients() int64 { return DefaultApp.GetNumberOfConnectedClients() } + +func IsReady(ctx context.Context) bool { + return DefaultApp.IsReady(ctx) +}