-
Notifications
You must be signed in to change notification settings - Fork 29
Description
I hav ea couple of questions about my taskiq setup.
I use my own custom SentinelStreamBroker that supports XAUTOCLAIM which is something the regular redis stream supports. (I hope to add a PR soon to support this for others)
My configurations are as follows
terminationGracePeriodSeconds: 600
containers:
- name: lev-cortex-worker
args:
- taskiq
- worker
- 'app.worker:broker'
- 'app.tasks'
- '--max-async-tasks=6'
- '--workers=6'
- '--max-tasks-per-child=30'
- '--wait-tasks-timeout=600'
And here is my init logic
return CustomRedisStreamSentinelBroker(
sentinels=[(settings.redis_or_sentinel.sentinel.host, settings.redis_or_sentinel.sentinel.port)],
master_name=settings.redis_or_sentinel.sentinel.master_name,
sentinel_kwargs={
"password": settings.redis_or_sentinel.sentinel.password,
},
socket_timeout=settings.redis_or_sentinel.sentinel.socket_timeout,
idle_timeout=15 min,
password=settings.redis_or_sentinel.sentinel.redis_password,
unacknowledged_batch_size=6, # Match max-async-tasks to allow full worker utilization
xread_count=6, # Increase batch size for more efficient fetching
socket_connect_timeout=settings.redis_or_sentinel.sentinel.socket_connect_timeout,
maxlen=1_500,
My issue is 2 fold
- I am seeing ghost consumers left behind, we should clean these up on shutdown I guess right?
- I am seeing we are shutting down with pending tasks still present for the consumer.
For number 1 - I think we can fix pretty nicely (assuming this is a something we want fixed)
For number 2 - maybe I need to adjust my configurations? If so, maybe I lack an understanding of how things should work
some notes - I use a long idle timeout because I have long running agents sometimes - an agent can take from 1 minute to 15 min so far. So we want to support that without accidentally running it twice!
Some more things that might help - here are 7 of hte 18 consumers (though I am at 23 now, with 5 zombies)
1) 1) "name"
2) "03b5a6d4-cc21-4d68-86d4-3e47f9130e2e"
3) "pending"
4) (integer) 6
5) "idle"
6) (integer) 3186
7) "inactive"
8) (integer) 37588
2) 1) "name"
2) "05b088e1-f965-4bb6-b2ba-d8cf3de7a1b0"
3) "pending"
4) (integer) 12
5) "idle"
6) (integer) 15172
7) "inactive"
8) (integer) 15172
3) 1) "name"
2) "0d1e9f4a-cbd5-4656-8d16-8b3ad5098856"
3) "pending"
4) (integer) 6
5) "idle"
6) (integer) 3790
7) "inactive"
8) (integer) 38281
4) 1) "name"
2) "27997b0f-0c67-4096-81ef-8bc9050e96ed"
3) "pending"
4) (integer) 5
5) "idle"
6) (integer) 1791
7) "inactive"
8) (integer) 38783
5) 1) "name"
2) "3982f537-568a-4c5f-a84d-97cdb033a3fb"
3) "pending"
4) (integer) 7
5) "idle"
6) (integer) 33316
7) "inactive"
8) (integer) 33316
6) 1) "name"
2) "570146d2-9229-42d1-bed8-6b1f0073392d"
3) "pending"
4) (integer) 7
5) "idle"
6) (integer) 39538
7) "inactive"
8) (integer) 39538
7) 1) "name"
2) "57a62bd1-d1b6-4fcc-a947-50e5cd79c4a1"
3) "pending"
4) (integer) 9
5) "idle"
6) (integer) 43979
7) "inactive"
8) (integer) 43979
So something I don't understand is how I am getting more than 6 pending if my setings above say only fetch 6 at a time (xread_count)
My first guess - I assume we don't wait for all 6 to finish, so every time I finish 1 task or 2, we xread more tasks and any that can't be processed yet bc it would bring me over 6 tasks, would end up sitting in pending for a bit right?