Skip to content

Commit d4152bb

Browse files
committed
test: align integration queues with physical names
1 parent 29db7e6 commit d4152bb

6 files changed

Lines changed: 62 additions & 48 deletions

integration/all/handler_context_integration_test.go

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ import (
2323
)
2424

2525
type handlerContextRecorder struct {
26-
mu sync.Mutex
27-
queueName string
28-
key any
29-
want string
30-
total map[EventKind]int
31-
decorated map[EventKind]int
26+
mu sync.Mutex
27+
queueName string
28+
key any
29+
want string
30+
total map[EventKind]int
31+
decorated map[EventKind]int
3232
}
3333

3434
func (r *handlerContextRecorder) setQueueName(queueName string) {
@@ -39,11 +39,11 @@ func (r *handlerContextRecorder) setQueueName(queueName string) {
3939

4040
func newHandlerContextRecorder(queueName string, key any, want string) *handlerContextRecorder {
4141
return &handlerContextRecorder{
42-
queueName: queueName,
43-
key: key,
44-
want: want,
45-
total: make(map[EventKind]int),
46-
decorated: make(map[EventKind]int),
42+
queueName: queueName,
43+
key: key,
44+
want: want,
45+
total: make(map[EventKind]int),
46+
decorated: make(map[EventKind]int),
4747
}
4848
}
4949

@@ -124,8 +124,8 @@ func TestIntegrationQueue_HandlerContextDecorator_AllBackends(t *testing.T) {
124124
const want = "jobs"
125125

126126
fixtures := []struct {
127-
name string
128-
newQ func(t *testing.T, observer Observer) (*Queue, string)
127+
name string
128+
newQ func(t *testing.T, observer Observer) (*Queue, string)
129129
}{
130130
{
131131
name: testenv.BackendSync,
@@ -170,61 +170,65 @@ func TestIntegrationQueue_HandlerContextDecorator_AllBackends(t *testing.T) {
170170
{
171171
name: testenv.BackendMySQL,
172172
newQ: func(t *testing.T, observer Observer) (*Queue, string) {
173+
physical := uniqueQueueName("handler-context-mysql")
173174
q, err := newQueue(
174-
withObserverAll(mysqlCfg(mysqlDSN(integrationMySQL.addr)), observer),
175+
withObserverAll(withDefaultQueueAll(mysqlCfg(mysqlDSN(integrationMySQL.addr)), physical), observer),
175176
WithHandlerContextDecorator(func(ctx context.Context) context.Context {
176177
return context.WithValue(ctx, key, want)
177178
}),
178179
)
179180
if err != nil {
180181
t.Fatalf("new mysql queue failed: %v", err)
181182
}
182-
return q, uniqueQueueName("handler-context-mysql")
183+
return q, physical
183184
},
184185
},
185186
{
186187
name: testenv.BackendPostgres,
187188
newQ: func(t *testing.T, observer Observer) (*Queue, string) {
189+
physical := uniqueQueueName("handler-context-postgres")
188190
q, err := newQueue(
189-
withObserverAll(postgresCfg(postgresDSN(integrationPostgres.addr)), observer),
191+
withObserverAll(withDefaultQueueAll(postgresCfg(postgresDSN(integrationPostgres.addr)), physical), observer),
190192
WithHandlerContextDecorator(func(ctx context.Context) context.Context {
191193
return context.WithValue(ctx, key, want)
192194
}),
193195
)
194196
if err != nil {
195197
t.Fatalf("new postgres queue failed: %v", err)
196198
}
197-
return q, uniqueQueueName("handler-context-postgres")
199+
return q, physical
198200
},
199201
},
200202
{
201203
name: testenv.BackendSQLite,
202204
newQ: func(t *testing.T, observer Observer) (*Queue, string) {
205+
physical := uniqueQueueName("handler-context-sqlite")
203206
q, err := newQueue(
204-
withObserverAll(sqliteCfg(fmt.Sprintf("%s/handler-context-%d.db", t.TempDir(), time.Now().UnixNano())), observer),
207+
withObserverAll(withDefaultQueueAll(sqliteCfg(fmt.Sprintf("%s/handler-context-%d.db", t.TempDir(), time.Now().UnixNano())), physical), observer),
205208
WithHandlerContextDecorator(func(ctx context.Context) context.Context {
206209
return context.WithValue(ctx, key, want)
207210
}),
208211
)
209212
if err != nil {
210213
t.Fatalf("new sqlite queue failed: %v", err)
211214
}
212-
return q, uniqueQueueName("handler-context-sqlite")
215+
return q, physical
213216
},
214217
},
215218
{
216219
name: testenv.BackendNATS,
217220
newQ: func(t *testing.T, observer Observer) (*Queue, string) {
221+
physical := uniqueQueueName("handler-context-nats")
218222
q, err := newQueue(
219-
withObserverAll(natsCfg(integrationNATS.url), observer),
223+
withObserverAll(withDefaultQueueAll(natsCfg(integrationNATS.url), physical), observer),
220224
WithHandlerContextDecorator(func(ctx context.Context) context.Context {
221225
return context.WithValue(ctx, key, want)
222226
}),
223227
)
224228
if err != nil {
225229
t.Fatalf("new nats queue failed: %v", err)
226230
}
227-
return q, uniqueQueueName("handler-context-nats")
231+
return q, physical
228232
},
229233
},
230234
{

integration/all/integration_scenarios_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -773,14 +773,14 @@ func TestIntegrationScenarios_AllBackends(t *testing.T) {
773773
name: testenv.BackendMySQL,
774774
queueName: "scenario_mysql",
775775
newQueue: func(t *testing.T) QueueRuntime {
776-
q, err := newQueueRuntime(mysqlCfg(mysqlDSN(integrationMySQL.addr)))
776+
q, err := newQueueRuntime(withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), "scenario_mysql"))
777777
if err != nil {
778778
t.Fatalf("new mysql queue failed: %v", err)
779779
}
780780
return q
781781
},
782782
newWorker: func(t *testing.T) runtimeWorkerBackend {
783-
return newQueueBackedWorker(t, mysqlCfg(mysqlDSN(integrationMySQL.addr)), 4)
783+
return newQueueBackedWorker(t, withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), "scenario_mysql"), 4)
784784
},
785785
supportsBackoff: true,
786786
supportsRestart: true,
@@ -796,14 +796,14 @@ func TestIntegrationScenarios_AllBackends(t *testing.T) {
796796
name: testenv.BackendPostgres,
797797
queueName: "scenario_postgres",
798798
newQueue: func(t *testing.T) QueueRuntime {
799-
q, err := newQueueRuntime(postgresCfg(postgresDSN(integrationPostgres.addr)))
799+
q, err := newQueueRuntime(withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), "scenario_postgres"))
800800
if err != nil {
801801
t.Fatalf("new postgres queue failed: %v", err)
802802
}
803803
return q
804804
},
805805
newWorker: func(t *testing.T) runtimeWorkerBackend {
806-
return newQueueBackedWorker(t, postgresCfg(postgresDSN(integrationPostgres.addr)), 4)
806+
return newQueueBackedWorker(t, withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), "scenario_postgres"), 4)
807807
},
808808
supportsBackoff: true,
809809
supportsRestart: true,
@@ -844,14 +844,14 @@ func TestIntegrationScenarios_AllBackends(t *testing.T) {
844844
name: testenv.BackendNATS,
845845
queueName: "scenario_nats",
846846
newQueue: func(t *testing.T) QueueRuntime {
847-
q, err := newQueueRuntime(natsCfg(integrationNATS.url))
847+
q, err := newQueueRuntime(withDefaultQueue(natsCfg(integrationNATS.url), "scenario_nats"))
848848
if err != nil {
849849
t.Fatalf("new nats queue failed: %v", err)
850850
}
851851
return q
852852
},
853853
newWorker: func(t *testing.T) runtimeWorkerBackend {
854-
return newQueueBackedWorker(t, natsCfg(integrationNATS.url), 4)
854+
return newQueueBackedWorker(t, withDefaultQueue(natsCfg(integrationNATS.url), "scenario_nats"), 4)
855855
},
856856
supportsBackoff: true,
857857
supportsRestart: false,
@@ -929,14 +929,14 @@ func TestIntegrationScenarios_AllBackends(t *testing.T) {
929929
if fx.name == testenv.BackendSQLite {
930930
dsn := fmt.Sprintf("%s/scenario-%d.db", t.TempDir(), time.Now().UnixNano())
931931
fx.newQueue = func(t *testing.T) QueueRuntime {
932-
q, err := newQueueRuntime(sqliteCfg(dsn))
932+
q, err := newQueueRuntime(withDefaultQueue(sqliteCfg(dsn), "scenario_sqlite"))
933933
if err != nil {
934934
t.Fatalf("new sqlite queue failed: %v", err)
935935
}
936936
return q
937937
}
938938
fx.newWorker = func(t *testing.T) runtimeWorkerBackend {
939-
return newQueueBackedWorker(t, sqliteCfg(dsn), 4)
939+
return newQueueBackedWorker(t, withDefaultQueue(sqliteCfg(dsn), "scenario_sqlite"), 4)
940940
}
941941
fx.supportsBackoff = true
942942
fx.supportsRestart = true
@@ -2467,9 +2467,9 @@ func newOrderingWorker(t *testing.T, fx scenarioFixture) runtimeWorkerBackend {
24672467
case testenv.BackendRedis:
24682468
return newQueueBackedWorker(t, redisCfg(integrationRedis.addr), 1)
24692469
case testenv.BackendMySQL:
2470-
return newQueueBackedWorker(t, mysqlCfg(mysqlDSN(integrationMySQL.addr)), 1)
2470+
return newQueueBackedWorker(t, withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), fx.queueName), 1)
24712471
case testenv.BackendPostgres:
2472-
return newQueueBackedWorker(t, postgresCfg(postgresDSN(integrationPostgres.addr)), 1)
2472+
return newQueueBackedWorker(t, withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), fx.queueName), 1)
24732473
default:
24742474
return fx.newWorker(t)
24752475
}

integration/all/runtime_integration_test.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,44 +69,48 @@ func TestIntegrationQueue_AllBackends(t *testing.T) {
6969
name: testenv.BackendMySQL,
7070
executes: true,
7171
newQ: func(t *testing.T) (*Queue, string) {
72-
q, err := newQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)))
72+
queueName := uniqueQueueName("queue-mysql")
73+
q, err := newQueue(withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), queueName))
7374
if err != nil {
7475
t.Fatalf("new mysql queue failed: %v", err)
7576
}
76-
return q, uniqueQueueName("queue-mysql")
77+
return q, queueName
7778
},
7879
},
7980
{
8081
name: testenv.BackendPostgres,
8182
executes: true,
8283
newQ: func(t *testing.T) (*Queue, string) {
83-
q, err := newQueue(postgresCfg(postgresDSN(integrationPostgres.addr)))
84+
queueName := uniqueQueueName("queue-postgres")
85+
q, err := newQueue(withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), queueName))
8486
if err != nil {
8587
t.Fatalf("new postgres queue failed: %v", err)
8688
}
87-
return q, uniqueQueueName("queue-postgres")
89+
return q, queueName
8890
},
8991
},
9092
{
9193
name: testenv.BackendSQLite,
9294
executes: true,
9395
newQ: func(t *testing.T) (*Queue, string) {
94-
q, err := newQueue(sqliteCfg(fmt.Sprintf("%s/queue-integration-%d.db", t.TempDir(), time.Now().UnixNano())))
96+
queueName := uniqueQueueName("queue-sqlite")
97+
q, err := newQueue(withDefaultQueue(sqliteCfg(fmt.Sprintf("%s/queue-integration-%d.db", t.TempDir(), time.Now().UnixNano())), queueName))
9598
if err != nil {
9699
t.Fatalf("new sqlite queue failed: %v", err)
97100
}
98-
return q, uniqueQueueName("queue-sqlite")
101+
return q, queueName
99102
},
100103
},
101104
{
102105
name: testenv.BackendNATS,
103106
executes: true,
104107
newQ: func(t *testing.T) (*Queue, string) {
105-
q, err := newQueue(natsCfg(integrationNATS.url))
108+
queueName := uniqueQueueName("queue-nats")
109+
q, err := newQueue(withDefaultQueue(natsCfg(integrationNATS.url), queueName))
106110
if err != nil {
107111
t.Fatalf("new nats queue failed: %v", err)
108112
}
109-
return q, uniqueQueueName("queue-nats")
113+
return q, queueName
110114
},
111115
},
112116
{

integration/bus/integration_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,13 @@ func TestIntegrationBus_AllBackends(t *testing.T) {
299299
}()
300300

301301
queueName := uniqueQueueName("bus-integration")
302-
if backend.name == testenv.BackendRedis || backend.name == testenv.BackendRabbitMQ {
302+
if backend.name == testenv.BackendRedis ||
303+
backend.name == testenv.BackendMySQL ||
304+
backend.name == testenv.BackendPostgres ||
305+
backend.name == testenv.BackendSQLite ||
306+
backend.name == testenv.BackendNATS ||
307+
backend.name == testenv.BackendSQS ||
308+
backend.name == testenv.BackendRabbitMQ {
303309
queueName = "default"
304310
}
305311
if !backend.executes {

integration/root/observability_integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestObservabilityIntegration_AllBackends(t *testing.T) {
4343
workers: 2,
4444
newQueue: func(t *testing.T, collector *queue.StatsCollector) QueueRuntime {
4545
ensureMySQLDB(t)
46-
q, err := newQueueRuntime(withObserver(mysqlCfg(mysqlDSN(integrationMySQL.addr)), collector))
46+
q, err := newQueueRuntime(withObserver(withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), "obs_mysql"), collector))
4747
if err != nil {
4848
t.Fatalf("new mysql queue failed: %v", err)
4949
}
@@ -57,7 +57,7 @@ func TestObservabilityIntegration_AllBackends(t *testing.T) {
5757
workers: 2,
5858
newQueue: func(t *testing.T, collector *queue.StatsCollector) QueueRuntime {
5959
ensurePostgresDB(t)
60-
q, err := newQueueRuntime(withObserver(postgresCfg(postgresDSN(integrationPostgres.addr)), collector))
60+
q, err := newQueueRuntime(withObserver(withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), "obs_postgres"), collector))
6161
if err != nil {
6262
t.Fatalf("new postgres queue failed: %v", err)
6363
}
@@ -71,7 +71,7 @@ func TestObservabilityIntegration_AllBackends(t *testing.T) {
7171
workers: 2,
7272
newQueue: func(t *testing.T, collector *queue.StatsCollector) QueueRuntime {
7373
dsn := fmt.Sprintf("%s/obs-%d.db", t.TempDir(), time.Now().UnixNano())
74-
q, err := newQueueRuntime(withObserver(sqliteCfg(dsn), collector))
74+
q, err := newQueueRuntime(withObserver(withDefaultQueue(sqliteCfg(dsn), "obs_sqlite"), collector))
7575
if err != nil {
7676
t.Fatalf("new sqlite queue failed: %v", err)
7777
}
@@ -84,7 +84,7 @@ func TestObservabilityIntegration_AllBackends(t *testing.T) {
8484
workers: 2,
8585
newQueue: func(t *testing.T, collector *queue.StatsCollector) QueueRuntime {
8686
ensureNATS(t)
87-
q, err := newQueueRuntime(withObserver(natsCfg(integrationNATS.url), collector))
87+
q, err := newQueueRuntime(withObserver(withDefaultQueue(natsCfg(integrationNATS.url), "obs_nats"), collector))
8888
if err != nil {
8989
t.Fatalf("new nats queue failed: %v", err)
9090
}

integration/root/process_events_integration_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestObservabilityIntegration_ProcessEvents_AllBackends(t *testing.T) {
7878
workers: 2,
7979
newQueue: func(t *testing.T, observer queue.Observer) QueueRuntime {
8080
ensureMySQLDB(t)
81-
q, err := newQueueRuntime(withObserver(mysqlCfg(mysqlDSN(integrationMySQL.addr)), observer))
81+
q, err := newQueueRuntime(withObserver(withDefaultQueue(mysqlCfg(mysqlDSN(integrationMySQL.addr)), "obs_events_mysql"), observer))
8282
if err != nil {
8383
t.Fatalf("new mysql queue failed: %v", err)
8484
}
@@ -91,7 +91,7 @@ func TestObservabilityIntegration_ProcessEvents_AllBackends(t *testing.T) {
9191
workers: 2,
9292
newQueue: func(t *testing.T, observer queue.Observer) QueueRuntime {
9393
ensurePostgresDB(t)
94-
q, err := newQueueRuntime(withObserver(postgresCfg(postgresDSN(integrationPostgres.addr)), observer))
94+
q, err := newQueueRuntime(withObserver(withDefaultQueue(postgresCfg(postgresDSN(integrationPostgres.addr)), "obs_events_postgres"), observer))
9595
if err != nil {
9696
t.Fatalf("new postgres queue failed: %v", err)
9797
}
@@ -104,7 +104,7 @@ func TestObservabilityIntegration_ProcessEvents_AllBackends(t *testing.T) {
104104
workers: 2,
105105
newQueue: func(t *testing.T, observer queue.Observer) QueueRuntime {
106106
dsn := fmt.Sprintf("%s/obs-events-%d.db", t.TempDir(), time.Now().UnixNano())
107-
q, err := newQueueRuntime(withObserver(sqliteCfg(dsn), observer))
107+
q, err := newQueueRuntime(withObserver(withDefaultQueue(sqliteCfg(dsn), "obs_events_sqlite"), observer))
108108
if err != nil {
109109
t.Fatalf("new sqlite queue failed: %v", err)
110110
}
@@ -117,7 +117,7 @@ func TestObservabilityIntegration_ProcessEvents_AllBackends(t *testing.T) {
117117
workers: 2,
118118
newQueue: func(t *testing.T, observer queue.Observer) QueueRuntime {
119119
ensureNATS(t)
120-
q, err := newQueueRuntime(withObserver(natsCfg(integrationNATS.url), observer))
120+
q, err := newQueueRuntime(withObserver(withDefaultQueue(natsCfg(integrationNATS.url), "obs_events_nats"), observer))
121121
if err != nil {
122122
t.Fatalf("new nats queue failed: %v", err)
123123
}

0 commit comments

Comments
 (0)