From 54d3be69297656ff9f6c5bbf023cd286298c528a Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Thu, 28 Apr 2016 17:21:03 -0700 Subject: [PATCH] don't clobber error output, append worker output --- src/worker.js | 65 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 58 insertions(+), 7 deletions(-) diff --git a/src/worker.js b/src/worker.js index d4b9c74..3d93fb6 100644 --- a/src/worker.js +++ b/src/worker.js @@ -73,12 +73,15 @@ export default class Job extends events.EventEmitter { _failJob(job, msg) { this.debug(`Failing job ${job._id}`); const doc = { - status: jobStatuses.JOB_STATUS_FAILED, - output: { + status: jobStatuses.JOB_STATUS_FAILED + }; + + if (!job._source.output) { + doc.output = { content_type: 'text/plain', content: msg - } - }; + }; + } return this.client.update({ index: job._index, @@ -94,8 +97,57 @@ export default class Job extends events.EventEmitter { } _performJob(job) { - this.debug(`Performing job ${job._id}`); - return Promise.reject('mock es failure'); + this.debug(`Starting job ${job._id}`); + + return Bluebird.try(() => { + return this.workerFn(job._source.payload); + }) + .then((output) => { + const unknownMime = false; + const docOutput = {}; + if (typeof output === 'object' && output.content) { + docOutput.content = output.content; + docOutput.content_type = output.content_type || unknownMime; + } else { + docOutput.content = output || ''; + docOutput.content_type = unknownMime; + } + + const doc = { + status: jobStatuses.JOB_STATUS_COMPLETED, + output: docOutput + }; + + return this.client.update({ + index: job._index, + type: job._type, + id: job._id, + version: job._version, + body: { doc } + }) + .catch((err) => { + if (err.statusCode === 409) return false; + throw err; + }); + }, (err) => { + console.error('err', err); + const doc = { + output: { + content_type: false, + content: err.toString() + } + }; + + return this.client.update({ + index: job._index, + type: job._type, + id: job._id, + version: job._version, + body: { doc } + }) + .catch(() => false) + .then(() => { throw err; }); + }); } _startJobPolling() { @@ -138,7 +190,6 @@ export default class Job extends events.EventEmitter { .then((claimedJobs) => { if (claimedJobs.length !== 1) return; const job = claimedJobs[0]; - this.debug(`Beginning work on ${job._id}`); return this._performJob(job); }) .finally(() => this._startJobPolling());