kombu: Redis transport causes multiple Celery job runs if eta exceeds visibility timeout

Caveat - I initially tracked down and patched this problem over a year ago, but never opened an Issue on it. Looking at the Kombu 3 code it appears to still be present, and I’d like to figure out if it can be fixed or if I’ve misunderstood the code.

Problem: When moving a job from the queue into the unacked sorted set, the Redis transport sets the “score” to the current time (time.time()).

In restore_visible(), the transport uses this score to determine when to move a job from the unacked queue back into the regular queue, effectively with the heuristic if enqueue_time < time.time() - visibility_timeout.

This makes sense for jobs which don’t have an ETA (aren’t scheduled for the future, that is). For jobs that do have an ETA, if that ETA is further in the future from the time of enqueue than the visibility timeout, the job will be copied back into the regular queue (after visibility_timeout seconds), despite still being resident in some worker’s memory, from whence it can be pulled by another worker and eventually executed twice.

Consider the following timeline: visibility_timeout=2

  • T0: job1 is enqueued with ETA = T5
  • T1: job1 is pulled from the queue by worker1; it goes into the unacked queue with a score of T1 (the current time), and then sits around in worker1’s memory.
  • T3: worker2 (or is it the master process? never been clear on this) calls restore_visible, looking for jobs with a score of <= T1 (the current time minus the visibility timeout). It finds job1 and moves it back to the regular queue.
  • T4: worker2 pulls job1 from the regular queue (also sending it to the unacked queue)
  • T5: worker1 and worker2, both with job1 in their internal queue, execute the job simultaneously.

If you have a reasonably short visibility timeout and a reasonably long ETA, this can cause the same job to be executed hundreds of times. This is a violation of the property that one expects by //not// using acks_late - namely, the job can be executed zero times, one time, or multiple times, rather than a guaranteed zero-or-one.

The fix I came up with was patching the Redis transport to set the score in the unacked sorted set relative to the ETA rather than to the current time. This required calling message.decode(), introspecting on the ETA and the timezone thereof - which I’m sure violates all sorts of barriers between Celery and Kombu. But it was all I could think of at the time: 9df3dcaf2461a8266a6691ca2c089518649cd9d5.

It did just occur to me that another possible solution was to have the workers lock around the unacked queue itself - if a worker is able to successfully remove the job from the unacked queue (sorted set, technically) then it has permission to execute the job, otherwise it does not. Not sure if that even makes sense, just spitballing.

About this issue

  • Original URL
  • State: open
  • Created 10 years ago
  • Reactions: 5
  • Comments: 29 (9 by maintainers)

Most upvoted comments

@DylanYoung so, here’s how I understand it after reading through code and debugging, keep in mind reality might be different.

RabbitMQ transport

You could compare rabbitmq to TCP: until it receives a confirmation of delivery back from the consumer, it retries the delivery. It uses heartbeats and other means to detect network failure or consumer crashes, and guarantees “at least once” delivery. There’s more to it but I never actually used it 😅

Redis transport

Kombu uses Redis pubsub as a transport, and it is different. If one of the subscribers for whatever reason fails to process the message, redis doesn’t care. Then, the tradeoff is to have a visibility_timeout (defaults to 1 hour), and a sorted set of unacked tasks, to be able to redistribute it to a different worker if timeout’s reached.

This is why it is such a bulky implementation we have here. I think it’s possible to implement a lighter redis transport, using BRPOP for example, but at a price of dropping support for many celery features.

This is also something @bigo was proposing above.

ETA

Now, neither RabbitMQ, nor redis, have no native support for ETA tasks anyway. So, what celery does – it uses QoS, so workers can prefetch tasks and run them when time has come. For the transport it means that worker can grab a handful of task but not acknowledge them for visibility_timeout (i.e. 1 hour).

If ETA is larger than that, that means worker does not ack the task for more than a visibility_timeout, and in case of redis, it just reschedules the task as if it was a lost message.

It is not a problem for rabbit, since it doesn’t need the visibility_timeout workaround and has a native QoS support. Another conversation around it you can read here.

Why is there not a fix for this yet? I just sent about 1500 texts to a bunch of end users -.-

did anyone every make any progress on this issue? We’ve been forward-porting this hack I made on new versions of Kombu for like…3 years.

Sounds good. I have a bit on my plate now, but if the idea is to go with the polling task option, that doesn’t sound terribly difficult to implement.

@ahopkins someone has to take over this, as no active development is happening at the moment.

It has a long history of discussion if you’d like to dive deeper, worth noting https://github.com/celery/celery/issues/3274 and https://github.com/celery/celery/issues/2541.

@ask Hello. I see “Not funded” label on this issue.

How much for a fix? 😃

Is there a working demo for the bug? ~It seems I’ve just had the issue in production, but I had no luck reproducing it locally.~

~It’s fired a single ETA task about 40 times, and that might be related with how many times I restarted flower within the same ETA window, so I tried emulating that with visibility_timeout = 1.~

The live config is extremely simple: redis + all-default celery config (i.e. I believeacks_late is False) with a single worker + flower.

Edit: I was looking for a magic number, that could explain the number of accumulated tasks, and it turns out to create one copy every 5 minutes, since the task was first enqueued. Which is exactly equal to unacked_mutex_expire.

yes!! this is all inclusive!!

we have to figure out the design decision first. btw the more you pay as donation the more time core team member like me could allocate dedicated time for celery 😃

@bigo Just got burned by this myself – though fortunately not affecting any end users. Looks like suggestion elsewhere is to use RabbitMQ with celery to avoid this ETA/visibility timeout issue. For my part, I am just having the problematic task grab a 5-minute Lock first (so other tasks fail; although also means task should take far less than 5-minutes to execute!). Another devops-ish option could be to ensure a (rolling?) graceful restart of all workers at an interval less than the visibility_timeout so that they give up their tasks naturally.

For the time being, kombu could issue a logger.warn if ETA was bigger than visability_timeout.