- Published on
Simple task queue with Redis Streams
- Authors
- Name
- Christopher Schleiden
- @cschleiden
Lately I've done a bit more work with Redis. While I had used it via abstractions at previous jobs, I had never interacted with it directly so there were a few things to catch up on.
One of the scenarios I needed it for, was a simple task queue for go-workflows. Basic requirements where the ability to push and pop tasks, not having to do polling so use blocking (*BL*
) Redis commands, and resiliency to worker failures. So if a worker pops a task off the queue and doesn't finish or heartbeats it within a certain timeframe, it should become available again for another worker.
BLMOVE
Using lists and The first idea, and how many task queues were originally implemented in Redis, was to use two lists. One for pending tasks, one for tasks being processed (processing) and move items between them atomically with BRPOPLPUSH
or its more generic successor BLMOVE
.
This allows to push tasks, pop tasks, and keep track of the tasks being worked on currently. The problem is when a worker crashes, we need to somehow move tasks from the processing list back to the pending list. One option is to keep track of when a worker picked up a task, or sent a heartbeat, in a separate sorted set ZSET
. The score is the time the worker acquired or renewed the lease last. We can then occasionally check the processing list for tasks that have been in there for too long, and move them back to the pending list and delete them from the sorted set.
One problem with this approach is that to pick up task, we now have to perform two operations:
BLMOVE pending processing RIGHT LEFT <timeout>
to wait for a task in the pending list and move it to processing, and thenZADD time <now + lock_timeout> <task id>
to update the score.
We cannot execute the BLMOVE
and ZADD
in a single, atomic operation like a lua script, since we do want the blocking behavior or BLMOVE
-- which you cannot use in a script. This leaves the possibility of a race condition with the periodic heartbeat check. What could happen is that we execute BLMOVE
, and then before we can do the ZADD
, the heartbeat check starts running. It would encounter a task in the processing list without a score. To work around that, we can give up the blocking BLMOVE
and use LMOVE
with ZADD
in a script and polling instead, but that increases load on the redis server, especially with multiple workers.
We can also take this scenario into account for the heartbeat check, and ignore any entry in pending which doesn't have a score, yet, but that complicates the cleanup logic.
Using Redis streams
What I ended up using were Redis Streams. Streams have a number of interesting properties, that make them a good fit for the requirements mentioned above.
When starting up, we create a stream task-queue
, and a single consumer group for all workers: task-workers
. Then we generate a unique name for each worker, and treat each as a single consumer in the consumer group.
Adding tasks
To add new tasks to the queue, we XADD
a new message:
XADD task-queue * id <id of task>
Retrieving tasks
Then each worker executes XREADGROUP
for reading new messages as part of the consumer group. The consumer group behavior for Redis Streams is similar to the one of Kafka, where only one consumer in the group receives the message:
XREADGROUP <group> <consumer> COUNT 1 BLOCK <timeout in ms> STREAMS task-queue
Marking tasks as finished
Whenever a worker has finished processing a task, we use XACK
to acknowledge the message and then XDEL
to delete it from the stream. We don't have to delete the message, we could occasionally trim the whole stream, but removing it right away works well enough for this scenario.
Recovering abandoned tasks
Once a worker has read a message, it starts processing it. In order to support the recovery of abandoned tasks, we take advantage of the additional XACK/XPENDING
features of Redis.
By not specifying the NOACK
parameter for the XREADGROUP
command, Redis requires the worker to explicitly acknowledge the message, before it is marked as delivered. Until it is acknowledged, it is not delivered to any other consumer in the group, and it is kept in a Pending Entries List (PEL). The PEL also tracks for each pending message, when it was read of claimed last. Redis calls that the idle time.
To recover abandoned tasks, we can use the XAUTOCLAIM
command. With
XAUTOCLAIM task-queue task-workers <consumer> <min-idle-time> 0 COUNT 1
we can transfer at most one message from the PEl which has a higher idle time than the threshold specified to the calling worker.
So checking for work is actually XAUTOCLAIM
and if that doesn't return any message, then the XREADGROUP
command.
Heartbeats for long running tasks
To prevent long running tasks from being acquired by another worker, workers periodically execute XCLAIM
on their already claimed message. This resets the idle time and serves as a heartbeat for the task.
XCLAIM task-queue task-workers <consumer> 0 <message id task>
I'm not using it yet, but we increase the retry counter every time so that we know how often a task was picked up again. Also pass 0
as the min-idle time, to always claim the message - even though it's already claimed by this worker.
Enforcing uniqueness
I had an additional requirement to support only unique task ids in a give task queue, but that was easy to add by combining the XADD
and XACK
/ XDEL
commands with an additional set and the SADD
/ SREM
commands.
Implementation
An implementation for this in Go is available here.