16 Commits

Author SHA1 Message Date
fb2f51b11d 0.4.0 2016-05-16 17:19:55 -07:00
8ac9a81fdb update changelog and readme 2016-05-16 15:06:47 -07:00
4472e725fa add tests for optional fields to get and toJSON methods 2016-05-16 15:04:35 -07:00
be1eb81059 make the mock client get method async
and update worker tests to use async value
2016-05-16 15:03:20 -07:00
8d21dc6967 add optional created_by record to the doc 2016-05-16 15:02:34 -07:00
31159baae9 add future version notes to changelog 2016-05-16 14:24:20 -07:00
49965bbaf1 add changelog file 2016-05-16 14:23:12 -07:00
49b982db99 change priority field to byte 2016-05-16 14:09:42 -07:00
9aa8eed297 filter output when searching for jobs 2016-05-16 14:09:24 -07:00
0bf6fb0023 0.3.2 2016-05-13 14:59:17 -07:00
300449bfb0 add test for missing index result 2016-05-13 14:59:05 -07:00
868c808db7 missing indices need to return an array 2016-05-13 14:56:50 -07:00
ef61a33a38 0.3.1 2016-05-13 14:40:46 -07:00
dae14e0edc add tests for job search failures 2016-05-13 14:36:19 -07:00
c51ea64bdd [worker] swollow missing index errors 2016-05-13 14:35:39 -07:00
5d37399fbf run travis on version tags too 2016-05-12 17:27:53 -07:00
10 changed files with 215 additions and 38 deletions

View File

@@ -16,3 +16,4 @@ after_success: npm run coverage
branches:
only:
- master
- /^v[0-9].*$/

41
CHANGELOG.md Normal file
View File

@@ -0,0 +1,41 @@
# Changelog
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
## v0.4.0
- [BREAKING] Change `priority` mapping to *byte*
- Exclude `output.content` from _source when query jobs
- Add optional `created_by` value to job documents
## v0.3.2
- Misisng indiced returns empty array (fixed errors in v0.3.1)
## v0.3.1
- Ignore missing indices when looking for jobs
## v0.3.0
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
## v0.2.2
- Swollow errors when saving job output
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
- Update npm package
## v0.2.1
- Use `esqueue` namespace for debugging
## v0.2.0
- [BREAKING] Async jobs should return promises, not use callbacks
- Remove bluebird dependency
- Only require specific lodash modules, instead of the whole library
## v0.1.0
- Initial release

View File

@@ -1,6 +1,6 @@
{
"name": "esqueue",
"version": "0.3.0",
"version": "0.4.0",
"description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js",
"scripts": {

View File

@@ -60,6 +60,7 @@ Option | Default | Description
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
### Creating a worker

View File

@@ -3,7 +3,7 @@ import { defaultSettings } from './constants';
const schema = {
jobtype: { type: 'string', index: 'not_analyzed' },
payload: { type: 'object', enabled: false },
priority: { type: 'short' },
priority: { type: 'byte' },
timeout: { type: 'long' },
process_expiration: { type: 'date' },
created_by: { type: 'string', index: 'not_analyzed' },

View File

@@ -20,6 +20,7 @@ export default class Job extends events.EventEmitter {
this.index = index;
this.jobtype = type;
this.payload = payload;
this.created_by = options.created_by || null;
this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
@@ -37,6 +38,7 @@ export default class Job extends events.EventEmitter {
jobtype: this.jobtype,
payload: this.payload,
priority: this.priority,
created_by: this.created_by,
timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(),
@@ -87,6 +89,7 @@ export default class Job extends events.EventEmitter {
index: this.index,
type: this.doctype,
jobtype: this.jobtype,
created_by: this.created_by,
payload: this.payload,
timeout: this.timeout,
max_attempts: this.maxAttempts,

View File

@@ -180,7 +180,7 @@ export default class Job extends events.EventEmitter {
}
_claimPendingJobs(jobs) {
if (jobs.length === 0) return;
if (!jobs || jobs.length === 0) return;
this._stopJobPolling();
let claimed = false;
@@ -228,6 +228,9 @@ export default class Job extends events.EventEmitter {
_getPendingJobs() {
const nowTime = moment().toISOString();
const query = {
_source : {
exclude: [ 'output.content' ]
},
query: {
constant_score: {
filter: {
@@ -267,6 +270,9 @@ export default class Job extends events.EventEmitter {
return jobs;
})
.catch((err) => {
// ignore missing indices errors
if (err.status === 404) return [];
this.debug('job querying failed', err);
this.emit('error', err);
this.queue.emit('worker_error', {

View File

@@ -32,6 +32,7 @@ Client.prototype.get = function (params = {}, source = {}) {
const _source = Object.assign({
jobtype: 'jobtype',
created_by: null,
payload: {
id: 'sample-job-1',
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
@@ -44,14 +45,14 @@ Client.prototype.get = function (params = {}, source = {}) {
status: 'pending'
}, source);
return {
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_version: params.version || 1,
found: true,
_source: _source
};
});
};
Client.prototype.search = function (params = {}, count = 5, source = {}) {

View File

@@ -14,6 +14,7 @@ const Job = module;
const maxPriority = 20;
const minPriority = -20;
const defaultPriority = 10;
const defaultCreatedBy = null;
describe('Job Class', function () {
let client;
@@ -90,6 +91,24 @@ describe('Job Class', function () {
});
});
it('should index the created_by value', function () {
const createdBy = 'user_identifier';
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', createdBy);
});
});
it('should index default created_by value', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
});
});
it('should index timeout value from options', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
@@ -181,6 +200,7 @@ describe('Job Class', function () {
expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('version', jobDoc.version);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('payload');
expect(doc).to.have.property('jobtype');
@@ -188,6 +208,25 @@ describe('Job Class', function () {
expect(doc).to.have.property('timeout');
});
});
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
return Promise.resolve(client.get({}, optionals))
.then((doc) => {
console.log('mocked doc', doc);
sinon.stub(client, 'get').returns(Promise.resolve(doc));
})
.then(() => {
return job.get()
.then((doc) => {
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});
});
describe('toJSON method', function () {
@@ -211,12 +250,23 @@ describe('Job Class', function () {
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout);
expect(doc).to.have.property('max_attempts', options.max_attempts);
expect(doc).to.have.property('priority', options.priority);
expect(doc).to.have.property('id');
expect(doc).to.not.have.property('version');
});
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
const doc = job.toJSON();
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});

View File

@@ -105,15 +105,9 @@ describe('Worker class', function () {
});
});
describe('searching for jobs', function () {
describe('polling for jobs', function () {
let searchSpy;
function getSearchParams(jobtype, params = {}) {
new Worker(mockQueue, jobtype, noop, params);
clock.tick(defaults.interval);
return searchSpy.firstCall.args[0];
}
beforeEach(() => {
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
@@ -124,7 +118,6 @@ describe('Worker class', function () {
clock.restore();
});
describe('polling interval', function () {
it('should start polling for jobs after interval', function () {
new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy);
@@ -141,20 +134,75 @@ describe('Worker class', function () {
});
});
describe('query for pending jobs', function () {
let worker;
let searchStub;
function getSearchParams(jobtype = 'test', params = {}) {
worker = new Worker(mockQueue, jobtype, noop, params);
worker._getPendingJobs();
return searchStub.firstCall.args[0];
}
describe('error handling', function () {
beforeEach(() => {
});
it('should pass search errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => done(new Error('should not resolve')))
.catch(() => { done(); });
});
it('should swollow index missing errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => { done(); })
.catch(() => done(new Error('should not reject')));
});
it('should return an empty array on missing index', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then((res) => {
try {
expect(res).to.be.an(Array);
expect(res).to.have.length(0);
done();
} catch (e) {
done(e);
}
})
.catch(() => done(new Error('should not reject')));
});
});
describe('query parameters', function () {
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
});
it('should query with version', function () {
const params = getSearchParams('test');
const params = getSearchParams();
expect(params).to.have.property('version', true);
});
it('should query by default doctype', function () {
const params = getSearchParams('test');
const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('test', { doctype });
const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype);
});
});
@@ -163,6 +211,23 @@ describe('Worker class', function () {
const conditionPath = 'query.constant_score.filter.bool';
const jobtype = 'test_jobtype';
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
});
afterEach(() => {
clock.restore();
});
it('should filter unwanted source data', function () {
const excludedFields = [ 'output.content' ];
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('_source');
expect(body._source).to.eql({ exclude: excludedFields });
});
it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
@@ -201,9 +266,9 @@ describe('Worker class', function () {
expect(body).to.have.property('size', size);
});
});
});
describe('claiming a job', function () {
let params;
let job;
@@ -220,10 +285,13 @@ describe('Worker class', function () {
id: 12345,
version: 3
};
job = mockQueue.client.get(params);
return mockQueue.client.get(params)
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
afterEach(() => {
clock.restore();
@@ -299,16 +367,18 @@ describe('Worker class', function () {
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
job = mockQueue.client.get();
return mockQueue.client.get()
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
afterEach(() => {
clock.restore();
});
it('should use version on update', function () {
worker._failJob(job);
const query = updateSpy.firstCall.args[0];
@@ -362,9 +432,13 @@ describe('Worker class', function () {
payload = {
value: random(0, 100, true)
};
job = mockQueue.client.get({}, { payload });
return mockQueue.client.get({}, { payload })
.then((jobDoc) => {
job = jobDoc;
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
it('should call the workerFn with the payload', function (done) {
const workerFn = function (jobPayload) {