pg-boss: Failing jobs on server shutdown

I call await boss.stop (), but the active jobs remain in the database and do not resume after the server restart. Should I call boss.fail() for each active tasks manually or does pg-boss have a built-in methods?

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 16 (7 by maintainers)

Most upvoted comments

I have implemented wrapper for PgBoss which allows to fail running jobs on shutdown. So far, I’m not completely sure about its stability. I did some tests and they all passed without problems (that is, after crashing the server, all incomplete jobs will be resumed)

class Queue {
    constructor(id, dbConnection, debug = false) {
        this.id = id;
        this.boss = new PgBoss(dbConnection);
        this.handler = null;
        this.activeJobsId = [];

        this.boss.on('error', error => console.error(error));
        this._stop = false;
        this._debug = debug;
    }

    setOptions(subscribeOptions, publishOptions) {
        this.subscribeOptions = subscribeOptions;
        this.publishOptions = publishOptions;
    }

    async start() {
        await this.boss.start();
    }

    async stop() {
        this._stop = true;
        if(this.activeJobsId.length)
            await this.boss.fail(this.activeJobsId);
        await this.boss.stop();
    }

    async subscribe(callback) {
        await this.boss.subscribe(this.id, this.subscribeOptions, async job => {
            this._debug && console.log(`started ${job.id}`);

            if(this._stop) throw '_stop';
            this.activeJobsId.push(job.id);
            await callback(job);
            this.activeJobsId.splice(this.activeJobsId.indexOf(job.id), 1);

            this._debug && console.log(`completed ${job.id}`);
        });
    }

    async publish(data) {
       return await this.boss.publish(this.id, data, this.publishOptions)
    }
}

How to use:

const queue = new Queue('some-queue', 'postgres://...', true);
queue.setOptions({ teamSize: 8, teamConcurrency: 5 }, { retryLimit: 3, retryDelay: 0 });

(async function() {
    await queue.start();

    await queue.subscribe(async (job) => {
        await delay(job.data.time);
        // throw 'fail';
    });
    
    await queue.publish({ time: 5000 });
    await queue.publish({ time: 1000 });
    await queue.publish({ time: 2000 });
})();

async function shutdown() {
    console.log('<\nshutdown')
    await queue.stop();
    console.log('shutdown\n>')
    process.exit(1);
}

process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);