use reduce to serialize claimJob attempts

This commit is contained in:
2016-05-04 12:43:10 -07:00
parent 1357810191
commit ecfd0e1d4a

View File

@@ -117,7 +117,7 @@ export default class Job extends events.EventEmitter {
this.debug(`Starting job ${job._id}`); this.debug(`Starting job ${job._id}`);
const workerOutput = new Promise((resolve, reject) => { const workerOutput = new Promise((resolve, reject) => {
resolve(this.workerFn(job._source.payload)); resolve(this.workerFn.call(null, job._source.payload));
setTimeout(function () { setTimeout(function () {
reject(new WorkerTimeoutError({ reject(new WorkerTimeoutError({
@@ -177,41 +177,49 @@ export default class Job extends events.EventEmitter {
} }
_claimPendingJobs(jobs) { _claimPendingJobs(jobs) {
if (jobs.length === 0) return;
this._stopJobPolling(); this._stopJobPolling();
let claimed = false; let claimed = false;
return Bluebird.mapSeries(jobs, (job) => { return jobs.reduce((chain, job) => {
if (claimed) return false; return chain.then((claimedJob) => {
// short-circuit the promise chain if a job has been claimed
if (claimed) return claimedJob;
return this._claimJob(job) return this._claimJob(job)
.then((claimedJob) => { .then((claimResult) => {
if (claimedJob !== false) { if (claimResult !== false) {
claimed = true; claimed = true;
return claimedJob; return claimResult;
} }
})
.catch((err) => {
this.debug('Failed to claim outstanding jobs', err);
this.emit('error', err);
this.queue.emit('worker_error', {
id: this.id,
type: this.type,
err
}); });
throw err;
}); });
}, Promise.resolve())
.catch((err) => {
this.debug('Failed to claim outstanding jobs', err);
this.emit('error', err);
this.queue.emit('worker_error', {
id: this.id,
type: this.type,
err
});
throw err;
}) })
.then((mappedJobs) => mappedJobs.filter(Boolean)) .then((claimedJob) => {
.then((claimedJobs) => { if (!claimedJob) {
if (claimedJobs.length !== 1) {
this.debug(`All ${jobs.length} jobs already claimed`); this.debug(`All ${jobs.length} jobs already claimed`);
return; return;
} }
const job = claimedJobs[0]; this.debug(`Claimed job ${claimedJob._id}`);
this.debug(`Claimed job ${job._id}`); return this._performJob(claimedJob);
return this._performJob(job);
}) })
.finally(() => this._startJobPolling()); .then(() => this._startJobPolling())
.catch((err) => {
this.debug('Error claiming jobs', err);
this.emit('error', err);
this._startJobPolling();
});
} }
_getPendingJobs() { _getPendingJobs() {