Skip to content

perf: batch insert attemptDetail #122

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
PoolConcurrency: int(cfg.Worker.Pool.Concurrency),
}
deliverer := deliverer.NewHTTPDeliverer(&cfg.Worker.Deliverer)
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus)
app.worker = worker.NewWorker(opts, db, deliverer, queue, app.metrics, tracer, app.bus, app.dispatcher)

Check warning on line 144 in app/app.go

View check run for this annotation

Codecov / codecov/patch

app/app.go#L144

Added line #L144 was not covered by tests
}

// admin
Expand Down
27 changes: 15 additions & 12 deletions db/dao/attempt_detail_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,25 @@
}
}

func (dao *attemptDetailDao) Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))
func (dao *attemptDetailDao) BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error {
ctx, span := tracing.Start(ctx, fmt.Sprintf("dao.%s.batch_insert", dao.opts.Table), trace.WithSpanKind(trace.SpanKindServer))

Check warning on line 34 in db/dao/attempt_detail_dao.go

View check run for this annotation

Codecov / codecov/patch

db/dao/attempt_detail_dao.go#L33-L34

Added lines #L33 - L34 were not covered by tests
defer span.End()

now := time.Now()
values := []interface{}{attemptDetail.ID, attemptDetail.RequestHeaders, attemptDetail.RequestBody, attemptDetail.ResponseHeaders, attemptDetail.ResponseBody, now, now, attemptDetail.WorkspaceId}

sql := `INSERT INTO attempt_details (id, request_headers, request_body, response_headers, response_body, created_at, updated_at, ws_id) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (id) DO UPDATE SET
request_headers = EXCLUDED.request_headers,
request_body = EXCLUDED.request_body,
response_headers = EXCLUDED.response_headers,
response_body = EXCLUDED.response_body,
updated_at = EXCLUDED.updated_at`

result, err := dao.DB(ctx).ExecContext(ctx, sql, values...)
builder := psql.Insert(dao.opts.Table).Columns("id", "request_headers", "request_body", "response_headers", "response_body", "created_at", "updated_at", "ws_id")
for _, entity := range entities {
builder = builder.Values(entity.ID, entity.RequestHeaders, entity.RequestBody, entity.ResponseHeaders, entity.ResponseBody, now, now, entity.WorkspaceId)
}
sql, args := builder.Suffix(`
ON CONFLICT (id) DO UPDATE SET
request_headers = EXCLUDED.request_headers,
request_body = EXCLUDED.request_body,
response_headers = EXCLUDED.response_headers,
response_body = EXCLUDED.response_body,
updated_at = EXCLUDED.updated_at`).MustSql()
dao.debugSQL(sql, args)
result, err := dao.DB(ctx).ExecContext(ctx, sql, args...)

Check warning on line 51 in db/dao/attempt_detail_dao.go

View check run for this annotation

Codecov / codecov/patch

db/dao/attempt_detail_dao.go#L39-L51

Added lines #L39 - L51 were not covered by tests
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion db/dao/daos.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type SourceDAO interface {

type AttemptDetailDAO interface {
BaseDAO[entities.AttemptDetail]
Insert(ctx context.Context, attemptDetail *entities.AttemptDetail) error
BatchInsert(ctx context.Context, entities []*entities.AttemptDetail) error
}

type PluginDAO interface {
Expand Down
10 changes: 9 additions & 1 deletion db/entities/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"database/sql/driver"
"encoding/json"
"errors"
"github.com/lib/pq"
"github.com/webhookx-io/webhookx/pkg/types"
)
Expand All @@ -26,7 +27,14 @@
type Headers map[string]string

func (m *Headers) Scan(src interface{}) error {
return json.Unmarshal(src.([]byte), m)
switch v := src.(type) {
case string:
return json.Unmarshal([]byte(v), m)
case []byte:
return json.Unmarshal(v, m)
default:
return errors.New("unknown type")

Check warning on line 36 in db/entities/types.go

View check run for this annotation

Codecov / codecov/patch

db/entities/types.go#L30-L36

Added lines #L30 - L36 were not covered by tests
}
}

func (m Headers) Value() (driver.Value, error) {
Expand Down
3 changes: 3 additions & 0 deletions db/migrations/7_update_attempt_details.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE JSONB USING request_headers::JSONB;
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE JSONB USING response_headers::JSONB;
CREATE INDEX idx_attempt_details_ws_id ON attempt_details (ws_id);
3 changes: 3 additions & 0 deletions db/migrations/7_update_attempt_details.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN request_headers TYPE TEXT;
ALTER TABLE IF EXISTS ONLY "attempt_details" ALTER COLUMN response_headers TYPE TEXT;
DROP INDEX idx_attempt_details_ws_id;
9 changes: 4 additions & 5 deletions dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
"github.com/webhookx-io/webhookx/pkg/tracing"
"github.com/webhookx-io/webhookx/pkg/types"
"github.com/webhookx-io/webhookx/utils"
"github.com/webhookx-io/webhookx/worker"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"time"
Expand Down Expand Up @@ -96,7 +95,7 @@
return d.db.Attempts.BatchInsert(ctx, attempts)
})
if err == nil {
go d.sendToQueue(context.WithoutCancel(ctx), attempts)
go d.SendToQueue(context.WithoutCancel(ctx), attempts)

Check warning on line 98 in dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

dispatcher/dispatcher.go#L98

Added line #L98 was not covered by tests
}
return n, err
}
Expand Down Expand Up @@ -130,19 +129,19 @@
return err
}

d.sendToQueue(ctx, attempts)
d.SendToQueue(ctx, attempts)

Check warning on line 132 in dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

dispatcher/dispatcher.go#L132

Added line #L132 was not covered by tests

return nil
}

func (d *Dispatcher) sendToQueue(ctx context.Context, attempts []*entities.Attempt) {
func (d *Dispatcher) SendToQueue(ctx context.Context, attempts []*entities.Attempt) {

Check warning on line 137 in dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

dispatcher/dispatcher.go#L137

Added line #L137 was not covered by tests
tasks := make([]*taskqueue.TaskMessage, 0, len(attempts))
ids := make([]string, 0, len(attempts))
for _, attempt := range attempts {
tasks = append(tasks, &taskqueue.TaskMessage{
ID: attempt.ID,
ScheduledAt: attempt.ScheduledAt.Time,
Data: &worker.MessageData{
Data: &taskqueue.MessageData{

Check warning on line 144 in dispatcher/dispatcher.go

View check run for this annotation

Codecov / codecov/patch

dispatcher/dispatcher.go#L144

Added line #L144 was not covered by tests
EventID: attempt.EventId,
EndpointId: attempt.EndpointId,
Attempt: attempt.AttemptNumber,
Expand Down
7 changes: 7 additions & 0 deletions pkg/taskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ type TaskMessage struct {
data []byte
}

type MessageData struct {
EventID string `json:"event_id"`
EndpointId string `json:"endpoint_id"`
Attempt int `json:"attempt"`
Event string `json:"event"`
}

func (t *TaskMessage) String() string {
return t.ID + ":" + string(t.data)
}
Expand Down
13 changes: 6 additions & 7 deletions test/tracing/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,12 @@ var _ = Describe("tracing worker", Ordered, func() {
"github.com/webhookx-io/webhookx",
}
expectedScopeSpans := map[string]map[string]string{
"worker.submit": {},
"worker.handle_task": {},
"dao.endpoints.get": {},
"dao.plugins.list": {},
"worker.deliver": {},
"dao.attempt_details.insert": {},
"taskqueue.redis.delete": {},
"worker.submit": {},
"worker.handle_task": {},
"dao.endpoints.get": {},
"dao.plugins.list": {},
"worker.deliver": {},
"taskqueue.redis.delete": {},
}

n, err := helper.FileCountLine(helper.OtelCollectorTracesFile)
Expand Down
2 changes: 1 addition & 1 deletion test/worker/requeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var _ = Describe("processRequeue", Ordered, func() {
assert.NoError(GinkgoT(), err)
w = worker.NewWorker(worker.WorkerOptions{
RequeueJobInterval: time.Second,
}, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{})
}, db, deliverer.NewHTTPDeliverer(&config.WorkerDeliverer{}), queue, metrics, tracer, mocks.MockBus{}, nil)

// data
ws := utils.Must(db.Workspaces.GetDefault(context.TODO()))
Expand Down
8 changes: 0 additions & 8 deletions worker/types.go

This file was deleted.

132 changes: 85 additions & 47 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"github.com/webhookx-io/webhookx/db"
"github.com/webhookx-io/webhookx/db/dao"
"github.com/webhookx-io/webhookx/db/entities"
"github.com/webhookx-io/webhookx/dispatcher"
"github.com/webhookx-io/webhookx/eventbus"
"github.com/webhookx-io/webhookx/mcache"
"github.com/webhookx-io/webhookx/pkg/metrics"
Expand All @@ -35,12 +36,15 @@

log *zap.SugaredLogger

queue taskqueue.TaskQueue
deliverer deliverer.Deliverer
DB *db.DB
tracer *tracing.Tracer
pool *pool.Pool
metrics *metrics.Metrics
queue taskqueue.TaskQueue
deliverer deliverer.Deliverer
DB *db.DB
tracer *tracing.Tracer
pool *pool.Pool
metrics *metrics.Metrics
dispatcher *dispatcher.Dispatcher

tasks chan *entities.AttemptDetail
}

type WorkerOptions struct {
Expand All @@ -57,7 +61,8 @@
queue taskqueue.TaskQueue,
metrics *metrics.Metrics,
tracer *tracing.Tracer,
bus eventbus.Bus) *Worker {
bus eventbus.Bus,
dispatcher *dispatcher.Dispatcher) *Worker {

Check warning on line 65 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L65

Added line #L65 was not covered by tests

opts.RequeueJobBatch = utils.DefaultIfZero(opts.RequeueJobBatch, constants.RequeueBatch)
opts.RequeueJobInterval = utils.DefaultIfZero(opts.RequeueJobInterval, constants.RequeueInterval)
Expand All @@ -66,16 +71,18 @@

ctx, cancel := context.WithCancel(context.Background())
worker := &Worker{
ctx: ctx,
cancel: cancel,
opts: opts,
queue: queue,
log: zap.S().Named("worker"),
deliverer: deliverer,
DB: db,
pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency),
metrics: metrics,
tracer: tracer,
ctx: ctx,
cancel: cancel,
opts: opts,
queue: queue,
log: zap.S().Named("worker"),
deliverer: deliverer,
DB: db,
pool: pool.NewPool(opts.PoolSize, opts.PoolConcurrency),
metrics: metrics,
tracer: tracer,
tasks: make(chan *entities.AttemptDetail, 1000),
dispatcher: dispatcher,

Check warning on line 85 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L74-L85

Added lines #L74 - L85 were not covered by tests
}

bus.Subscribe("plugin.crud", func(data interface{}) {
Expand Down Expand Up @@ -130,7 +137,7 @@
defer span.End()
ctx = tracingCtx
}
task.Data = &MessageData{}
task.Data = &taskqueue.MessageData{}

Check warning on line 140 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L140

Added line #L140 was not covered by tests
err = task.UnmarshalData(task.Data)
if err != nil {
w.log.Errorf("failed to unmarshal task: %v", err)
Expand Down Expand Up @@ -170,6 +177,10 @@
"consumers": w.opts.PoolConcurrency,
}))

for range runtime.NumCPU() {
go w.consumeAttemptDetails()
}

Check warning on line 182 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L180-L182

Added lines #L180 - L182 were not covered by tests

go w.run()

schedule.Schedule(w.ctx, w.processRequeue, w.opts.RequeueJobInterval)
Expand Down Expand Up @@ -210,7 +221,7 @@
task := &taskqueue.TaskMessage{
ID: attempt.ID,
ScheduledAt: attempt.ScheduledAt.Time,
Data: &MessageData{
Data: &taskqueue.MessageData{

Check warning on line 224 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L224

Added line #L224 was not covered by tests
EventID: attempt.EventId,
EndpointId: attempt.EndpointId,
Attempt: attempt.AttemptNumber,
Expand Down Expand Up @@ -245,7 +256,7 @@
defer span.End()
ctx = tracingCtx
}
data := task.Data.(*MessageData)
data := task.Data.(*taskqueue.MessageData)

Check warning on line 259 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L259

Added line #L259 was not covered by tests

// verify endpoint
cacheKey := constants.EndpointCacheKey.Build(data.EndpointId)
Expand Down Expand Up @@ -345,24 +356,19 @@
return err
}

go func() {
attemptDetail := &entities.AttemptDetail{
ID: task.ID,
RequestHeaders: utils.HeaderMap(request.Request.Header),
RequestBody: utils.Pointer(string(request.Payload)),
}
if len(response.Header) > 0 {
attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header)))
}
if response.ResponseBody != nil {
attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody))
}
attemptDetail.WorkspaceId = endpoint.WorkspaceId
err = w.DB.AttemptDetails.Insert(ctx, attemptDetail)
if err != nil {
w.log.Errorf("failed to insert attempt detail: %v", err)
}
}()
attemptDetail := &entities.AttemptDetail{
ID: task.ID,
RequestHeaders: utils.HeaderMap(request.Request.Header),
RequestBody: utils.Pointer(string(request.Payload)),
}
if len(response.Header) > 0 {
attemptDetail.ResponseHeaders = utils.Pointer(entities.Headers(utils.HeaderMap(response.Header)))
}
if response.ResponseBody != nil {
attemptDetail.ResponseBody = utils.Pointer(string(response.ResponseBody))
}
attemptDetail.WorkspaceId = endpoint.WorkspaceId
w.tasks <- attemptDetail

Check warning on line 371 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L359-L371

Added lines #L359 - L371 were not covered by tests

if result.Status == entities.AttemptStatusSuccess {
return nil
Expand All @@ -382,6 +388,9 @@
AttemptNumber: data.Attempt + 1,
ScheduledAt: types.NewTime(finishAt.Add(time.Second * time.Duration(delay))),
TriggerMode: entities.AttemptTriggerModeAutomatic,
Event: &entities.Event{
Data: json.RawMessage(data.Event),
},

Check warning on line 393 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L391-L393

Added lines #L391 - L393 were not covered by tests
}
nextAttempt.WorkspaceId = endpoint.WorkspaceId

Expand All @@ -390,15 +399,7 @@
return err
}

task = &taskqueue.TaskMessage{
ID: nextAttempt.ID,
ScheduledAt: nextAttempt.ScheduledAt.Time,
Data: &MessageData{
EventID: data.EventID,
EndpointId: data.EndpointId,
Attempt: nextAttempt.AttemptNumber,
},
}
go w.dispatcher.SendToQueue(context.WithoutCancel(ctx), []*entities.Attempt{nextAttempt})

Check warning on line 402 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L402

Added line #L402 was not covered by tests

err = w.queue.Add(ctx, []*taskqueue.TaskMessage{task})
if err != nil {
Expand Down Expand Up @@ -457,3 +458,40 @@
}
return *plugins, err
}

func (w *Worker) consumeAttemptDetails() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpicking: consider graceful shutdown when context done.

batch := 5

for {
select {
case <-w.ctx.Done():
return
default:

Check warning on line 469 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L462-L469

Added lines #L462 - L469 were not covered by tests
}

list := make([]*entities.AttemptDetail, 0, batch)

timeoutT := time.After(time.Millisecond * 100)
timeout := false
for i := 0; i < batch; i++ {
select {
case task := <-w.tasks:
list = append(list, task)
case <-timeoutT:
timeout = true

Check warning on line 481 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L472-L481

Added lines #L472 - L481 were not covered by tests
}
if timeout {
break

Check warning on line 484 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L483-L484

Added lines #L483 - L484 were not covered by tests
}
}

if len(list) == 0 {
continue

Check warning on line 489 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L488-L489

Added lines #L488 - L489 were not covered by tests
}

err := w.DB.AttemptDetails.BatchInsert(context.TODO(), list)
if err != nil {
w.log.Warnf("[worker] failed to batch insert attempt details: %v", err)
}

Check warning on line 495 in worker/worker.go

View check run for this annotation

Codecov / codecov/patch

worker/worker.go#L492-L495

Added lines #L492 - L495 were not covered by tests
}
}
Loading