Skip to content

Commit 3f38c52

Browse files
committed
Ensure endpoints are healthy
fleet server endpoint is built based on the elasticsearch endpoint fix creation
1 parent 1dcbe0a commit 3f38c52

File tree

3 files changed

+206
-38
lines changed

3 files changed

+206
-38
lines changed

internal/stack/serverless.go

+28-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package stack
26

37
import (
8+
"context"
49
"errors"
510
"fmt"
11+
"time"
612

713
"github.com/elastic/elastic-package/internal/logger"
814
"github.com/elastic/elastic-package/internal/profile"
@@ -43,6 +49,12 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
4349
return Config{}, fmt.Errorf("failed to create %s project %s in %s: %w", settings.Type, settings.Name, settings.Region, err)
4450
}
4551

52+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30)
53+
defer cancel()
54+
if err := sp.client.EnsureEndpoints(ctx, project); err != nil {
55+
return Config{}, fmt.Errorf("failed to ensure endpoints have been provisioned properly: %w", err)
56+
}
57+
4658
var config Config
4759
config.Provider = ProviderServerless
4860
config.Parameters = map[string]string{
@@ -53,13 +65,22 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
5365
config.ElasticsearchHost = project.Endpoints.Elasticsearch
5466
config.KibanaHost = project.Endpoints.Kibana
5567

68+
config.ElasticsearchUsername = project.Credentials.Username
69+
config.ElasticsearchPassword = project.Credentials.Password
70+
5671
printUserConfig(options.Printer, config)
5772

5873
err = storeConfig(sp.profile, config)
5974
if err != nil {
6075
return Config{}, fmt.Errorf("failed to store config: %w", err)
6176
}
6277

78+
logger.Debug("Waiting for creation plan to be completed")
79+
err = project.EnsureHealthy(ctx)
80+
if err != nil {
81+
return Config{}, fmt.Errorf("not all services are healthy: %w", err)
82+
}
83+
6384
return config, nil
6485
}
6586

@@ -150,6 +171,8 @@ func (sp *serverlessProvider) BootUp(options Options) error {
150171
// if err != nil {
151172
// return fmt.Errorf("failed to replace GeoIP databases: %w", err)
152173
// }
174+
logger.Debugf("Project created: %s", project.Name)
175+
printUserConfig(options.Printer, config)
153176
case nil:
154177
logger.Debugf("Project existed: %s", project.Name)
155178
printUserConfig(options.Printer, config)
@@ -209,7 +232,7 @@ func (sp *serverlessProvider) TearDown(options Options) error {
209232
return fmt.Errorf("failed to find current project: %w", err)
210233
}
211234

212-
logger.Debugf("Deleting project %q", project.ID)
235+
logger.Debugf("Deleting project %q (%s)", project.Name, project.ID)
213236

214237
err = sp.deleteProject(project, options)
215238
if err != nil {
@@ -222,10 +245,10 @@ func (sp *serverlessProvider) TearDown(options Options) error {
222245
// return fmt.Errorf("failed to delete GeoIP extension: %w", err)
223246
// }
224247

225-
err = storeConfig(sp.profile, Config{})
226-
if err != nil {
227-
return fmt.Errorf("failed to store config: %w", err)
228-
}
248+
// err = storeConfig(sp.profile, Config{})
249+
// if err != nil {
250+
// return fmt.Errorf("failed to store config: %w", err)
251+
// }
229252

230253
return nil
231254
}

internal/stack/serverless/client.go

+60-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package serverless
26

37
import (
@@ -10,6 +14,8 @@ import (
1014
"net/http"
1115
"net/url"
1216
"os"
17+
"strings"
18+
"time"
1319

1420
"github.com/elastic/elastic-package/internal/environment"
1521
"github.com/elastic/elastic-package/internal/logger"
@@ -18,6 +24,9 @@ import (
1824
type Client struct {
1925
host string
2026
apiKey string
27+
28+
username string
29+
password string
2130
}
2231

2332
// ClientOption is functional option modifying Serverless API client.
@@ -52,20 +61,34 @@ func NewClient(opts ...ClientOption) (*Client, error) {
5261
return c, nil
5362
}
5463

55-
// Address option sets the host to use to connect to Kibana.
64+
// WithAddress option sets the host to use to connect to Kibana.
5665
func WithAddress(address string) ClientOption {
5766
return func(c *Client) {
5867
c.host = address
5968
}
6069
}
6170

62-
// Address option sets the host to use to connect to Kibana.
71+
// WithApiKey option sets the host to use to connect to Kibana.
6372
func WithApiKey(apiKey string) ClientOption {
6473
return func(c *Client) {
6574
c.apiKey = apiKey
6675
}
6776
}
6877

78+
// WithUsername option sets the username.
79+
func WithUsername(username string) ClientOption {
80+
return func(c *Client) {
81+
c.username = username
82+
}
83+
}
84+
85+
// WithPassword option sets the password.
86+
func WithPassword(password string) ClientOption {
87+
return func(c *Client) {
88+
c.password = password
89+
}
90+
}
91+
6992
func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) {
7093
return c.sendRequest(ctx, http.MethodGet, resourcePath, nil)
7194
}
@@ -109,8 +132,13 @@ func (c *Client) newRequest(ctx context.Context, method, resourcePath string, re
109132
}
110133

111134
req.Header.Add("content-type", "application/json")
112-
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
113135

136+
if c.username != "" {
137+
req.SetBasicAuth(c.username, c.password)
138+
return req, nil
139+
}
140+
141+
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
114142
return req, nil
115143
}
116144

@@ -199,3 +227,32 @@ func (c *Client) GetProject(projectType, projectID string) (*Project, error) {
199227
err = json.Unmarshal(respBody, &project)
200228
return project, err
201229
}
230+
231+
func (c *Client) EnsureEndpoints(ctx context.Context, project *Project) error {
232+
timer := time.NewTimer(time.Millisecond)
233+
for {
234+
select {
235+
case <-ctx.Done():
236+
return ctx.Err()
237+
case <-timer.C:
238+
}
239+
240+
if project.Endpoints.Elasticsearch != "" {
241+
if project.Endpoints.Fleet == "" {
242+
logger.Debugf("Fleet Endpoint empty, setting it based on ES")
243+
project.Endpoints.Fleet = strings.Replace(project.Endpoints.Elasticsearch, ".es.", ".fleet.", 1)
244+
}
245+
return nil
246+
}
247+
248+
newProject, err := c.GetProject(project.Type, project.ID)
249+
if err != nil {
250+
logger.Debugf("request error: %s", err.Error())
251+
timer.Reset(time.Second * 5)
252+
continue
253+
}
254+
255+
project.Endpoints = newProject.Endpoints
256+
timer.Reset(time.Second * 5)
257+
}
258+
}

internal/stack/serverless/project.go

+118-30
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package serverless
26

37
import (
4-
"bytes"
58
"context"
69
"encoding/json"
710
"fmt"
8-
"io"
911
"net/http"
12+
"time"
13+
14+
"github.com/elastic/elastic-package/internal/logger"
1015
)
1116

1217
// Project represents a serverless project
@@ -16,6 +21,7 @@ type Project struct {
1621

1722
Name string `json:"name"`
1823
ID string `json:"id"`
24+
Alias string `json:"alias"`
1925
Type string `json:"type"`
2026
Region string `json:"region_id"`
2127

@@ -32,44 +38,126 @@ type Project struct {
3238
} `json:"endpoints"`
3339
}
3440

35-
// NewObservabilityProject creates a new observability type project
36-
func NewObservabilityProject(ctx context.Context, url, name, apiKey, region string) (*Project, error) {
37-
return newProject(ctx, url, name, apiKey, region, "observability")
41+
type serviceHealthy func(context.Context, *Project) error
42+
43+
func (p *Project) EnsureHealthy(ctx context.Context) error {
44+
if err := p.ensureServiceHealthy(ctx, getESHealthy); err != nil {
45+
return fmt.Errorf("elasticsearch not healthy: %w", err)
46+
}
47+
if err := p.ensureServiceHealthy(ctx, getKibanaHealthy); err != nil {
48+
return fmt.Errorf("kibana not healthy: %w", err)
49+
}
50+
if err := p.ensureServiceHealthy(ctx, getFleetHealthy); err != nil {
51+
return fmt.Errorf("fleet not healthy: %w", err)
52+
}
53+
return nil
3854
}
3955

40-
// newProject creates a new serverless project
41-
// Note that the Project.Endpoints may not be populated and another call may be required.
42-
func newProject(ctx context.Context, url, name, apiKey, region, projectType string) (*Project, error) {
43-
ReqBody := struct {
44-
Name string `json:"name"`
45-
RegionID string `json:"region_id"`
46-
}{
47-
Name: name,
48-
RegionID: region,
49-
}
50-
p, err := json.Marshal(ReqBody)
56+
func (p *Project) ensureServiceHealthy(ctx context.Context, serviceFunc serviceHealthy) error {
57+
timer := time.NewTimer(time.Millisecond)
58+
for {
59+
select {
60+
case <-ctx.Done():
61+
return ctx.Err()
62+
case <-timer.C:
63+
}
64+
65+
err := serviceFunc(ctx, p)
66+
if err != nil {
67+
logger.Debugf("service not ready: %s", err.Error())
68+
timer.Reset(time.Second * 5)
69+
continue
70+
}
71+
72+
return nil
73+
}
74+
return nil
75+
}
76+
77+
func getESHealthy(ctx context.Context, project *Project) error {
78+
client, err := NewClient(
79+
WithAddress(project.Endpoints.Elasticsearch),
80+
WithUsername(project.Credentials.Username),
81+
WithPassword(project.Credentials.Password),
82+
)
5183
if err != nil {
52-
return nil, err
84+
return err
5385
}
54-
req, err := http.NewRequestWithContext(ctx, "POST", url+"/api/v1/serverless/projects/"+projectType, bytes.NewReader(p))
86+
87+
statusCode, respBody, err := client.get(ctx, "/_cluster/health")
5588
if err != nil {
56-
return nil, err
89+
return fmt.Errorf("failed to query elasticsearch health: %w", err)
90+
}
91+
92+
if statusCode != http.StatusOK {
93+
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
5794
}
58-
req.Header.Set("Content-Type", "application/json")
59-
req.Header.Set("Authorization", "ApiKey "+apiKey)
6095

61-
resp, err := http.DefaultClient.Do(req)
96+
var health struct {
97+
Status string `json:"status"`
98+
}
99+
if err := json.Unmarshal(respBody, &health); err != nil {
100+
logger.Debugf("Unable to decode response: %v body: %s", err, string(respBody))
101+
return err
102+
}
103+
if health.Status == "green" {
104+
return nil
105+
}
106+
return fmt.Errorf("elasticsearch unhealthy: %s", health.Status)
107+
}
108+
109+
func getKibanaHealthy(ctx context.Context, project *Project) error {
110+
client, err := NewClient(
111+
WithAddress(project.Endpoints.Kibana),
112+
WithUsername(project.Credentials.Username),
113+
WithPassword(project.Credentials.Password),
114+
)
115+
if err != nil {
116+
return err
117+
}
118+
119+
statusCode, respBody, err := client.get(ctx, "/api/status")
120+
if err != nil {
121+
return fmt.Errorf("failed to query kibana status: %w", err)
122+
}
123+
if statusCode != http.StatusOK {
124+
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
125+
}
126+
127+
var status struct {
128+
Status struct {
129+
Overall struct {
130+
Level string `json:"level"`
131+
} `json:"overall"`
132+
} `json:"status"`
133+
}
134+
if err := json.Unmarshal(respBody, &status); err != nil {
135+
logger.Debugf("Unable to decode response: %v body: %s", err, string(respBody))
136+
return err
137+
}
138+
if status.Status.Overall.Level == "available" {
139+
return nil
140+
}
141+
return fmt.Errorf("kibana unhealthy: %s", status.Status.Overall.Level)
142+
}
143+
144+
func getFleetHealthy(ctx context.Context, project *Project) error {
145+
client, err := NewClient(
146+
WithAddress(project.Endpoints.Fleet),
147+
WithUsername(project.Credentials.Username),
148+
WithPassword(project.Credentials.Password),
149+
)
62150
if err != nil {
63-
return nil, err
151+
return err
64152
}
65-
defer resp.Body.Close()
66153

67-
if resp.StatusCode != http.StatusCreated {
68-
p, _ := io.ReadAll(resp.Body)
69-
return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, string(p))
154+
statusCode, respBody, err := client.get(ctx, "/api/status")
155+
if err != nil {
156+
return fmt.Errorf("failed to query fleet status: %w", err)
157+
}
158+
if statusCode != http.StatusOK {
159+
return fmt.Errorf("fleet unhealthy: status code %d, body: %s", statusCode, string(respBody))
70160
}
71-
project := &Project{url: url, apiKey: apiKey}
72161

73-
err = json.NewDecoder(resp.Body).Decode(project)
74-
return project, err
162+
return nil
75163
}

0 commit comments

Comments
 (0)