Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to retry a timed out job? #401

Open
ross-nordstrom opened this issue May 26, 2023 · 2 comments
Open

How to retry a timed out job? #401

ross-nordstrom opened this issue May 26, 2023 · 2 comments

Comments

@ross-nordstrom
Copy link
Contributor

ross-nordstrom commented May 26, 2023

Context

I'm trying to add some fail-safes around a resource-intensive job with a lot of external dependencies, so it can sometimes hang or OOM. It usually works on the next retry.

Issue

I'd like to set a job-specific timeout and have it retry after a TimeoutError, but I can't figure out how to do that. The TimeoutError seems to be terminal and I can't get it to retry... any advice on how to make this work?

See related issue: #402

Reproduction

  1. run the worker python script.py worker
  2. queue jobs by running python script.py client
  3. Watch the job run, timeout, and NOT retry in the worker
import asyncio
import random

import arq.worker
from arq import create_pool
from arq.connections import RedisSettings
from arq.typing import WorkerSettingsBase
from future.moves import sys

from settings import redis_settings


async def __flaky_job(_ctx):
    latency = random.uniform(1.0, 60.0)
    print(f"Starting with that will take {latency:.2f}s to run (and is allowed to run up to 3s)...")
    await asyncio.sleep(latency)
    print("Done!")


flaky_job_with_timeout = arq.worker.func(__flaky_job, name='flaky_job', timeout=3)


class WorkerSettings(WorkerSettingsBase):
    redis_settings = redis_settings
    functions = [flaky_job_with_timeout]


def worker():
    print('Starting worker')
    arq.run_worker(WorkerSettings)


async def client():
    print('Running client')
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('flaky_job')
    print('Enqueued job')


if __name__ == '__main__':
    match sys.argv[1]:
        case 'client':
            asyncio.run(client())
        case 'worker':
            worker()
@JonasKs
Copy link
Sponsor Collaborator

JonasKs commented May 26, 2023

There's many settings to the worker class, and you can specify max retry on specific jobs. Please see the documentation. I also suggest reading this part: https://arq-docs.helpmanual.io/#retrying-jobs-and-cancellation

@ross-nordstrom
Copy link
Contributor Author

Right, I've been using Retry and job aborting successfully, but am struggling with timeouts-vs-retries.

Is it a correct expectation that a timed-out job (job runs longer than timeout) will be automatically retried?
If not, is there a way to do this?

I tried catching the error, but it doesn't propagate up to where I invoke run_worker:

try:
    arq.run_worker(WorkerSettings)
except Exception as e:
    # Never hit on TimeoutError
    logging.exception('worker error')

By the way, this is what the worker logs when you run my reproduction steps:

Starting worker
Starting with that will take 30.57s to run (and is allowed to run up to 3s)...
  3.00s ! 0de71325829747abbbd608ec97fc1f4c:flaky_job failed, TimeoutError: 

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants