import events from 'events'; import Puid from 'puid'; import moment from 'moment'; import constants from './constants'; import logger from './helpers/logger'; import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors'; const puid = new Puid(); const debug = logger('esqueue:worker'); function formatJobObject(job) { return { index: job._index, type: job._type, id: job._id, }; } export default class Job extends events.EventEmitter { constructor(queue, type, workerFn, opts = {}) { if (typeof type !== 'string') throw new Error('Type must be a string'); if (typeof workerFn !== 'function') throw new Error('Worker must be a function'); super(); this.id = puid.generate(); this.queue = queue; this.client = opts.client || this.queue.client; this.jobtype = type; this.workerFn = workerFn; this.checkInterval = opts.interval || 1500; this.checkSize = opts.size || 10; this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE; this.debug = (...msg) => debug(...msg, `id: ${this.id}`); this._checker = false; this.debug(`Created worker for job type ${this.jobtype}`); this._startJobPolling(); } destroy() { clearInterval(this._checker); } toJSON() { return { id: this.id, index: this.queue.index, jobType: this.jobType, doctype: this.doctype, }; } emit(name, ...args) { super.emit(name, ...args); this.queue.emit(name, ...args); } _formatErrorParams(err, job) { const response = { error: err, worker: this.toJSON(), }; if (job) response.job = formatJobObject(job); return response; } _claimJob(job) { const m = moment(); const startTime = m.toISOString(); const expirationTime = m.add(job._source.timeout).toISOString(); const attempts = job._source.attempts + 1; if (attempts > job._source.max_attempts) { const msg = (!job._source.output) ? `Max attempts reached (${job._source.max_attempts})` : false; return this._failJob(job, msg) .then(() => false); } const doc = { attempts: attempts, started_at: startTime, process_expiration: expirationTime, status: constants.JOB_STATUS_PROCESSING, }; return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .then((response) => { const updatedJob = Object.assign({}, job, response); updatedJob._source = Object.assign({}, job._source, doc); return updatedJob; }) .catch((err) => { if (err.statusCode === 409) return true; this.debug(`_claimJob failed on job ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job)); return false; }); } _failJob(job, output = false) { this.debug(`Failing job ${job._id}`); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); const doc = { status: constants.JOB_STATUS_FAILED, completed_at: completedTime, output: docOutput }; return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .then(() => { const eventOutput = { job: formatJobObject(job), worker: this.toJSON(), output: docOutput, }; this.emit(constants.EVENT_WORKER_JOB_FAIL, eventOutput); return true; }) .catch((err) => { if (err.statusCode === 409) return true; this.debug(`_failJob failed to update job ${job._id}`, err); this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job)); return false; }); } _formatOutput(output) { const unknownMime = false; const defaultOutput = null; const docOutput = {}; if (typeof output === 'object' && output.content) { docOutput.content = output.content; docOutput.content_type = output.content_type || unknownMime; } else { docOutput.content = output || defaultOutput; docOutput.content_type = unknownMime; } return docOutput; } _performJob(job) { this.debug(`Starting job ${job._id}`); const workerOutput = new Promise((resolve, reject) => { // run the worker's workerFn let isResolved = false; Promise.resolve(this.workerFn.call(null, job._source.payload)) .then((res) => { isResolved = true; resolve(res); }) .catch((err) => { isResolved = true; reject(err); }); // fail if workerFn doesn't finish before timeout setTimeout(() => { if (isResolved) return; this.debug(`Timeout processing job ${job._id}`); reject(new WorkerTimeoutError({ timeout: job._source.timeout, jobId: job._id, })); }, job._source.timeout); }); return workerOutput.then((output) => { // job execution was successful this.debug(`Completed job ${job._id}`); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); const doc = { status: constants.JOB_STATUS_COMPLETED, completed_at: completedTime, output: docOutput }; return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .then(() => { const eventOutput = { job: formatJobObject(job), output: docOutput, }; this.emit(constants.EVENT_WORKER_COMPLETE, eventOutput); }) .catch((err) => { if (err.statusCode === 409) return false; this.debug(`Failure saving job output ${job._id}`, err); this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job)); }); }, (jobErr) => { if (!jobErr) { jobErr = new UnspecifiedWorkerError({ jobId: job._id, }); } // job execution failed if (jobErr.type === 'WorkerTimeoutError') { this.debug(`Timeout on job ${job._id}`); this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job)); return; } this.debug(`Failure occurred on job ${job._id}`, jobErr); this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job)); return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false); }); } _startJobPolling() { this._checker = setInterval(() => { this._getPendingJobs() .then((jobs) => this._claimPendingJobs(jobs)); } , this.checkInterval); } _stopJobPolling() { clearInterval(this._checker); } _claimPendingJobs(jobs) { if (!jobs || jobs.length === 0) return; this._stopJobPolling(); let claimed = false; // claim a single job, stopping after first successful claim return jobs.reduce((chain, job) => { return chain.then((claimedJob) => { // short-circuit the promise chain if a job has been claimed if (claimed) return claimedJob; return this._claimJob(job) .then((claimResult) => { if (claimResult !== false) { claimed = true; return claimResult; } }); }); }, Promise.resolve()) .then((claimedJob) => { if (!claimedJob) { this.debug(`All ${jobs.length} jobs already claimed`); return; } this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) .then(() => this._startJobPolling()) .catch((err) => { this.debug('Error claiming jobs', err); this._startJobPolling(); }); } _getPendingJobs() { const nowTime = moment().toISOString(); const query = { _source : { exclude: [ 'output.content' ] }, query: { constant_score: { filter: { bool: { filter: { term: { jobtype: this.jobtype } }, should: [ { term: { status: 'pending'} }, { bool: { filter: [ { term: { status: 'processing' } }, { range: { process_expiration: { lte: nowTime } } } ] } } ] } } } }, sort: [ { priority: { order: 'asc' }}, { created_at: { order: 'asc' }} ], size: this.checkSize }; this.debug('querying for outstanding jobs'); return this.client.search({ index: `${this.queue.index}-*`, type: this.doctype, version: true, body: query }) .then((results) => { const jobs = results.hits.hits; this.debug(`${jobs.length} outstanding jobs returned`); return jobs; }) .catch((err) => { // ignore missing indices errors if (err.status === 404) return []; this.debug('job querying failed', err); this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err)); }); } }