diff --git a/.travis.yml b/.travis.yml index 0456315..e71fa01 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,18 @@ language: node_js + node_js: + - "stable" + - "5" - "4" - "4.3" + notifications: email: false + +before_script: npm install + after_success: npm run coverage + +branches: + only: + - master \ No newline at end of file diff --git a/readme.md b/readme.md index 66ad14e..31ff097 100644 --- a/readme.md +++ b/readme.md @@ -33,6 +33,7 @@ Option | Default | Description ------ | ----------- | ------- interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week` timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it. +doctype | `esqueue` | The doctype to use in Elasticsearch client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead. diff --git a/src/helpers/constants.js b/src/helpers/constants.js index b81294f..f764f8c 100644 --- a/src/helpers/constants.js +++ b/src/helpers/constants.js @@ -6,4 +6,10 @@ export const jobStatuses = { JOB_STATUS_CANCELLED: 'cancelled', }; -export default Object.assign({}, jobStatuses); \ No newline at end of file +export const defaultSettings = { + DEFAULT_SETTING_TIMEOUT: 10000, + DEFAULT_SETTING_INTERVAL: 'week', + DEFAULT_SETTING_DOCTYPE: 'esqueue', +}; + +export default Object.assign({}, jobStatuses, defaultSettings); \ No newline at end of file diff --git a/src/helpers/create_index.js b/src/helpers/create_index.js index acf1b0f..9975103 100644 --- a/src/helpers/create_index.js +++ b/src/helpers/create_index.js @@ -1,4 +1,7 @@ -var schema = { +import { defaultSettings } from './constants'; + +const schema = { + jobtype: { type: 'string', index: 'not_analyzed' }, payload: { type: 'object', enabled: false }, priority: { type: 'short' }, timeout: { type: 'long' }, @@ -9,7 +12,7 @@ var schema = { completed_at: { type: 'date' }, attempts: { type: 'short' }, max_attempts: { type: 'short' }, - status: { type: 'keyword' }, + status: { type: 'string', index: 'not_analyzed' }, output: { type: 'object', properties: { @@ -19,14 +22,9 @@ var schema = { } }; -export default function createIndex(client, indexName) { - const indexBody = { - mappings: { - _default_: { - properties: schema - } - } - }; +export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) { + const indexBody = { mappings : {} }; + indexBody.mappings[doctype] = { properties: schema }; return client.indices.exists({ index: indexName, diff --git a/src/index.js b/src/index.js index 18bf795..e1e676a 100644 --- a/src/index.js +++ b/src/index.js @@ -2,6 +2,7 @@ import events from 'events'; import createClient from './helpers/es_client'; import indexTimestamp from './helpers/index_timestamp'; import logger from './helpers/logger'; +import { defaultSettings } from './helpers/constants'; import Job from './job.js'; import Worker from './worker.js'; import omit from 'lodash.omit'; @@ -15,8 +16,9 @@ export default class Esqueue extends events.EventEmitter { super(); this.index = index; this.settings = Object.assign({ - interval: 'week', - timeout: 10000, + interval: defaultSettings.DEFAULT_SETTING_INTERVAL, + timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT, + doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE, }, omit(options, [ 'client' ])); this.client = createClient(options.client || {}); @@ -25,7 +27,7 @@ export default class Esqueue extends events.EventEmitter { } _initTasks() { - var initTasks = [ + const initTasks = [ this.client.ping({ timeout: 3000 }), ]; @@ -38,10 +40,13 @@ export default class Esqueue extends events.EventEmitter { addJob(type, payload, opts = {}) { const timestamp = indexTimestamp(this.settings.interval); const index = `${this.index}-${timestamp}`; + const defaults = { + timeout: this.settings.timeout, + }; - const options = Object.assign({ - timeout: this.settings.timeout - }, opts); + const options = Object.assign(defaults, opts, { + doctype: this.settings.doctype + }); return new Job(this.client, index, type, payload, options); } diff --git a/src/job.js b/src/job.js index 1a53e1e..49aeac0 100644 --- a/src/job.js +++ b/src/job.js @@ -2,7 +2,7 @@ import events from 'events'; import isPlainObject from 'lodash.isplainobject'; import Puid from 'puid'; import logger from './helpers/logger'; -import { jobStatuses } from './helpers/constants'; +import contstants from './helpers/constants'; import createIndex from './helpers/create_index'; const debug = logger('esqueue:job'); @@ -18,21 +18,23 @@ export default class Job extends events.EventEmitter { this.client = client; this.id = puid.generate(); this.index = index; - this.type = type; + this.jobtype = type; this.payload = payload; this.timeout = options.timeout || 10000; this.maxAttempts = options.max_attempts || 3; this.priority = Math.max(Math.min(options.priority || 10, 20), -20); + this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE; this.debug = (...msg) => debug(...msg, `id: ${this.id}`); - this.ready = createIndex(client, index) + this.ready = createIndex(client, index, this.doctype) .then(() => { return this.client.index({ index: this.index, - type: this.type, + type: this.doctype, id: this.id, body: { + jobtype: this.jobtype, payload: this.payload, priority: this.priority, timeout: this.timeout, @@ -40,7 +42,7 @@ export default class Job extends events.EventEmitter { created_at: new Date(), attempts: 0, max_attempts: this.maxAttempts, - status: jobStatuses.JOB_STATUS_PENDING, + status: contstants.JOB_STATUS_PENDING, } }) .then((doc) => { @@ -65,7 +67,7 @@ export default class Job extends events.EventEmitter { .then(() => { return this.client.get({ index: this.index, - type: this.type, + type: this.doctype, id: this.id }); }) @@ -83,7 +85,8 @@ export default class Job extends events.EventEmitter { return Object.assign({ id: this.id, index: this.index, - type: this.type, + type: this.doctype, + jobtype: this.jobtype, payload: this.payload, timeout: this.timeout, max_attempts: this.maxAttempts, diff --git a/src/worker.js b/src/worker.js index d90587a..b0628b3 100644 --- a/src/worker.js +++ b/src/worker.js @@ -2,7 +2,7 @@ import events from 'events'; import Puid from 'puid'; import moment from 'moment'; import logger from './helpers/logger'; -import { jobStatuses } from './helpers/constants'; +import constants from './helpers/constants'; import { WorkerTimeoutError } from './helpers/errors'; const puid = new Puid(); @@ -18,10 +18,11 @@ export default class Job extends events.EventEmitter { this.id = puid.generate(); this.queue = queue; this.client = this.queue.client; - this.type = type; + this.jobtype = type; this.workerFn = workerFn; this.checkInterval = opts.interval || 1500; this.checkSize = opts.size || 10; + this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE; this.debug = (...msg) => debug(...msg, `id: ${this.id}`); @@ -50,7 +51,7 @@ export default class Job extends events.EventEmitter { attempts: attempts, started_at: startTime, process_expiration: expirationTime, - status: jobStatuses.JOB_STATUS_PROCESSING, + status: constants.JOB_STATUS_PROCESSING, }; return this.client.update({ @@ -76,7 +77,7 @@ export default class Job extends events.EventEmitter { const completedTime = moment().toISOString(); const doc = { - status: jobStatuses.JOB_STATUS_FAILED, + status: constants.JOB_STATUS_FAILED, completed_at: completedTime, }; @@ -136,7 +137,7 @@ export default class Job extends events.EventEmitter { const docOutput = this._formatOutput(output); const doc = { - status: jobStatuses.JOB_STATUS_COMPLETED, + status: constants.JOB_STATUS_COMPLETED, completed_at: completedTime, output: docOutput }; @@ -226,20 +227,23 @@ export default class Job extends events.EventEmitter { _getPendingJobs() { const nowTime = moment().toISOString(); - const dateFilter = { - range: { - process_expiration: { - lte: nowTime - } - } - }; const query = { query: { - bool: { - should: [ - { bool: { must: [{ term: { status: 'pending'} }] }}, - { bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } } - ] + constant_score: { + filter: { + bool: { + must: { term: { jobtype: this.jobtype } }, + should: [ + { term: { status: 'pending'} }, + { bool: + { must: [ + { term: { status: 'processing' } }, + { range: { process_expiration: { lte: nowTime } } } + ] } + } + ] + } + } } }, sort: [ @@ -253,7 +257,7 @@ export default class Job extends events.EventEmitter { return this.client.search({ index: `${this.queue.index}-*`, - type: this.type, + type: this.doctype, version: true, body: query }) diff --git a/test/fixtures/elasticsearch.js b/test/fixtures/elasticsearch.js index 620bb00..8112c20 100644 --- a/test/fixtures/elasticsearch.js +++ b/test/fixtures/elasticsearch.js @@ -1,5 +1,6 @@ import { uniqueId, times, random } from 'lodash'; import elasticsearch from 'elasticsearch'; +import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants'; function Client() { this.indices = { @@ -11,10 +12,10 @@ function Client() { } Client.prototype.index = function (params = {}) { - var shardCount = 2; + const shardCount = 2; return Promise.resolve({ _index: params.index || 'index', - _type: params.type || 'type', + _type: params.type || DEFAULT_SETTING_DOCTYPE, _id: params.id || uniqueId('testDoc'), _version: 1, _shards: { total: shardCount, successful: shardCount, failed: 0 }, @@ -30,6 +31,7 @@ Client.prototype.get = function (params = {}, source = {}) { if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound; const _source = Object.assign({ + jobtype: 'jobtype', payload: { id: 'sample-job-1', now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' @@ -44,7 +46,7 @@ Client.prototype.get = function (params = {}, source = {}) { return { _index: params.index || 'index', - _type: params.type || 'type', + _type: params.type || DEFAULT_SETTING_DOCTYPE, _id: params.id || 'AVRPRLnlp7Ur1SZXfT-T', _version: params.version || 1, found: true, @@ -56,7 +58,7 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) { const hits = times(count, () => { return { _index: params.index || 'index', - _type: params.type || 'type', + _type: params.type || DEFAULT_SETTING_DOCTYPE, _id: uniqueId('documentId'), _version: random(1, 5), _score: null, @@ -83,10 +85,10 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) { }; Client.prototype.update = function (params = {}) { - var shardCount = 2; + const shardCount = 2; return Promise.resolve({ _index: params.index || 'index', - _type: params.type || 'type', + _type: params.type || DEFAULT_SETTING_DOCTYPE, _id: params.id || uniqueId('testDoc'), _version: params.version + 1 || 2, _shards: { total: shardCount, successful: shardCount, failed: 0 }, diff --git a/test/src/helpers/create_index.js b/test/src/helpers/create_index.js index 09e26e1..a0dfc12 100644 --- a/test/src/helpers/create_index.js +++ b/test/src/helpers/create_index.js @@ -2,6 +2,7 @@ import expect from 'expect.js'; import sinon from 'sinon'; import createIndex from '../../../lib/helpers/create_index'; import elasticsearchMock from '../../fixtures/elasticsearch'; +import { defaultSettings } from '../../../lib/helpers/constants'; describe('Create Index', function () { let client; @@ -23,8 +24,9 @@ describe('Create Index', function () { }); }); - it('should create the default mappings', function () { + it('should create the type mappings', function () { const indexName = 'test-index'; + const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE; const result = createIndex(client, indexName); return result @@ -33,8 +35,24 @@ describe('Create Index', function () { sinon.assert.callCount(createSpy, 1); expect(payload).to.have.property('body'); expect(payload.body).to.have.property('mappings'); - expect(payload.body.mappings).to.have.property('_default_'); - expect(payload.body.mappings._default_).to.have.property('properties'); + expect(payload.body.mappings).to.have.property(docType); + expect(payload.body.mappings[docType]).to.have.property('properties'); + }); + }); + + it('should accept a custom doctype', function () { + const indexName = 'test-index'; + const docType = 'my_type'; + const result = createIndex(client, indexName, docType); + + return result + .then(function () { + const payload = createSpy.getCall(0).args[0]; + sinon.assert.callCount(createSpy, 1); + expect(payload).to.have.property('body'); + expect(payload.body).to.have.property('mappings'); + expect(payload.body.mappings).to.have.property(docType); + expect(payload.body.mappings[docType]).to.have.property('properties'); }); }); }); diff --git a/test/src/helpers/index_timestamp.js b/test/src/helpers/index_timestamp.js index 7f1a24b..01c92c4 100644 --- a/test/src/helpers/index_timestamp.js +++ b/test/src/helpers/index_timestamp.js @@ -25,32 +25,32 @@ describe('Index interval', function () { }); it('should return the year', function () { - var timestamp = indexTimestamp('year'); + const timestamp = indexTimestamp('year'); expect(timestamp).to.equal('2016'); }); it('should return the year and month', function () { - var timestamp = indexTimestamp('month'); + const timestamp = indexTimestamp('month'); expect(timestamp).to.equal('2016-04'); }); it('should return the year, month, and first day of the week', function () { - var timestamp = indexTimestamp('week'); + const timestamp = indexTimestamp('week'); expect(timestamp).to.equal('2016-03-27'); }); it('should return the year, month, and day of the week', function () { - var timestamp = indexTimestamp('day'); + const timestamp = indexTimestamp('day'); expect(timestamp).to.equal('2016-04-02'); }); it('should return the year, month, day and hour', function () { - var timestamp = indexTimestamp('hour'); + const timestamp = indexTimestamp('hour'); expect(timestamp).to.equal('2016-04-02-01'); }); it('should return the year, month, day, hour and minute', function () { - var timestamp = indexTimestamp('minute'); + const timestamp = indexTimestamp('minute'); expect(timestamp).to.equal('2016-04-02-01-02'); }); }); diff --git a/test/src/job.js b/test/src/job.js index 6bd5e0d..c0d6dea 100644 --- a/test/src/job.js +++ b/test/src/job.js @@ -3,7 +3,7 @@ import expect from 'expect.js'; import sinon from 'sinon'; import proxyquire from 'proxyquire'; import elasticsearchMock from '../fixtures/elasticsearch'; -import { JOB_STATUS_PENDING } from '../../lib/helpers/constants'; +import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants'; const createIndexMock = sinon.stub().returns(Promise.resolve('mock')); const module = proxyquire.noPreserveCache()('../../lib/job', { @@ -73,12 +73,23 @@ describe('Job Class', function () { return job.ready.then(() => { const newDoc = validateDoc(client.index); expect(newDoc).to.have.property('index', index); - expect(newDoc).to.have.property('type', type); + expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE); expect(newDoc).to.have.property('body'); expect(newDoc.body).to.have.property('payload', payload); }); }); + it('should index the job type', function () { + const job = new Job(client, index, type, payload); + return job.ready.then(() => { + const newDoc = validateDoc(client.index); + expect(newDoc).to.have.property('index', index); + expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE); + expect(newDoc).to.have.property('body'); + expect(newDoc.body).to.have.property('jobtype', type); + }); + }); + it('should index timeout value from options', function () { const job = new Job(client, index, type, payload, options); return job.ready.then(() => { @@ -95,6 +106,16 @@ describe('Job Class', function () { }); }); + it('should set an expired process_expiration time', function () { + const now = new Date().getTime(); + const job = new Job(client, index, type, payload, options); + return job.ready.then(() => { + const newDoc = validateDoc(client.index); + expect(newDoc.body).to.have.property('process_expiration'); + expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now); + }); + }); + it('should set attempt count', function () { const job = new Job(client, index, type, payload, options); return job.ready.then(() => { @@ -152,14 +173,17 @@ describe('Job Class', function () { it('should return the job document', function () { const job = new Job(client, index, type, payload); + return job.get() .then((doc) => { const jobDoc = job.document; // document should be resolved expect(doc).to.have.property('index', index); - expect(doc).to.have.property('type', type); + expect(doc).to.have.property('type', jobDoc.type); expect(doc).to.have.property('id', jobDoc.id); expect(doc).to.have.property('version', jobDoc.version); + expect(doc).to.have.property('payload'); + expect(doc).to.have.property('jobtype'); expect(doc).to.have.property('priority'); expect(doc).to.have.property('timeout'); }); @@ -185,7 +209,8 @@ describe('Job Class', function () { const doc = job.toJSON(); expect(doc).to.have.property('index', index); - expect(doc).to.have.property('type', type); + expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE); + expect(doc).to.have.property('jobtype', type); expect(doc).to.have.property('timeout', options.timeout); expect(doc).to.have.property('max_attempts', options.max_attempts); expect(doc).to.have.property('priority', options.priority); diff --git a/test/src/worker.js b/test/src/worker.js index b80ce1e..43c7f90 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -1,10 +1,10 @@ import expect from 'expect.js'; import sinon from 'sinon'; import moment from 'moment'; -import { noop, random } from 'lodash'; +import { noop, random, get, find } from 'lodash'; import Worker from '../../lib/worker'; import elasticsearchMock from '../fixtures/elasticsearch'; -import { JOB_STATUS_PROCESSING, JOB_STATUS_COMPLETED, JOB_STATUS_FAILED } from '../../lib/helpers/constants'; +import constants from '../../lib/helpers/constants'; const anchor = '2016-04-02T01:02:03.456'; // saturday const defaults = { @@ -106,48 +106,102 @@ describe('Worker class', function () { }); describe('searching for jobs', function () { + let searchSpy; + + function getSearchParams(jobtype, params = {}) { + new Worker(mockQueue, jobtype, noop, params); + clock.tick(defaults.interval); + return searchSpy.firstCall.args[0]; + } + beforeEach(() => { anchorMoment = moment(anchor); clock = sinon.useFakeTimers(anchorMoment.valueOf()); + searchSpy = sinon.spy(mockQueue.client, 'search'); }); afterEach(() => { clock.restore(); }); - it('should start polling for jobs after interval', function () { - const searchSpy = sinon.spy(mockQueue.client, 'search'); - new Worker(mockQueue, 'test', noop); - sinon.assert.notCalled(searchSpy); - clock.tick(defaults.interval); - sinon.assert.calledOnce(searchSpy); + describe('polling interval', function () { + it('should start polling for jobs after interval', function () { + new Worker(mockQueue, 'test', noop); + sinon.assert.notCalled(searchSpy); + clock.tick(defaults.interval); + sinon.assert.calledOnce(searchSpy); + }); + + it('should use interval option to control polling', function () { + const interval = 567; + new Worker(mockQueue, 'test', noop, { interval }); + sinon.assert.notCalled(searchSpy); + clock.tick(interval); + sinon.assert.calledOnce(searchSpy); + }); }); - it('should use interval option to control polling', function () { - const interval = 567; - const searchSpy = sinon.spy(mockQueue.client, 'search'); - new Worker(mockQueue, 'test', noop, { interval }); - sinon.assert.notCalled(searchSpy); - clock.tick(interval); - sinon.assert.calledOnce(searchSpy); + describe('query parameters', function () { + it('should query with version', function () { + const params = getSearchParams('test'); + expect(params).to.have.property('version', true); + }); + + it('should query by default doctype', function () { + const params = getSearchParams('test'); + expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE); + }); + + it('should query by custom doctype', function () { + const doctype = 'custom_test'; + const params = getSearchParams('test', { doctype }); + expect(params).to.have.property('type', doctype); + }); }); - it('should use default size', function () { - const searchSpy = sinon.spy(mockQueue.client, 'search'); - new Worker(mockQueue, 'test', noop); - clock.tick(defaults.interval); - const body = searchSpy.firstCall.args[0].body; - expect(body).to.have.property('size', defaults.size); + describe('query body', function () { + const conditionPath = 'query.constant_score.filter.bool'; + const jobtype = 'test_jobtype'; + + it('should search by job type', function () { + const { body } = getSearchParams(jobtype); + const conditions = get(body, conditionPath); + expect(conditions).to.have.property('must'); + expect(conditions.must).to.eql({ term: { jobtype: jobtype } }); + }); + + it('should search for pending or expired jobs', function () { + const { body } = getSearchParams(jobtype); + const conditions = get(body, conditionPath); + expect(conditions).to.have.property('should'); + + // this works because we are stopping the clock, so all times match + const nowTime = moment().toISOString(); + const pending = { term: { status: 'pending'} }; + const expired = { bool: { must: [ + { term: { status: 'processing' } }, + { range: { process_expiration: { lte: nowTime } } } + ] } }; + + const pendingMatch = find(conditions.should, pending); + expect(pendingMatch).to.not.be(undefined); + + const expiredMatch = find(conditions.should, expired); + expect(expiredMatch).to.not.be(undefined); + }); + + it('should use default size', function () { + const { body } = getSearchParams(jobtype); + expect(body).to.have.property('size', defaults.size); + }); + + it('should observe the size option', function () { + const size = 25; + const { body } = getSearchParams(jobtype, { size }); + expect(body).to.have.property('size', size); + }); }); - it('should observe the size option', function () { - const size = 25; - const searchSpy = sinon.spy(mockQueue.client, 'search'); - new Worker(mockQueue, 'test', noop, { size }); - clock.tick(defaults.interval); - const body = searchSpy.firstCall.args[0].body; - expect(body).to.have.property('size', size); - }); }); describe('claiming a job', function () { @@ -193,7 +247,7 @@ describe('Worker class', function () { it('should update the job status', function () { worker._claimJob(job); const doc = updateSpy.firstCall.args[0].body.doc; - expect(doc).to.have.property('status', JOB_STATUS_PROCESSING); + expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING); }); it('should set job expiration time', function () { @@ -267,7 +321,7 @@ describe('Worker class', function () { it('should set status to failed', function () { worker._failJob(job); const doc = updateSpy.firstCall.args[0].body.doc; - expect(doc).to.have.property('status', JOB_STATUS_FAILED); + expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED); }); it('should append error message if supplied', function () { @@ -292,7 +346,7 @@ describe('Worker class', function () { worker._failJob(job, msg); const doc = updateSpy.firstCall.args[0].body.doc; expect(doc).to.have.property('output'); - expect(doc).to.have.property('status', JOB_STATUS_FAILED); + expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED); expect(doc).to.have.property('completed_at'); const completedTimestamp = moment(doc.completed_at).valueOf(); expect(completedTimestamp).to.be.greaterThan(startTime); @@ -357,7 +411,7 @@ describe('Worker class', function () { .then(() => { sinon.assert.calledOnce(updateSpy); const doc = updateSpy.firstCall.args[0].body.doc; - expect(doc).to.have.property('status', JOB_STATUS_COMPLETED); + expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED); expect(doc).to.have.property('completed_at'); const completedTimestamp = moment(doc.completed_at).valueOf(); expect(completedTimestamp).to.be.greaterThan(startTime);