diff --git a/src/worker.js b/src/worker.js index d37d4b1..c314f2a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -37,6 +37,7 @@ export default class Worker extends events.EventEmitter { this._poller = { timer: false, enabled: true, + running: false, }; this.debug(`Created worker for job type ${this.jobtype}`); @@ -250,23 +251,33 @@ export default class Worker extends events.EventEmitter { } _startJobPolling() { - if (!this._poller.enabled) { + if (!this._poller.enabled || this._poller.running) { return; } - this._poller.timer = setInterval(() => { - this._getPendingJobs().then((jobs) => this._claimPendingJobs(jobs)); + this._poller.timer = setTimeout(() => { + this._getPendingJobs().then((jobs) => { + if (!this._poller.running) return; + + const foundJobs = (!jobs || jobs.length === 0); + const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs); + + task.then(() => { + this._poller.running = false; + this._startJobPolling(); + }); + }); } , this.checkInterval); + + this._poller.running = true; } _stopJobPolling() { - clearInterval(this._poller.timer); + this._poller.running = false; + clearTimeout(this._poller.timer); } - _claimPendingJobs(jobs) { - if (!jobs || jobs.length === 0) return; - - this._stopJobPolling(); + _claimPendingJobs(jobs = []) { let claimed = false; // claim a single job, stopping after first successful claim @@ -292,10 +303,8 @@ export default class Worker extends events.EventEmitter { this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) - .then(() => this._startJobPolling()) .catch((err) => { this.debug('Error claiming jobs', err); - this._startJobPolling(); }); }