Skip to content

Commit 03bea90

Browse files
committed
feat(queue): decorate handler context before process events
1 parent d9af68f commit 03bea90

5 files changed

Lines changed: 83 additions & 5 deletions

File tree

fake_queue.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ func (f *FakeQueue) WithContext(ctx context.Context) queueRuntime {
7575
return &clone
7676
}
7777

78+
func (f *FakeQueue) setHandlerContextDecorator(func(context.Context) context.Context) {}
79+
7880
// Dispatch records a typed job payload in-memory using the fake default queue.
7981
// @group Testing
8082
//

observability.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,15 +869,20 @@ func (q *observedQueue) Register(jobType string, handler Handler) {
869869
runtime.Register(jobType, handler)
870870
return
871871
}
872-
runtime.Register(jobType, wrapObservedHandler(q.observer, q.driver, "", jobType, handler))
872+
runtime.Register(jobType, wrapObservedHandler(q.observer, q.driver, "", jobType, nil, handler))
873873
}
874874

875875
func (q *observedQueue) Driver() Driver {
876876
return q.driver
877877
}
878878

879-
func wrapObservedHandler(observer Observer, driver Driver, queueName string, jobType string, handler Handler) Handler {
879+
func wrapObservedHandler(observer Observer, driver Driver, queueName string, jobType string, ctxDecorator func(context.Context) context.Context, handler Handler) Handler {
880880
return func(ctx context.Context, job Job) error {
881+
if ctxDecorator != nil {
882+
if decorated := ctxDecorator(ctx); decorated != nil {
883+
ctx = decorated
884+
}
885+
}
881886
opts := job.jobOptions()
882887
effectiveQueue := queueName
883888
if effectiveQueue == "" {

observability_branches_test.go

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,15 @@ func (r *observerRecorder) Observe(_ context.Context, event Event) {
6060
r.events = append(r.events, event)
6161
}
6262

63+
type observerContextRecorder struct {
64+
values []string
65+
}
66+
67+
func (r *observerContextRecorder) Observe(ctx context.Context, _ Event) {
68+
value, _ := ctx.Value("decorated").(string)
69+
r.values = append(r.values, value)
70+
}
71+
6372
func TestChannelObserver_DropIfFullAndBlockingSend(t *testing.T) {
6473
ch := make(chan Event, 1)
6574
ch <- Event{Kind: EventEnqueueAccepted}
@@ -116,7 +125,7 @@ func TestObservedQueue_DispatchClassifiesErrors(t *testing.T) {
116125
func TestWrapObservedHandler_EmitsRetriedAndArchived(t *testing.T) {
117126
t.Run("retry path", func(t *testing.T) {
118127
recorder := &observerRecorder{}
119-
h := wrapObservedHandler(recorder, DriverSync, "", "job:retry", func(context.Context, Job) error {
128+
h := wrapObservedHandler(recorder, DriverSync, "", "job:retry", nil, func(context.Context, Job) error {
120129
return errors.New("boom")
121130
})
122131

@@ -134,7 +143,7 @@ func TestWrapObservedHandler_EmitsRetriedAndArchived(t *testing.T) {
134143

135144
t.Run("archive path", func(t *testing.T) {
136145
recorder := &observerRecorder{}
137-
h := wrapObservedHandler(recorder, DriverSync, "", "job:archive", func(context.Context, Job) error {
146+
h := wrapObservedHandler(recorder, DriverSync, "", "job:archive", nil, func(context.Context, Job) error {
138147
return errors.New("boom")
139148
})
140149

@@ -151,6 +160,30 @@ func TestWrapObservedHandler_EmitsRetriedAndArchived(t *testing.T) {
151160
})
152161
}
153162

163+
func TestWrapObservedHandler_DecoratesObserverContext(t *testing.T) {
164+
recorder := &observerContextRecorder{}
165+
h := wrapObservedHandler(recorder, DriverSync, "", "job:decorated", func(ctx context.Context) context.Context {
166+
return context.WithValue(ctx, "decorated", "jobs")
167+
}, func(ctx context.Context, _ Job) error {
168+
if got, _ := ctx.Value("decorated").(string); got != "jobs" {
169+
t.Fatalf("handler ctx value = %q, want jobs", got)
170+
}
171+
return nil
172+
})
173+
174+
if err := h(context.Background(), NewJob("job:decorated").OnQueue("default")); err != nil {
175+
t.Fatalf("wrapped handler returned error: %v", err)
176+
}
177+
if len(recorder.values) != 2 {
178+
t.Fatalf("expected 2 observed events, got %d", len(recorder.values))
179+
}
180+
for i, got := range recorder.values {
181+
if got != "jobs" {
182+
t.Fatalf("observer ctx value at index %d = %q, want jobs", i, got)
183+
}
184+
}
185+
}
186+
154187
func TestObservedQueue_WrapperMethods(t *testing.T) {
155188
recorder := &observerRecorder{}
156189
inner := &queueBackendStub{

queue.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ type queueRuntime interface {
4343
// Ready checks backend readiness for dispatch/worker operation.
4444
// @group Driver Integration
4545
Ready(ctx context.Context) error
46+
47+
// setHandlerContextDecorator decorates handler execution context at registration time.
48+
setHandlerContextDecorator(func(context.Context) context.Context)
4649
}
4750

4851
// WorkerpoolConfig configures the in-memory workerpool q.
@@ -180,6 +183,7 @@ type queueCommon struct {
180183
cfg Config
181184
driver Driver
182185
ctx context.Context
186+
handlerContextDecorator func(context.Context) context.Context
183187
}
184188

185189
type nativeQueueRuntime struct {
@@ -229,6 +233,13 @@ func (q *queueCommon) WithContext(ctx context.Context) *queueCommon {
229233
return &clone
230234
}
231235

236+
func (q *queueCommon) setHandlerContextDecorator(fn func(context.Context) context.Context) {
237+
if q == nil {
238+
return
239+
}
240+
q.handlerContextDecorator = fn
241+
}
242+
232243
func (q *queueCommon) Dispatch(job any) error {
233244
dispatchJob, err := q.jobFromAny(job)
234245
if err != nil {
@@ -259,6 +270,20 @@ func (q *externalQueueRuntime) WithContext(ctx context.Context) queueRuntime {
259270
return &clone
260271
}
261272

273+
func (q *nativeQueueRuntime) setHandlerContextDecorator(fn func(context.Context) context.Context) {
274+
if q == nil {
275+
return
276+
}
277+
q.common.setHandlerContextDecorator(fn)
278+
}
279+
280+
func (q *externalQueueRuntime) setHandlerContextDecorator(fn func(context.Context) context.Context) {
281+
if q == nil {
282+
return
283+
}
284+
q.common.setHandlerContextDecorator(fn)
285+
}
286+
262287
func (q *nativeQueueRuntime) BusRegister(jobType string, handler busruntime.Handler) {
263288
if handler == nil {
264289
q.Register(jobType, nil)
@@ -510,7 +535,7 @@ func (q *queueCommon) wrapRegisteredHandler(jobType string, handler Handler) Han
510535
if q.cfg.Driver == DriverRedis {
511536
return handler
512537
}
513-
return wrapObservedHandler(q.cfg.Observer, q.cfg.Driver, "", jobType, handler)
538+
return wrapObservedHandler(q.cfg.Observer, q.cfg.Driver, "", jobType, q.handlerContextDecorator, handler)
514539
}
515540

516541
func (q *queueCommon) dispatchBusJob(ctx context.Context, jobType string, payload []byte, opts busruntime.JobOptions) error {

runtime.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ type Option func(*runtimeOptions)
101101
type runtimeOptions struct {
102102
busOpts []bus.Option
103103
workers int
104+
handlerContextDecorator func(context.Context) context.Context
104105
}
105106

106107
func (o *runtimeOptions) apply(opts []Option) {
@@ -207,6 +208,15 @@ func WithWorkers(count int) Option {
207208
}
208209
}
209210

211+
// WithHandlerContextDecorator decorates queue handler execution context before
212+
// process lifecycle events and handler execution run.
213+
// @group Queue
214+
func WithHandlerContextDecorator(fn func(context.Context) context.Context) Option {
215+
return func(o *runtimeOptions) {
216+
o.handlerContextDecorator = fn
217+
}
218+
}
219+
210220
// Queue is the high-level user-facing queue API.
211221
// It composes the queue runtime with the internal orchestration engine.
212222
// @group Queue
@@ -230,6 +240,9 @@ func newQueueFromRuntime(q queueRuntime, opts ...Option) (*Queue, error) {
230240
if ro.workers > 0 && q != nil {
231241
q = q.Workers(ro.workers)
232242
}
243+
if ro.handlerContextDecorator != nil && q != nil {
244+
q.setHandlerContextDecorator(ro.handlerContextDecorator)
245+
}
233246
b, err := bus.New(q, ro.busOpts...)
234247
if err != nil {
235248
return nil, err

0 commit comments

Comments
 (0)