diff --git a/src/worker.js b/src/worker.js index 92e9b76..d546275 100644 --- a/src/worker.js +++ b/src/worker.js @@ -8,6 +8,14 @@ import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors'; const puid = new Puid(); const debug = logger('esqueue:worker'); +function formatJobObject(job) { + return { + index: job._index, + type: job._type, + id: job._id, + }; +} + export default class Job extends events.EventEmitter { constructor(queue, type, workerFn, opts = {}) { if (typeof type !== 'string') throw new Error('Type must be a string'); @@ -35,6 +43,25 @@ export default class Job extends events.EventEmitter { clearInterval(this._checker); } + toJSON() { + return { + id: this.id, + index: this.queue.index, + jobType: this.jobType, + doctype: this.doctype, + }; + } + + _formatErrorParams(err, job) { + const response = { + error: err, + worker: this.toJSON(), + }; + + if (job) response.job = formatJobObject(job); + return response; + } + _claimJob(job) { const m = moment(); const startTime = m.toISOString(); @@ -69,7 +96,7 @@ export default class Job extends events.EventEmitter { .catch((err) => { if (err.statusCode === 409) return true; this.debug(`_claimJob failed on job ${job._id}`, err); - this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, err); + this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); return false; }); } @@ -98,7 +125,7 @@ export default class Job extends events.EventEmitter { .catch((err) => { if (err.statusCode === 409) return true; this.debug(`_failJob failed to update job ${job._id}`, err); - this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, err); + this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job)); return false; }); } @@ -151,6 +178,7 @@ export default class Job extends events.EventEmitter { // job execution was successful this.debug(`Completed job ${job._id}`); + const emitJob = formatJobObject(job); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); @@ -170,7 +198,7 @@ export default class Job extends events.EventEmitter { .catch((err) => { if (err.statusCode === 409) return false; this.debug(`Failure saving job output ${job._id}`, err); - this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, err); + this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job)); }); }, (jobErr) => { if (!jobErr) { @@ -182,12 +210,12 @@ export default class Job extends events.EventEmitter { // job execution failed if (jobErr.type === 'WorkerTimeoutError') { this.debug(`Timeout on job ${job._id}`); - this.emit(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, jobErr); + this.emit(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, this._formatErrorParams(jobErr, job)); return; } this.debug(`Failure occurred on job ${job._id}`, jobErr); - this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, jobErr); + this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job)); return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false); }); } @@ -288,7 +316,7 @@ export default class Job extends events.EventEmitter { if (err.status === 404) return []; this.debug('job querying failed', err); - this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, err); + this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err)); }); } } \ No newline at end of file diff --git a/test/src/worker.js b/test/src/worker.js index ea5f069..d3e3596 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -289,7 +289,6 @@ describe('Worker class', function () { }); }); - describe('claiming a job', function () { let params; let job; @@ -391,12 +390,18 @@ describe('Worker class', function () { sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) { - expect(err).to.have.property('statusCode', 401); - done(); + try { + expect(err).to.have.property('error'); + expect(err).to.have.property('job'); + expect(err).to.have.property('worker'); + expect(err.error).to.have.property('statusCode', 401); + done(); + } catch (e) { + done(e); + } }); worker._claimJob(job); }); - }); describe('failing a job', function () { @@ -462,8 +467,15 @@ describe('Worker class', function () { sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 })); worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) { - expect(err).to.have.property('statusCode', 401); - done(); + try { + expect(err).to.have.property('error'); + expect(err).to.have.property('job'); + expect(err).to.have.property('worker'); + expect(err.error).to.have.property('statusCode', 401); + done(); + } catch (e) { + done(e); + } }); worker._failJob(job); }); @@ -612,7 +624,10 @@ describe('Worker class', function () { worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => { try { - expect(err).to.have.property('type', 'UnspecifiedWorkerError'); + expect(err).to.have.property('error'); + expect(err).to.have.property('job'); + expect(err).to.have.property('worker'); + expect(err.error).to.have.property('type', 'UnspecifiedWorkerError'); done(); } catch (e) { done(e); @@ -654,7 +669,10 @@ describe('Worker class', function () { // check for timeout event worker.once(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, (err) => { try { - expect(err).to.have.property('type', 'WorkerTimeoutError'); + expect(err).to.have.property('error'); + expect(err).to.have.property('job'); + expect(err).to.have.property('worker'); + expect(err.error).to.have.property('type', 'WorkerTimeoutError'); performJobPromise.then(() => { sinon.assert.notCalled(failStub); done();