Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions .github/workflows/nodejs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,34 @@ jobs:
FRONTEND_PORT=$(kubectl get svc frontend -o=jsonpath='{.spec.ports[0].nodePort}')
FRONTEND_URL="http://127.0.0.1:$FRONTEND_PORT"
echo "Host: $FRONTEND_URL"
npx wait-on "$FRONTEND_URL/service/control/health"
kubectl wait --timeout 10m --for=condition=ready pod -l role=worker
npx wait-on --timeout 120000 "$FRONTEND_URL/service/control/health"
kubectl wait --timeout 3m --for=condition=ready pod -l role=worker
ROOT_TEST_URL=$FRONTEND_URL npm run test
env:
FLAKINESS_ACCESS_TOKEN: ${{ secrets.FLAKINESS_ACCESS_TOKEN }}
- name: Debug on failure
if: failure()
run: |
echo "=== Pod Status ==="
kubectl get pods -o wide
echo ""
echo "=== Control Service Logs ==="
kubectl logs deploy/control --tail=200 || true
echo ""
echo "=== Control Service Previous Logs (crashed) ==="
kubectl logs deploy/control --previous --tail=200 || true
echo ""
echo "=== Control Service Describe ==="
kubectl describe pod -l io.kompose.service=control || true
echo ""
echo "=== RabbitMQ Logs ==="
kubectl logs deploy/rabbitmq --tail=50 || true
echo ""
echo "=== etcd Logs ==="
kubectl logs deploy/etcd --tail=50 || true
echo ""
echo "=== File Service Logs ==="
kubectl logs deploy/file --tail=100 || true
- name: Upload playwright-report
if: ${{ !cancelled() }}
uses: actions/upload-artifact@v4
Expand Down
40 changes: 24 additions & 16 deletions control-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package main

import (
"context"
"crypto/rand"
"errors"
"fmt"
"io/ioutil"
"math/rand"
"io"
"math/big"
"net/http"
"os"
"os/signal"
Expand All @@ -21,7 +22,7 @@ import (
"github.com/getsentry/sentry-go"
sentryecho "github.com/getsentry/sentry-go/echo"

"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -35,7 +36,6 @@ const (
)

func init() {
rand.Seed(time.Now().UTC().UnixNano())
log.SetFormatter(&log.TextFormatter{
TimestampFormat: time.StampMilli,
})
Expand All @@ -46,7 +46,8 @@ type server struct {

etcdClient *clientv3.Client

amqpErrorChan chan *amqp.Error
amqpConnection *amqp.Connection
amqpErrorChan chan *amqp.Error

workers map[workertypes.WorkerLanguage]*Workers
}
Expand Down Expand Up @@ -105,9 +106,10 @@ func newServer() (*server, error) {
}

s := &server{
etcdClient: etcdClient,
amqpErrorChan: amqpErrorChan,
workers: workersMap,
etcdClient: etcdClient,
amqpConnection: amqpConnection,
amqpErrorChan: amqpErrorChan,
workers: workersMap,
}

s.initializeHttpServer()
Expand Down Expand Up @@ -216,8 +218,9 @@ func (s *server) handleRun(c echo.Context) error {
}

func (s *server) handleShareGet(c echo.Context) error {
ctx := c.Request().Context()
id := c.Param("id")
resp, err := s.etcdClient.Get(context.Background(), id)
resp, err := s.etcdClient.Get(ctx, id)
if err != nil {
return fmt.Errorf("could not fetch share: %w", err)
}
Expand All @@ -228,18 +231,19 @@ func (s *server) handleShareGet(c echo.Context) error {
}

func (s *server) handleShareCreate(c echo.Context) error {
code, err := ioutil.ReadAll(http.MaxBytesReader(c.Response().Writer, c.Request().Body, 1<<20))
ctx := c.Request().Context()
code, err := io.ReadAll(http.MaxBytesReader(c.Response().Writer, c.Request().Body, 1<<20))
if err != nil {
return fmt.Errorf("could not read request body: %w", err)
}
for retryCount := 0; retryCount <= 3; retryCount++ {
id := generateRandomString(SNIPPET_ID_LENGTH)
resp, err := s.etcdClient.Get(context.Background(), id)
resp, err := s.etcdClient.Get(ctx, id)
if err != nil {
return fmt.Errorf("could not fetch share: %w", err)
}
if resp.Count == 0 {
_, err = s.etcdClient.Put(context.Background(), id, string(code))
_, err = s.etcdClient.Put(ctx, id, string(code))
if err != nil {
return fmt.Errorf("could not save share: %w", err)
}
Expand All @@ -252,8 +256,9 @@ func (s *server) handleShareCreate(c echo.Context) error {
}

func (s *server) handleHealth(c echo.Context) error {
ctx := c.Request().Context()
for _, endpoint := range s.etcdClient.Endpoints() {
if _, err := s.etcdClient.Status(context.Background(), endpoint); err != nil {
if _, err := s.etcdClient.Status(ctx, endpoint); err != nil {
return fmt.Errorf("could not check etcd status: %w", err)
}
}
Expand All @@ -273,7 +278,9 @@ func (s *server) Stop() error {
return fmt.Errorf("could not cleanup workers: %w", err)
}
}

if err := s.amqpConnection.Close(); err != nil {
return fmt.Errorf("could not close amqp connection: %w", err)
}
return s.etcdClient.Close()
}

Expand Down Expand Up @@ -304,10 +311,11 @@ func main() {
}

func generateRandomString(n int) string {
var letterRunes = []rune("abcdefghijklmnopqrstuvpxyz1234567890")
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyz1234567890")
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
idx, _ := rand.Int(rand.Reader, big.NewInt(int64(len(letterRunes))))
b[i] = letterRunes[idx.Int64()]
}
return string(b)
}
42 changes: 19 additions & 23 deletions control-service/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (
log "github.com/sirupsen/logrus"

"github.com/google/uuid"
"github.com/streadway/amqp"
amqp "github.com/rabbitmq/amqp091-go"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
"k8s.io/utils/ptr"
)

type Workers struct {
Expand All @@ -25,14 +25,12 @@ type Workers struct {
amqpReplyQueueName string
amqpChannel *amqp.Channel
k8ClientSet kubernetes.Interface
repliesMu sync.Mutex
replies map[string]chan *workertypes.WorkerResponsePayload
replies sync.Map // map[string]chan *workertypes.WorkerResponsePayload
}

func newWorkers(language workertypes.WorkerLanguage, workerCount int, k8ClientSet kubernetes.Interface, amqpChannel *amqp.Channel) (*Workers, error) {
w := &Workers{
language: language,
replies: make(map[string]chan *workertypes.WorkerResponsePayload),
k8ClientSet: k8ClientSet,
amqpChannel: amqpChannel,
workers: make(chan *Worker, workerCount),
Expand Down Expand Up @@ -75,13 +73,12 @@ func (w *Workers) consumeReplies() error {
go func() {
for msg := range msgs {
log.Printf("received rpc callback, corr id: %v", msg.CorrelationId)
w.repliesMu.Lock()
replyChan, ok := w.replies[msg.CorrelationId]
w.repliesMu.Unlock()
value, ok := w.replies.Load(msg.CorrelationId)
if !ok {
log.Printf("no reply channel exists for worker %s", msg.CorrelationId)
continue
}
replyChan := value.(chan *workertypes.WorkerResponsePayload)
var reply *workertypes.WorkerResponsePayload
if err := json.Unmarshal(msg.Body, &reply); err != nil {
log.Printf("could not unmarshal reply json: %v", err)
Expand Down Expand Up @@ -132,9 +129,7 @@ func newWorker(workers *Workers) (*Worker, error) {
language: workers.language,
}

w.workers.repliesMu.Lock()
w.workers.replies[w.id] = make(chan *workertypes.WorkerResponsePayload, 1)
w.workers.repliesMu.Unlock()
w.workers.replies.Store(w.id, make(chan *workertypes.WorkerResponsePayload, 1))

_, err := w.workers.amqpChannel.QueueDeclare(
fmt.Sprintf("rpc_queue_%s", w.id), // name
Expand Down Expand Up @@ -167,8 +162,8 @@ func (w *Worker) createPod() error {
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicy(v1.RestartPolicyNever),
AutomountServiceAccountToken: pointer.BoolPtr(false),
EnableServiceLinks: pointer.BoolPtr(false),
AutomountServiceAccountToken: ptr.To(false),
EnableServiceLinks: ptr.To(false),
Containers: []v1.Container{
{
Name: "worker",
Expand All @@ -181,7 +176,7 @@ func (w *Worker) createPod() error {
},
{
Name: "AMQP_URL",
Value: "amqp://rabbitmq:5672?heartbeat=5s",
Value: "amqp://rabbitmq:5672?heartbeat=5",
},
{
Name: "WORKER_HTTP_PROXY",
Expand Down Expand Up @@ -245,20 +240,21 @@ func (w *Worker) Publish(code string) error {
func (w *Worker) Cleanup() error {
if err := w.workers.k8ClientSet.CoreV1().Pods(K8_NAMESPACE_NAME).
Delete(context.Background(), w.pod.Name, metav1.DeleteOptions{
GracePeriodSeconds: pointer.Int64Ptr(0),
GracePeriodSeconds: ptr.To(int64(0)),
}); err != nil {
return fmt.Errorf("could not delete pod: %w", err)
}
w.workers.repliesMu.Lock()
delete(w.workers.replies, w.id)
w.workers.repliesMu.Unlock()

w.workers.replies.Delete(w.id)
return nil
}

func (w *Worker) Subscribe() <-chan *workertypes.WorkerResponsePayload {
w.workers.repliesMu.Lock()
ch := w.workers.replies[w.id]
w.workers.repliesMu.Unlock()
return ch
value, ok := w.workers.replies.Load(w.id)
if !ok {
// This shouldn't happen, but return a closed channel to avoid panic
ch := make(chan *workertypes.WorkerResponsePayload)
close(ch)
return ch
}
return value.(chan *workertypes.WorkerResponsePayload)
}
82 changes: 52 additions & 30 deletions file-service/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"context"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"os"
"os/signal"
"path/filepath"
"slices"
"syscall"
"time"

Expand All @@ -34,6 +36,13 @@ type server struct {

const BUCKET_NAME = "file-uploads"

var allowedMimeTypes = []string{
"application/pdf",
"image/png",
"video/webm",
"application/zip",
}

func newServer() (*server, error) {
err := sentry.Init(sentry.ClientOptions{
Dsn: os.Getenv("FILE_SERVICE_SENTRY_DSN"),
Expand Down Expand Up @@ -100,43 +109,56 @@ func (s *server) handleUploadImage(c echo.Context) error {
outFiles := []publicFile{}
for _, files := range c.Request().MultipartForm.File {
for i := range files {
file, err := files[i].Open()
if err != nil {
return fmt.Errorf("could not open file: %w", err)
}
fileContent, err := io.ReadAll(file)
if err != nil {
return fmt.Errorf("could not read file: %w", err)
}
defer file.Close()
mimeType, err := filetype.Match(fileContent)
if err != nil {
return fmt.Errorf("could not detect mime-type: %w", err)
}
if mimeType.MIME.Value != "application/pdf" && mimeType.MIME.Value != "image/png" && mimeType.MIME.Value != "video/webm" && mimeType.MIME.Value != "application/zip" {
return fmt.Errorf("not allowed mime-type (%s): %s", mimeType.MIME.Value, files[i].Filename)
}
fileExtension := filepath.Ext(files[i].Filename)
objectName := uuid.New().String() + fileExtension
if _, err := s.minioClient.PutObject(context.Background(), BUCKET_NAME, objectName, bytes.NewBuffer(fileContent), files[i].Size, minio.PutObjectOptions{
ContentType: mimeType.MIME.Value,
}); err != nil {
return fmt.Errorf("could not put object: %w", err)
}
publicURL, err := s.minioClient.PresignedGetObject(context.Background(), BUCKET_NAME, objectName, time.Minute*10, url.Values{})
pf, err := s.processUploadedFile(c.Request().Context(), files[i])
if err != nil {
return fmt.Errorf("could not generate public URL: %w", err)
return err
}
outFiles = append(outFiles, publicFile{
Extension: fileExtension,
FileName: files[i].Filename,
PublicURL: publicURL.EscapedPath() + "?" + publicURL.RawQuery,
})
outFiles = append(outFiles, pf)
}
}
return c.JSON(http.StatusCreated, outFiles)
}

func (s *server) processUploadedFile(ctx context.Context, fh *multipart.FileHeader) (publicFile, error) {
file, err := fh.Open()
if err != nil {
return publicFile{}, fmt.Errorf("could not open file: %w", err)
}
defer file.Close()

fileContent, err := io.ReadAll(file)
if err != nil {
return publicFile{}, fmt.Errorf("could not read file: %w", err)
}

mimeType, err := filetype.Match(fileContent)
if err != nil {
return publicFile{}, fmt.Errorf("could not detect mime-type: %w", err)
}
if !slices.Contains(allowedMimeTypes, mimeType.MIME.Value) {
return publicFile{}, fmt.Errorf("not allowed mime-type (%s): %s", mimeType.MIME.Value, fh.Filename)
}

fileExtension := filepath.Ext(fh.Filename)
objectName := uuid.New().String() + fileExtension
if _, err := s.minioClient.PutObject(ctx, BUCKET_NAME, objectName, bytes.NewBuffer(fileContent), fh.Size, minio.PutObjectOptions{
ContentType: mimeType.MIME.Value,
}); err != nil {
return publicFile{}, fmt.Errorf("could not put object: %w", err)
}

publicURL, err := s.minioClient.PresignedGetObject(ctx, BUCKET_NAME, objectName, time.Minute*10, url.Values{})
if err != nil {
return publicFile{}, fmt.Errorf("could not generate public URL: %w", err)
}

return publicFile{
Extension: fileExtension,
FileName: fh.Filename,
PublicURL: publicURL.EscapedPath() + "?" + publicURL.RawQuery,
}, nil
}

func (s *server) handleHealth(c echo.Context) error {
return c.String(http.StatusOK, "OK")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ require (
github.com/h2non/filetype v1.1.3
github.com/labstack/echo/v4 v4.15.0
github.com/minio/minio-go/v7 v7.0.98
github.com/rabbitmq/amqp091-go v1.10.0
github.com/sirupsen/logrus v1.9.4
github.com/streadway/amqp v1.1.0
go.etcd.io/etcd/client/v3 v3.5.18
k8s.io/api v0.35.0
k8s.io/apimachinery v0.35.0
Expand Down
Loading
Loading