19 Commits

Author SHA1 Message Date
e4e8e9222c 0.3.0 2016-05-12 17:02:07 -07:00
02a530c4c7 Merge branch 'develop' 2016-05-12 16:59:48 -07:00
959f58ad8b travis should only build the master branch 2016-05-12 16:59:14 -07:00
e9c3f5553d try using node 5 and 6 again
with an additional npm install call, because npm3
2016-05-12 16:11:10 -07:00
234b829adf add more tests around job query 2016-05-12 15:09:53 -07:00
df9508808b use jobtype in worker, update query
also simplify the query a bit and use constant_score
2016-05-12 14:08:43 -07:00
609e81fdef distiguish the doctype from the jobtype
update tests and mock elasticsearch client
2016-05-12 12:20:49 -07:00
3375335d24 add type field, switch to string types
should allow elasticsearch 2.x use
2016-05-12 11:49:54 -07:00
0020050f3f use contants for defaults, use a common doctype
add tests, update readme
2016-05-12 11:48:54 -07:00
fa784393e5 add test for process_expiration value 2016-05-11 11:04:23 -07:00
a4323433f2 remove use of var 2016-05-11 10:58:21 -07:00
7d08b98b15 Merge branch 'develop' 2016-05-10 17:31:27 -07:00
4793027ff3 0.2.2 2016-05-10 17:31:09 -07:00
38532a6296 fix scoping issue, add debugging on worker register 2016-05-10 17:28:27 -07:00
aa5ea72e3b swollow errors saving job output, include error in debugging output 2016-05-10 17:24:05 -07:00
d1e5d68f74 swollow errors saving job output, include error in debugging output 2016-05-10 17:23:07 -07:00
e077442340 add debugging on job timeout 2016-05-10 16:57:40 -07:00
82506a74e8 set process_expiration by default
without this, the job query fails with field='process_expiration' is unrecognized
2016-05-10 16:01:24 -07:00
cae02cb0f8 add description and keywords 2016-05-10 14:14:19 -07:00
13 changed files with 236 additions and 99 deletions

View File

@@ -1,7 +1,18 @@
language: node_js
node_js:
- "stable"
- "5"
- "4"
- "4.3"
notifications:
email: false
before_script: npm install
after_success: npm run coverage
branches:
only:
- master

View File

@@ -1,7 +1,7 @@
{
"name": "esqueue",
"version": "0.2.1",
"description": "",
"version": "0.3.0",
"description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js",
"scripts": {
"build": "rm -rf lib && babel src --out-dir lib",
@@ -11,6 +11,12 @@
"unit": "nyc --require babel-core/register mocha test/src/**"
},
"author": "Joe Fleming (https://github.com/w33ble)",
"keywords": [
"job",
"queue",
"worker",
"elasticsearch"
],
"repository": {
"type": "git",
"url": "https://github.com/w33ble/esqueue.git"

View File

@@ -33,6 +33,7 @@ Option | Default | Description
------ | ----------- | -------
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
doctype | `esqueue` | The doctype to use in Elasticsearch
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.

View File

@@ -6,4 +6,10 @@ export const jobStatuses = {
JOB_STATUS_CANCELLED: 'cancelled',
};
export default Object.assign({}, jobStatuses);
export const defaultSettings = {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
};
export default Object.assign({}, jobStatuses, defaultSettings);

View File

@@ -1,4 +1,7 @@
var schema = {
import { defaultSettings } from './constants';
const schema = {
jobtype: { type: 'string', index: 'not_analyzed' },
payload: { type: 'object', enabled: false },
priority: { type: 'short' },
timeout: { type: 'long' },
@@ -9,7 +12,7 @@ var schema = {
completed_at: { type: 'date' },
attempts: { type: 'short' },
max_attempts: { type: 'short' },
status: { type: 'keyword' },
status: { type: 'string', index: 'not_analyzed' },
output: {
type: 'object',
properties: {
@@ -19,14 +22,9 @@ var schema = {
}
};
export default function createIndex(client, indexName) {
const indexBody = {
mappings: {
_default_: {
properties: schema
}
}
};
export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) {
const indexBody = { mappings : {} };
indexBody.mappings[doctype] = { properties: schema };
return client.indices.exists({
index: indexName,

View File

@@ -2,6 +2,7 @@ import events from 'events';
import createClient from './helpers/es_client';
import indexTimestamp from './helpers/index_timestamp';
import logger from './helpers/logger';
import { defaultSettings } from './helpers/constants';
import Job from './job.js';
import Worker from './worker.js';
import omit from 'lodash.omit';
@@ -15,8 +16,9 @@ export default class Esqueue extends events.EventEmitter {
super();
this.index = index;
this.settings = Object.assign({
interval: 'week',
timeout: 10000,
interval: defaultSettings.DEFAULT_SETTING_INTERVAL,
timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT,
doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE,
}, omit(options, [ 'client' ]));
this.client = createClient(options.client || {});
@@ -25,7 +27,7 @@ export default class Esqueue extends events.EventEmitter {
}
_initTasks() {
var initTasks = [
const initTasks = [
this.client.ping({ timeout: 3000 }),
];
@@ -38,10 +40,13 @@ export default class Esqueue extends events.EventEmitter {
addJob(type, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval);
const index = `${this.index}-${timestamp}`;
const defaults = {
timeout: this.settings.timeout,
};
const options = Object.assign({
timeout: this.settings.timeout
}, opts);
const options = Object.assign(defaults, opts, {
doctype: this.settings.doctype
});
return new Job(this.client, index, type, payload, options);
}

View File

@@ -2,7 +2,7 @@ import events from 'events';
import isPlainObject from 'lodash.isplainobject';
import Puid from 'puid';
import logger from './helpers/logger';
import { jobStatuses } from './helpers/constants';
import contstants from './helpers/constants';
import createIndex from './helpers/create_index';
const debug = logger('esqueue:job');
@@ -18,28 +18,31 @@ export default class Job extends events.EventEmitter {
this.client = client;
this.id = puid.generate();
this.index = index;
this.type = type;
this.jobtype = type;
this.payload = payload;
this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this.ready = createIndex(client, index)
this.ready = createIndex(client, index, this.doctype)
.then(() => {
return this.client.index({
index: this.index,
type: this.type,
type: this.doctype,
id: this.id,
body: {
jobtype: this.jobtype,
payload: this.payload,
priority: this.priority,
timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(),
attempts: 0,
max_attempts: this.maxAttempts,
status: jobStatuses.JOB_STATUS_PENDING,
status: contstants.JOB_STATUS_PENDING,
}
})
.then((doc) => {
@@ -64,7 +67,7 @@ export default class Job extends events.EventEmitter {
.then(() => {
return this.client.get({
index: this.index,
type: this.type,
type: this.doctype,
id: this.id
});
})
@@ -82,7 +85,8 @@ export default class Job extends events.EventEmitter {
return Object.assign({
id: this.id,
index: this.index,
type: this.type,
type: this.doctype,
jobtype: this.jobtype,
payload: this.payload,
timeout: this.timeout,
max_attempts: this.maxAttempts,

View File

@@ -2,7 +2,7 @@ import events from 'events';
import Puid from 'puid';
import moment from 'moment';
import logger from './helpers/logger';
import { jobStatuses } from './helpers/constants';
import constants from './helpers/constants';
import { WorkerTimeoutError } from './helpers/errors';
const puid = new Puid();
@@ -18,14 +18,16 @@ export default class Job extends events.EventEmitter {
this.id = puid.generate();
this.queue = queue;
this.client = this.queue.client;
this.type = type;
this.jobtype = type;
this.workerFn = workerFn;
this.checkInterval = opts.interval || 1500;
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this._checker = false;
this.debug(`Created worker for type ${this.type}`);
this._startJobPolling();
}
@@ -49,7 +51,7 @@ export default class Job extends events.EventEmitter {
attempts: attempts,
started_at: startTime,
process_expiration: expirationTime,
status: jobStatuses.JOB_STATUS_PROCESSING,
status: constants.JOB_STATUS_PROCESSING,
};
return this.client.update({
@@ -75,7 +77,7 @@ export default class Job extends events.EventEmitter {
const completedTime = moment().toISOString();
const doc = {
status: jobStatuses.JOB_STATUS_FAILED,
status: constants.JOB_STATUS_FAILED,
completed_at: completedTime,
};
@@ -118,7 +120,8 @@ export default class Job extends events.EventEmitter {
const workerOutput = new Promise((resolve, reject) => {
resolve(this.workerFn.call(null, job._source.payload));
setTimeout(function () {
setTimeout(() => {
this.debug(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError({
timeout: job._source.timeout,
jobId: job._id,
@@ -134,7 +137,7 @@ export default class Job extends events.EventEmitter {
const docOutput = this._formatOutput(output);
const doc = {
status: jobStatuses.JOB_STATUS_COMPLETED,
status: constants.JOB_STATUS_COMPLETED,
completed_at: completedTime,
output: docOutput
};
@@ -148,7 +151,8 @@ export default class Job extends events.EventEmitter {
})
.catch((err) => {
if (err.statusCode === 409) return false;
throw err;
this.debug(`Failure saving job output ${job._id}`, err);
this.emit('job_error', err);
});
}, (jobErr) => {
// job execution failed
@@ -158,7 +162,7 @@ export default class Job extends events.EventEmitter {
return;
}
this.debug(`Failure occurred on job ${job._id}`);
this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.emit('job_error', jobErr);
return this._failJob(job, jobErr.toString());
});
@@ -223,20 +227,23 @@ export default class Job extends events.EventEmitter {
_getPendingJobs() {
const nowTime = moment().toISOString();
const dateFilter = {
range: {
process_expiration: {
lte: nowTime
}
}
};
const query = {
query: {
bool: {
should: [
{ bool: { must: [{ term: { status: 'pending'} }] }},
{ bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } }
]
constant_score: {
filter: {
bool: {
must: { term: { jobtype: this.jobtype } },
should: [
{ term: { status: 'pending'} },
{ bool:
{ must: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] }
}
]
}
}
}
},
sort: [
@@ -250,7 +257,7 @@ export default class Job extends events.EventEmitter {
return this.client.search({
index: `${this.queue.index}-*`,
type: this.type,
type: this.doctype,
version: true,
body: query
})

View File

@@ -1,5 +1,6 @@
import { uniqueId, times, random } from 'lodash';
import elasticsearch from 'elasticsearch';
import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
function Client() {
this.indices = {
@@ -11,10 +12,10 @@ function Client() {
}
Client.prototype.index = function (params = {}) {
var shardCount = 2;
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || 'type',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_version: 1,
_shards: { total: shardCount, successful: shardCount, failed: 0 },
@@ -30,6 +31,7 @@ Client.prototype.get = function (params = {}, source = {}) {
if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound;
const _source = Object.assign({
jobtype: 'jobtype',
payload: {
id: 'sample-job-1',
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
@@ -44,7 +46,7 @@ Client.prototype.get = function (params = {}, source = {}) {
return {
_index: params.index || 'index',
_type: params.type || 'type',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_version: params.version || 1,
found: true,
@@ -56,7 +58,7 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
const hits = times(count, () => {
return {
_index: params.index || 'index',
_type: params.type || 'type',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: uniqueId('documentId'),
_version: random(1, 5),
_score: null,
@@ -83,10 +85,10 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
};
Client.prototype.update = function (params = {}) {
var shardCount = 2;
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || 'type',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_version: params.version + 1 || 2,
_shards: { total: shardCount, successful: shardCount, failed: 0 },

View File

@@ -2,6 +2,7 @@ import expect from 'expect.js';
import sinon from 'sinon';
import createIndex from '../../../lib/helpers/create_index';
import elasticsearchMock from '../../fixtures/elasticsearch';
import { defaultSettings } from '../../../lib/helpers/constants';
describe('Create Index', function () {
let client;
@@ -23,8 +24,9 @@ describe('Create Index', function () {
});
});
it('should create the default mappings', function () {
it('should create the type mappings', function () {
const indexName = 'test-index';
const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE;
const result = createIndex(client, indexName);
return result
@@ -33,8 +35,24 @@ describe('Create Index', function () {
sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('body');
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property('_default_');
expect(payload.body.mappings._default_).to.have.property('properties');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
});
});
it('should accept a custom doctype', function () {
const indexName = 'test-index';
const docType = 'my_type';
const result = createIndex(client, indexName, docType);
return result
.then(function () {
const payload = createSpy.getCall(0).args[0];
sinon.assert.callCount(createSpy, 1);
expect(payload).to.have.property('body');
expect(payload.body).to.have.property('mappings');
expect(payload.body.mappings).to.have.property(docType);
expect(payload.body.mappings[docType]).to.have.property('properties');
});
});
});

View File

@@ -25,32 +25,32 @@ describe('Index interval', function () {
});
it('should return the year', function () {
var timestamp = indexTimestamp('year');
const timestamp = indexTimestamp('year');
expect(timestamp).to.equal('2016');
});
it('should return the year and month', function () {
var timestamp = indexTimestamp('month');
const timestamp = indexTimestamp('month');
expect(timestamp).to.equal('2016-04');
});
it('should return the year, month, and first day of the week', function () {
var timestamp = indexTimestamp('week');
const timestamp = indexTimestamp('week');
expect(timestamp).to.equal('2016-03-27');
});
it('should return the year, month, and day of the week', function () {
var timestamp = indexTimestamp('day');
const timestamp = indexTimestamp('day');
expect(timestamp).to.equal('2016-04-02');
});
it('should return the year, month, day and hour', function () {
var timestamp = indexTimestamp('hour');
const timestamp = indexTimestamp('hour');
expect(timestamp).to.equal('2016-04-02-01');
});
it('should return the year, month, day, hour and minute', function () {
var timestamp = indexTimestamp('minute');
const timestamp = indexTimestamp('minute');
expect(timestamp).to.equal('2016-04-02-01-02');
});
});

View File

@@ -3,7 +3,7 @@ import expect from 'expect.js';
import sinon from 'sinon';
import proxyquire from 'proxyquire';
import elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PENDING } from '../../lib/helpers/constants';
import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
const module = proxyquire.noPreserveCache()('../../lib/job', {
@@ -73,12 +73,23 @@ describe('Job Class', function () {
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', type);
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload);
});
});
it('should index the job type', function () {
const job = new Job(client, index, type, payload);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('jobtype', type);
});
});
it('should index timeout value from options', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
@@ -95,6 +106,16 @@ describe('Job Class', function () {
});
});
it('should set an expired process_expiration time', function () {
const now = new Date().getTime();
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('process_expiration');
expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now);
});
});
it('should set attempt count', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
@@ -152,14 +173,17 @@ describe('Job Class', function () {
it('should return the job document', function () {
const job = new Job(client, index, type, payload);
return job.get()
.then((doc) => {
const jobDoc = job.document; // document should be resolved
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', type);
expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('version', jobDoc.version);
expect(doc).to.have.property('payload');
expect(doc).to.have.property('jobtype');
expect(doc).to.have.property('priority');
expect(doc).to.have.property('timeout');
});
@@ -185,7 +209,8 @@ describe('Job Class', function () {
const doc = job.toJSON();
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', type);
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('timeout', options.timeout);
expect(doc).to.have.property('max_attempts', options.max_attempts);
expect(doc).to.have.property('priority', options.priority);

View File

@@ -1,10 +1,10 @@
import expect from 'expect.js';
import sinon from 'sinon';
import moment from 'moment';
import { noop, random } from 'lodash';
import { noop, random, get, find } from 'lodash';
import Worker from '../../lib/worker';
import elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PROCESSING, JOB_STATUS_COMPLETED, JOB_STATUS_FAILED } from '../../lib/helpers/constants';
import constants from '../../lib/helpers/constants';
const anchor = '2016-04-02T01:02:03.456'; // saturday
const defaults = {
@@ -106,48 +106,102 @@ describe('Worker class', function () {
});
describe('searching for jobs', function () {
let searchSpy;
function getSearchParams(jobtype, params = {}) {
new Worker(mockQueue, jobtype, noop, params);
clock.tick(defaults.interval);
return searchSpy.firstCall.args[0];
}
beforeEach(() => {
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
searchSpy = sinon.spy(mockQueue.client, 'search');
});
afterEach(() => {
clock.restore();
});
it('should start polling for jobs after interval', function () {
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledOnce(searchSpy);
describe('polling interval', function () {
it('should start polling for jobs after interval', function () {
new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledOnce(searchSpy);
});
it('should use interval option to control polling', function () {
const interval = 567;
new Worker(mockQueue, 'test', noop, { interval });
sinon.assert.notCalled(searchSpy);
clock.tick(interval);
sinon.assert.calledOnce(searchSpy);
});
});
it('should use interval option to control polling', function () {
const interval = 567;
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop, { interval });
sinon.assert.notCalled(searchSpy);
clock.tick(interval);
sinon.assert.calledOnce(searchSpy);
describe('query parameters', function () {
it('should query with version', function () {
const params = getSearchParams('test');
expect(params).to.have.property('version', true);
});
it('should query by default doctype', function () {
const params = getSearchParams('test');
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('test', { doctype });
expect(params).to.have.property('type', doctype);
});
});
it('should use default size', function () {
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop);
clock.tick(defaults.interval);
const body = searchSpy.firstCall.args[0].body;
expect(body).to.have.property('size', defaults.size);
describe('query body', function () {
const conditionPath = 'query.constant_score.filter.bool';
const jobtype = 'test_jobtype';
it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions).to.have.property('must');
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
});
it('should search for pending or expired jobs', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions).to.have.property('should');
// this works because we are stopping the clock, so all times match
const nowTime = moment().toISOString();
const pending = { term: { status: 'pending'} };
const expired = { bool: { must: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] } };
const pendingMatch = find(conditions.should, pending);
expect(pendingMatch).to.not.be(undefined);
const expiredMatch = find(conditions.should, expired);
expect(expiredMatch).to.not.be(undefined);
});
it('should use default size', function () {
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('size', defaults.size);
});
it('should observe the size option', function () {
const size = 25;
const { body } = getSearchParams(jobtype, { size });
expect(body).to.have.property('size', size);
});
});
it('should observe the size option', function () {
const size = 25;
const searchSpy = sinon.spy(mockQueue.client, 'search');
new Worker(mockQueue, 'test', noop, { size });
clock.tick(defaults.interval);
const body = searchSpy.firstCall.args[0].body;
expect(body).to.have.property('size', size);
});
});
describe('claiming a job', function () {
@@ -193,7 +247,7 @@ describe('Worker class', function () {
it('should update the job status', function () {
worker._claimJob(job);
const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_PROCESSING);
expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING);
});
it('should set job expiration time', function () {
@@ -267,7 +321,7 @@ describe('Worker class', function () {
it('should set status to failed', function () {
worker._failJob(job);
const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
});
it('should append error message if supplied', function () {
@@ -292,7 +346,7 @@ describe('Worker class', function () {
worker._failJob(job, msg);
const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('output');
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
expect(doc).to.have.property('completed_at');
const completedTimestamp = moment(doc.completed_at).valueOf();
expect(completedTimestamp).to.be.greaterThan(startTime);
@@ -357,7 +411,7 @@ describe('Worker class', function () {
.then(() => {
sinon.assert.calledOnce(updateSpy);
const doc = updateSpy.firstCall.args[0].body.doc;
expect(doc).to.have.property('status', JOB_STATUS_COMPLETED);
expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED);
expect(doc).to.have.property('completed_at');
const completedTimestamp = moment(doc.completed_at).valueOf();
expect(completedTimestamp).to.be.greaterThan(startTime);