diff --git a/app/app.go b/app/app.go index f76d6c3d..327363fa 100644 --- a/app/app.go +++ b/app/app.go @@ -141,7 +141,7 @@ func (app *Application) initialize() error { PoolConcurrency: int(cfg.Worker.Pool.Concurrency), } deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer) - app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus) + app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus, app.dispatcher) } // admin diff --git a/db/dao/attempt_detail_dao.go b/db/dao/attempt_detail_dao.go index bc76ef5c..8df37be1 100644 --- a/db/dao/attempt_detail_dao.go +++ b/db/dao/attempt_detail_dao.go @@ -30,22 +30,25 @@ func NewAttemptDetailDao(db *sqlx.DB, bus *eventbus.EventBus, workspace bool) At } } -func (dao *attemptDetailDao) Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error { - ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) +func (dao *attemptDetailDao) BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error { + ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer)) defer span.End() now := time.Now() - values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId} - sql := `INSERT INTO attempt_details (id, request_headers, request_body, response_headers, response_body, created_at, updated_at, ws_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) - ON CONFLICT (id) DO UPDATE SET - request_headers = EXCLUDED.request_headers, - request_body = EXCLUDED.request_body, - response_headers = EXCLUDED.response_headers, - response_body = EXCLUDED.response_body, - updated_at = EXCLUDED.updated_at` - - result, err := dao.DB(ctx).ExecContext(ctx, sql, values...) + builder := psql.Insert(dao.opts.Table).Columns("id", "request_headers", "request_body", "response_headers", "response_body", "created_at", "updated_at", "ws_id") + for _, entity := range entities { + builder = builder.Values(entity.ID, entity.RequestHeaders, entity.RequestBody, entity.ResponseHeaders, entity.ResponseBody, now, now, entity.WorkspaceId) + } + sql, args := builder.Suffix(` + ON CONFLICT (id) DO UPDATE SET + request_headers = EXCLUDED.request_headers, + request_body = EXCLUDED.request_body, + response_headers = EXCLUDED.response_headers, + response_body = EXCLUDED.response_body, + updated_at = EXCLUDED.updated_at`).MustSql() + dao.debugSQL(sql, args) + result, err := dao.DB(ctx).ExecContext(ctx, sql, args...) if err != nil { return err } diff --git a/db/dao/daos.go b/db/dao/daos.go index 4fc87197..9c0880ab 100644 --- a/db/dao/daos.go +++ b/db/dao/daos.go @@ -50,7 +50,7 @@ type SourceDAO interface { type AttemptDetailDAO interface { BaseDAO[entities.AttemptDetail] - Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error + BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error } type PluginDAO interface { diff --git a/db/entities/types.go b/db/entities/types.go index d970ff29..45b73123 100644 --- a/db/entities/types.go +++ b/db/entities/types.go @@ -3,6 +3,7 @@ package entities import ( "database/sql/driver" "encoding/json" + "errors" "github.com/lib/pq" "github.com/webhookx-io/webhookx/pkg/types" ) @@ -26,7 +27,14 @@ type BaseModel struct { type Headers map[string]string func (m *Headers) Scan(src interface{}) error { - return json.Unmarshal(src.([]byte), m) + switch v := src.(type) { + case string: + return json.Unmarshal([]byte(v), m) + case []byte: + return json.Unmarshal(v, m) + default: + return errors.New("unknown type") + } } func (m Headers) Value() (driver.Value, error) { diff --git a/db/migrations/7_update_attempt_details.down.sql b/db/migrations/7_update_attempt_details.down.sql new file mode 100644 index 00000000..c63d7dfb --- /dev/null +++ b/db/migrations/7_update_attempt_details.down.sql @@ -0,0 +1,3 @@ +ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE JSONB USING request_headers::JSONB; +ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE JSONB USING response_headers::JSONB; +CREATE INDEX idx_attempt_details_ws_id ON attempt_details (ws_id); diff --git a/db/migrations/7_update_attempt_details.up.sql b/db/migrations/7_update_attempt_details.up.sql new file mode 100644 index 00000000..5da35bc8 --- /dev/null +++ b/db/migrations/7_update_attempt_details.up.sql @@ -0,0 +1,3 @@ +ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE TEXT; +ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE TEXT; +DROP INDEX idx_attempt_details_ws_id; diff --git a/dispatcher/dispatcher.go b/dispatcher/dispatcher.go index cbd3c655..fd25bd92 100644 --- a/dispatcher/dispatcher.go +++ b/dispatcher/dispatcher.go @@ -13,7 +13,6 @@ import ( "github.com/webhookx-io/webhookx/pkg/tracing" "github.com/webhookx-io/webhookx/pkg/types" "github.com/webhookx-io/webhookx/utils" - "github.com/webhookx-io/webhookx/worker" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" "time" @@ -96,7 +95,7 @@ func (d *Dispatcher) dispatchBatch(ctx context.Context, events []*entities.Event return d.db.Attempts.BatchInsert(ctx, attempts) }) if err == nil { - go d.sendToQueue(context.WithoutCancel(ctx), attempts) + go d.SendToQueue(context.WithoutCancel(ctx), attempts) } return n, err } @@ -130,19 +129,19 @@ func (d *Dispatcher) DispatchEndpoint(ctx context.Context, event *entities.Event return err } - d.sendToQueue(ctx, attempts) + d.SendToQueue(ctx, attempts) return nil } -func (d *Dispatcher) sendToQueue(ctx context.Context, attempts []*entities.Attempt) { +func (d *Dispatcher) SendToQueue(ctx context.Context, attempts []*entities.Attempt) { tasks := make([]*taskqueue.TaskMessage, 0, len(attempts)) ids := make([]string, 0, len(attempts)) for _, attempt := range attempts { tasks = append(tasks, &taskqueue.TaskMessage{ ID: attempt.ID, ScheduledAt: attempt.ScheduledAt.Time, - Data: &worker.MessageData{ + Data: &taskqueue.MessageData{ EventID: attempt.EventId, EndpointId: attempt.EndpointId, Attempt: attempt.AttemptNumber, diff --git a/pkg/taskqueue/queue.go b/pkg/taskqueue/queue.go index 65a86c7c..85d579d8 100644 --- a/pkg/taskqueue/queue.go +++ b/pkg/taskqueue/queue.go @@ -13,6 +13,13 @@ type TaskMessage struct { data []byte } +type MessageData struct { + EventID string `json:"event_id"` + EndpointId string `json:"endpoint_id"` + Attempt int `json:"attempt"` + Event string `json:"event"` +} + func (t *TaskMessage) String() string { return t.ID + ":" + string(t.data) } diff --git a/test/tracing/worker_test.go b/test/tracing/worker_test.go index 0b39472e..e760a5ff 100644 --- a/test/tracing/worker_test.go +++ b/test/tracing/worker_test.go @@ -53,13 +53,12 @@ var _ = Describe("tracing worker", Ordered, func() { "github.com/webhookx-io/webhookx", } expectedScopeSpans := map[string]map[string]string{ - "worker.submit": {}, - "worker.handle_task": {}, - "dao.endpoints.get": {}, - "dao.plugins.list": {}, - "worker.deliver": {}, - "dao.attempt_details.insert": {}, - "taskqueue.redis.delete": {}, + "worker.submit": {}, + "worker.handle_task": {}, + "dao.endpoints.get": {}, + "dao.plugins.list": {}, + "worker.deliver": {}, + "taskqueue.redis.delete": {}, } n, err := helper.FileCountLine(helper.OtelCollectorTracesFile) diff --git a/test/worker/requeue_test.go b/test/worker/requeue_test.go index ddff10e7..58979f50 100644 --- a/test/worker/requeue_test.go +++ b/test/worker/requeue_test.go @@ -45,7 +45,7 @@ var _ = Describe("processRequeue", Ordered, func() { assert.NoError(GinkgoT(), err) w = worker.NewWorker(worker.WorkerOptions{ RequeueJobInterval: time.Second, - }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{}) + }, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{}, nil) // data ws := utils.Must(db.Workspaces.GetDefault(context.TODO())) diff --git a/worker/types.go b/worker/types.go deleted file mode 100644 index 3d77f304..00000000 --- a/worker/types.go +++ /dev/null @@ -1,8 +0,0 @@ -package worker - -type MessageData struct { - EventID string `json:"event_id"` - EndpointId string `json:"endpoint_id"` - Attempt int `json:"attempt"` - Event string `json:"event"` -} diff --git a/worker/worker.go b/worker/worker.go index 7f8a52a2..e892bd16 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -9,6 +9,7 @@ import ( "github.com/webhookx-io/webhookx/db" "github.com/webhookx-io/webhookx/db/dao" "github.com/webhookx-io/webhookx/db/entities" + "github.com/webhookx-io/webhookx/dispatcher" "github.com/webhookx-io/webhookx/eventbus" "github.com/webhookx-io/webhookx/mcache" "github.com/webhookx-io/webhookx/pkg/metrics" @@ -35,12 +36,15 @@ type Worker struct { log *zap.SugaredLogger - queue taskqueue.TaskQueue - deliverer deliverer.Deliverer - DB *db.DB - tracer *tracing.Tracer - pool *pool.Pool - metrics *metrics.Metrics + queue taskqueue.TaskQueue + deliverer deliverer.Deliverer + DB *db.DB + tracer *tracing.Tracer + pool *pool.Pool + metrics *metrics.Metrics + dispatcher *dispatcher.Dispatcher + + tasks chan *entities.AttemptDetail } type WorkerOptions struct { @@ -57,7 +61,8 @@ func NewWorker( queue taskqueue.TaskQueue, metrics *metrics.Metrics, tracer *tracing.Tracer, - bus eventbus.Bus) *Worker { + bus eventbus.Bus, + dispatcher *dispatcher.Dispatcher) *Worker { opts.RequeueJobBatch = utils.DefaultIfZero(opts.RequeueJobBatch, constants.RequeueBatch) opts.RequeueJobInterval = utils.DefaultIfZero(opts.RequeueJobInterval, constants.RequeueInterval) @@ -66,16 +71,18 @@ func NewWorker( ctx, cancel := context.WithCancel(context.Background()) worker := &Worker{ - ctx: ctx, - cancel: cancel, - opts: opts, - queue: queue, - log: zap.S().Named("worker"), - deliverer: deliverer, - DB: db, - pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency), - metrics: metrics, - tracer: tracer, + ctx: ctx, + cancel: cancel, + opts: opts, + queue: queue, + log: zap.S().Named("worker"), + deliverer: deliverer, + DB: db, + pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency), + metrics: metrics, + tracer: tracer, + tasks: make(chan *entities.AttemptDetail, 1000), + dispatcher: dispatcher, } bus.Subscribe("plugin.crud", func(data interface{}) { @@ -130,7 +137,7 @@ func (w *Worker) run() { defer span.End() ctx = tracingCtx } - task.Data = &MessageData{} + task.Data = &taskqueue.MessageData{} err = task.UnmarshalData(task.Data) if err != nil { w.log.Errorf("failed to unmarshal task: %v", err) @@ -170,6 +177,10 @@ func (w *Worker) Start() error { "consumers": w.opts.PoolConcurrency, })) + for range runtime.NumCPU() { + go w.consumeAttemptDetails() + } + go w.run() schedule.Schedule(w.ctx, w.processRequeue, w.opts.RequeueJobInterval) @@ -210,7 +221,7 @@ func (w *Worker) processRequeue() { task := &taskqueue.TaskMessage{ ID: attempt.ID, ScheduledAt: attempt.ScheduledAt.Time, - Data: &MessageData{ + Data: &taskqueue.MessageData{ EventID: attempt.EventId, EndpointId: attempt.EndpointId, Attempt: attempt.AttemptNumber, @@ -245,7 +256,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er defer span.End() ctx = tracingCtx } - data := task.Data.(*MessageData) + data := task.Data.(*taskqueue.MessageData) // verify endpoint cacheKey := constants.EndpointCacheKey.Build(data.EndpointId) @@ -345,24 +356,19 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er return err } - go func() { - attemptDetail := &entities.AttemptDetail{ - ID: task.ID, - RequestHeaders: utils.HeaderMap(request.Request.Header), - RequestBody: utils.Pointer(string(request.Payload)), - } - if len(response.Header) > 0 { - attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header))) - } - if response.ResponseBody != nil { - attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody)) - } - attemptDetail.WorkspaceId = endpoint.WorkspaceId - err = w.DB.AttemptDetails.Insert(ctx, attemptDetail) - if err != nil { - w.log.Errorf("failed to insert attempt detail: %v", err) - } - }() + attemptDetail := &entities.AttemptDetail{ + ID: task.ID, + RequestHeaders: utils.HeaderMap(request.Request.Header), + RequestBody: utils.Pointer(string(request.Payload)), + } + if len(response.Header) > 0 { + attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header))) + } + if response.ResponseBody != nil { + attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody)) + } + attemptDetail.WorkspaceId = endpoint.WorkspaceId + w.tasks <- attemptDetail if result.Status == entities.AttemptStatusSuccess { return nil @@ -382,6 +388,9 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er AttemptNumber: data.Attempt + 1, ScheduledAt: types.NewTime(finishAt.Add(time.Second * time.Duration(delay))), TriggerMode: entities.AttemptTriggerModeAutomatic, + Event: &entities.Event{ + Data: json.RawMessage(data.Event), + }, } nextAttempt.WorkspaceId = endpoint.WorkspaceId @@ -390,15 +399,7 @@ func (w *Worker) handleTask(ctx context.Context, task *taskqueue.TaskMessage) er return err } - task = &taskqueue.TaskMessage{ - ID: nextAttempt.ID, - ScheduledAt: nextAttempt.ScheduledAt.Time, - Data: &MessageData{ - EventID: data.EventID, - EndpointId: data.EndpointId, - Attempt: nextAttempt.AttemptNumber, - }, - } + go w.dispatcher.SendToQueue(context.WithoutCancel(ctx), []*entities.Attempt{nextAttempt}) err = w.queue.Add(ctx, []*taskqueue.TaskMessage{task}) if err != nil { @@ -457,3 +458,40 @@ func listEndpointPlugins(ctx context.Context, db *db.DB, endpointId string) ([]* } return *plugins, err } + +func (w *Worker) consumeAttemptDetails() { + batch := 5 + + for { + select { + case <-w.ctx.Done(): + return + default: + } + + list := make([]*entities.AttemptDetail, 0, batch) + + timeoutT := time.After(time.Millisecond * 100) + timeout := false + for i := 0; i < batch; i++ { + select { + case task := <-w.tasks: + list = append(list, task) + case <-timeoutT: + timeout = true + } + if timeout { + break + } + } + + if len(list) == 0 { + continue + } + + err := w.DB.AttemptDetails.BatchInsert(context.TODO(), list) + if err != nil { + w.log.Warnf("[worker] failed to batch insert attempt details: %v", err) + } + } +}