Skip to content

Commit 03ff2d1

Browse files
committed
Start implementing AddClient (not tested)
1 parent 80d1887 commit 03ff2d1

File tree

3 files changed

+102
-6
lines changed

3 files changed

+102
-6
lines changed

apiserver/api/api.go

+28-6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,12 @@ type Peer struct {
1818
IP string
1919
}
2020

21+
type ClientRegistrationRequest struct {
22+
Username string
23+
PublicKey string
24+
Serial string
25+
}
26+
2127
// TODO(jhrv): do actual filtering of the clients.
2228
// TODO(jhrv): keep cache of gateway access group members to remove AAD runtime dependency
2329
// gatewayConfig returns the clients for the gateway that has the group membership required
@@ -62,25 +68,41 @@ func (a *api) updateHealth(w http.ResponseWriter, r *http.Request) {
6268
if err := json.NewDecoder(r.Body).Decode(&healthUpdates); err != nil {
6369
defer r.Body.Close()
6470

65-
w.WriteHeader(http.StatusBadRequest)
66-
w.Write([]byte(fmt.Sprintf("error during JSON unmarshal: %s\n", err)))
71+
respondf(w, http.StatusBadRequest, "error during JSON unmarshal: %s\n", err)
6772
return
6873
}
6974

7075
// Abort status update if it contains incomplete entries
7176
// is_healthy and serial is required
7277
for _, s := range healthUpdates {
7378
if s.Healthy == nil || len(s.Serial) == 0 {
74-
w.WriteHeader(http.StatusBadRequest)
75-
w.Write([]byte("missing required field\n"))
79+
respondf(w, http.StatusBadRequest, "missing required field\n")
7680
return
7781
}
7882
}
7983

8084
if err := a.db.UpdateClientStatus(healthUpdates); err != nil {
81-
w.WriteHeader(http.StatusInternalServerError)
8285
log.Error(err)
83-
w.Write([]byte("unable to persist client statuses\n"))
86+
respondf(w, http.StatusInternalServerError,"unable to persist client statuses\n")
8487
return
8588
}
8689
}
90+
91+
func (a *api) registerClient(w http.ResponseWriter, r *http.Request) {
92+
var reg ClientRegistrationRequest
93+
if err := json.NewDecoder(r.Body).Decode(&reg); err != nil {
94+
respondf(w, http.StatusBadRequest, "error during JSON unmarshal: %s\n", err)
95+
}
96+
97+
if err := a.db.AddClient(reg.Username, reg.PublicKey, reg.Serial); err != nil {
98+
respondf(w, http.StatusInternalServerError, "unable to add new peer: %s\n", err)
99+
}
100+
}
101+
102+
func respondf(w http.ResponseWriter, statusCode int, format string, args... interface{}) {
103+
w.WriteHeader(statusCode)
104+
105+
if _, wErr := w.Write([]byte(fmt.Sprintf(format, args...))); wErr != nil {
106+
log.Errorf("unable to write client response: %v", wErr)
107+
}
108+
}

apiserver/api/router.go

+1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ func New(cfg Config) chi.Router {
1616
r.Get("/gateways/{gateway}", api.gatewayConfig)
1717
r.Get("/clients", api.clients)
1818
r.Put("/clients/health", api.updateHealth)
19+
r.Put("/clients/register", api.registerClient)
1920

2021
return r
2122
}

apiserver/database/database.go

+73
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,22 @@ package database
33
import (
44
"context"
55
"fmt"
6+
"github.com/jackc/pgx/v4"
7+
"sync"
68
"time"
79

810
"github.com/jackc/pgx/v4/pgxpool"
911
)
1012

13+
const (
14+
ControlPlaneCidr = "10.255.240.0/21"
15+
DataPlaneCidr = "10.255.248.0/21"
16+
)
17+
1118
type APIServerDB interface {
1219
ReadClients() ([]Client, error)
1320
UpdateClientStatus([]Client) error
21+
AddClient(username, publicKey, serial string) error
1422
}
1523

1624
type Client struct {
@@ -101,3 +109,68 @@ func (d *database) UpdateClientStatus(clients []Client) error {
101109

102110
return tx.Commit(ctx)
103111
}
112+
113+
var mux sync.Mutex
114+
115+
func (d *database) AddClient(username, publicKey, serial string) error {
116+
mux.Lock()
117+
defer mux.Unlock()
118+
119+
ctx := context.Background()
120+
121+
tx, err := d.conn.Begin(ctx)
122+
if err != nil {
123+
return fmt.Errorf("start transaction: %w", err)
124+
}
125+
126+
defer tx.Rollback(ctx)
127+
128+
ips, err := ips(tx, ctx)
129+
if err != nil {
130+
return fmt.Errorf("fetch ips: %w", err)
131+
}
132+
133+
ip, err := FindAvailableIP(ControlPlaneCidr, ips)
134+
if err != nil {
135+
return fmt.Errorf("finding available ip: %w", err)
136+
}
137+
138+
statement := `
139+
WITH
140+
client_key AS
141+
(INSERT INTO client (serial, healthy) VALUES ($1, false) RETURNING id),
142+
peer_control_key AS
143+
(INSERT INTO peer (public_key, ip, type) VALUES ($2, $3, 'control') RETURNING id)
144+
INSERT
145+
INTO client_peer(client_id, peer_id)
146+
(
147+
SELECT client_key.id, peer_control_key.id
148+
FROM client_key, peer_control_key
149+
);
150+
`
151+
_, err = tx.Exec(ctx, statement, serial, publicKey, ip)
152+
153+
if err != nil {
154+
return fmt.Errorf("inserting new client: %w", err)
155+
}
156+
157+
return nil
158+
}
159+
160+
func ips(tx pgx.Tx, ctx context.Context) (ips []string, err error) {
161+
rows, err := tx.Query(ctx, "SELECT ip FROM peer;")
162+
if err != nil {
163+
return nil, fmt.Errorf("get peers: %w", err)
164+
}
165+
166+
if rows.Err() != nil {
167+
return nil, fmt.Errorf("get peers: %w", err)
168+
}
169+
170+
err = rows.Scan(ips)
171+
if err != nil {
172+
return nil, fmt.Errorf("scan peers: %w", err)
173+
}
174+
175+
return
176+
}

0 commit comments

Comments
 (0)