Skip to content

Commit 601eec1

Browse files
committed
Ensure endpoints are healthy
fleet server endpoint is built based on the elasticsearch endpoint fix creation
1 parent 1dcbe0a commit 601eec1

File tree

3 files changed

+206
-38
lines changed

3 files changed

+206
-38
lines changed

internal/stack/serverless.go

+26-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,14 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package stack
26

37
import (
8+
"context"
49
"errors"
510
"fmt"
11+
"time"
612

713
"github.com/elastic/elastic-package/internal/logger"
814
"github.com/elastic/elastic-package/internal/profile"
@@ -43,6 +49,12 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
4349
return Config{}, fmt.Errorf("failed to create %s project %s in %s: %w", settings.Type, settings.Name, settings.Region, err)
4450
}
4551

52+
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*30)
53+
defer cancel()
54+
if err := sp.client.EnsureEndpoints(ctx, project); err != nil {
55+
return Config{}, fmt.Errorf("failed to ensure endpoints have been provisioned properly: %w", err)
56+
}
57+
4658
var config Config
4759
config.Provider = ProviderServerless
4860
config.Parameters = map[string]string{
@@ -53,13 +65,22 @@ func (sp *serverlessProvider) createProject(settings projectSettings, options Op
5365
config.ElasticsearchHost = project.Endpoints.Elasticsearch
5466
config.KibanaHost = project.Endpoints.Kibana
5567

68+
config.ElasticsearchUsername = project.Credentials.Username
69+
config.ElasticsearchPassword = project.Credentials.Password
70+
5671
printUserConfig(options.Printer, config)
5772

5873
err = storeConfig(sp.profile, config)
5974
if err != nil {
6075
return Config{}, fmt.Errorf("failed to store config: %w", err)
6176
}
6277

78+
logger.Debug("Waiting for creation plan to be completed")
79+
err = project.EnsureHealthy(ctx)
80+
if err != nil {
81+
return Config{}, fmt.Errorf("not all services are healthy: %w", err)
82+
}
83+
6384
return config, nil
6485
}
6586

@@ -209,7 +230,7 @@ func (sp *serverlessProvider) TearDown(options Options) error {
209230
return fmt.Errorf("failed to find current project: %w", err)
210231
}
211232

212-
logger.Debugf("Deleting project %q", project.ID)
233+
logger.Debugf("Deleting project %q (%s)", project.Name, project.ID)
213234

214235
err = sp.deleteProject(project, options)
215236
if err != nil {
@@ -222,10 +243,10 @@ func (sp *serverlessProvider) TearDown(options Options) error {
222243
// return fmt.Errorf("failed to delete GeoIP extension: %w", err)
223244
// }
224245

225-
err = storeConfig(sp.profile, Config{})
226-
if err != nil {
227-
return fmt.Errorf("failed to store config: %w", err)
228-
}
246+
// err = storeConfig(sp.profile, Config{})
247+
// if err != nil {
248+
// return fmt.Errorf("failed to store config: %w", err)
249+
// }
229250

230251
return nil
231252
}

internal/stack/serverless/client.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package serverless
26

37
import (
@@ -7,9 +11,12 @@ import (
711
"errors"
812
"fmt"
913
"io"
14+
"log"
1015
"net/http"
1116
"net/url"
1217
"os"
18+
"strings"
19+
"time"
1320

1421
"github.com/elastic/elastic-package/internal/environment"
1522
"github.com/elastic/elastic-package/internal/logger"
@@ -18,6 +25,9 @@ import (
1825
type Client struct {
1926
host string
2027
apiKey string
28+
29+
username string
30+
password string
2131
}
2232

2333
// ClientOption is functional option modifying Serverless API client.
@@ -52,20 +62,34 @@ func NewClient(opts ...ClientOption) (*Client, error) {
5262
return c, nil
5363
}
5464

55-
// Address option sets the host to use to connect to Kibana.
65+
// WithAddress option sets the host to use to connect to Kibana.
5666
func WithAddress(address string) ClientOption {
5767
return func(c *Client) {
5868
c.host = address
5969
}
6070
}
6171

62-
// Address option sets the host to use to connect to Kibana.
72+
// WithApiKey option sets the host to use to connect to Kibana.
6373
func WithApiKey(apiKey string) ClientOption {
6474
return func(c *Client) {
6575
c.apiKey = apiKey
6676
}
6777
}
6878

79+
// WithUsername option sets the username.
80+
func WithUsername(username string) ClientOption {
81+
return func(c *Client) {
82+
c.username = username
83+
}
84+
}
85+
86+
// WithPassword option sets the password.
87+
func WithPassword(password string) ClientOption {
88+
return func(c *Client) {
89+
c.password = password
90+
}
91+
}
92+
6993
func (c *Client) get(ctx context.Context, resourcePath string) (int, []byte, error) {
7094
return c.sendRequest(ctx, http.MethodGet, resourcePath, nil)
7195
}
@@ -109,8 +133,13 @@ func (c *Client) newRequest(ctx context.Context, method, resourcePath string, re
109133
}
110134

111135
req.Header.Add("content-type", "application/json")
112-
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
113136

137+
if c.username != "" {
138+
req.SetBasicAuth(c.username, c.password)
139+
return req, nil
140+
}
141+
142+
req.Header.Add("Authorization", fmt.Sprintf("ApiKey %s", c.apiKey))
114143
return req, nil
115144
}
116145

@@ -199,3 +228,32 @@ func (c *Client) GetProject(projectType, projectID string) (*Project, error) {
199228
err = json.Unmarshal(respBody, &project)
200229
return project, err
201230
}
231+
232+
func (c *Client) EnsureEndpoints(ctx context.Context, project *Project) error {
233+
timer := time.NewTimer(time.Millisecond)
234+
for {
235+
select {
236+
case <-ctx.Done():
237+
return ctx.Err()
238+
case <-timer.C:
239+
}
240+
241+
if project.Endpoints.Elasticsearch != "" {
242+
if project.Endpoints.Fleet == "" {
243+
logger.Debugf("Fleet Endpoint empty, setting it based on ES")
244+
project.Endpoints.Fleet = strings.Replace(project.Endpoints.Elasticsearch, ".es.", ".fleet.", 1)
245+
}
246+
return nil
247+
}
248+
249+
newProject, err := c.GetProject(project.Type, project.ID)
250+
if err != nil {
251+
log.Printf("request error: %w", err.Error())
252+
timer.Reset(time.Second * 5)
253+
continue
254+
}
255+
256+
project.Endpoints = newProject.Endpoints
257+
timer.Reset(time.Second * 5)
258+
}
259+
}

internal/stack/serverless/project.go

+119-30
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
15
package serverless
26

37
import (
4-
"bytes"
58
"context"
69
"encoding/json"
710
"fmt"
8-
"io"
11+
"log"
912
"net/http"
13+
"time"
14+
15+
"github.com/elastic/elastic-package/internal/logger"
1016
)
1117

1218
// Project represents a serverless project
@@ -16,6 +22,7 @@ type Project struct {
1622

1723
Name string `json:"name"`
1824
ID string `json:"id"`
25+
Alias string `json:"alias"`
1926
Type string `json:"type"`
2027
Region string `json:"region_id"`
2128

@@ -32,44 +39,126 @@ type Project struct {
3239
} `json:"endpoints"`
3340
}
3441

35-
// NewObservabilityProject creates a new observability type project
36-
func NewObservabilityProject(ctx context.Context, url, name, apiKey, region string) (*Project, error) {
37-
return newProject(ctx, url, name, apiKey, region, "observability")
42+
type serviceHealthy func(context.Context, *Project) error
43+
44+
func (p *Project) EnsureHealthy(ctx context.Context) error {
45+
if err := p.ensureServiceHealthy(ctx, getESHealthy); err != nil {
46+
return fmt.Errorf("elasticsearch not healthy: %w", err)
47+
}
48+
if err := p.ensureServiceHealthy(ctx, getKibanaHealthy); err != nil {
49+
return fmt.Errorf("kibana not healthy: %w", err)
50+
}
51+
if err := p.ensureServiceHealthy(ctx, getFleetHealthy); err != nil {
52+
return fmt.Errorf("fleet not healthy: %w", err)
53+
}
54+
return nil
3855
}
3956

40-
// newProject creates a new serverless project
41-
// Note that the Project.Endpoints may not be populated and another call may be required.
42-
func newProject(ctx context.Context, url, name, apiKey, region, projectType string) (*Project, error) {
43-
ReqBody := struct {
44-
Name string `json:"name"`
45-
RegionID string `json:"region_id"`
46-
}{
47-
Name: name,
48-
RegionID: region,
49-
}
50-
p, err := json.Marshal(ReqBody)
57+
func (p *Project) ensureServiceHealthy(ctx context.Context, serviceFunc serviceHealthy) error {
58+
timer := time.NewTimer(time.Millisecond)
59+
for {
60+
select {
61+
case <-ctx.Done():
62+
return ctx.Err()
63+
case <-timer.C:
64+
}
65+
66+
err := serviceFunc(ctx, p)
67+
if err != nil {
68+
logger.Debugf("service not ready: %s", err.Error())
69+
timer.Reset(time.Second * 5)
70+
continue
71+
}
72+
73+
return nil
74+
}
75+
return nil
76+
}
77+
78+
func getESHealthy(ctx context.Context, project *Project) error {
79+
client, err := NewClient(
80+
WithAddress(project.Endpoints.Elasticsearch),
81+
WithUsername(project.Credentials.Username),
82+
WithPassword(project.Credentials.Password),
83+
)
5184
if err != nil {
52-
return nil, err
85+
return err
5386
}
54-
req, err := http.NewRequestWithContext(ctx, "POST", url+"/api/v1/serverless/projects/"+projectType, bytes.NewReader(p))
87+
88+
statusCode, respBody, err := client.get(ctx, "/_cluster/health")
5589
if err != nil {
56-
return nil, err
90+
return fmt.Errorf("failed to query elasticsearch health: %w", err)
91+
}
92+
93+
if statusCode != http.StatusOK {
94+
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
5795
}
58-
req.Header.Set("Content-Type", "application/json")
59-
req.Header.Set("Authorization", "ApiKey "+apiKey)
6096

61-
resp, err := http.DefaultClient.Do(req)
97+
var health struct {
98+
Status string `json:"status"`
99+
}
100+
if err := json.Unmarshal(respBody, &health); err != nil {
101+
log.Printf("Unable to decode response: %v body: %s", err, string(respBody))
102+
return err
103+
}
104+
if health.Status == "green" {
105+
return nil
106+
}
107+
return fmt.Errorf("elasticsearch unhealthy: %s", health.Status)
108+
}
109+
110+
func getKibanaHealthy(ctx context.Context, project *Project) error {
111+
client, err := NewClient(
112+
WithAddress(project.Endpoints.Kibana),
113+
WithUsername(project.Credentials.Username),
114+
WithPassword(project.Credentials.Password),
115+
)
116+
if err != nil {
117+
return err
118+
}
119+
120+
statusCode, respBody, err := client.get(ctx, "/api/status")
121+
if err != nil {
122+
return fmt.Errorf("failed to query kibana status: %w", err)
123+
}
124+
if statusCode != http.StatusOK {
125+
return fmt.Errorf("unexpected status code %d, body: %s", statusCode, string(respBody))
126+
}
127+
128+
var status struct {
129+
Status struct {
130+
Overall struct {
131+
Level string `json:"level"`
132+
} `json:"overall"`
133+
} `json:"status"`
134+
}
135+
if err := json.Unmarshal(respBody, &status); err != nil {
136+
log.Printf("Unable to decode response: %v body: %s", err, string(respBody))
137+
return err
138+
}
139+
if status.Status.Overall.Level == "available" {
140+
return nil
141+
}
142+
return fmt.Errorf("kibana unhealthy: %s", status.Status.Overall.Level)
143+
}
144+
145+
func getFleetHealthy(ctx context.Context, project *Project) error {
146+
client, err := NewClient(
147+
WithAddress(project.Endpoints.Fleet),
148+
WithUsername(project.Credentials.Username),
149+
WithPassword(project.Credentials.Password),
150+
)
62151
if err != nil {
63-
return nil, err
152+
return err
64153
}
65-
defer resp.Body.Close()
66154

67-
if resp.StatusCode != http.StatusCreated {
68-
p, _ := io.ReadAll(resp.Body)
69-
return nil, fmt.Errorf("unexpected status code %d, body: %s", resp.StatusCode, string(p))
155+
statusCode, respBody, err := client.get(ctx, "/api/status")
156+
if err != nil {
157+
return fmt.Errorf("failed to query fleet status: %w", err)
158+
}
159+
if statusCode != http.StatusOK {
160+
return fmt.Errorf("fleet unhealthy: status code %d, body: %s", statusCode, string(respBody))
70161
}
71-
project := &Project{url: url, apiKey: apiKey}
72162

73-
err = json.NewDecoder(resp.Body).Decode(project)
74-
return project, err
163+
return nil
75164
}

0 commit comments

Comments
 (0)