import events from 'events'; import Puid from 'puid'; import moment from 'moment'; import Bluebird from 'bluebird'; import logger from './helpers/logger'; import { jobStatuses } from './helpers/constants'; import { WorkerTimeoutError } from './helpers/errors'; const puid = new Puid(); const debug = logger('worker'); export default class Job extends events.EventEmitter { constructor(queue, type, workerFn, opts = {}) { if (typeof type !== 'string') throw new Error('Type must be a string'); if (typeof workerFn !== 'function') throw new Error('Worker must be a function'); super(); this.id = puid.generate(); this.queue = queue; this.client = this.queue.client; this.type = type; this.workerFn = workerFn; this.checkInterval = opts.interval || 1500; this.checkSize = opts.size || 10; this.debug = (...msg) => debug(...msg, `id: ${this.id}`); this._checker = false; this._startJobPolling(); } destroy() { clearInterval(this._checker); } _claimJob(job) { this.debug(`Attempting to claim job ${job._id}`); const m = moment(); const startTime = m.toISOString(); const expirationTime = m.add(job._source.timeout).toISOString(); const attempts = job._source.attempts + 1; if (attempts > job._source.max_attempts) { const msg = (!job._source.output) ? `Max attempts reached (${job._source.max_attempts})` : false; return this._failJob(job, msg) .then(() => false); } const doc = { attempts: attempts, started_at: startTime, process_expiration: expirationTime, status: jobStatuses.JOB_STATUS_PROCESSING, }; return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .then((response) => { const updatedJob = Object.assign({}, job, response); updatedJob._source = Object.assign({}, job._source, doc); return updatedJob; }) .catch((err) => { if (err.statusCode === 409) return false; throw err; }); } _failJob(job, output = false) { this.debug(`Failing job ${job._id}`); const completedTime = moment().toISOString(); const doc = { status: jobStatuses.JOB_STATUS_FAILED, completed_at: completedTime, }; if (output) { doc.output = this._formatOutput(output); } return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .catch((err) => { if (err.statusCode === 409) return true; throw err; }); } _formatOutput(output) { const unknownMime = false; const docOutput = {}; if (typeof output === 'object' && output.content) { docOutput.content = output.content; docOutput.content_type = output.content_type || unknownMime; } else { docOutput.content = output || ''; docOutput.content_type = unknownMime; } return docOutput; } _performJob(job) { this.debug(`Starting job ${job._id}`); const workerOutput = new Promise((resolve, reject) => { this.workerFn.call(null, job._source.payload, function (err, cbOutput) { if (err) return reject(err); resolve(cbOutput); }); setTimeout(function () { reject(new WorkerTimeoutError({ timeout: job._source.timeout, jobId: job._id, })); }, job._source.timeout); }); return workerOutput.then((output) => { // job execution was successful this.debug(`Completed job ${job._id}`); const completedTime = moment().toISOString(); const docOutput = this._formatOutput(output); const doc = { status: jobStatuses.JOB_STATUS_COMPLETED, completed_at: completedTime, output: docOutput }; return this.client.update({ index: job._index, type: job._type, id: job._id, version: job._version, body: { doc } }) .catch((err) => { if (err.statusCode === 409) return false; throw err; }); }, (jobErr) => { // job execution failed if (jobErr.type === 'WorkerTimeout') { this.debug(`Timeout on job ${job._id}`); return; } this.debug(`Failure occurred on job ${job._id}`); return this._failJob(job, jobErr.toString()) .catch(() => false) .then(() => { this.emit('job_error', jobErr); throw jobErr; }); }); } _startJobPolling() { this._checker = setInterval(() => { this._getPendingJobs() .then((jobs) => this._claimPendingJobs(jobs)); } , this.checkInterval); } _stopJobPolling() { clearInterval(this._checker); } _claimPendingJobs(jobs) { this._stopJobPolling(); let claimed = false; return Bluebird.mapSeries(jobs, (job) => { if (claimed) return false; return this._claimJob(job) .then((claimedJob) => { if (claimedJob !== false) { claimed = true; return claimedJob; } }) .catch((err) => { this.debug('Failed to claim outstanding jobs', err); this.emit('error', err); this.queue.emit('worker_error', { id: this.id, type: this.type, err }); throw err; }); }) .then((mappedJobs) => mappedJobs.filter(Boolean)) .then((claimedJobs) => { if (claimedJobs.length !== 1) return; const job = claimedJobs[0]; return this._performJob(job); }) .finally(() => this._startJobPolling()); } _getPendingJobs() { const nowTime = moment().toISOString(); const dateFilter = { range: { process_expiration: { lte: nowTime } } }; const query = { query: { bool: { should: [ { bool: { must: [{ term: { status: 'pending'} }] }}, { bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } } ] } }, sort: [ { priority: { order: 'asc' }}, { created_at: { order: 'asc' }} ], size: this.checkSize }; this.debug('querying for outstanding jobs'); return this.client.search({ index: `${this.queue.index}-*`, type: this.type, version: true, body: query }) .then((results) => { const jobs = results.hits.hits; this.debug(`${jobs.length} outstanding jobs returned`); return jobs; }) .catch((err) => { this.debug('job querying failed', err); this.emit('error', err); this.queue.emit('worker_error', { id: this.id, type: this.type, err }); throw err; }); } }