diff --git a/src/worker.js b/src/worker.js index 0484929..9f02ccc 100644 --- a/src/worker.js +++ b/src/worker.js @@ -1,8 +1,12 @@ import events from 'events'; import Puid from 'puid'; +import moment from 'moment'; +import Bluebird from 'bluebird'; +import logger from './helpers/logger'; import { jobStatuses } from './helpers/constants'; const puid = new Puid(); +const debug = logger('worker'); export default class Job extends events.EventEmitter { constructor(queue, type, workerFn, opts = {}) { @@ -18,48 +22,175 @@ export default class Job extends events.EventEmitter { this.workerFn = workerFn; this.checkInterval = opts.interval || 1500; - this._processJobs(); - this._checker = setInterval(this._processJobs, this.checkInterval); + this.debug = (...msg) => debug(...msg, `id: ${this.id}`); + + // this._pollForPendingJobs(); + this._checker = setInterval(() => this._pollForPendingJobs(), this.checkInterval); } destroy() { clearInterval(this._checker); } - _processJobs() { + _claimJob(job) { + this.debug(`Attempting to claim job ${job._id}`); + const m = moment(); + const startTime = m.toISOString(); + const expirationTime = m.add(job._source.timeout).toISOString(); + const attempts = job._source.attempts + 1; + + if (attempts > job._source.max_attempts) { + return this._failJob(job, `Max attempts reached (${job._source.max_attempts})`) + .then(() => false); + } + + const doc = { + attempts: attempts, + started_at: startTime, + process_expiration: expirationTime, + status: jobStatuses.JOB_STATUS_PROCESSING, + }; + + return this.client.update({ + index: job._index, + type: job._type, + id: job._id, + version: job._version, + body: { doc } + }) + .then((response) => { + const updatedJob = Object.assign({}, job, response); + updatedJob._source = Object.assign({}, job._source, doc); + return updatedJob; + }) + .catch((err) => { + if (err.statusCode === 409) return false; + throw err; + }); + } + + _failJob(job, msg) { + this.debug(`Failing job ${job._id}`); + const doc = { + status: jobStatuses.JOB_STATUS_FAILED, + output: { + content_type: 'text/plain', + content: msg + } + }; + + return this.client.update({ + index: job._index, + type: job._type, + id: job._id, + version: job._version, + body: { doc } + }) + .catch((err) => { + if (err.statusCode === 409) { + return this.client.get({ + index: job._index, + type: job._type, + id: job._id + }) + .then((jobDoc) => { + if (jobDoc._source.status === jobStatuses.JOB_STATUS_FAILED) return false; + throw err; + }); + } + + throw err; + }); + } + + _performJob(job) { + this.debug(`Performing job ${job._id}`); + return Promise.reject('mock es failure'); + } + + _pollForPendingJobs() { + this._getPendingJobs().then((jobs) => this._claimPendingJobs(jobs)); + } + + _claimPendingJobs(jobs) { + let claimed = false; + Bluebird.mapSeries(jobs, (job) => { + if (claimed) return false; + + return this._claimJob(job) + .then((claimedJob) => { + if (claimedJob !== false) { + claimed = true; + return claimedJob; + } + }) + .catch((err) => { + this.debug('Failed to claim outstanding jobs', err); + this.emit('error', err); + this.queue.emit('worker_error', { + id: this.id, + type: this.type, + err + }); + throw err; + }); + }) + .then((mappedJobs) => mappedJobs.filter(Boolean)) + .then((claimedJobs) => { + if (claimedJobs.length !== 1) return; + const job = claimedJobs[0]; + this.debug(`Beginning work on ${job._id}`); + return this._performJob(job); + }); + } + + _getPendingJobs() { + const nowTime = moment().toISOString(); + const dateFilter = { + range: { + process_expiration: { + lte: nowTime + } + } + }; const query = { query: { bool: { - must: [{ term: { _type: 'example' } }], should: [ - { bool: { - must: [{ term: { status: 'pending'} }] - }}, - { bool: { - must: [ - { term: { status: 'processing'}} - ], - filter: { - range: { - process_expiration: { - lte: '2016-04-26T23:40:47.820Z' - } - } - } - }} + { bool: { must: [{ term: { status: 'pending'} }] }}, + { bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } } ] } }, sort: [ { priority: { order: 'asc' }}, { created_at: { order: 'asc' }} - ] + ], + size: 10 }; + this.debug('querying for outstanding jobs'); + return this.client.search({ - body: { - query: query - } + index: `${this.queue.index}-*`, + type: this.type, + version: true, + body: query + }) + .then((results) => { + const jobs = results.hits.hits; + this.debug(`${jobs.length} outstanding jobs returned`); + return jobs; + }) + .catch((err) => { + this.debug('job querying failed', err); + this.emit('error', err); + this.queue.emit('worker_error', { + id: this.id, + type: this.type, + err + }); + throw err; }); } } \ No newline at end of file