asynq: [BUG] The task is always active

Describe the bug When the task is active, the worker shutdown, then the task will always be active.

To Reproduce Steps to reproduce the behavior:

  1. Set task sleep times.
func HandleWelcomeEmailTask(ctx context.Context, t *asynq.Task) error {
	id, err := t.Payload.GetInt("user_id")
	if err != nil {
		return err
	}
	fmt.Printf("[%s] - [Start] Send Welcome Email to User %d\n", time.Now().String(), id)
	time.Sleep(10*time.Second)
	fmt.Printf("[%s] - [ End ] Send Welcome Email to User %d\n", time.Now().String(), id)
	return nil
}
  1. Start the worker.
  2. Start the scheduler.
  3. When the task is active, shutdown the worker.
  4. The task will always be active.

Expected behavior I think the task should change the state to failed based on Timeout, MaxRetry, etc.

Screenshots If applicable, add screenshots to help explain your problem.

Environment (please complete the following information):

  • OS: Windows
  • Version of asynq package: v0.17.2

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 22 (12 by maintainers)

Most upvoted comments

@thanhps42 Even though we couldn’t get to the bottom of this, this was a good opportunity to take another look at task recovering logic, so thank you 🙏 And please let me know if you see the issue again.

@crossworth Thank you for spotting! Yes, we should call Timer.Reset in recoverer. My reasoning behind using the Timer instead of Ticker is because I wanted to ensure that we start counting after the current execution is done. See the example below.

		for {
			select {
			case <-r.done:
				r.logger.Debug("Recoverer done")
				timer.Stop()
				return
			case <-timer.C:
                                 Do() // Potentially time-consuming operation
                                 timer.Reset(r.interval)
			}
		}

But I’m open to suggestions. Let me know if you have thoughts on this!

@thanhps42 Thank you for reporting this! @crossworth thank you for following up to this issue!

I can reproduce this bug, and it seems like RDB.ListDeadlineExceeded is not returning the deadline exceeded messages. I’ll look into this bug in the next few days, but feel free to open a PR for the bug fix if anyone else is interested in this.

Hello, are you sure that the task is not recovered after 1 minute?

On pull request https://github.com/hibiken/asynq/pull/181 was added support for task recovery on worker crash.

The way the crash recovery works it’s quite simple, on server (worker) startup after 1 minute https://github.com/hibiken/asynq/blob/2516c4babac835a04b59822eebb9a0016351e177/server.go#L376

A routine is executed to list all the “DeadlineExceeded” tasks: https://github.com/hibiken/asynq/blob/2516c4babac835a04b59822eebb9a0016351e177/recoverer.go#L70

And the retry/archive is executed based on the task Retry/Retried value: https://github.com/hibiken/asynq/blob/2516c4babac835a04b59822eebb9a0016351e177/recoverer.go#L77-L81

if the task should be retried, a delay function is used as well: https://github.com/hibiken/asynq/blob/2516c4babac835a04b59822eebb9a0016351e177/recoverer.go#L89-L95

the default delay function used is this: https://github.com/hibiken/asynq/blob/2516c4babac835a04b59822eebb9a0016351e177/server.go#L261-L268

If you don’t provide a timeout and deadline the default 30 minutes timeout will be used. https://github.com/hibiken/asynq/blob/63ce9ed0f925912bd95c8464d1ee9dfae4ddf109/client.go#L238

NOTE: if you set asynq.MaxRetry(0) when using Enqueue and the worker crash, no retry will be executed.

The recovery only works when you start the worker, so if are inspecting the redis directly you will not see the task been recovered unless you start the server (worker).