Compare commits
7 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bf6fb0023 | |||
| 300449bfb0 | |||
| 868c808db7 | |||
| ef61a33a38 | |||
| dae14e0edc | |||
| c51ea64bdd | |||
| 5d37399fbf |
@@ -16,3 +16,4 @@ after_success: npm run coverage
|
|||||||
branches:
|
branches:
|
||||||
only:
|
only:
|
||||||
- master
|
- master
|
||||||
|
- /^v[0-9].*$/
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "esqueue",
|
"name": "esqueue",
|
||||||
"version": "0.3.0",
|
"version": "0.3.2",
|
||||||
"description": "Job queue, powered by Elasticsearch",
|
"description": "Job queue, powered by Elasticsearch",
|
||||||
"main": "lib/index.js",
|
"main": "lib/index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -180,7 +180,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_claimPendingJobs(jobs) {
|
_claimPendingJobs(jobs) {
|
||||||
if (jobs.length === 0) return;
|
if (!jobs || jobs.length === 0) return;
|
||||||
|
|
||||||
this._stopJobPolling();
|
this._stopJobPolling();
|
||||||
let claimed = false;
|
let claimed = false;
|
||||||
@@ -267,6 +267,9 @@ export default class Job extends events.EventEmitter {
|
|||||||
return jobs;
|
return jobs;
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
|
// ignore missing indices errors
|
||||||
|
if (err.status === 404) return [];
|
||||||
|
|
||||||
this.debug('job querying failed', err);
|
this.debug('job querying failed', err);
|
||||||
this.emit('error', err);
|
this.emit('error', err);
|
||||||
this.queue.emit('worker_error', {
|
this.queue.emit('worker_error', {
|
||||||
|
|||||||
@@ -105,15 +105,9 @@ describe('Worker class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('searching for jobs', function () {
|
describe('polling for jobs', function () {
|
||||||
let searchSpy;
|
let searchSpy;
|
||||||
|
|
||||||
function getSearchParams(jobtype, params = {}) {
|
|
||||||
new Worker(mockQueue, jobtype, noop, params);
|
|
||||||
clock.tick(defaults.interval);
|
|
||||||
return searchSpy.firstCall.args[0];
|
|
||||||
}
|
|
||||||
|
|
||||||
beforeEach(() => {
|
beforeEach(() => {
|
||||||
anchorMoment = moment(anchor);
|
anchorMoment = moment(anchor);
|
||||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
@@ -124,7 +118,6 @@ describe('Worker class', function () {
|
|||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('polling interval', function () {
|
|
||||||
it('should start polling for jobs after interval', function () {
|
it('should start polling for jobs after interval', function () {
|
||||||
new Worker(mockQueue, 'test', noop);
|
new Worker(mockQueue, 'test', noop);
|
||||||
sinon.assert.notCalled(searchSpy);
|
sinon.assert.notCalled(searchSpy);
|
||||||
@@ -141,20 +134,75 @@ describe('Worker class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('query for pending jobs', function () {
|
||||||
|
let worker;
|
||||||
|
let searchStub;
|
||||||
|
|
||||||
|
function getSearchParams(jobtype = 'test', params = {}) {
|
||||||
|
worker = new Worker(mockQueue, jobtype, noop, params);
|
||||||
|
worker._getPendingJobs();
|
||||||
|
return searchStub.firstCall.args[0];
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('error handling', function () {
|
||||||
|
beforeEach(() => {
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should pass search errors', function (done) {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
worker._getPendingJobs()
|
||||||
|
.then(() => done(new Error('should not resolve')))
|
||||||
|
.catch(() => { done(); });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should swollow index missing errors', function (done) {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||||
|
status: 404
|
||||||
|
}));
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
worker._getPendingJobs()
|
||||||
|
.then(() => { done(); })
|
||||||
|
.catch(() => done(new Error('should not reject')));
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return an empty array on missing index', function (done) {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||||
|
status: 404
|
||||||
|
}));
|
||||||
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
worker._getPendingJobs()
|
||||||
|
.then((res) => {
|
||||||
|
try {
|
||||||
|
expect(res).to.be.an(Array);
|
||||||
|
expect(res).to.have.length(0);
|
||||||
|
done();
|
||||||
|
} catch (e) {
|
||||||
|
done(e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.catch(() => done(new Error('should not reject')));
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('query parameters', function () {
|
describe('query parameters', function () {
|
||||||
|
beforeEach(() => {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||||
|
});
|
||||||
|
|
||||||
it('should query with version', function () {
|
it('should query with version', function () {
|
||||||
const params = getSearchParams('test');
|
const params = getSearchParams();
|
||||||
expect(params).to.have.property('version', true);
|
expect(params).to.have.property('version', true);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should query by default doctype', function () {
|
it('should query by default doctype', function () {
|
||||||
const params = getSearchParams('test');
|
const params = getSearchParams();
|
||||||
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should query by custom doctype', function () {
|
it('should query by custom doctype', function () {
|
||||||
const doctype = 'custom_test';
|
const doctype = 'custom_test';
|
||||||
const params = getSearchParams('test', { doctype });
|
const params = getSearchParams('type', { doctype });
|
||||||
expect(params).to.have.property('type', doctype);
|
expect(params).to.have.property('type', doctype);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -163,6 +211,16 @@ describe('Worker class', function () {
|
|||||||
const conditionPath = 'query.constant_score.filter.bool';
|
const conditionPath = 'query.constant_score.filter.bool';
|
||||||
const jobtype = 'test_jobtype';
|
const jobtype = 'test_jobtype';
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||||
|
anchorMoment = moment(anchor);
|
||||||
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
clock.restore();
|
||||||
|
});
|
||||||
|
|
||||||
it('should search by job type', function () {
|
it('should search by job type', function () {
|
||||||
const { body } = getSearchParams(jobtype);
|
const { body } = getSearchParams(jobtype);
|
||||||
const conditions = get(body, conditionPath);
|
const conditions = get(body, conditionPath);
|
||||||
@@ -201,9 +259,9 @@ describe('Worker class', function () {
|
|||||||
expect(body).to.have.property('size', size);
|
expect(body).to.have.property('size', size);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
describe('claiming a job', function () {
|
describe('claiming a job', function () {
|
||||||
let params;
|
let params;
|
||||||
let job;
|
let job;
|
||||||
|
|||||||
Reference in New Issue
Block a user