error emitters should return more than just the error
update tests, and properly catch error conditions in worker event tests
This commit is contained in:
@@ -8,6 +8,14 @@ import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
|
|||||||
const puid = new Puid();
|
const puid = new Puid();
|
||||||
const debug = logger('esqueue:worker');
|
const debug = logger('esqueue:worker');
|
||||||
|
|
||||||
|
function formatJobObject(job) {
|
||||||
|
return {
|
||||||
|
index: job._index,
|
||||||
|
type: job._type,
|
||||||
|
id: job._id,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
export default class Job extends events.EventEmitter {
|
export default class Job extends events.EventEmitter {
|
||||||
constructor(queue, type, workerFn, opts = {}) {
|
constructor(queue, type, workerFn, opts = {}) {
|
||||||
if (typeof type !== 'string') throw new Error('Type must be a string');
|
if (typeof type !== 'string') throw new Error('Type must be a string');
|
||||||
@@ -35,6 +43,25 @@ export default class Job extends events.EventEmitter {
|
|||||||
clearInterval(this._checker);
|
clearInterval(this._checker);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
toJSON() {
|
||||||
|
return {
|
||||||
|
id: this.id,
|
||||||
|
index: this.queue.index,
|
||||||
|
jobType: this.jobType,
|
||||||
|
doctype: this.doctype,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
_formatErrorParams(err, job) {
|
||||||
|
const response = {
|
||||||
|
error: err,
|
||||||
|
worker: this.toJSON(),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (job) response.job = formatJobObject(job);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
_claimJob(job) {
|
_claimJob(job) {
|
||||||
const m = moment();
|
const m = moment();
|
||||||
const startTime = m.toISOString();
|
const startTime = m.toISOString();
|
||||||
@@ -69,7 +96,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
if (err.statusCode === 409) return true;
|
if (err.statusCode === 409) return true;
|
||||||
this.debug(`_claimJob failed on job ${job._id}`, err);
|
this.debug(`_claimJob failed on job ${job._id}`, err);
|
||||||
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, err);
|
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
|
||||||
return false;
|
return false;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -98,7 +125,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
if (err.statusCode === 409) return true;
|
if (err.statusCode === 409) return true;
|
||||||
this.debug(`_failJob failed to update job ${job._id}`, err);
|
this.debug(`_failJob failed to update job ${job._id}`, err);
|
||||||
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, err);
|
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||||
return false;
|
return false;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -151,6 +178,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
// job execution was successful
|
// job execution was successful
|
||||||
this.debug(`Completed job ${job._id}`);
|
this.debug(`Completed job ${job._id}`);
|
||||||
|
|
||||||
|
const emitJob = formatJobObject(job);
|
||||||
const completedTime = moment().toISOString();
|
const completedTime = moment().toISOString();
|
||||||
const docOutput = this._formatOutput(output);
|
const docOutput = this._formatOutput(output);
|
||||||
|
|
||||||
@@ -170,7 +198,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
if (err.statusCode === 409) return false;
|
if (err.statusCode === 409) return false;
|
||||||
this.debug(`Failure saving job output ${job._id}`, err);
|
this.debug(`Failure saving job output ${job._id}`, err);
|
||||||
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, err);
|
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||||
});
|
});
|
||||||
}, (jobErr) => {
|
}, (jobErr) => {
|
||||||
if (!jobErr) {
|
if (!jobErr) {
|
||||||
@@ -182,12 +210,12 @@ export default class Job extends events.EventEmitter {
|
|||||||
// job execution failed
|
// job execution failed
|
||||||
if (jobErr.type === 'WorkerTimeoutError') {
|
if (jobErr.type === 'WorkerTimeoutError') {
|
||||||
this.debug(`Timeout on job ${job._id}`);
|
this.debug(`Timeout on job ${job._id}`);
|
||||||
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, jobErr);
|
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, this._formatErrorParams(jobErr, job));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this.debug(`Failure occurred on job ${job._id}`, jobErr);
|
this.debug(`Failure occurred on job ${job._id}`, jobErr);
|
||||||
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, jobErr);
|
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
|
||||||
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
|
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -288,7 +316,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
if (err.status === 404) return [];
|
if (err.status === 404) return [];
|
||||||
|
|
||||||
this.debug('job querying failed', err);
|
this.debug('job querying failed', err);
|
||||||
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, err);
|
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -289,7 +289,6 @@ describe('Worker class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('claiming a job', function () {
|
describe('claiming a job', function () {
|
||||||
let params;
|
let params;
|
||||||
let job;
|
let job;
|
||||||
@@ -391,12 +390,18 @@ describe('Worker class', function () {
|
|||||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||||
|
|
||||||
worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
|
worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
|
||||||
expect(err).to.have.property('statusCode', 401);
|
try {
|
||||||
|
expect(err).to.have.property('error');
|
||||||
|
expect(err).to.have.property('job');
|
||||||
|
expect(err).to.have.property('worker');
|
||||||
|
expect(err.error).to.have.property('statusCode', 401);
|
||||||
done();
|
done();
|
||||||
|
} catch (e) {
|
||||||
|
done(e);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
worker._claimJob(job);
|
worker._claimJob(job);
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('failing a job', function () {
|
describe('failing a job', function () {
|
||||||
@@ -462,8 +467,15 @@ describe('Worker class', function () {
|
|||||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||||
|
|
||||||
worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) {
|
worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) {
|
||||||
expect(err).to.have.property('statusCode', 401);
|
try {
|
||||||
|
expect(err).to.have.property('error');
|
||||||
|
expect(err).to.have.property('job');
|
||||||
|
expect(err).to.have.property('worker');
|
||||||
|
expect(err.error).to.have.property('statusCode', 401);
|
||||||
done();
|
done();
|
||||||
|
} catch (e) {
|
||||||
|
done(e);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
worker._failJob(job);
|
worker._failJob(job);
|
||||||
});
|
});
|
||||||
@@ -612,7 +624,10 @@ describe('Worker class', function () {
|
|||||||
|
|
||||||
worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||||
try {
|
try {
|
||||||
expect(err).to.have.property('type', 'UnspecifiedWorkerError');
|
expect(err).to.have.property('error');
|
||||||
|
expect(err).to.have.property('job');
|
||||||
|
expect(err).to.have.property('worker');
|
||||||
|
expect(err.error).to.have.property('type', 'UnspecifiedWorkerError');
|
||||||
done();
|
done();
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
done(e);
|
done(e);
|
||||||
@@ -654,7 +669,10 @@ describe('Worker class', function () {
|
|||||||
// check for timeout event
|
// check for timeout event
|
||||||
worker.once(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, (err) => {
|
worker.once(constants.EVENT_WORKER_JOB_TIMEOUT_ERROR, (err) => {
|
||||||
try {
|
try {
|
||||||
expect(err).to.have.property('type', 'WorkerTimeoutError');
|
expect(err).to.have.property('error');
|
||||||
|
expect(err).to.have.property('job');
|
||||||
|
expect(err).to.have.property('worker');
|
||||||
|
expect(err.error).to.have.property('type', 'WorkerTimeoutError');
|
||||||
performJobPromise.then(() => {
|
performJobPromise.then(() => {
|
||||||
sinon.assert.notCalled(failStub);
|
sinon.assert.notCalled(failStub);
|
||||||
done();
|
done();
|
||||||
|
|||||||
Reference in New Issue
Block a user