From 696cf78464ba600a9c85871cac94f8103208f0c5 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Mon, 2 May 2016 17:51:06 -0700 Subject: [PATCH] handle worker timeouts --- package.json | 1 + src/helpers/errors.js | 12 ++++++++++++ src/worker.js | 26 +++++++++++++++++++++++--- 3 files changed, 36 insertions(+), 3 deletions(-) create mode 100644 src/helpers/errors.js diff --git a/package.json b/package.json index 6dacbf7..0ceb5c4 100644 --- a/package.json +++ b/package.json @@ -33,6 +33,7 @@ "bluebird": "~3.3.5", "debug": "~2.2.0", "elasticsearch": "~11.0.1", + "error": "~7.0.2", "in-publish": "~2.0.0", "lodash": "~4.11.1", "moment": "~2.10.6", diff --git a/src/helpers/errors.js b/src/helpers/errors.js new file mode 100644 index 0000000..00e3e7a --- /dev/null +++ b/src/helpers/errors.js @@ -0,0 +1,12 @@ +import typedError from 'error/typed'; + +const errors = {}; + +errors.WorkerTimeoutError = typedError({ + type: 'WorkerTimeout', + message: 'worker timed out, timeout={timeout}', + timeout: null, + jobId: null +}); + +export default errors; diff --git a/src/worker.js b/src/worker.js index 1a3f38f..826b695 100644 --- a/src/worker.js +++ b/src/worker.js @@ -4,6 +4,7 @@ import moment from 'moment'; import Bluebird from 'bluebird'; import logger from './helpers/logger'; import { jobStatuses } from './helpers/constants'; +import { WorkerTimeoutError } from './helpers/errors'; const puid = new Puid(); const debug = logger('worker'); @@ -115,8 +116,22 @@ export default class Job extends events.EventEmitter { _performJob(job) { this.debug(`Starting job ${job._id}`); - return Bluebird.fromCallback((cb) => this.workerFn(job._source.payload, cb)) - .then((output) => { + const workerOutput = new Promise((resolve, reject) => { + this.workerFn.call(null, job._source.payload, function (err, cbOutput) { + if (err) return reject(err); + resolve(cbOutput); + }); + + setTimeout(function () { + reject(new WorkerTimeoutError({ + timeout: job._source.timeout, + jobId: job._id, + })); + }, job._source.timeout); + }); + + return workerOutput.then((output) => { + // job execution was successful this.debug(`Completed job ${job._id}`); const completedTime = moment().toISOString(); @@ -140,8 +155,13 @@ export default class Job extends events.EventEmitter { throw err; }); }, (jobErr) => { - this.debug(`Failure occurred during job ${job._id}`); + // job execution failed + if (jobErr.type === 'WorkerTimeout') { + this.debug(`Timeout on job ${job._id}`); + return; + } + this.debug(`Failure occurred on job ${job._id}`); return this._failJob(job, jobErr.toString()) .catch(() => false) .then(() => {