Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bf6fb0023 | |||
| 300449bfb0 | |||
| 868c808db7 | |||
| ef61a33a38 | |||
| dae14e0edc | |||
| c51ea64bdd | |||
| 5d37399fbf | |||
| e4e8e9222c | |||
| 02a530c4c7 | |||
| 959f58ad8b | |||
| e9c3f5553d | |||
| 234b829adf | |||
| df9508808b | |||
| 609e81fdef | |||
| 3375335d24 | |||
| 0020050f3f | |||
| fa784393e5 | |||
| a4323433f2 | |||
| 7d08b98b15 | |||
| 4793027ff3 | |||
| 38532a6296 | |||
| aa5ea72e3b | |||
| d1e5d68f74 | |||
| e077442340 | |||
| 82506a74e8 | |||
| cae02cb0f8 | |||
| 11de18f4da | |||
| f5bf40cf71 | |||
| 1f053cbb6b |
14
.travis.yml
14
.travis.yml
@@ -1,5 +1,19 @@
|
||||
language: node_js
|
||||
|
||||
node_js:
|
||||
- "stable"
|
||||
- "5"
|
||||
- "4"
|
||||
- "4.3"
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
||||
before_script: npm install
|
||||
|
||||
after_success: npm run coverage
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- /^v[0-9].*$/
|
||||
|
||||
10
package.json
10
package.json
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "esqueue",
|
||||
"version": "0.2.0",
|
||||
"description": "",
|
||||
"version": "0.3.2",
|
||||
"description": "Job queue, powered by Elasticsearch",
|
||||
"main": "lib/index.js",
|
||||
"scripts": {
|
||||
"build": "rm -rf lib && babel src --out-dir lib",
|
||||
@@ -11,6 +11,12 @@
|
||||
"unit": "nyc --require babel-core/register mocha test/src/**"
|
||||
},
|
||||
"author": "Joe Fleming (https://github.com/w33ble)",
|
||||
"keywords": [
|
||||
"job",
|
||||
"queue",
|
||||
"worker",
|
||||
"elasticsearch"
|
||||
],
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/w33ble/esqueue.git"
|
||||
|
||||
@@ -33,6 +33,7 @@ Option | Default | Description
|
||||
------ | ----------- | -------
|
||||
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
|
||||
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
|
||||
doctype | `esqueue` | The doctype to use in Elasticsearch
|
||||
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
|
||||
|
||||
|
||||
|
||||
@@ -6,4 +6,10 @@ export const jobStatuses = {
|
||||
JOB_STATUS_CANCELLED: 'cancelled',
|
||||
};
|
||||
|
||||
export default Object.assign({}, jobStatuses);
|
||||
export const defaultSettings = {
|
||||
DEFAULT_SETTING_TIMEOUT: 10000,
|
||||
DEFAULT_SETTING_INTERVAL: 'week',
|
||||
DEFAULT_SETTING_DOCTYPE: 'esqueue',
|
||||
};
|
||||
|
||||
export default Object.assign({}, jobStatuses, defaultSettings);
|
||||
@@ -1,4 +1,7 @@
|
||||
var schema = {
|
||||
import { defaultSettings } from './constants';
|
||||
|
||||
const schema = {
|
||||
jobtype: { type: 'string', index: 'not_analyzed' },
|
||||
payload: { type: 'object', enabled: false },
|
||||
priority: { type: 'short' },
|
||||
timeout: { type: 'long' },
|
||||
@@ -9,7 +12,7 @@ var schema = {
|
||||
completed_at: { type: 'date' },
|
||||
attempts: { type: 'short' },
|
||||
max_attempts: { type: 'short' },
|
||||
status: { type: 'keyword' },
|
||||
status: { type: 'string', index: 'not_analyzed' },
|
||||
output: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
@@ -19,14 +22,9 @@ var schema = {
|
||||
}
|
||||
};
|
||||
|
||||
export default function createIndex(client, indexName) {
|
||||
const indexBody = {
|
||||
mappings: {
|
||||
_default_: {
|
||||
properties: schema
|
||||
}
|
||||
}
|
||||
};
|
||||
export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) {
|
||||
const indexBody = { mappings : {} };
|
||||
indexBody.mappings[doctype] = { properties: schema };
|
||||
|
||||
return client.indices.exists({
|
||||
index: indexName,
|
||||
|
||||
19
src/index.js
19
src/index.js
@@ -2,11 +2,12 @@ import events from 'events';
|
||||
import createClient from './helpers/es_client';
|
||||
import indexTimestamp from './helpers/index_timestamp';
|
||||
import logger from './helpers/logger';
|
||||
import { defaultSettings } from './helpers/constants';
|
||||
import Job from './job.js';
|
||||
import Worker from './worker.js';
|
||||
import omit from 'lodash.omit';
|
||||
|
||||
const debug = logger('queue');
|
||||
const debug = logger('esqueue:queue');
|
||||
|
||||
export default class Esqueue extends events.EventEmitter {
|
||||
constructor(index, options = {}) {
|
||||
@@ -15,8 +16,9 @@ export default class Esqueue extends events.EventEmitter {
|
||||
super();
|
||||
this.index = index;
|
||||
this.settings = Object.assign({
|
||||
interval: 'week',
|
||||
timeout: 10000,
|
||||
interval: defaultSettings.DEFAULT_SETTING_INTERVAL,
|
||||
timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT,
|
||||
doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE,
|
||||
}, omit(options, [ 'client' ]));
|
||||
this.client = createClient(options.client || {});
|
||||
|
||||
@@ -25,7 +27,7 @@ export default class Esqueue extends events.EventEmitter {
|
||||
}
|
||||
|
||||
_initTasks() {
|
||||
var initTasks = [
|
||||
const initTasks = [
|
||||
this.client.ping({ timeout: 3000 }),
|
||||
];
|
||||
|
||||
@@ -38,10 +40,13 @@ export default class Esqueue extends events.EventEmitter {
|
||||
addJob(type, payload, opts = {}) {
|
||||
const timestamp = indexTimestamp(this.settings.interval);
|
||||
const index = `${this.index}-${timestamp}`;
|
||||
const defaults = {
|
||||
timeout: this.settings.timeout,
|
||||
};
|
||||
|
||||
const options = Object.assign({
|
||||
timeout: this.settings.timeout
|
||||
}, opts);
|
||||
const options = Object.assign(defaults, opts, {
|
||||
doctype: this.settings.doctype
|
||||
});
|
||||
|
||||
return new Job(this.client, index, type, payload, options);
|
||||
}
|
||||
|
||||
20
src/job.js
20
src/job.js
@@ -2,10 +2,10 @@ import events from 'events';
|
||||
import isPlainObject from 'lodash.isplainobject';
|
||||
import Puid from 'puid';
|
||||
import logger from './helpers/logger';
|
||||
import { jobStatuses } from './helpers/constants';
|
||||
import contstants from './helpers/constants';
|
||||
import createIndex from './helpers/create_index';
|
||||
|
||||
const debug = logger('job');
|
||||
const debug = logger('esqueue:job');
|
||||
const puid = new Puid();
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
@@ -18,28 +18,31 @@ export default class Job extends events.EventEmitter {
|
||||
this.client = client;
|
||||
this.id = puid.generate();
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.jobtype = type;
|
||||
this.payload = payload;
|
||||
this.timeout = options.timeout || 10000;
|
||||
this.maxAttempts = options.max_attempts || 3;
|
||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this.ready = createIndex(client, index)
|
||||
this.ready = createIndex(client, index, this.doctype)
|
||||
.then(() => {
|
||||
return this.client.index({
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
id: this.id,
|
||||
body: {
|
||||
jobtype: this.jobtype,
|
||||
payload: this.payload,
|
||||
priority: this.priority,
|
||||
timeout: this.timeout,
|
||||
process_expiration: new Date(0), // use epoch so the job query works
|
||||
created_at: new Date(),
|
||||
attempts: 0,
|
||||
max_attempts: this.maxAttempts,
|
||||
status: jobStatuses.JOB_STATUS_PENDING,
|
||||
status: contstants.JOB_STATUS_PENDING,
|
||||
}
|
||||
})
|
||||
.then((doc) => {
|
||||
@@ -64,7 +67,7 @@ export default class Job extends events.EventEmitter {
|
||||
.then(() => {
|
||||
return this.client.get({
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
id: this.id
|
||||
});
|
||||
})
|
||||
@@ -82,7 +85,8 @@ export default class Job extends events.EventEmitter {
|
||||
return Object.assign({
|
||||
id: this.id,
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
jobtype: this.jobtype,
|
||||
payload: this.payload,
|
||||
timeout: this.timeout,
|
||||
max_attempts: this.maxAttempts,
|
||||
|
||||
@@ -2,11 +2,11 @@ import events from 'events';
|
||||
import Puid from 'puid';
|
||||
import moment from 'moment';
|
||||
import logger from './helpers/logger';
|
||||
import { jobStatuses } from './helpers/constants';
|
||||
import constants from './helpers/constants';
|
||||
import { WorkerTimeoutError } from './helpers/errors';
|
||||
|
||||
const puid = new Puid();
|
||||
const debug = logger('worker');
|
||||
const debug = logger('esqueue:worker');
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
constructor(queue, type, workerFn, opts = {}) {
|
||||
@@ -18,14 +18,16 @@ export default class Job extends events.EventEmitter {
|
||||
this.id = puid.generate();
|
||||
this.queue = queue;
|
||||
this.client = this.queue.client;
|
||||
this.type = type;
|
||||
this.jobtype = type;
|
||||
this.workerFn = workerFn;
|
||||
this.checkInterval = opts.interval || 1500;
|
||||
this.checkSize = opts.size || 10;
|
||||
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this._checker = false;
|
||||
this.debug(`Created worker for type ${this.type}`);
|
||||
this._startJobPolling();
|
||||
}
|
||||
|
||||
@@ -49,7 +51,7 @@ export default class Job extends events.EventEmitter {
|
||||
attempts: attempts,
|
||||
started_at: startTime,
|
||||
process_expiration: expirationTime,
|
||||
status: jobStatuses.JOB_STATUS_PROCESSING,
|
||||
status: constants.JOB_STATUS_PROCESSING,
|
||||
};
|
||||
|
||||
return this.client.update({
|
||||
@@ -75,7 +77,7 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
const completedTime = moment().toISOString();
|
||||
const doc = {
|
||||
status: jobStatuses.JOB_STATUS_FAILED,
|
||||
status: constants.JOB_STATUS_FAILED,
|
||||
completed_at: completedTime,
|
||||
};
|
||||
|
||||
@@ -118,7 +120,8 @@ export default class Job extends events.EventEmitter {
|
||||
const workerOutput = new Promise((resolve, reject) => {
|
||||
resolve(this.workerFn.call(null, job._source.payload));
|
||||
|
||||
setTimeout(function () {
|
||||
setTimeout(() => {
|
||||
this.debug(`Timeout processing job ${job._id}`);
|
||||
reject(new WorkerTimeoutError({
|
||||
timeout: job._source.timeout,
|
||||
jobId: job._id,
|
||||
@@ -134,7 +137,7 @@ export default class Job extends events.EventEmitter {
|
||||
const docOutput = this._formatOutput(output);
|
||||
|
||||
const doc = {
|
||||
status: jobStatuses.JOB_STATUS_COMPLETED,
|
||||
status: constants.JOB_STATUS_COMPLETED,
|
||||
completed_at: completedTime,
|
||||
output: docOutput
|
||||
};
|
||||
@@ -148,7 +151,8 @@ export default class Job extends events.EventEmitter {
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return false;
|
||||
throw err;
|
||||
this.debug(`Failure saving job output ${job._id}`, err);
|
||||
this.emit('job_error', err);
|
||||
});
|
||||
}, (jobErr) => {
|
||||
// job execution failed
|
||||
@@ -158,7 +162,7 @@ export default class Job extends events.EventEmitter {
|
||||
return;
|
||||
}
|
||||
|
||||
this.debug(`Failure occurred on job ${job._id}`);
|
||||
this.debug(`Failure occurred on job ${job._id}`, jobErr);
|
||||
this.emit('job_error', jobErr);
|
||||
return this._failJob(job, jobErr.toString());
|
||||
});
|
||||
@@ -176,7 +180,7 @@ export default class Job extends events.EventEmitter {
|
||||
}
|
||||
|
||||
_claimPendingJobs(jobs) {
|
||||
if (jobs.length === 0) return;
|
||||
if (!jobs || jobs.length === 0) return;
|
||||
|
||||
this._stopJobPolling();
|
||||
let claimed = false;
|
||||
@@ -223,21 +227,24 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
_getPendingJobs() {
|
||||
const nowTime = moment().toISOString();
|
||||
const dateFilter = {
|
||||
range: {
|
||||
process_expiration: {
|
||||
lte: nowTime
|
||||
}
|
||||
}
|
||||
};
|
||||
const query = {
|
||||
query: {
|
||||
constant_score: {
|
||||
filter: {
|
||||
bool: {
|
||||
must: { term: { jobtype: this.jobtype } },
|
||||
should: [
|
||||
{ bool: { must: [{ term: { status: 'pending'} }] }},
|
||||
{ bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } }
|
||||
{ term: { status: 'pending'} },
|
||||
{ bool:
|
||||
{ must: [
|
||||
{ term: { status: 'processing' } },
|
||||
{ range: { process_expiration: { lte: nowTime } } }
|
||||
] }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
sort: [
|
||||
{ priority: { order: 'asc' }},
|
||||
@@ -250,7 +257,7 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
return this.client.search({
|
||||
index: `${this.queue.index}-*`,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
version: true,
|
||||
body: query
|
||||
})
|
||||
@@ -260,6 +267,9 @@ export default class Job extends events.EventEmitter {
|
||||
return jobs;
|
||||
})
|
||||
.catch((err) => {
|
||||
// ignore missing indices errors
|
||||
if (err.status === 404) return [];
|
||||
|
||||
this.debug('job querying failed', err);
|
||||
this.emit('error', err);
|
||||
this.queue.emit('worker_error', {
|
||||
|
||||
14
test/fixtures/elasticsearch.js
vendored
14
test/fixtures/elasticsearch.js
vendored
@@ -1,5 +1,6 @@
|
||||
import { uniqueId, times, random } from 'lodash';
|
||||
import elasticsearch from 'elasticsearch';
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
|
||||
|
||||
function Client() {
|
||||
this.indices = {
|
||||
@@ -11,10 +12,10 @@ function Client() {
|
||||
}
|
||||
|
||||
Client.prototype.index = function (params = {}) {
|
||||
var shardCount = 2;
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: 1,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
@@ -30,6 +31,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound;
|
||||
|
||||
const _source = Object.assign({
|
||||
jobtype: 'jobtype',
|
||||
payload: {
|
||||
id: 'sample-job-1',
|
||||
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
||||
@@ -44,7 +46,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
|
||||
return {
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
||||
_version: params.version || 1,
|
||||
found: true,
|
||||
@@ -56,7 +58,7 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||
const hits = times(count, () => {
|
||||
return {
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_id: uniqueId('documentId'),
|
||||
_version: random(1, 5),
|
||||
_score: null,
|
||||
@@ -83,10 +85,10 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||
};
|
||||
|
||||
Client.prototype.update = function (params = {}) {
|
||||
var shardCount = 2;
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: params.version + 1 || 2,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
|
||||
@@ -2,6 +2,7 @@ import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import createIndex from '../../../lib/helpers/create_index';
|
||||
import elasticsearchMock from '../../fixtures/elasticsearch';
|
||||
import { defaultSettings } from '../../../lib/helpers/constants';
|
||||
|
||||
describe('Create Index', function () {
|
||||
let client;
|
||||
@@ -23,8 +24,9 @@ describe('Create Index', function () {
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the default mappings', function () {
|
||||
it('should create the type mappings', function () {
|
||||
const indexName = 'test-index';
|
||||
const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE;
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
@@ -33,8 +35,24 @@ describe('Create Index', function () {
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property('_default_');
|
||||
expect(payload.body.mappings._default_).to.have.property('properties');
|
||||
expect(payload.body.mappings).to.have.property(docType);
|
||||
expect(payload.body.mappings[docType]).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
|
||||
it('should accept a custom doctype', function () {
|
||||
const indexName = 'test-index';
|
||||
const docType = 'my_type';
|
||||
const result = createIndex(client, indexName, docType);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
const payload = createSpy.getCall(0).args[0];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property(docType);
|
||||
expect(payload.body.mappings[docType]).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -25,32 +25,32 @@ describe('Index interval', function () {
|
||||
});
|
||||
|
||||
it('should return the year', function () {
|
||||
var timestamp = indexTimestamp('year');
|
||||
const timestamp = indexTimestamp('year');
|
||||
expect(timestamp).to.equal('2016');
|
||||
});
|
||||
|
||||
it('should return the year and month', function () {
|
||||
var timestamp = indexTimestamp('month');
|
||||
const timestamp = indexTimestamp('month');
|
||||
expect(timestamp).to.equal('2016-04');
|
||||
});
|
||||
|
||||
it('should return the year, month, and first day of the week', function () {
|
||||
var timestamp = indexTimestamp('week');
|
||||
const timestamp = indexTimestamp('week');
|
||||
expect(timestamp).to.equal('2016-03-27');
|
||||
});
|
||||
|
||||
it('should return the year, month, and day of the week', function () {
|
||||
var timestamp = indexTimestamp('day');
|
||||
const timestamp = indexTimestamp('day');
|
||||
expect(timestamp).to.equal('2016-04-02');
|
||||
});
|
||||
|
||||
it('should return the year, month, day and hour', function () {
|
||||
var timestamp = indexTimestamp('hour');
|
||||
const timestamp = indexTimestamp('hour');
|
||||
expect(timestamp).to.equal('2016-04-02-01');
|
||||
});
|
||||
|
||||
it('should return the year, month, day, hour and minute', function () {
|
||||
var timestamp = indexTimestamp('minute');
|
||||
const timestamp = indexTimestamp('minute');
|
||||
expect(timestamp).to.equal('2016-04-02-01-02');
|
||||
});
|
||||
});
|
||||
|
||||
@@ -3,7 +3,7 @@ import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import { JOB_STATUS_PENDING } from '../../lib/helpers/constants';
|
||||
import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
|
||||
|
||||
const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
|
||||
const module = proxyquire.noPreserveCache()('../../lib/job', {
|
||||
@@ -73,12 +73,23 @@ describe('Job Class', function () {
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', type);
|
||||
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the job type', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('jobtype', type);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index timeout value from options', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
@@ -95,6 +106,16 @@ describe('Job Class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
it('should set an expired process_expiration time', function () {
|
||||
const now = new Date().getTime();
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('process_expiration');
|
||||
expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set attempt count', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
@@ -152,14 +173,17 @@ describe('Job Class', function () {
|
||||
|
||||
it('should return the job document', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
|
||||
return job.get()
|
||||
.then((doc) => {
|
||||
const jobDoc = job.document; // document should be resolved
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('type', type);
|
||||
expect(doc).to.have.property('type', jobDoc.type);
|
||||
expect(doc).to.have.property('id', jobDoc.id);
|
||||
expect(doc).to.have.property('version', jobDoc.version);
|
||||
|
||||
expect(doc).to.have.property('payload');
|
||||
expect(doc).to.have.property('jobtype');
|
||||
expect(doc).to.have.property('priority');
|
||||
expect(doc).to.have.property('timeout');
|
||||
});
|
||||
@@ -185,7 +209,8 @@ describe('Job Class', function () {
|
||||
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('type', type);
|
||||
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(doc).to.have.property('jobtype', type);
|
||||
expect(doc).to.have.property('timeout', options.timeout);
|
||||
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
||||
expect(doc).to.have.property('priority', options.priority);
|
||||
|
||||
@@ -1,10 +1,10 @@
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import { noop, random } from 'lodash';
|
||||
import { noop, random, get, find } from 'lodash';
|
||||
import Worker from '../../lib/worker';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import { JOB_STATUS_PROCESSING, JOB_STATUS_COMPLETED, JOB_STATUS_FAILED } from '../../lib/helpers/constants';
|
||||
import constants from '../../lib/helpers/constants';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
const defaults = {
|
||||
@@ -105,10 +105,13 @@ describe('Worker class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
describe('searching for jobs', function () {
|
||||
describe('polling for jobs', function () {
|
||||
let searchSpy;
|
||||
|
||||
beforeEach(() => {
|
||||
anchorMoment = moment(anchor);
|
||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||
searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -116,7 +119,6 @@ describe('Worker class', function () {
|
||||
});
|
||||
|
||||
it('should start polling for jobs after interval', function () {
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop);
|
||||
sinon.assert.notCalled(searchSpy);
|
||||
clock.tick(defaults.interval);
|
||||
@@ -125,30 +127,140 @@ describe('Worker class', function () {
|
||||
|
||||
it('should use interval option to control polling', function () {
|
||||
const interval = 567;
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop, { interval });
|
||||
sinon.assert.notCalled(searchSpy);
|
||||
clock.tick(interval);
|
||||
sinon.assert.calledOnce(searchSpy);
|
||||
});
|
||||
});
|
||||
|
||||
describe('query for pending jobs', function () {
|
||||
let worker;
|
||||
let searchStub;
|
||||
|
||||
function getSearchParams(jobtype = 'test', params = {}) {
|
||||
worker = new Worker(mockQueue, jobtype, noop, params);
|
||||
worker._getPendingJobs();
|
||||
return searchStub.firstCall.args[0];
|
||||
}
|
||||
|
||||
describe('error handling', function () {
|
||||
beforeEach(() => {
|
||||
});
|
||||
|
||||
it('should pass search errors', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then(() => done(new Error('should not resolve')))
|
||||
.catch(() => { done(); });
|
||||
});
|
||||
|
||||
it('should swollow index missing errors', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||
status: 404
|
||||
}));
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then(() => { done(); })
|
||||
.catch(() => done(new Error('should not reject')));
|
||||
});
|
||||
|
||||
it('should return an empty array on missing index', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||
status: 404
|
||||
}));
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then((res) => {
|
||||
try {
|
||||
expect(res).to.be.an(Array);
|
||||
expect(res).to.have.length(0);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
})
|
||||
.catch(() => done(new Error('should not reject')));
|
||||
});
|
||||
});
|
||||
|
||||
describe('query parameters', function () {
|
||||
beforeEach(() => {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||
});
|
||||
|
||||
it('should query with version', function () {
|
||||
const params = getSearchParams();
|
||||
expect(params).to.have.property('version', true);
|
||||
});
|
||||
|
||||
it('should query by default doctype', function () {
|
||||
const params = getSearchParams();
|
||||
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
|
||||
it('should query by custom doctype', function () {
|
||||
const doctype = 'custom_test';
|
||||
const params = getSearchParams('type', { doctype });
|
||||
expect(params).to.have.property('type', doctype);
|
||||
});
|
||||
});
|
||||
|
||||
describe('query body', function () {
|
||||
const conditionPath = 'query.constant_score.filter.bool';
|
||||
const jobtype = 'test_jobtype';
|
||||
|
||||
beforeEach(() => {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||
anchorMoment = moment(anchor);
|
||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
it('should search by job type', function () {
|
||||
const { body } = getSearchParams(jobtype);
|
||||
const conditions = get(body, conditionPath);
|
||||
expect(conditions).to.have.property('must');
|
||||
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
|
||||
});
|
||||
|
||||
it('should search for pending or expired jobs', function () {
|
||||
const { body } = getSearchParams(jobtype);
|
||||
const conditions = get(body, conditionPath);
|
||||
expect(conditions).to.have.property('should');
|
||||
|
||||
// this works because we are stopping the clock, so all times match
|
||||
const nowTime = moment().toISOString();
|
||||
const pending = { term: { status: 'pending'} };
|
||||
const expired = { bool: { must: [
|
||||
{ term: { status: 'processing' } },
|
||||
{ range: { process_expiration: { lte: nowTime } } }
|
||||
] } };
|
||||
|
||||
const pendingMatch = find(conditions.should, pending);
|
||||
expect(pendingMatch).to.not.be(undefined);
|
||||
|
||||
const expiredMatch = find(conditions.should, expired);
|
||||
expect(expiredMatch).to.not.be(undefined);
|
||||
});
|
||||
|
||||
it('should use default size', function () {
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop);
|
||||
clock.tick(defaults.interval);
|
||||
const body = searchSpy.firstCall.args[0].body;
|
||||
const { body } = getSearchParams(jobtype);
|
||||
expect(body).to.have.property('size', defaults.size);
|
||||
});
|
||||
|
||||
it('should observe the size option', function () {
|
||||
const size = 25;
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop, { size });
|
||||
clock.tick(defaults.interval);
|
||||
const body = searchSpy.firstCall.args[0].body;
|
||||
const { body } = getSearchParams(jobtype, { size });
|
||||
expect(body).to.have.property('size', size);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
describe('claiming a job', function () {
|
||||
let params;
|
||||
@@ -193,7 +305,7 @@ describe('Worker class', function () {
|
||||
it('should update the job status', function () {
|
||||
worker._claimJob(job);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_PROCESSING);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING);
|
||||
});
|
||||
|
||||
it('should set job expiration time', function () {
|
||||
@@ -267,7 +379,7 @@ describe('Worker class', function () {
|
||||
it('should set status to failed', function () {
|
||||
worker._failJob(job);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
|
||||
});
|
||||
|
||||
it('should append error message if supplied', function () {
|
||||
@@ -292,7 +404,7 @@ describe('Worker class', function () {
|
||||
worker._failJob(job, msg);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('output');
|
||||
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('completed_at');
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
@@ -357,7 +469,7 @@ describe('Worker class', function () {
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(updateSpy);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_COMPLETED);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED);
|
||||
expect(doc).to.have.property('completed_at');
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
|
||||
Reference in New Issue
Block a user