Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Elastic Mode #359 #401

Merged
merged 1 commit into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,6 @@ func TestGetProxies(t *testing.T) {
context.TODO(),
newPool,
nil,
false,
false,
config.DefaultHealthCheckPeriod,
&config.Client{
Network: config.DefaultNetwork,
Expand Down Expand Up @@ -233,8 +231,6 @@ func TestGetServers(t *testing.T) {
context.TODO(),
newPool,
nil,
true,
false,
config.DefaultHealthCheckPeriod,
&config.Client{
Network: config.DefaultNetwork,
Expand Down
471 changes: 234 additions & 237 deletions api/v1/api.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/v1/api.proto

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 2 additions & 4 deletions api/v1/api.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,6 @@ var runCmd = &cobra.Command{
runCtx,
pools[name],
pluginRegistry,
cfg.Elastic,
cfg.ReuseElasticClients,
cfg.HealthCheckPeriod,
clientConfig,
logger,
Expand All @@ -742,8 +740,6 @@ var runCmd = &cobra.Command{

span.AddEvent("Create proxy", trace.WithAttributes(
attribute.String("name", name),
attribute.Bool("elastic", cfg.Elastic),
attribute.Bool("reuseElasticClients", cfg.ReuseElasticClients),
attribute.String("healthCheckPeriod", cfg.HealthCheckPeriod.String()),
))

Expand Down
4 changes: 2 additions & 2 deletions cmd/testdata/gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pools:

proxies:
default:
elastic: False
healthCheckPeriod: 60s # duration
test:
elastic: False
healthCheckPeriod: 60s # duration

servers:
default:
Expand Down
2 changes: 1 addition & 1 deletion cmd/testdata/gatewayd_tls.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pools:

proxies:
default:
elastic: False
healthCheckPeriod: 60s # duration

servers:
default:
Expand Down
4 changes: 1 addition & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ func (c *Config) LoadDefaults(ctx context.Context) {
}

defaultProxy := Proxy{
Elastic: false,
ReuseElasticClients: false,
HealthCheckPeriod: DefaultHealthCheckPeriod,
HealthCheckPeriod: DefaultHealthCheckPeriod,
}

defaultServer := Server{
Expand Down
4 changes: 1 addition & 3 deletions config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,7 @@ type Pool struct {
}

type Proxy struct {
Elastic bool `json:"elastic"`
ReuseElasticClients bool `json:"reuseElasticClients"`
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
HealthCheckPeriod time.Duration `json:"healthCheckPeriod" jsonschema:"oneof_type=string;integer"`
}

type Server struct {
Expand Down
2 changes: 0 additions & 2 deletions gatewayd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pools:

proxies:
default:
elastic: False
reuseElasticClients: False
healthCheckPeriod: 60s # duration

servers:
Expand Down
77 changes: 19 additions & 58 deletions network/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,9 @@ type Proxy struct {
scheduler *gocron.Scheduler
ctx context.Context //nolint:containedctx
pluginTimeout time.Duration
HealthCheckPeriod time.Duration

Elastic bool
ReuseElasticClients bool
HealthCheckPeriod time.Duration

// ClientConfig is used for elastic proxy and reconnection
// ClientConfig is used for reconnection
ClientConfig *config.Client
}

Expand All @@ -55,7 +52,6 @@ var _ IProxy = (*Proxy)(nil)
func NewProxy(
ctx context.Context,
connPool pool.IPool, pluginRegistry *plugin.Registry,
elastic, reuseElasticClients bool,
healthCheckPeriod time.Duration,
clientConfig *config.Client, logger zerolog.Logger,
pluginTimeout time.Duration,
Expand All @@ -71,8 +67,6 @@ func NewProxy(
scheduler: gocron.NewScheduler(time.UTC),
ctx: proxyCtx,
pluginTimeout: pluginTimeout,
Elastic: elastic,
ReuseElasticClients: reuseElasticClients,
ClientConfig: clientConfig,
HealthCheckPeriod: healthCheckPeriod,
}
Expand Down Expand Up @@ -138,8 +132,7 @@ func NewProxy(
}

// Connect maps a server connection from the available connection pool to a incoming connection.
// It returns an error if the pool is exhausted. If the pool is elastic, it creates a new client
// and maps it to the incoming connection.
// It returns an error if the pool is exhausted.
func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError {
_, span := otel.Tracer(config.TracerName).Start(pr.ctx, "Connect")
defer span.End()
Expand All @@ -156,34 +149,13 @@ func (pr *Proxy) Connect(conn *ConnWrapper) *gerr.GatewayDError {

var client *Client
if pr.IsExhausted() {
// Pool is exhausted or is elastic.
if pr.Elastic {
// Create a new client.
client = NewClient(
pr.ctx, pr.ClientConfig, pr.logger,
NewRetry(
pr.ClientConfig.Retries,
config.If[time.Duration](
pr.ClientConfig.Backoff > 0,
pr.ClientConfig.Backoff,
config.DefaultBackoff,
),
pr.ClientConfig.BackoffMultiplier,
pr.ClientConfig.DisableBackoffCaps,
pr.logger,
),
)
span.AddEvent("Created a new client connection")
pr.logger.Debug().Str("id", client.ID[:7]).Msg("Reused the client connection")
} else {
span.AddEvent(gerr.ErrPoolExhausted.Error())
return gerr.ErrPoolExhausted
}
} else {
// Get the client from the pool with the given clientID.
if cl, ok := pr.availableConnections.Pop(clientID).(*Client); ok {
client = cl
}
// Pool is exhausted
span.AddEvent(gerr.ErrPoolExhausted.Error())
return gerr.ErrPoolExhausted
}
// Get the client from the pool with the given clientID.
if cl, ok := pr.availableConnections.Pop(clientID).(*Client); ok {
client = cl
}

client, err := pr.IsHealthy(client)
Expand Down Expand Up @@ -241,23 +213,17 @@ func (pr *Proxy) Disconnect(conn *ConnWrapper) *gerr.GatewayDError {
return gerr.ErrClientNotFound
}

//nolint:nestif
if client, ok := client.(*Client); ok {
if (pr.Elastic && pr.ReuseElasticClients) || !pr.Elastic {
// Recycle the server connection by reconnecting.
if err := client.Reconnect(); err != nil {
pr.logger.Error().Err(err).Msg("Failed to reconnect to the client")
span.RecordError(err)
}
// Recycle the server connection by reconnecting.
if err := client.Reconnect(); err != nil {
pr.logger.Error().Err(err).Msg("Failed to reconnect to the client")
span.RecordError(err)
}

// If the client is not in the pool, put it back.
if err := pr.availableConnections.Put(client.ID, client); err != nil {
pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool")
span.RecordError(err)
}
} else {
span.RecordError(gerr.ErrClientNotConnected)
return gerr.ErrClientNotConnected
// If the client is not in the pool, put it back.
if err := pr.availableConnections.Put(client.ID, client); err != nil {
pr.logger.Error().Err(err).Msg("Failed to put the client back in the pool")
span.RecordError(err)
}
} else {
// This should never happen, but if it does,
Expand Down Expand Up @@ -627,11 +593,6 @@ func (pr *Proxy) IsHealthy(client *Client) (*Client, *gerr.GatewayDError) {
func (pr *Proxy) IsExhausted() bool {
_, span := otel.Tracer(config.TracerName).Start(pr.ctx, "IsExhausted")
defer span.End()

if pr.Elastic {
return false
}

return pr.availableConnections.Size() == 0 && pr.availableConnections.Cap() > 0
}

Expand Down
Loading