Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"reflect"
"strings"
"syscall"

"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -128,6 +127,7 @@ type Pitaya interface {
GetModule(name string) (interfaces.Module, error)

GetNumberOfConnectedClients() int64
IsReady(ctx context.Context) bool
}

// App is the base app struct
Expand Down Expand Up @@ -567,3 +567,41 @@ func (app *App) RegisterRPCJob(rpcJob worker.RPCJob) error {
func (app *App) GetNumberOfConnectedClients() int64 {
return app.sessionPool.GetSessionCount()
}

// IsReady checks if pitaya is ready and able to serve requests
func (app *App) IsReady(ctx context.Context) bool {
if !app.IsRunning() {
return false
}

// Check NATS RPC Client connection
if app.rpcClient != nil {
if natsClient, ok := app.rpcClient.(*cluster.NatsRPCClient); ok {
if !natsClient.IsConnected() {
logger.Log.Info("pitaya is not ready")
return false
}
}
}

// Check NATS RPC Server connection
if app.rpcServer != nil {
if natsServer, ok := app.rpcServer.(*cluster.NatsRPCServer); ok {
if !natsServer.IsConnected() {
logger.Log.Info("pitaya is not ready")
return false
}
}
}

// Check ETCD connection
if app.serviceDiscovery != nil {
if !app.serviceDiscovery.IsConnected(ctx) {
logger.Log.Info("pitaya is not ready")
return false
}
}

logger.Log.Info("pitaya is ready")
return true
}
15 changes: 13 additions & 2 deletions cluster/etcd_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ func (sd *etcdServiceDiscovery) SyncServers(firstSync bool) error {
}

// delete invalid servers (local ones that are not in etcd)
var allIds = make([]string, 0)
allIds := make([]string, 0)

// Spawn worker goroutines that will work in parallel
parallelGetter := newParallelGetter(sd.cli, sd.syncServersParallelism)
Expand Down Expand Up @@ -716,7 +716,6 @@ func (sd *etcdServiceDiscovery) watchEtcdChanges() {
case <-sd.stopChan:
return
}

}
}(w)
}
Expand All @@ -729,3 +728,15 @@ func (sd *etcdServiceDiscovery) isServerTypeBlacklisted(svType string) bool {
}
return false
}

func (sd *etcdServiceDiscovery) IsConnected(ctx context.Context) bool {
if sd.cli == nil {
return false
}

checkCtx, cancel := context.WithTimeout(ctx, sd.revokeTimeout)
defer cancel()

_, err := sd.cli.Get(checkCtx, "health-check", clientv3.WithLimit(1))
return err == nil
}
15 changes: 15 additions & 0 deletions cluster/mocks/service_discovery.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion cluster/nats_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ func (ns *NatsRPCClient) Init() error {

// initConnection initializes or replaces the NATS connection
func (ns *NatsRPCClient) initConnection(isReplacement bool) error {

if !isReplacement {
ns.running = true
logger.Log.Debugf("connecting to nats (client) with timeout of %s", ns.connectionTimeout)
Expand Down Expand Up @@ -310,3 +309,7 @@ func (ns *NatsRPCClient) stop() {
func (ns *NatsRPCClient) getSubscribeChannel() string {
return fmt.Sprintf("pitaya/servers/%s/%s", ns.server.Type, ns.server.ID)
}

func (ns *NatsRPCClient) IsConnected() bool {
return ns.conn != nil && ns.conn.IsConnected()
}
5 changes: 4 additions & 1 deletion cluster/nats_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,6 @@ func (ns *NatsRPCServer) Init() error {

// initConnection initializes or replaces the NATS connection
func (ns *NatsRPCServer) initConnection(isReplacement bool) error {

if !isReplacement {
// TODO should we have concurrency here? it feels like we should
go ns.handleMessages()
Expand Down Expand Up @@ -482,3 +481,7 @@ func (ns *NatsRPCServer) reportMetrics() {
}
}
}

func (ns *NatsRPCServer) IsConnected() bool {
return ns.conn != nil && ns.conn.IsConnected()
}
7 changes: 6 additions & 1 deletion cluster/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

package cluster

import "github.com/topfreegames/pitaya/v2/interfaces"
import (
"context"

"github.com/topfreegames/pitaya/v2/interfaces"
)

// ServiceDiscovery is the interface for a service discovery client
type ServiceDiscovery interface {
Expand All @@ -29,5 +33,6 @@ type ServiceDiscovery interface {
GetServers() []*Server
SyncServers(firstSync bool) error
AddListener(listener SDListener)
IsConnected(ctx context.Context) bool
interfaces.Module
}
14 changes: 14 additions & 0 deletions mocks/app.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions static.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,7 @@ func GetModule(name string) (interfaces.Module, error) {
func GetNumberOfConnectedClients() int64 {
return DefaultApp.GetNumberOfConnectedClients()
}

func IsReady(ctx context.Context) bool {
return DefaultApp.IsReady(ctx)
}