7 Commits

Author SHA1 Message Date
0bf6fb0023 0.3.2 2016-05-13 14:59:17 -07:00
300449bfb0 add test for missing index result 2016-05-13 14:59:05 -07:00
868c808db7 missing indices need to return an array 2016-05-13 14:56:50 -07:00
ef61a33a38 0.3.1 2016-05-13 14:40:46 -07:00
dae14e0edc add tests for job search failures 2016-05-13 14:36:19 -07:00
c51ea64bdd [worker] swollow missing index errors 2016-05-13 14:35:39 -07:00
5d37399fbf run travis on version tags too 2016-05-12 17:27:53 -07:00
4 changed files with 88 additions and 26 deletions

View File

@@ -15,4 +15,5 @@ after_success: npm run coverage
branches: branches:
only: only:
- master - master
- /^v[0-9].*$/

View File

@@ -1,6 +1,6 @@
{ {
"name": "esqueue", "name": "esqueue",
"version": "0.3.0", "version": "0.3.2",
"description": "Job queue, powered by Elasticsearch", "description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js", "main": "lib/index.js",
"scripts": { "scripts": {

View File

@@ -180,7 +180,7 @@ export default class Job extends events.EventEmitter {
} }
_claimPendingJobs(jobs) { _claimPendingJobs(jobs) {
if (jobs.length === 0) return; if (!jobs || jobs.length === 0) return;
this._stopJobPolling(); this._stopJobPolling();
let claimed = false; let claimed = false;
@@ -267,6 +267,9 @@ export default class Job extends events.EventEmitter {
return jobs; return jobs;
}) })
.catch((err) => { .catch((err) => {
// ignore missing indices errors
if (err.status === 404) return [];
this.debug('job querying failed', err); this.debug('job querying failed', err);
this.emit('error', err); this.emit('error', err);
this.queue.emit('worker_error', { this.queue.emit('worker_error', {

View File

@@ -105,15 +105,9 @@ describe('Worker class', function () {
}); });
}); });
describe('searching for jobs', function () { describe('polling for jobs', function () {
let searchSpy; let searchSpy;
function getSearchParams(jobtype, params = {}) {
new Worker(mockQueue, jobtype, noop, params);
clock.tick(defaults.interval);
return searchSpy.firstCall.args[0];
}
beforeEach(() => { beforeEach(() => {
anchorMoment = moment(anchor); anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf()); clock = sinon.useFakeTimers(anchorMoment.valueOf());
@@ -124,37 +118,91 @@ describe('Worker class', function () {
clock.restore(); clock.restore();
}); });
describe('polling interval', function () { it('should start polling for jobs after interval', function () {
it('should start polling for jobs after interval', function () { new Worker(mockQueue, 'test', noop);
new Worker(mockQueue, 'test', noop); sinon.assert.notCalled(searchSpy);
sinon.assert.notCalled(searchSpy); clock.tick(defaults.interval);
clock.tick(defaults.interval); sinon.assert.calledOnce(searchSpy);
sinon.assert.calledOnce(searchSpy); });
it('should use interval option to control polling', function () {
const interval = 567;
new Worker(mockQueue, 'test', noop, { interval });
sinon.assert.notCalled(searchSpy);
clock.tick(interval);
sinon.assert.calledOnce(searchSpy);
});
});
describe('query for pending jobs', function () {
let worker;
let searchStub;
function getSearchParams(jobtype = 'test', params = {}) {
worker = new Worker(mockQueue, jobtype, noop, params);
worker._getPendingJobs();
return searchStub.firstCall.args[0];
}
describe('error handling', function () {
beforeEach(() => {
}); });
it('should use interval option to control polling', function () { it('should pass search errors', function (done) {
const interval = 567; searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
new Worker(mockQueue, 'test', noop, { interval }); worker = new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy); worker._getPendingJobs()
clock.tick(interval); .then(() => done(new Error('should not resolve')))
sinon.assert.calledOnce(searchSpy); .catch(() => { done(); });
});
it('should swollow index missing errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => { done(); })
.catch(() => done(new Error('should not reject')));
});
it('should return an empty array on missing index', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then((res) => {
try {
expect(res).to.be.an(Array);
expect(res).to.have.length(0);
done();
} catch (e) {
done(e);
}
})
.catch(() => done(new Error('should not reject')));
}); });
}); });
describe('query parameters', function () { describe('query parameters', function () {
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
});
it('should query with version', function () { it('should query with version', function () {
const params = getSearchParams('test'); const params = getSearchParams();
expect(params).to.have.property('version', true); expect(params).to.have.property('version', true);
}); });
it('should query by default doctype', function () { it('should query by default doctype', function () {
const params = getSearchParams('test'); const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE); expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
}); });
it('should query by custom doctype', function () { it('should query by custom doctype', function () {
const doctype = 'custom_test'; const doctype = 'custom_test';
const params = getSearchParams('test', { doctype }); const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype); expect(params).to.have.property('type', doctype);
}); });
}); });
@@ -163,6 +211,16 @@ describe('Worker class', function () {
const conditionPath = 'query.constant_score.filter.bool'; const conditionPath = 'query.constant_score.filter.bool';
const jobtype = 'test_jobtype'; const jobtype = 'test_jobtype';
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
});
afterEach(() => {
clock.restore();
});
it('should search by job type', function () { it('should search by job type', function () {
const { body } = getSearchParams(jobtype); const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath); const conditions = get(body, conditionPath);
@@ -201,9 +259,9 @@ describe('Worker class', function () {
expect(body).to.have.property('size', size); expect(body).to.have.property('size', size);
}); });
}); });
}); });
describe('claiming a job', function () { describe('claiming a job', function () {
let params; let params;
let job; let job;