Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1fe139e1b0 | |||
| 59d30bbb0f | |||
| 6ce18a5477 | |||
| 89c08068dd | |||
| 5f2e2b09cf | |||
| fb2f51b11d | |||
| 8ac9a81fdb | |||
| 4472e725fa | |||
| be1eb81059 | |||
| 8d21dc6967 | |||
| 31159baae9 | |||
| 49965bbaf1 | |||
| 49b982db99 | |||
| 9aa8eed297 | |||
| 0bf6fb0023 | |||
| 300449bfb0 | |||
| 868c808db7 | |||
| ef61a33a38 | |||
| dae14e0edc | |||
| c51ea64bdd | |||
| 5d37399fbf |
@@ -1 +1 @@
|
|||||||
4.3.2
|
4.4.4
|
||||||
|
|||||||
@@ -15,4 +15,5 @@ after_success: npm run coverage
|
|||||||
|
|
||||||
branches:
|
branches:
|
||||||
only:
|
only:
|
||||||
- master
|
- master
|
||||||
|
- /^v[0-9].*$/
|
||||||
|
|||||||
45
CHANGELOG.md
Normal file
45
CHANGELOG.md
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
|
||||||
|
|
||||||
|
## v0.4.1
|
||||||
|
|
||||||
|
- Use `filter` instead of `must` to query for outstanding jobs
|
||||||
|
|
||||||
|
## v0.4.0
|
||||||
|
|
||||||
|
- [BREAKING] Change `priority` mapping to *byte*
|
||||||
|
- Exclude `output.content` from _source when query jobs
|
||||||
|
- Add optional `created_by` value to job documents
|
||||||
|
|
||||||
|
## v0.3.2
|
||||||
|
|
||||||
|
- Misisng indiced returns empty array (fixed errors in v0.3.1)
|
||||||
|
|
||||||
|
## v0.3.1
|
||||||
|
|
||||||
|
- Ignore missing indices when looking for jobs
|
||||||
|
|
||||||
|
## v0.3.0
|
||||||
|
|
||||||
|
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
|
||||||
|
|
||||||
|
## v0.2.2
|
||||||
|
|
||||||
|
- Swollow errors when saving job output
|
||||||
|
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
|
||||||
|
- Update npm package
|
||||||
|
|
||||||
|
## v0.2.1
|
||||||
|
|
||||||
|
- Use `esqueue` namespace for debugging
|
||||||
|
|
||||||
|
## v0.2.0
|
||||||
|
|
||||||
|
- [BREAKING] Async jobs should return promises, not use callbacks
|
||||||
|
- Remove bluebird dependency
|
||||||
|
- Only require specific lodash modules, instead of the whole library
|
||||||
|
|
||||||
|
## v0.1.0
|
||||||
|
|
||||||
|
- Initial release
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "esqueue",
|
"name": "esqueue",
|
||||||
"version": "0.3.0",
|
"version": "0.4.1",
|
||||||
"description": "Job queue, powered by Elasticsearch",
|
"description": "Job queue, powered by Elasticsearch",
|
||||||
"main": "lib/index.js",
|
"main": "lib/index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ Option | Default | Description
|
|||||||
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
|
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
|
||||||
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
||||||
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
||||||
|
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
|
||||||
|
|
||||||
### Creating a worker
|
### Creating a worker
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import { defaultSettings } from './constants';
|
|||||||
const schema = {
|
const schema = {
|
||||||
jobtype: { type: 'string', index: 'not_analyzed' },
|
jobtype: { type: 'string', index: 'not_analyzed' },
|
||||||
payload: { type: 'object', enabled: false },
|
payload: { type: 'object', enabled: false },
|
||||||
priority: { type: 'short' },
|
priority: { type: 'byte' },
|
||||||
timeout: { type: 'long' },
|
timeout: { type: 'long' },
|
||||||
process_expiration: { type: 'date' },
|
process_expiration: { type: 'date' },
|
||||||
created_by: { type: 'string', index: 'not_analyzed' },
|
created_by: { type: 'string', index: 'not_analyzed' },
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
this.index = index;
|
this.index = index;
|
||||||
this.jobtype = type;
|
this.jobtype = type;
|
||||||
this.payload = payload;
|
this.payload = payload;
|
||||||
|
this.created_by = options.created_by || null;
|
||||||
this.timeout = options.timeout || 10000;
|
this.timeout = options.timeout || 10000;
|
||||||
this.maxAttempts = options.max_attempts || 3;
|
this.maxAttempts = options.max_attempts || 3;
|
||||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||||
@@ -37,6 +38,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
jobtype: this.jobtype,
|
jobtype: this.jobtype,
|
||||||
payload: this.payload,
|
payload: this.payload,
|
||||||
priority: this.priority,
|
priority: this.priority,
|
||||||
|
created_by: this.created_by,
|
||||||
timeout: this.timeout,
|
timeout: this.timeout,
|
||||||
process_expiration: new Date(0), // use epoch so the job query works
|
process_expiration: new Date(0), // use epoch so the job query works
|
||||||
created_at: new Date(),
|
created_at: new Date(),
|
||||||
@@ -87,6 +89,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
index: this.index,
|
index: this.index,
|
||||||
type: this.doctype,
|
type: this.doctype,
|
||||||
jobtype: this.jobtype,
|
jobtype: this.jobtype,
|
||||||
|
created_by: this.created_by,
|
||||||
payload: this.payload,
|
payload: this.payload,
|
||||||
timeout: this.timeout,
|
timeout: this.timeout,
|
||||||
max_attempts: this.maxAttempts,
|
max_attempts: this.maxAttempts,
|
||||||
|
|||||||
@@ -180,7 +180,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_claimPendingJobs(jobs) {
|
_claimPendingJobs(jobs) {
|
||||||
if (jobs.length === 0) return;
|
if (!jobs || jobs.length === 0) return;
|
||||||
|
|
||||||
this._stopJobPolling();
|
this._stopJobPolling();
|
||||||
let claimed = false;
|
let claimed = false;
|
||||||
@@ -228,15 +228,18 @@ export default class Job extends events.EventEmitter {
|
|||||||
_getPendingJobs() {
|
_getPendingJobs() {
|
||||||
const nowTime = moment().toISOString();
|
const nowTime = moment().toISOString();
|
||||||
const query = {
|
const query = {
|
||||||
|
_source : {
|
||||||
|
exclude: [ 'output.content' ]
|
||||||
|
},
|
||||||
query: {
|
query: {
|
||||||
constant_score: {
|
constant_score: {
|
||||||
filter: {
|
filter: {
|
||||||
bool: {
|
bool: {
|
||||||
must: { term: { jobtype: this.jobtype } },
|
filter: { term: { jobtype: this.jobtype } },
|
||||||
should: [
|
should: [
|
||||||
{ term: { status: 'pending'} },
|
{ term: { status: 'pending'} },
|
||||||
{ bool:
|
{ bool:
|
||||||
{ must: [
|
{ filter: [
|
||||||
{ term: { status: 'processing' } },
|
{ term: { status: 'processing' } },
|
||||||
{ range: { process_expiration: { lte: nowTime } } }
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
] }
|
] }
|
||||||
@@ -267,6 +270,9 @@ export default class Job extends events.EventEmitter {
|
|||||||
return jobs;
|
return jobs;
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
|
// ignore missing indices errors
|
||||||
|
if (err.status === 404) return [];
|
||||||
|
|
||||||
this.debug('job querying failed', err);
|
this.debug('job querying failed', err);
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
this.queue.emit('worker_error', {
|
this.queue.emit('worker_error', {
|
||||||
|
|||||||
5
test/fixtures/elasticsearch.js
vendored
5
test/fixtures/elasticsearch.js
vendored
@@ -32,6 +32,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
|||||||
|
|
||||||
const _source = Object.assign({
|
const _source = Object.assign({
|
||||||
jobtype: 'jobtype',
|
jobtype: 'jobtype',
|
||||||
|
created_by: null,
|
||||||
payload: {
|
payload: {
|
||||||
id: 'sample-job-1',
|
id: 'sample-job-1',
|
||||||
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
||||||
@@ -44,14 +45,14 @@ Client.prototype.get = function (params = {}, source = {}) {
|
|||||||
status: 'pending'
|
status: 'pending'
|
||||||
}, source);
|
}, source);
|
||||||
|
|
||||||
return {
|
return Promise.resolve({
|
||||||
_index: params.index || 'index',
|
_index: params.index || 'index',
|
||||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||||
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
||||||
_version: params.version || 1,
|
_version: params.version || 1,
|
||||||
found: true,
|
found: true,
|
||||||
_source: _source
|
_source: _source
|
||||||
};
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ const Job = module;
|
|||||||
const maxPriority = 20;
|
const maxPriority = 20;
|
||||||
const minPriority = -20;
|
const minPriority = -20;
|
||||||
const defaultPriority = 10;
|
const defaultPriority = 10;
|
||||||
|
const defaultCreatedBy = null;
|
||||||
|
|
||||||
describe('Job Class', function () {
|
describe('Job Class', function () {
|
||||||
let client;
|
let client;
|
||||||
@@ -90,6 +91,24 @@ describe('Job Class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it('should index the created_by value', function () {
|
||||||
|
const createdBy = 'user_identifier';
|
||||||
|
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
|
||||||
|
return job.ready.then(() => {
|
||||||
|
const newDoc = validateDoc(client.index);
|
||||||
|
expect(newDoc.body).to.have.property('created_by', createdBy);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should index default created_by value', function () {
|
||||||
|
const job = new Job(client, index, type, payload, options);
|
||||||
|
return job.ready.then(() => {
|
||||||
|
const newDoc = validateDoc(client.index);
|
||||||
|
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should index timeout value from options', function () {
|
it('should index timeout value from options', function () {
|
||||||
const job = new Job(client, index, type, payload, options);
|
const job = new Job(client, index, type, payload, options);
|
||||||
return job.ready.then(() => {
|
return job.ready.then(() => {
|
||||||
@@ -181,6 +200,7 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('type', jobDoc.type);
|
expect(doc).to.have.property('type', jobDoc.type);
|
||||||
expect(doc).to.have.property('id', jobDoc.id);
|
expect(doc).to.have.property('id', jobDoc.id);
|
||||||
expect(doc).to.have.property('version', jobDoc.version);
|
expect(doc).to.have.property('version', jobDoc.version);
|
||||||
|
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||||
|
|
||||||
expect(doc).to.have.property('payload');
|
expect(doc).to.have.property('payload');
|
||||||
expect(doc).to.have.property('jobtype');
|
expect(doc).to.have.property('jobtype');
|
||||||
@@ -188,6 +208,24 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('timeout');
|
expect(doc).to.have.property('timeout');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should contain optional data', function () {
|
||||||
|
const optionals = {
|
||||||
|
created_by: 'some_ident'
|
||||||
|
};
|
||||||
|
|
||||||
|
const job = new Job(client, index, type, payload, optionals);
|
||||||
|
return Promise.resolve(client.get({}, optionals))
|
||||||
|
.then((doc) => {
|
||||||
|
sinon.stub(client, 'get').returns(Promise.resolve(doc));
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
return job.get()
|
||||||
|
.then((doc) => {
|
||||||
|
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('toJSON method', function () {
|
describe('toJSON method', function () {
|
||||||
@@ -211,12 +249,23 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('index', index);
|
expect(doc).to.have.property('index', index);
|
||||||
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||||
expect(doc).to.have.property('jobtype', type);
|
expect(doc).to.have.property('jobtype', type);
|
||||||
|
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||||
expect(doc).to.have.property('timeout', options.timeout);
|
expect(doc).to.have.property('timeout', options.timeout);
|
||||||
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
||||||
expect(doc).to.have.property('priority', options.priority);
|
expect(doc).to.have.property('priority', options.priority);
|
||||||
expect(doc).to.have.property('id');
|
expect(doc).to.have.property('id');
|
||||||
expect(doc).to.not.have.property('version');
|
expect(doc).to.not.have.property('version');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should contain optional data', function () {
|
||||||
|
const optionals = {
|
||||||
|
created_by: 'some_ident'
|
||||||
|
};
|
||||||
|
|
||||||
|
const job = new Job(client, index, type, payload, optionals);
|
||||||
|
const doc = job.toJSON();
|
||||||
|
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -105,15 +105,9 @@ describe('Worker class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('searching for jobs', function () {
|
describe('polling for jobs', function () {
|
||||||
let searchSpy;
|
let searchSpy;
|
||||||
|
|
||||||
function getSearchParams(jobtype, params = {}) {
|
|
||||||
new Worker(mockQueue, jobtype, noop, params);
|
|
||||||
clock.tick(defaults.interval);
|
|
||||||
return searchSpy.firstCall.args[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
anchorMoment = moment(anchor);
|
anchorMoment = moment(anchor);
|
||||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
@@ -124,37 +118,91 @@ describe('Worker class', function () {
|
|||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('polling interval', function () {
|
it('should start polling for jobs after interval', function () {
|
||||||
it('should start polling for jobs after interval', function () {
|
new Worker(mockQueue, 'test', noop);
|
||||||
new Worker(mockQueue, 'test', noop);
|
sinon.assert.notCalled(searchSpy);
|
||||||
sinon.assert.notCalled(searchSpy);
|
clock.tick(defaults.interval);
|
||||||
clock.tick(defaults.interval);
|
sinon.assert.calledOnce(searchSpy);
|
||||||
sinon.assert.calledOnce(searchSpy);
|
});
|
||||||
|
|
||||||
|
it('should use interval option to control polling', function () {
|
||||||
|
const interval = 567;
|
||||||
|
new Worker(mockQueue, 'test', noop, { interval });
|
||||||
|
sinon.assert.notCalled(searchSpy);
|
||||||
|
clock.tick(interval);
|
||||||
|
sinon.assert.calledOnce(searchSpy);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('query for pending jobs', function () {
|
||||||
|
let worker;
|
||||||
|
let searchStub;
|
||||||
|
|
||||||
|
function getSearchParams(jobtype = 'test', params = {}) {
|
||||||
|
worker = new Worker(mockQueue, jobtype, noop, params);
|
||||||
|
worker._getPendingJobs();
|
||||||
|
return searchStub.firstCall.args[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('error handling', function () {
|
||||||
|
beforeEach(() => {
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should use interval option to control polling', function () {
|
it('should pass search errors', function (done) {
|
||||||
const interval = 567;
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
|
||||||
new Worker(mockQueue, 'test', noop, { interval });
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
sinon.assert.notCalled(searchSpy);
|
worker._getPendingJobs()
|
||||||
clock.tick(interval);
|
.then(() => done(new Error('should not resolve')))
|
||||||
sinon.assert.calledOnce(searchSpy);
|
.catch(() => { done(); });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should swollow index missing errors', function (done) {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||||
|
status: 404
|
||||||
|
}));
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
worker._getPendingJobs()
|
||||||
|
.then(() => { done(); })
|
||||||
|
.catch(() => done(new Error('should not reject')));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return an empty array on missing index', function (done) {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||||
|
status: 404
|
||||||
|
}));
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
worker._getPendingJobs()
|
||||||
|
.then((res) => {
|
||||||
|
try {
|
||||||
|
expect(res).to.be.an(Array);
|
||||||
|
expect(res).to.have.length(0);
|
||||||
|
done();
|
||||||
|
} catch (e) {
|
||||||
|
done(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch(() => done(new Error('should not reject')));
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('query parameters', function () {
|
describe('query parameters', function () {
|
||||||
|
beforeEach(() => {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||||
|
});
|
||||||
|
|
||||||
it('should query with version', function () {
|
it('should query with version', function () {
|
||||||
const params = getSearchParams('test');
|
const params = getSearchParams();
|
||||||
expect(params).to.have.property('version', true);
|
expect(params).to.have.property('version', true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should query by default doctype', function () {
|
it('should query by default doctype', function () {
|
||||||
const params = getSearchParams('test');
|
const params = getSearchParams();
|
||||||
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should query by custom doctype', function () {
|
it('should query by custom doctype', function () {
|
||||||
const doctype = 'custom_test';
|
const doctype = 'custom_test';
|
||||||
const params = getSearchParams('test', { doctype });
|
const params = getSearchParams('type', { doctype });
|
||||||
expect(params).to.have.property('type', doctype);
|
expect(params).to.have.property('type', doctype);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -163,11 +211,27 @@ describe('Worker class', function () {
|
|||||||
const conditionPath = 'query.constant_score.filter.bool';
|
const conditionPath = 'query.constant_score.filter.bool';
|
||||||
const jobtype = 'test_jobtype';
|
const jobtype = 'test_jobtype';
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||||
|
anchorMoment = moment(anchor);
|
||||||
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
clock.restore();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should filter unwanted source data', function () {
|
||||||
|
const excludedFields = [ 'output.content' ];
|
||||||
|
const { body } = getSearchParams(jobtype);
|
||||||
|
expect(body).to.have.property('_source');
|
||||||
|
expect(body._source).to.eql({ exclude: excludedFields });
|
||||||
|
});
|
||||||
|
|
||||||
it('should search by job type', function () {
|
it('should search by job type', function () {
|
||||||
const { body } = getSearchParams(jobtype);
|
const { body } = getSearchParams(jobtype);
|
||||||
const conditions = get(body, conditionPath);
|
const conditions = get(body, conditionPath);
|
||||||
expect(conditions).to.have.property('must');
|
expect(conditions.filter).to.eql({ term: { jobtype: jobtype } });
|
||||||
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should search for pending or expired jobs', function () {
|
it('should search for pending or expired jobs', function () {
|
||||||
@@ -178,7 +242,7 @@ describe('Worker class', function () {
|
|||||||
// this works because we are stopping the clock, so all times match
|
// this works because we are stopping the clock, so all times match
|
||||||
const nowTime = moment().toISOString();
|
const nowTime = moment().toISOString();
|
||||||
const pending = { term: { status: 'pending'} };
|
const pending = { term: { status: 'pending'} };
|
||||||
const expired = { bool: { must: [
|
const expired = { bool: { filter: [
|
||||||
{ term: { status: 'processing' } },
|
{ term: { status: 'processing' } },
|
||||||
{ range: { process_expiration: { lte: nowTime } } }
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
] } };
|
] } };
|
||||||
@@ -201,9 +265,9 @@ describe('Worker class', function () {
|
|||||||
expect(body).to.have.property('size', size);
|
expect(body).to.have.property('size', size);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('claiming a job', function () {
|
describe('claiming a job', function () {
|
||||||
let params;
|
let params;
|
||||||
let job;
|
let job;
|
||||||
@@ -220,9 +284,12 @@ describe('Worker class', function () {
|
|||||||
id: 12345,
|
id: 12345,
|
||||||
version: 3
|
version: 3
|
||||||
};
|
};
|
||||||
job = mockQueue.client.get(params);
|
return mockQueue.client.get(params)
|
||||||
worker = new Worker(mockQueue, 'test', noop);
|
.then((jobDoc) => {
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
job = jobDoc;
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
@@ -299,16 +366,18 @@ describe('Worker class', function () {
|
|||||||
anchorMoment = moment(anchor);
|
anchorMoment = moment(anchor);
|
||||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
|
|
||||||
job = mockQueue.client.get();
|
return mockQueue.client.get()
|
||||||
worker = new Worker(mockQueue, 'test', noop);
|
.then((jobDoc) => {
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
job = jobDoc;
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
it('should use version on update', function () {
|
it('should use version on update', function () {
|
||||||
worker._failJob(job);
|
worker._failJob(job);
|
||||||
const query = updateSpy.firstCall.args[0];
|
const query = updateSpy.firstCall.args[0];
|
||||||
@@ -362,8 +431,12 @@ describe('Worker class', function () {
|
|||||||
payload = {
|
payload = {
|
||||||
value: random(0, 100, true)
|
value: random(0, 100, true)
|
||||||
};
|
};
|
||||||
job = mockQueue.client.get({}, { payload });
|
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
return mockQueue.client.get({}, { payload })
|
||||||
|
.then((jobDoc) => {
|
||||||
|
job = jobDoc;
|
||||||
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should call the workerFn with the payload', function (done) {
|
it('should call the workerFn with the payload', function (done) {
|
||||||
|
|||||||
Reference in New Issue
Block a user