Skip to content

Commit a9d7849

Browse files
authored
[Feature] Unify agency access (#1024)
1 parent 8891432 commit a9d7849

31 files changed

+7499
-457
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
- (Feature) Add `ArangoLocalStorage` CRD auto-installer
2121
- (Feature) Add `ArangoDeploymentReplication` CRD auto-installer
2222
- (Bugfix) Allow missing `token` key in License secret
23+
- (Feature) Unify agency access
2324

2425
## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
2526
- (Bugfix) Fix arangosync members state inspection

pkg/deployment/agency/cache.go

+89-89
Original file line numberDiff line numberDiff line change
@@ -22,57 +22,85 @@ package agency
2222

2323
import (
2424
"context"
25-
"fmt"
2625
"sync"
27-
"time"
2826

2927
"github.com/arangodb/go-driver/agency"
3028
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
3129
"github.com/arangodb/kube-arangodb/pkg/util/errors"
30+
"github.com/arangodb/kube-arangodb/pkg/util/globals"
3231
)
3332

3433
type health struct {
3534
leaderID string
3635

36+
agencySize int
37+
38+
names []string
3739
commitIndexes map[string]uint64
40+
leaders map[string]string
41+
election map[string]int
3842
}
3943

4044
func (h health) LeaderID() string {
4145
return h.leaderID
4246
}
4347

44-
// IsHealthy returns true if all agencies have the same commit index.
45-
// Returns false when:
46-
// - agencies' list is empty.
47-
// - agencies have different commit indices.
48-
// - agencies have commit indices == 0.
49-
func (h health) IsHealthy() bool {
50-
var globalCommitIndex uint64
51-
first := true
52-
53-
for _, commitIndex := range h.commitIndexes {
54-
if first {
55-
globalCommitIndex = commitIndex
56-
first = false
57-
} else if commitIndex != globalCommitIndex {
58-
return false
48+
// Healthy returns nil if all agencies have the same commit index.
49+
func (h health) Healthy() error {
50+
if err := h.Serving(); err != nil {
51+
return err
52+
}
53+
54+
if h.election[h.leaderID] != h.agencySize {
55+
return errors.Newf("Not all agents are in quorum")
56+
}
57+
58+
index := h.commitIndexes[h.leaderID]
59+
if index == 0 {
60+
return errors.Newf("Agency CommitIndex is zero")
61+
}
62+
63+
for k, v := range h.commitIndexes {
64+
if v != index {
65+
return errors.Newf("Agent %s is behind in CommitIndex", k)
5966
}
6067
}
6168

62-
return globalCommitIndex != 0
69+
return nil
70+
}
71+
72+
func (h health) Serving() error {
73+
if h.agencySize == 0 {
74+
return errors.Newf("Empty agents list")
75+
}
76+
77+
if len(h.election) == 0 {
78+
return errors.Newf("No Leader")
79+
} else if len(h.election) > 1 {
80+
return errors.Newf("Multiple leaders")
81+
}
82+
83+
if len(h.leaders) <= h.agencySize/2 {
84+
return errors.Newf("Quorum is not present")
85+
}
86+
87+
return nil
6388
}
6489

6590
// Health describes interface to check healthy of the environment.
6691
type Health interface {
67-
// IsHealthy return true when environment is considered as healthy.
68-
IsHealthy() bool
92+
// Healthy return nil when environment is considered as healthy.
93+
Healthy() error
94+
95+
// Serving return nil when environment is considered as responsive, but not fully healthy.
96+
Serving() error
6997

7098
// LeaderID returns a leader ID or empty string if a leader is not known.
7199
LeaderID() string
72100
}
73101

74102
type Cache interface {
75-
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
103+
Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error)
76104
Data() (State, bool)
77105
CommitIndex() uint64
78106
// Health returns true when healthy object is available.
@@ -107,7 +135,7 @@ func (c cacheSingle) Health() (Health, bool) {
107135
return nil, false
108136
}
109137

110-
func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
138+
func (c cacheSingle) Reload(_ context.Context, _ int, _ []agency.Agency) (uint64, error) {
111139
return 0, nil
112140
}
113141

@@ -153,15 +181,16 @@ func (c *cache) Health() (Health, bool) {
153181
return nil, false
154182
}
155183

156-
func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
184+
func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error) {
157185
c.lock.Lock()
158186
defer c.lock.Unlock()
159187

160-
leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
188+
leaderCli, leaderConfig, health, err := getLeader(ctx, size, clients)
161189
if err != nil {
162190
// Invalidate a leader ID and agency state.
163191
// In the next iteration leaderID will be sat because `valid` will be false.
164192
c.valid = false
193+
c.health = nil
165194

166195
return 0, err
167196
}
@@ -186,91 +215,62 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
186215

187216
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
188217
// If there is no quorum for the leader then error is returned.
189-
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
190-
var mutex sync.Mutex
191-
var anyError error
192-
var wg sync.WaitGroup
218+
func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
219+
configs := make([]*Config, len(clients))
220+
errs := make([]error, len(clients))
193221

194-
cliLen := len(clients)
195-
if cliLen == 0 {
196-
return nil, nil, nil, errors.New("empty list of agencies' clients")
197-
}
198-
configs := make([]*Config, cliLen)
199-
leaders := make(map[string]int, cliLen)
200-
201-
var h health
222+
var wg sync.WaitGroup
202223

203-
h.commitIndexes = make(map[string]uint64, cliLen)
204-
// Fetch all configs from agencies.
205-
wg.Add(cliLen)
206-
for i, cli := range clients {
207-
go func(iLocal int, cliLocal agency.Agency) {
224+
// Fetch Agency config
225+
for i := range clients {
226+
wg.Add(1)
227+
go func(id int) {
208228
defer wg.Done()
209229

210-
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
230+
ctxLocal, cancel := globals.GetGlobals().Timeouts().Agency().WithTimeout(ctx)
211231
defer cancel()
212-
config, err := GetAgencyConfig(ctxLocal, cliLocal)
213-
214-
mutex.Lock()
215-
defer mutex.Unlock()
232+
config, err := GetAgencyConfig(ctxLocal, clients[id])
216233

217234
if err != nil {
218-
anyError = err
219-
return
220-
} else if config == nil || config.LeaderId == "" {
221-
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
235+
errs[id] = err
222236
return
223237
}
224238

225239
// Write config on the same index where client is (It will be helpful later).
226-
configs[iLocal] = config
227-
// Count leaders.
228-
leaders[config.LeaderId]++
229-
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
230-
}(i, cli)
240+
configs[id] = config
241+
}(i)
231242
}
232243
wg.Wait()
233244

234-
if anyError != nil {
235-
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
236-
}
237-
238-
if len(leaders) == 0 {
239-
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
240-
}
241-
242-
// Find the leader ID which has the most votes from all agencies.
243-
maxVotes := 0
244-
var leaderID string
245-
for id, votes := range leaders {
246-
if votes > maxVotes {
247-
maxVotes = votes
248-
leaderID = id
245+
var h health
246+
h.agencySize = size
247+
h.names = make([]string, len(clients))
248+
h.commitIndexes = make(map[string]uint64, len(clients))
249+
h.leaders = make(map[string]string, len(clients))
250+
h.election = make(map[string]int, len(clients))
251+
252+
for id := range configs {
253+
if config := configs[id]; config != nil {
254+
name := config.Configuration.ID
255+
h.names[id] = name
256+
h.commitIndexes[name] = config.CommitIndex
257+
if config.LeaderId != "" {
258+
h.leaders[name] = config.LeaderId
259+
h.election[config.LeaderId]++
260+
h.leaderID = config.LeaderId
261+
}
249262
}
250263
}
251264

252-
h.leaderID = leaderID
253-
254-
// Check if a leader has quorum from all possible agencies.
255-
if maxVotes <= cliLen/2 {
256-
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
257-
return nil, nil, nil, wrapError(anyError, message)
265+
if err := h.Serving(); err != nil {
266+
return nil, nil, nil, err
258267
}
259268

260-
// From here on, a leader with quorum is known.
261-
for i, config := range configs {
262-
if config != nil && config.Configuration.ID == leaderID {
263-
return clients[i], config, h, nil
269+
for id := range clients {
270+
if h.leaderID == h.names[id] {
271+
return clients[id], configs[id], h, nil
264272
}
265273
}
266274

267-
return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
268-
}
269-
270-
func wrapError(err error, message string) error {
271-
if err != nil {
272-
return errors.WithMessage(err, message)
273-
}
274-
275-
return errors.New(message)
275+
return nil, nil, nil, errors.Newf("Unable to find agent")
276276
}

pkg/deployment/agency/current_collections.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -27,5 +27,5 @@ type StateCurrentDBCollections map[string]StateCurrentDBCollection
2727
type StateCurrentDBCollection map[string]StateCurrentDBShard
2828

2929
type StateCurrentDBShard struct {
30-
Servers ShardServers `json:"servers,omitempty"`
30+
Servers Servers `json:"servers,omitempty"`
3131
}

pkg/deployment/agency/definitions.go

+7
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ const (
3838

3939
SupervisionKey = "Supervision"
4040
SupervisionMaintenanceKey = "Maintenance"
41+
42+
TargetJobToDoKey = "ToDo"
43+
TargetJobPendingKey = "Pending"
44+
TargetJobFailedKey = "Failed"
45+
TargetJobFinishedKey = "Finished"
46+
47+
TargetCleanedServersKey = "CleanedServers"
4148
)
4249

4350
func GetAgencyKey(parts ...string) string {

0 commit comments

Comments
 (0)