diff --git a/src/worker.js b/src/worker.js index 1377d28..1a3f38f 100644 --- a/src/worker.js +++ b/src/worker.js @@ -71,17 +71,17 @@ export default class Job extends events.EventEmitter { }); } - _failJob(job, msg = false) { + _failJob(job, output = false) { this.debug(`Failing job ${job._id}`); + + const completedTime = moment().toISOString(); const doc = { - status: jobStatuses.JOB_STATUS_FAILED + status: jobStatuses.JOB_STATUS_FAILED, + completed_at: completedTime, }; - if (msg) { - doc.output = { - content_type: 'text/plain', - content: msg - }; + if (output) { + doc.output = this._formatOutput(output); } return this.client.update({ @@ -97,22 +97,30 @@ export default class Job extends events.EventEmitter { }); } + _formatOutput(output) { + const unknownMime = false; + const docOutput = {}; + + if (typeof output === 'object' && output.content) { + docOutput.content = output.content; + docOutput.content_type = output.content_type || unknownMime; + } else { + docOutput.content = output || ''; + docOutput.content_type = unknownMime; + } + + return docOutput; + } + _performJob(job) { this.debug(`Starting job ${job._id}`); return Bluebird.fromCallback((cb) => this.workerFn(job._source.payload, cb)) .then((output) => { - const completedTime = moment().toISOString(); - const unknownMime = false; - const docOutput = {}; + this.debug(`Completed job ${job._id}`); - if (typeof output === 'object' && output.content) { - docOutput.content = output.content; - docOutput.content_type = output.content_type || unknownMime; - } else { - docOutput.content = output || ''; - docOutput.content_type = unknownMime; - } + const completedTime = moment().toISOString(); + const docOutput = this._formatOutput(output); const doc = { status: jobStatuses.JOB_STATUS_COMPLETED, @@ -132,25 +140,14 @@ export default class Job extends events.EventEmitter { throw err; }); }, (jobErr) => { - const completedTime = moment().toISOString(); - const doc = { - status: jobStatuses.JOB_STATUS_FAILED, - completed_at: completedTime, - output: { - content_type: false, - content: jobErr.toString() - } - }; + this.debug(`Failure occurred during job ${job._id}`); - return this.client.update({ - index: job._index, - type: job._type, - id: job._id, - version: job._version, - body: { doc } - }) + return this._failJob(job, jobErr.toString()) .catch(() => false) - .then(() => { throw jobErr; }); + .then(() => { + this.emit('job_error', jobErr); + throw jobErr; + }); }); }