29 Commits

Author SHA1 Message Date
fb2f51b11d 0.4.0 2016-05-16 17:19:55 -07:00
8ac9a81fdb update changelog and readme 2016-05-16 15:06:47 -07:00
4472e725fa add tests for optional fields to get and toJSON methods 2016-05-16 15:04:35 -07:00
be1eb81059 make the mock client get method async
and update worker tests to use async value
2016-05-16 15:03:20 -07:00
8d21dc6967 add optional created_by record to the doc 2016-05-16 15:02:34 -07:00
31159baae9 add future version notes to changelog 2016-05-16 14:24:20 -07:00
49965bbaf1 add changelog file 2016-05-16 14:23:12 -07:00
49b982db99 change priority field to byte 2016-05-16 14:09:42 -07:00
9aa8eed297 filter output when searching for jobs 2016-05-16 14:09:24 -07:00
0bf6fb0023 0.3.2 2016-05-13 14:59:17 -07:00
300449bfb0 add test for missing index result 2016-05-13 14:59:05 -07:00
868c808db7 missing indices need to return an array 2016-05-13 14:56:50 -07:00
ef61a33a38 0.3.1 2016-05-13 14:40:46 -07:00
dae14e0edc add tests for job search failures 2016-05-13 14:36:19 -07:00
c51ea64bdd [worker] swollow missing index errors 2016-05-13 14:35:39 -07:00
5d37399fbf run travis on version tags too 2016-05-12 17:27:53 -07:00
e4e8e9222c 0.3.0 2016-05-12 17:02:07 -07:00
02a530c4c7 Merge branch 'develop' 2016-05-12 16:59:48 -07:00
959f58ad8b travis should only build the master branch 2016-05-12 16:59:14 -07:00
e9c3f5553d try using node 5 and 6 again
with an additional npm install call, because npm3
2016-05-12 16:11:10 -07:00
234b829adf add more tests around job query 2016-05-12 15:09:53 -07:00
df9508808b use jobtype in worker, update query
also simplify the query a bit and use constant_score
2016-05-12 14:08:43 -07:00
609e81fdef distiguish the doctype from the jobtype
update tests and mock elasticsearch client
2016-05-12 12:20:49 -07:00
3375335d24 add type field, switch to string types
should allow elasticsearch 2.x use
2016-05-12 11:49:54 -07:00
0020050f3f use contants for defaults, use a common doctype
add tests, update readme
2016-05-12 11:48:54 -07:00
fa784393e5 add test for process_expiration value 2016-05-11 11:04:23 -07:00
a4323433f2 remove use of var 2016-05-11 10:58:21 -07:00
7d08b98b15 Merge branch 'develop' 2016-05-10 17:31:27 -07:00
d1e5d68f74 swollow errors saving job output, include error in debugging output 2016-05-10 17:23:07 -07:00
14 changed files with 401 additions and 97 deletions

View File

@@ -1,7 +1,19 @@
language: node_js language: node_js
node_js: node_js:
- "stable"
- "5"
- "4" - "4"
- "4.3" - "4.3"
notifications: notifications:
email: false email: false
before_script: npm install
after_success: npm run coverage after_success: npm run coverage
branches:
only:
- master
- /^v[0-9].*$/

41
CHANGELOG.md Normal file
View File

@@ -0,0 +1,41 @@
# Changelog
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
## v0.4.0
- [BREAKING] Change `priority` mapping to *byte*
- Exclude `output.content` from _source when query jobs
- Add optional `created_by` value to job documents
## v0.3.2
- Misisng indiced returns empty array (fixed errors in v0.3.1)
## v0.3.1
- Ignore missing indices when looking for jobs
## v0.3.0
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
## v0.2.2
- Swollow errors when saving job output
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
- Update npm package
## v0.2.1
- Use `esqueue` namespace for debugging
## v0.2.0
- [BREAKING] Async jobs should return promises, not use callbacks
- Remove bluebird dependency
- Only require specific lodash modules, instead of the whole library
## v0.1.0
- Initial release

View File

@@ -1,6 +1,6 @@
{ {
"name": "esqueue", "name": "esqueue",
"version": "0.2.2", "version": "0.4.0",
"description": "Job queue, powered by Elasticsearch", "description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js", "main": "lib/index.js",
"scripts": { "scripts": {

View File

@@ -33,6 +33,7 @@ Option | Default | Description
------ | ----------- | ------- ------ | ----------- | -------
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week` interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it. timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
doctype | `esqueue` | The doctype to use in Elasticsearch
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead. client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
@@ -59,6 +60,7 @@ Option | Default | Description
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue. timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing. max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`. priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
### Creating a worker ### Creating a worker

View File

@@ -6,4 +6,10 @@ export const jobStatuses = {
JOB_STATUS_CANCELLED: 'cancelled', JOB_STATUS_CANCELLED: 'cancelled',
}; };
export default Object.assign({}, jobStatuses); export const defaultSettings = {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
};
export default Object.assign({}, jobStatuses, defaultSettings);

View File

@@ -1,6 +1,9 @@
var schema = { import { defaultSettings } from './constants';
const schema = {
jobtype: { type: 'string', index: 'not_analyzed' },
payload: { type: 'object', enabled: false }, payload: { type: 'object', enabled: false },
priority: { type: 'short' }, priority: { type: 'byte' },
timeout: { type: 'long' }, timeout: { type: 'long' },
process_expiration: { type: 'date' }, process_expiration: { type: 'date' },
created_by: { type: 'string', index: 'not_analyzed' }, created_by: { type: 'string', index: 'not_analyzed' },
@@ -9,7 +12,7 @@ var schema = {
completed_at: { type: 'date' }, completed_at: { type: 'date' },
attempts: { type: 'short' }, attempts: { type: 'short' },
max_attempts: { type: 'short' }, max_attempts: { type: 'short' },
status: { type: 'keyword' }, status: { type: 'string', index: 'not_analyzed' },
output: { output: {
type: 'object', type: 'object',
properties: { properties: {
@@ -19,14 +22,9 @@ var schema = {
} }
}; };
export default function createIndex(client, indexName) { export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) {
const indexBody = { const indexBody = { mappings : {} };
mappings: { indexBody.mappings[doctype] = { properties: schema };
_default_: {
properties: schema
}
}
};
return client.indices.exists({ return client.indices.exists({
index: indexName, index: indexName,

View File

@@ -2,6 +2,7 @@ import events from 'events';
import createClient from './helpers/es_client'; import createClient from './helpers/es_client';
import indexTimestamp from './helpers/index_timestamp'; import indexTimestamp from './helpers/index_timestamp';
import logger from './helpers/logger'; import logger from './helpers/logger';
import { defaultSettings } from './helpers/constants';
import Job from './job.js'; import Job from './job.js';
import Worker from './worker.js'; import Worker from './worker.js';
import omit from 'lodash.omit'; import omit from 'lodash.omit';
@@ -15,8 +16,9 @@ export default class Esqueue extends events.EventEmitter {
super(); super();
this.index = index; this.index = index;
this.settings = Object.assign({ this.settings = Object.assign({
interval: 'week', interval: defaultSettings.DEFAULT_SETTING_INTERVAL,
timeout: 10000, timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT,
doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE,
}, omit(options, [ 'client' ])); }, omit(options, [ 'client' ]));
this.client = createClient(options.client || {}); this.client = createClient(options.client || {});
@@ -25,7 +27,7 @@ export default class Esqueue extends events.EventEmitter {
} }
_initTasks() { _initTasks() {
var initTasks = [ const initTasks = [
this.client.ping({ timeout: 3000 }), this.client.ping({ timeout: 3000 }),
]; ];
@@ -38,10 +40,13 @@ export default class Esqueue extends events.EventEmitter {
addJob(type, payload, opts = {}) { addJob(type, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval); const timestamp = indexTimestamp(this.settings.interval);
const index = `${this.index}-${timestamp}`; const index = `${this.index}-${timestamp}`;
const defaults = {
timeout: this.settings.timeout,
};
const options = Object.assign({ const options = Object.assign(defaults, opts, {
timeout: this.settings.timeout doctype: this.settings.doctype
}, opts); });
return new Job(this.client, index, type, payload, options); return new Job(this.client, index, type, payload, options);
} }

View File

@@ -2,7 +2,7 @@ import events from 'events';
import isPlainObject from 'lodash.isplainobject'; import isPlainObject from 'lodash.isplainobject';
import Puid from 'puid'; import Puid from 'puid';
import logger from './helpers/logger'; import logger from './helpers/logger';
import { jobStatuses } from './helpers/constants'; import contstants from './helpers/constants';
import createIndex from './helpers/create_index'; import createIndex from './helpers/create_index';
const debug = logger('esqueue:job'); const debug = logger('esqueue:job');
@@ -18,29 +18,33 @@ export default class Job extends events.EventEmitter {
this.client = client; this.client = client;
this.id = puid.generate(); this.id = puid.generate();
this.index = index; this.index = index;
this.type = type; this.jobtype = type;
this.payload = payload; this.payload = payload;
this.created_by = options.created_by || null;
this.timeout = options.timeout || 10000; this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3; this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20); this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
this.debug = (...msg) => debug(...msg, `id: ${this.id}`); this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this.ready = createIndex(client, index) this.ready = createIndex(client, index, this.doctype)
.then(() => { .then(() => {
return this.client.index({ return this.client.index({
index: this.index, index: this.index,
type: this.type, type: this.doctype,
id: this.id, id: this.id,
body: { body: {
jobtype: this.jobtype,
payload: this.payload, payload: this.payload,
priority: this.priority, priority: this.priority,
created_by: this.created_by,
timeout: this.timeout, timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(), created_at: new Date(),
attempts: 0, attempts: 0,
max_attempts: this.maxAttempts, max_attempts: this.maxAttempts,
status: jobStatuses.JOB_STATUS_PENDING, status: contstants.JOB_STATUS_PENDING,
} }
}) })
.then((doc) => { .then((doc) => {
@@ -65,7 +69,7 @@ export default class Job extends events.EventEmitter {
.then(() => { .then(() => {
return this.client.get({ return this.client.get({
index: this.index, index: this.index,
type: this.type, type: this.doctype,
id: this.id id: this.id
}); });
}) })
@@ -83,7 +87,9 @@ export default class Job extends events.EventEmitter {
return Object.assign({ return Object.assign({
id: this.id, id: this.id,
index: this.index, index: this.index,
type: this.type, type: this.doctype,
jobtype: this.jobtype,
created_by: this.created_by,
payload: this.payload, payload: this.payload,
timeout: this.timeout, timeout: this.timeout,
max_attempts: this.maxAttempts, max_attempts: this.maxAttempts,

View File

@@ -2,7 +2,7 @@ import events from 'events';
import Puid from 'puid'; import Puid from 'puid';
import moment from 'moment'; import moment from 'moment';
import logger from './helpers/logger'; import logger from './helpers/logger';
import { jobStatuses } from './helpers/constants'; import constants from './helpers/constants';
import { WorkerTimeoutError } from './helpers/errors'; import { WorkerTimeoutError } from './helpers/errors';
const puid = new Puid(); const puid = new Puid();
@@ -18,10 +18,11 @@ export default class Job extends events.EventEmitter {
this.id = puid.generate(); this.id = puid.generate();
this.queue = queue; this.queue = queue;
this.client = this.queue.client; this.client = this.queue.client;
this.type = type; this.jobtype = type;
this.workerFn = workerFn; this.workerFn = workerFn;
this.checkInterval = opts.interval || 1500; this.checkInterval = opts.interval || 1500;
this.checkSize = opts.size || 10; this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.debug = (...msg) => debug(...msg, `id: ${this.id}`); this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
@@ -50,7 +51,7 @@ export default class Job extends events.EventEmitter {
attempts: attempts, attempts: attempts,
started_at: startTime, started_at: startTime,
process_expiration: expirationTime, process_expiration: expirationTime,
status: jobStatuses.JOB_STATUS_PROCESSING, status: constants.JOB_STATUS_PROCESSING,
}; };
return this.client.update({ return this.client.update({
@@ -76,7 +77,7 @@ export default class Job extends events.EventEmitter {
const completedTime = moment().toISOString(); const completedTime = moment().toISOString();
const doc = { const doc = {
status: jobStatuses.JOB_STATUS_FAILED, status: constants.JOB_STATUS_FAILED,
completed_at: completedTime, completed_at: completedTime,
}; };
@@ -136,7 +137,7 @@ export default class Job extends events.EventEmitter {
const docOutput = this._formatOutput(output); const docOutput = this._formatOutput(output);
const doc = { const doc = {
status: jobStatuses.JOB_STATUS_COMPLETED, status: constants.JOB_STATUS_COMPLETED,
completed_at: completedTime, completed_at: completedTime,
output: docOutput output: docOutput
}; };
@@ -179,7 +180,7 @@ export default class Job extends events.EventEmitter {
} }
_claimPendingJobs(jobs) { _claimPendingJobs(jobs) {
if (jobs.length === 0) return; if (!jobs || jobs.length === 0) return;
this._stopJobPolling(); this._stopJobPolling();
let claimed = false; let claimed = false;
@@ -226,20 +227,26 @@ export default class Job extends events.EventEmitter {
_getPendingJobs() { _getPendingJobs() {
const nowTime = moment().toISOString(); const nowTime = moment().toISOString();
const dateFilter = {
range: {
process_expiration: {
lte: nowTime
}
}
};
const query = { const query = {
_source : {
exclude: [ 'output.content' ]
},
query: { query: {
bool: { constant_score: {
should: [ filter: {
{ bool: { must: [{ term: { status: 'pending'} }] }}, bool: {
{ bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } } must: { term: { jobtype: this.jobtype } },
] should: [
{ term: { status: 'pending'} },
{ bool:
{ must: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] }
}
]
}
}
} }
}, },
sort: [ sort: [
@@ -253,7 +260,7 @@ export default class Job extends events.EventEmitter {
return this.client.search({ return this.client.search({
index: `${this.queue.index}-*`, index: `${this.queue.index}-*`,
type: this.type, type: this.doctype,
version: true, version: true,
body: query body: query
}) })
@@ -263,6 +270,9 @@ export default class Job extends events.EventEmitter {
return jobs; return jobs;
}) })
.catch((err) => { .catch((err) => {
// ignore missing indices errors
if (err.status === 404) return [];
this.debug('job querying failed', err); this.debug('job querying failed', err);
this.emit('error', err); this.emit('error', err);
this.queue.emit('worker_error', { this.queue.emit('worker_error', {

View File

@@ -1,5 +1,6 @@
import { uniqueId, times, random } from 'lodash'; import { uniqueId, times, random } from 'lodash';
import elasticsearch from 'elasticsearch'; import elasticsearch from 'elasticsearch';
import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
function Client() { function Client() {
this.indices = { this.indices = {
@@ -11,10 +12,10 @@ function Client() {
} }
Client.prototype.index = function (params = {}) { Client.prototype.index = function (params = {}) {
var shardCount = 2; const shardCount = 2;
return Promise.resolve({ return Promise.resolve({
_index: params.index || 'index', _index: params.index || 'index',
_type: params.type || 'type', _type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'), _id: params.id || uniqueId('testDoc'),
_version: 1, _version: 1,
_shards: { total: shardCount, successful: shardCount, failed: 0 }, _shards: { total: shardCount, successful: shardCount, failed: 0 },
@@ -30,6 +31,8 @@ Client.prototype.get = function (params = {}, source = {}) {
if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound; if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound;
const _source = Object.assign({ const _source = Object.assign({
jobtype: 'jobtype',
created_by: null,
payload: { payload: {
id: 'sample-job-1', id: 'sample-job-1',
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
@@ -42,21 +45,21 @@ Client.prototype.get = function (params = {}, source = {}) {
status: 'pending' status: 'pending'
}, source); }, source);
return { return Promise.resolve({
_index: params.index || 'index', _index: params.index || 'index',
_type: params.type || 'type', _type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T', _id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_version: params.version || 1, _version: params.version || 1,
found: true, found: true,
_source: _source _source: _source
}; });
}; };
Client.prototype.search = function (params = {}, count = 5, source = {}) { Client.prototype.search = function (params = {}, count = 5, source = {}) {
const hits = times(count, () => { const hits = times(count, () => {
return { return {
_index: params.index || 'index', _index: params.index || 'index',
_type: params.type || 'type', _type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: uniqueId('documentId'), _id: uniqueId('documentId'),
_version: random(1, 5), _version: random(1, 5),
_score: null, _score: null,
@@ -83,10 +86,10 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
}; };
Client.prototype.update = function (params = {}) { Client.prototype.update = function (params = {}) {
var shardCount = 2; const shardCount = 2;
return Promise.resolve({ return Promise.resolve({
_index: params.index || 'index', _index: params.index || 'index',
_type: params.type || 'type', _type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'), _id: params.id || uniqueId('testDoc'),
_version: params.version + 1 || 2, _version: params.version + 1 || 2,
_shards: { total: shardCount, successful: shardCount, failed: 0 }, _shards: { total: shardCount, successful: shardCount, failed: 0 },

View File

@@ -2,6 +2,7 @@ import expect from 'expect.js';
import sinon from 'sinon'; import sinon from 'sinon';
import createIndex from '../../../lib/helpers/create_index'; import createIndex from '../../../lib/helpers/create_index';
import elasticsearchMock from '../../fixtures/elasticsearch'; import elasticsearchMock from '../../fixtures/elasticsearch';
import { defaultSettings } from '../../../lib/helpers/constants';
describe('Create Index', function () { describe('Create Index', function () {
let client; let client;
@@ -23,8 +24,9 @@ describe('Create Index', function () {
}); });
}); });
it('should create the default mappings', function () { it('should create the type mappings', function () {
const indexName = 'test-index'; const indexName = 'test-index';
const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE;
const result = createIndex(client, indexName); const result = createIndex(client, indexName);
return result return result
@@ -33,8 +35,24 @@ describe('Create Index', function () {
sinon.assert.callCount(createSpy, 1); sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('body'); expect(payload).to.have.property('body');
expect(payload.body).to.have.property('mappings'); expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property('_default_'); expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings._default_).to.have.property('properties'); expect(payload.body.mappings[docType]).to.have.property('properties');
});
});
it('should accept a custom doctype', function () {
const indexName = 'test-index';
const docType = 'my_type';
const result = createIndex(client, indexName, docType);
return result
.then(function () {
const payload = createSpy.getCall(0).args[0];
sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('body');
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
}); });
}); });
}); });

View File

@@ -25,32 +25,32 @@ describe('Index interval', function () {
}); });
it('should return the year', function () { it('should return the year', function () {
var timestamp = indexTimestamp('year'); const timestamp = indexTimestamp('year');
expect(timestamp).to.equal('2016'); expect(timestamp).to.equal('2016');
}); });
it('should return the year and month', function () { it('should return the year and month', function () {
var timestamp = indexTimestamp('month'); const timestamp = indexTimestamp('month');
expect(timestamp).to.equal('2016-04'); expect(timestamp).to.equal('2016-04');
}); });
it('should return the year, month, and first day of the week', function () { it('should return the year, month, and first day of the week', function () {
var timestamp = indexTimestamp('week'); const timestamp = indexTimestamp('week');
expect(timestamp).to.equal('2016-03-27'); expect(timestamp).to.equal('2016-03-27');
}); });
it('should return the year, month, and day of the week', function () { it('should return the year, month, and day of the week', function () {
var timestamp = indexTimestamp('day'); const timestamp = indexTimestamp('day');
expect(timestamp).to.equal('2016-04-02'); expect(timestamp).to.equal('2016-04-02');
}); });
it('should return the year, month, day and hour', function () { it('should return the year, month, day and hour', function () {
var timestamp = indexTimestamp('hour'); const timestamp = indexTimestamp('hour');
expect(timestamp).to.equal('2016-04-02-01'); expect(timestamp).to.equal('2016-04-02-01');
}); });
it('should return the year, month, day, hour and minute', function () { it('should return the year, month, day, hour and minute', function () {
var timestamp = indexTimestamp('minute'); const timestamp = indexTimestamp('minute');
expect(timestamp).to.equal('2016-04-02-01-02'); expect(timestamp).to.equal('2016-04-02-01-02');
}); });
}); });

View File

@@ -3,7 +3,7 @@ import expect from 'expect.js';
import sinon from 'sinon'; import sinon from 'sinon';
import proxyquire from 'proxyquire'; import proxyquire from 'proxyquire';
import elasticsearchMock from '../fixtures/elasticsearch'; import elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PENDING } from '../../lib/helpers/constants'; import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
const createIndexMock = sinon.stub().returns(Promise.resolve('mock')); const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
const module = proxyquire.noPreserveCache()('../../lib/job', { const module = proxyquire.noPreserveCache()('../../lib/job', {
@@ -14,6 +14,7 @@ const Job = module;
const maxPriority = 20; const maxPriority = 20;
const minPriority = -20; const minPriority = -20;
const defaultPriority = 10; const defaultPriority = 10;
const defaultCreatedBy = null;
describe('Job Class', function () { describe('Job Class', function () {
let client; let client;
@@ -73,12 +74,41 @@ describe('Job Class', function () {
return job.ready.then(() => { return job.ready.then(() => {
const newDoc = validateDoc(client.index); const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index); expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', type); expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body'); expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload); expect(newDoc.body).to.have.property('payload', payload);
}); });
}); });
it('should index the job type', function () {
const job = new Job(client, index, type, payload);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('jobtype', type);
});
});
it('should index the created_by value', function () {
const createdBy = 'user_identifier';
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', createdBy);
});
});
it('should index default created_by value', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
});
});
it('should index timeout value from options', function () { it('should index timeout value from options', function () {
const job = new Job(client, index, type, payload, options); const job = new Job(client, index, type, payload, options);
return job.ready.then(() => { return job.ready.then(() => {
@@ -95,6 +125,16 @@ describe('Job Class', function () {
}); });
}); });
it('should set an expired process_expiration time', function () {
const now = new Date().getTime();
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('process_expiration');
expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now);
});
});
it('should set attempt count', function () { it('should set attempt count', function () {
const job = new Job(client, index, type, payload, options); const job = new Job(client, index, type, payload, options);
return job.ready.then(() => { return job.ready.then(() => {
@@ -152,18 +192,41 @@ describe('Job Class', function () {
it('should return the job document', function () { it('should return the job document', function () {
const job = new Job(client, index, type, payload); const job = new Job(client, index, type, payload);
return job.get() return job.get()
.then((doc) => { .then((doc) => {
const jobDoc = job.document; // document should be resolved const jobDoc = job.document; // document should be resolved
expect(doc).to.have.property('index', index); expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', type); expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id); expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('version', jobDoc.version); expect(doc).to.have.property('version', jobDoc.version);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('payload'); expect(doc).to.have.property('payload');
expect(doc).to.have.property('jobtype');
expect(doc).to.have.property('priority'); expect(doc).to.have.property('priority');
expect(doc).to.have.property('timeout'); expect(doc).to.have.property('timeout');
}); });
}); });
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
return Promise.resolve(client.get({}, optionals))
.then((doc) => {
console.log('mocked doc', doc);
sinon.stub(client, 'get').returns(Promise.resolve(doc));
})
.then(() => {
return job.get()
.then((doc) => {
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});
}); });
describe('toJSON method', function () { describe('toJSON method', function () {
@@ -185,13 +248,25 @@ describe('Job Class', function () {
const doc = job.toJSON(); const doc = job.toJSON();
expect(doc).to.have.property('index', index); expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', type); expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout); expect(doc).to.have.property('timeout', options.timeout);
expect(doc).to.have.property('max_attempts', options.max_attempts); expect(doc).to.have.property('max_attempts', options.max_attempts);
expect(doc).to.have.property('priority', options.priority); expect(doc).to.have.property('priority', options.priority);
expect(doc).to.have.property('id'); expect(doc).to.have.property('id');
expect(doc).to.not.have.property('version'); expect(doc).to.not.have.property('version');
}); });
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
const doc = job.toJSON();
expect(doc).to.have.property('created_by', optionals.created_by);
});
}); });
}); });

View File

@@ -1,10 +1,10 @@
import expect from 'expect.js'; import expect from 'expect.js';
import sinon from 'sinon'; import sinon from 'sinon';
import moment from 'moment'; import moment from 'moment';
import { noop, random } from 'lodash'; import { noop, random, get, find } from 'lodash';
import Worker from '../../lib/worker'; import Worker from '../../lib/worker';
import elasticsearchMock from '../fixtures/elasticsearch'; import elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PROCESSING, JOB_STATUS_COMPLETED, JOB_STATUS_FAILED } from '../../lib/helpers/constants'; import constants from '../../lib/helpers/constants';
const anchor = '2016-04-02T01:02:03.456'; // saturday const anchor = '2016-04-02T01:02:03.456'; // saturday
const defaults = { const defaults = {
@@ -105,10 +105,13 @@ describe('Worker class', function () {
}); });
}); });
describe('searching for jobs', function () { describe('polling for jobs', function () {
let searchSpy;
beforeEach(() => { beforeEach(() => {
anchorMoment = moment(anchor); anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf()); clock = sinon.useFakeTimers(anchorMoment.valueOf());
searchSpy = sinon.spy(mockQueue.client, 'search');
}); });
afterEach(() => { afterEach(() => {
@@ -116,7 +119,6 @@ describe('Worker class', function () {
}); });
it('should start polling for jobs after interval', function () { it('should start polling for jobs after interval', function () {
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop); new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy); sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval); clock.tick(defaults.interval);
@@ -125,31 +127,148 @@ describe('Worker class', function () {
it('should use interval option to control polling', function () { it('should use interval option to control polling', function () {
const interval = 567; const interval = 567;
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop, { interval }); new Worker(mockQueue, 'test', noop, { interval });
sinon.assert.notCalled(searchSpy); sinon.assert.notCalled(searchSpy);
clock.tick(interval); clock.tick(interval);
sinon.assert.calledOnce(searchSpy); sinon.assert.calledOnce(searchSpy);
}); });
});
it('should use default size', function () { describe('query for pending jobs', function () {
const searchSpy = sinon.spy(mockQueue.client, 'search'); let worker;
new Worker(mockQueue, 'test', noop); let searchStub;
clock.tick(defaults.interval);
const body = searchSpy.firstCall.args[0].body; function getSearchParams(jobtype = 'test', params = {}) {
expect(body).to.have.property('size', defaults.size); worker = new Worker(mockQueue, jobtype, noop, params);
worker._getPendingJobs();
return searchStub.firstCall.args[0];
}
describe('error handling', function () {
beforeEach(() => {
});
it('should pass search errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => done(new Error('should not resolve')))
.catch(() => { done(); });
});
it('should swollow index missing errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => { done(); })
.catch(() => done(new Error('should not reject')));
});
it('should return an empty array on missing index', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then((res) => {
try {
expect(res).to.be.an(Array);
expect(res).to.have.length(0);
done();
} catch (e) {
done(e);
}
})
.catch(() => done(new Error('should not reject')));
});
}); });
it('should observe the size option', function () { describe('query parameters', function () {
const size = 25; beforeEach(() => {
const searchSpy = sinon.spy(mockQueue.client, 'search'); searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
new Worker(mockQueue, 'test', noop, { size }); });
clock.tick(defaults.interval);
const body = searchSpy.firstCall.args[0].body; it('should query with version', function () {
expect(body).to.have.property('size', size); const params = getSearchParams();
expect(params).to.have.property('version', true);
});
it('should query by default doctype', function () {
const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype);
});
});
describe('query body', function () {
const conditionPath = 'query.constant_score.filter.bool';
const jobtype = 'test_jobtype';
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
});
afterEach(() => {
clock.restore();
});
it('should filter unwanted source data', function () {
const excludedFields = [ 'output.content' ];
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('_source');
expect(body._source).to.eql({ exclude: excludedFields });
});
it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions).to.have.property('must');
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
});
it('should search for pending or expired jobs', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions).to.have.property('should');
// this works because we are stopping the clock, so all times match
const nowTime = moment().toISOString();
const pending = { term: { status: 'pending'} };
const expired = { bool: { must: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] } };
const pendingMatch = find(conditions.should, pending);
expect(pendingMatch).to.not.be(undefined);
const expiredMatch = find(conditions.should, expired);
expect(expiredMatch).to.not.be(undefined);
});
it('should use default size', function () {
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('size', defaults.size);
});
it('should observe the size option', function () {
const size = 25;
const { body } = getSearchParams(jobtype, { size });
expect(body).to.have.property('size', size);
});
}); });
}); });
describe('claiming a job', function () { describe('claiming a job', function () {
let params; let params;
let job; let job;
@@ -166,9 +285,12 @@ describe('Worker class', function () {
id: 12345, id: 12345,
version: 3 version: 3
}; };
job = mockQueue.client.get(params); return mockQueue.client.get(params)
worker = new Worker(mockQueue, 'test', noop); .then((jobDoc) => {
updateSpy = sinon.spy(mockQueue.client, 'update'); job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
}); });
afterEach(() => { afterEach(() => {
@@ -193,7 +315,7 @@ describe('Worker class', function () {
it('should update the job status', function () { it('should update the job status', function () {
worker._claimJob(job); worker._claimJob(job);
const doc = updateSpy.firstCall.args[0].body.doc; const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_PROCESSING); expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING);
}); });
it('should set job expiration time', function () { it('should set job expiration time', function () {
@@ -245,16 +367,18 @@ describe('Worker class', function () {
anchorMoment = moment(anchor); anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf()); clock = sinon.useFakeTimers(anchorMoment.valueOf());
job = mockQueue.client.get(); return mockQueue.client.get()
worker = new Worker(mockQueue, 'test', noop); .then((jobDoc) => {
updateSpy = sinon.spy(mockQueue.client, 'update'); job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
}); });
afterEach(() => { afterEach(() => {
clock.restore(); clock.restore();
}); });
it('should use version on update', function () { it('should use version on update', function () {
worker._failJob(job); worker._failJob(job);
const query = updateSpy.firstCall.args[0]; const query = updateSpy.firstCall.args[0];
@@ -267,7 +391,7 @@ describe('Worker class', function () {
it('should set status to failed', function () { it('should set status to failed', function () {
worker._failJob(job); worker._failJob(job);
const doc = updateSpy.firstCall.args[0].body.doc; const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_FAILED); expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
}); });
it('should append error message if supplied', function () { it('should append error message if supplied', function () {
@@ -292,7 +416,7 @@ describe('Worker class', function () {
worker._failJob(job, msg); worker._failJob(job, msg);
const doc = updateSpy.firstCall.args[0].body.doc; const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('output'); expect(doc).to.have.property('output');
expect(doc).to.have.property('status', JOB_STATUS_FAILED); expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
expect(doc).to.have.property('completed_at'); expect(doc).to.have.property('completed_at');
const completedTimestamp = moment(doc.completed_at).valueOf(); const completedTimestamp = moment(doc.completed_at).valueOf();
expect(completedTimestamp).to.be.greaterThan(startTime); expect(completedTimestamp).to.be.greaterThan(startTime);
@@ -308,8 +432,12 @@ describe('Worker class', function () {
payload = { payload = {
value: random(0, 100, true) value: random(0, 100, true)
}; };
job = mockQueue.client.get({}, { payload });
updateSpy = sinon.spy(mockQueue.client, 'update'); return mockQueue.client.get({}, { payload })
.then((jobDoc) => {
job = jobDoc;
updateSpy = sinon.spy(mockQueue.client, 'update');
});
}); });
it('should call the workerFn with the payload', function (done) { it('should call the workerFn with the payload', function (done) {
@@ -357,7 +485,7 @@ describe('Worker class', function () {
.then(() => { .then(() => {
sinon.assert.calledOnce(updateSpy); sinon.assert.calledOnce(updateSpy);
const doc = updateSpy.firstCall.args[0].body.doc; const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_COMPLETED); expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED);
expect(doc).to.have.property('completed_at'); expect(doc).to.have.property('completed_at');
const completedTimestamp = moment(doc.completed_at).valueOf(); const completedTimestamp = moment(doc.completed_at).valueOf();
expect(completedTimestamp).to.be.greaterThan(startTime); expect(completedTimestamp).to.be.greaterThan(startTime);