From ecfd0e1d4a227ed5d4919e55053401d79c656c2a Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Wed, 4 May 2016 12:43:10 -0700 Subject: [PATCH] use reduce to serialize claimJob attempts --- src/worker.js | 58 +++++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 25 deletions(-) diff --git a/src/worker.js b/src/worker.js index cbc9570..91d817f 100644 --- a/src/worker.js +++ b/src/worker.js @@ -117,7 +117,7 @@ export default class Job extends events.EventEmitter { this.debug(`Starting job ${job._id}`); const workerOutput = new Promise((resolve, reject) => { - resolve(this.workerFn(job._source.payload)); + resolve(this.workerFn.call(null, job._source.payload)); setTimeout(function () { reject(new WorkerTimeoutError({ @@ -177,41 +177,49 @@ export default class Job extends events.EventEmitter { } _claimPendingJobs(jobs) { + if (jobs.length === 0) return; + this._stopJobPolling(); let claimed = false; - return Bluebird.mapSeries(jobs, (job) => { - if (claimed) return false; + return jobs.reduce((chain, job) => { + return chain.then((claimedJob) => { + // short-circuit the promise chain if a job has been claimed + if (claimed) return claimedJob; - return this._claimJob(job) - .then((claimedJob) => { - if (claimedJob !== false) { - claimed = true; - return claimedJob; - } - }) - .catch((err) => { - this.debug('Failed to claim outstanding jobs', err); - this.emit('error', err); - this.queue.emit('worker_error', { - id: this.id, - type: this.type, - err + return this._claimJob(job) + .then((claimResult) => { + if (claimResult !== false) { + claimed = true; + return claimResult; + } }); - throw err; }); + }, Promise.resolve()) + .catch((err) => { + this.debug('Failed to claim outstanding jobs', err); + this.emit('error', err); + this.queue.emit('worker_error', { + id: this.id, + type: this.type, + err + }); + throw err; }) - .then((mappedJobs) => mappedJobs.filter(Boolean)) - .then((claimedJobs) => { - if (claimedJobs.length !== 1) { + .then((claimedJob) => { + if (!claimedJob) { this.debug(`All ${jobs.length} jobs already claimed`); return; } - const job = claimedJobs[0]; - this.debug(`Claimed job ${job._id}`); - return this._performJob(job); + this.debug(`Claimed job ${claimedJob._id}`); + return this._performJob(claimedJob); }) - .finally(() => this._startJobPolling()); + .then(() => this._startJobPolling()) + .catch((err) => { + this.debug('Error claiming jobs', err); + this.emit('error', err); + this._startJobPolling(); + }); } _getPendingJobs() {