Skip to content

Commit c428392

Browse files
runner proxy (spacecloud-io#1531)
1 parent 7fed050 commit c428392

File tree

8 files changed

+211
-129
lines changed

8 files changed

+211
-129
lines changed

Diff for: runner/server/handle.go

-84
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9-
"io/ioutil"
109
"net/http"
1110
"time"
1211

@@ -447,86 +446,3 @@ func (s *Server) HandleGetServiceRoutingRequest() http.HandlerFunc {
447446
_ = json.NewEncoder(w).Encode(model.Response{Result: result})
448447
}
449448
}
450-
451-
func (s *Server) handleProxy() http.HandlerFunc {
452-
return func(w http.ResponseWriter, r *http.Request) {
453-
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Minute)
454-
defer cancel()
455-
456-
// Close the body of the request
457-
defer utils.CloseTheCloser(r.Body)
458-
459-
// http: Request.RequestURI can't be set in client requests.
460-
// http://golang.org/src/pkg/net/http/client.go
461-
r.RequestURI = ""
462-
463-
// Get the meta data from headers
464-
project := r.Header.Get("x-og-project")
465-
service := r.Header.Get("x-og-service")
466-
ogHost := r.Header.Get("x-og-host")
467-
ogPort := r.Header.Get("x-og-port")
468-
ogVersion := r.Header.Get("x-og-version")
469-
470-
// Delete the headers
471-
r.Header.Del("x-og-project")
472-
r.Header.Del("x-og-service")
473-
r.Header.Del("x-og-host")
474-
r.Header.Del("x-og-port")
475-
r.Header.Del("x-og-version")
476-
477-
// Change the destination with the original host and port
478-
r.Host = ogHost
479-
r.URL.Host = fmt.Sprintf("%s:%s", ogHost, ogPort)
480-
481-
// Set the url scheme to http
482-
r.URL.Scheme = "http"
483-
484-
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Proxy is making request to host (%s) port (%s)", ogHost, ogPort), nil)
485-
486-
// Instruct driver to scale up
487-
if err := s.driver.ScaleUp(ctx, project, service, ogVersion); err != nil {
488-
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusServiceUnavailable, err)
489-
return
490-
}
491-
492-
// Wait for the service to scale up
493-
if err := s.debounce.Wait(fmt.Sprintf("proxy-%s-%s-%s", project, service, ogVersion), func() error {
494-
return s.driver.WaitForService(ctx, &model.Service{ProjectID: project, ID: service, Version: ogVersion})
495-
}); err != nil {
496-
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusServiceUnavailable, err)
497-
return
498-
}
499-
500-
var res *http.Response
501-
for i := 0; i < 5; i++ {
502-
// Fire the request
503-
var err error
504-
res, err = http.DefaultClient.Do(r)
505-
if err != nil {
506-
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusInternalServerError, err)
507-
return
508-
}
509-
510-
// TODO: Make this retry logic better
511-
if res.StatusCode != http.StatusServiceUnavailable {
512-
break
513-
}
514-
515-
time.Sleep(350 * time.Millisecond)
516-
517-
// Close the body
518-
_, _ = io.Copy(ioutil.Discard, res.Body)
519-
utils.CloseTheCloser(res.Body)
520-
}
521-
522-
defer utils.CloseTheCloser(res.Body)
523-
524-
// Copy headers and status code
525-
for k, v := range res.Header {
526-
w.Header()[k] = v
527-
}
528-
529-
w.WriteHeader(res.StatusCode)
530-
_, _ = io.Copy(w, res.Body)
531-
}
532-
}

Diff for: runner/server/handle_proxy.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"net/http"
6+
"time"
7+
8+
"github.com/gorilla/mux"
9+
"github.com/spaceuptech/helpers"
10+
11+
"github.com/spaceuptech/space-cloud/runner/model"
12+
"github.com/spaceuptech/space-cloud/runner/utils"
13+
)
14+
15+
func (s *Server) handleWaitServices() http.HandlerFunc {
16+
17+
return func(w http.ResponseWriter, r *http.Request) {
18+
19+
defer utils.CloseTheCloser(r.Body)
20+
21+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
22+
defer cancel()
23+
24+
vars := mux.Vars(r)
25+
project := vars["project"]
26+
serviceID := vars["serviceId"]
27+
version := vars["version"]
28+
// Wait for the service to scale up
29+
if err := s.driver.WaitForService(ctx, &model.Service{ProjectID: project, ID: serviceID, Version: version}); err != nil {
30+
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusServiceUnavailable, err)
31+
return
32+
}
33+
_ = helpers.Response.SendOkayResponse(ctx, http.StatusOK, w)
34+
}
35+
}
36+
37+
func (s *Server) handleScaleUpService() http.HandlerFunc {
38+
39+
return func(w http.ResponseWriter, r *http.Request) {
40+
41+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
42+
defer cancel()
43+
44+
// Close the body of the request
45+
defer utils.CloseTheCloser(r.Body)
46+
// Verify token
47+
_, err := s.auth.VerifyToken(utils.GetToken(r))
48+
if err != nil {
49+
_ = helpers.Logger.LogError(helpers.GetRequestID(ctx), "Failed to scaleUp service", err, nil)
50+
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusUnauthorized, err)
51+
return
52+
}
53+
54+
vars := mux.Vars(r)
55+
project := vars["project"]
56+
serviceID := vars["serviceId"]
57+
version := vars["version"]
58+
// Instruct driver to scale up
59+
if err := s.driver.ScaleUp(ctx, project, serviceID, version); err != nil {
60+
_ = helpers.Response.SendErrorResponse(ctx, w, http.StatusServiceUnavailable, err)
61+
return
62+
}
63+
64+
_ = helpers.Response.SendOkayResponse(ctx, http.StatusOK, w)
65+
}
66+
}

Diff for: runner/server/routes.go

+2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ func (s *Server) routes() {
1414
s.router.Methods(http.MethodPost).Path("/v1/runner/{project}/services/{serviceId}/{version}").HandlerFunc(s.handleApplyService())
1515
s.router.Methods(http.MethodGet).Path("/v1/runner/{project}/services").HandlerFunc(s.HandleGetServices())
1616
s.router.Methods(http.MethodGet).Path("/v1/runner/{project}/services/status").HandlerFunc(s.HandleGetServicesStatus())
17+
s.router.Methods(http.MethodPost).Path("/v1/runner/{project}/scale-up/{serviceId}/{version}").HandlerFunc(s.handleScaleUpService())
18+
s.router.Methods(http.MethodGet).Path("/v1/runner/{project}/wait/{serviceId}/{version} ").HandlerFunc(s.handleWaitServices())
1719

1820
s.router.Methods(http.MethodDelete).Path("/v1/runner/{project}/services/{serviceId}/{version}").HandlerFunc(s.HandleDeleteService())
1921

Diff for: runner/server/server.go

-14
Original file line numberDiff line numberDiff line change
@@ -77,20 +77,6 @@ func (s *Server) Start() error {
7777
// Initialise the various routes of the s
7878
s.routes()
7979

80-
// Start proxy server
81-
go func() {
82-
// Create a new router
83-
router := mux.NewRouter()
84-
router.PathPrefix("/").HandlerFunc(s.handleProxy())
85-
86-
// Start http server
87-
corsObj := utils.CreateCorsObject()
88-
helpers.Logger.LogInfo(helpers.GetRequestID(context.TODO()), fmt.Sprintf("Starting server proxy on port %s", s.config.ProxyPort), nil)
89-
if err := http.ListenAndServe(":"+s.config.ProxyPort, corsObj.Handler(loggerMiddleWare(router))); err != nil {
90-
helpers.Logger.LogFatal(helpers.GetRequestID(context.TODO()), fmt.Sprintf("Proxy server failed: - %v", err), nil)
91-
}
92-
}()
93-
9480
// Start the http server
9581
corsObj := utils.CreateCorsObject()
9682
helpers.Logger.LogInfo(helpers.GetRequestID(context.TODO()), fmt.Sprintf("Starting server on port %s", s.config.Port), nil)

Diff for: runner/utils/driver/istio/helpers.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,7 @@ func prepareVirtualServiceHTTPRoutes(ctx context.Context, projectID, serviceID s
214214

215215
// Redirect traffic to runner when no of replicas is equal to zero. The runner proxy will scale up the service to service incoming requests.
216216
if versionScaleConfig.MinReplicas == 0 {
217-
destHost = "runner.space-cloud.svc.cluster.local"
217+
destHost = "runner-proxy.space-cloud.svc.cluster.local"
218218
destPort = proxyPort
219219
}
220220

@@ -379,7 +379,7 @@ func updateOrCreateVirtualServiceRoutes(service *model.Service, proxyPort uint32
379379
for _, dest := range httpRoute.Route {
380380

381381
// Check if the route was for a service with min scale 0. If the destination has the host of runner, it means it is communicating via the proxy.
382-
if dest.Destination.Host == "runner.space-cloud.svc.cluster.local" {
382+
if dest.Destination.Host == "runner-proxy.space-cloud.svc.cluster.local" {
383383
// We are only interested in this case if the new min replica for this version is more than 0. If the min replica was zero there would be no change
384384
if service.AutoScale.MinReplicas == 0 {
385385
continue
@@ -404,7 +404,7 @@ func updateOrCreateVirtualServiceRoutes(service *model.Service, proxyPort uint32
404404

405405
// Update the destination to communicate via the proxy if its for our version
406406
if dest.Destination.Host == getInternalServiceDomain(service.ProjectID, service.ID, service.Version) {
407-
dest.Destination.Host = "runner.space-cloud.svc.cluster.local"
407+
dest.Destination.Host = "runner-proxy.space-cloud.svc.cluster.local"
408408
dest.Destination.Port = &networkingv1alpha3.PortSelector{Number: proxyPort}
409409
}
410410
}

Diff for: runner/utils/driver/istio/istio.go

+64-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package istio
22

33
import (
4+
"sync"
5+
46
kedaVersionedClient "github.com/kedacore/keda/pkg/generated/clientset/versioned"
57
versionedclient "istio.io/client-go/pkg/clientset/versioned"
68
v1 "k8s.io/api/core/v1"
@@ -14,11 +16,22 @@ import (
1416
"github.com/spaceuptech/space-cloud/runner/utils/auth"
1517
)
1618

19+
// deployment stores the deploymentID
20+
type deployments map[string]status
21+
22+
// status stores the value of AvailableReplicas and ReadyReplicas
23+
type status struct {
24+
AvailableReplicas int32
25+
ReadyReplicas int32
26+
}
27+
1728
// Istio manages the istio on kubernetes deployment target
1829
type Istio struct {
1930
// For internal use
20-
auth *auth.Module
21-
config *Config
31+
auth *auth.Module
32+
config *Config
33+
seviceStatus map[string]deployments
34+
lock sync.RWMutex
2235

2336
// Drivers to talk to k8s and istio
2437
kube kubernetes.Interface
@@ -66,7 +79,43 @@ func NewIstioDriver(auth *auth.Module, c *Config) (*Istio, error) {
6679
// Start the keda external scaler
6780
go kedaScaler.Start()
6881

69-
return &Istio{auth: auth, config: c, kube: kube, istio: istio, keda: kedaClient, kedaScaler: kedaScaler}, nil
82+
i := &Istio{auth: auth, config: c, seviceStatus: make(map[string]deployments), kube: kube, istio: istio, keda: kedaClient, kedaScaler: kedaScaler}
83+
if err := WatchDeployments(func(eventType string, availableReplicas, readyReplicas int32, projectID, deploymentID string) {
84+
i.lock.Lock()
85+
defer i.lock.Unlock()
86+
87+
switch eventType {
88+
case resourceAddEvent, resourceUpdateEvent:
89+
if i.seviceStatus[projectID] == nil {
90+
i.seviceStatus[projectID] = deployments{
91+
deploymentID: {
92+
AvailableReplicas: availableReplicas,
93+
ReadyReplicas: readyReplicas,
94+
},
95+
}
96+
} else {
97+
i.seviceStatus[projectID][deploymentID] = status{
98+
AvailableReplicas: availableReplicas,
99+
ReadyReplicas: readyReplicas,
100+
}
101+
}
102+
case resourceDeleteEvent:
103+
deployments, ok := i.seviceStatus[projectID]
104+
if ok {
105+
_, found := deployments[deploymentID]
106+
if found {
107+
delete(deployments, deploymentID)
108+
}
109+
if len(deployments) == 0 {
110+
delete(i.seviceStatus, projectID)
111+
}
112+
}
113+
}
114+
}); err != nil {
115+
return nil, err
116+
}
117+
118+
return i, nil
70119
}
71120

72121
func checkIfVolumeIsSecret(name string, volumes []v1.Volume) bool {
@@ -82,3 +131,15 @@ func checkIfVolumeIsSecret(name string, volumes []v1.Volume) bool {
82131
func (i *Istio) Type() model.DriverType {
83132
return model.TypeIstio
84133
}
134+
135+
func (i *Istio) getStatusOfDeployement(projectID, deployementID string) bool {
136+
i.lock.RLock()
137+
if deployments, ok := i.seviceStatus[projectID]; ok {
138+
if status, ok := deployments[deployementID]; ok {
139+
i.lock.RUnlock()
140+
return status.AvailableReplicas >= 1 && status.ReadyReplicas >= 1
141+
}
142+
}
143+
i.lock.RUnlock()
144+
return false
145+
}

Diff for: runner/utils/driver/istio/scale.go

+15-25
Original file line numberDiff line numberDiff line change
@@ -3,42 +3,32 @@ package istio
33
import (
44
"context"
55
"fmt"
6+
"time"
67

78
"github.com/spaceuptech/helpers"
8-
appsv1 "k8s.io/api/apps/v1"
9-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10-
"k8s.io/apimachinery/pkg/watch"
11-
129
"github.com/spaceuptech/space-cloud/runner/model"
1310
)
1411

1512
// WaitForService adjusts scales, up the service to scale up the number of nodes from zero to one
1613
// TODO: Do one watch per service. Right now its possible to have multiple watches for the same service
1714
func (i *Istio) WaitForService(ctx context.Context, service *model.Service) error {
18-
ns := service.ProjectID
19-
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Scaling up service (%s:%s:%s) from zero", ns, service.ID, service.Version), nil)
20-
21-
timeout := int64(5 * 60)
22-
labels := fmt.Sprintf("app=%s,version=%s", service.ID, service.Version)
23-
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Watching for service (%s:%s:%s) to scale up and enter ready state", ns, service.ID, service.Version), nil)
24-
watcher, err := i.kube.AppsV1().Deployments(ns).Watch(ctx, metav1.ListOptions{Watch: true, LabelSelector: labels, TimeoutSeconds: &timeout})
25-
if err != nil {
26-
return err
27-
}
28-
defer watcher.Stop()
2915

30-
for ev := range watcher.ResultChan() {
31-
if ev.Type == watch.Error {
32-
return helpers.Logger.LogError(helpers.GetRequestID(ctx), fmt.Sprintf("service (%s:%s:%s) could not be scaled up", ns, service.ID, service.Version), nil, nil)
33-
}
34-
deployment := ev.Object.(*appsv1.Deployment)
35-
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Received watch event for service (%s:%s): available replicas - %d; ready replicas - %d", ns, service.ID, deployment.Status.AvailableReplicas, deployment.Status.ReadyReplicas), nil)
36-
if deployment.Status.AvailableReplicas >= 1 && deployment.Status.ReadyReplicas >= 1 {
37-
return nil
16+
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Scaling up service (%s:%s:%s) from zero", service.ProjectID, service.ID, service.Version), nil)
17+
18+
helpers.Logger.LogDebug(helpers.GetRequestID(ctx), fmt.Sprintf("Watching for service (%s:%s:%s) to scale up and enter ready state", service.ProjectID, service.ID, service.Version), nil)
19+
20+
ticker := time.NewTicker(200 * time.Millisecond)
21+
22+
for {
23+
select {
24+
case <-ctx.Done():
25+
return helpers.Logger.LogError(helpers.GetRequestID(ctx), fmt.Sprintf("Service (%s) could not be started", service.ID), nil, nil)
26+
case <-ticker.C:
27+
if i.getStatusOfDeployement(service.ProjectID, service.ID) {
28+
return nil
29+
}
3830
}
3931
}
40-
41-
return helpers.Logger.LogError(helpers.GetRequestID(ctx), fmt.Sprintf("service (%s:%s) could not be started", ns, service.ID), nil, nil)
4232
}
4333

4434
// ScaleUp is notifies keda to scale up a service

0 commit comments

Comments
 (0)