12 Commits

Author SHA1 Message Date
fb2f51b11d 0.4.0 2016-05-16 17:19:55 -07:00
8ac9a81fdb update changelog and readme 2016-05-16 15:06:47 -07:00
4472e725fa add tests for optional fields to get and toJSON methods 2016-05-16 15:04:35 -07:00
be1eb81059 make the mock client get method async
and update worker tests to use async value
2016-05-16 15:03:20 -07:00
8d21dc6967 add optional created_by record to the doc 2016-05-16 15:02:34 -07:00
31159baae9 add future version notes to changelog 2016-05-16 14:24:20 -07:00
49965bbaf1 add changelog file 2016-05-16 14:23:12 -07:00
49b982db99 change priority field to byte 2016-05-16 14:09:42 -07:00
9aa8eed297 filter output when searching for jobs 2016-05-16 14:09:24 -07:00
0bf6fb0023 0.3.2 2016-05-13 14:59:17 -07:00
300449bfb0 add test for missing index result 2016-05-13 14:59:05 -07:00
868c808db7 missing indices need to return an array 2016-05-13 14:56:50 -07:00
9 changed files with 148 additions and 15 deletions

41
CHANGELOG.md Normal file
View File

@@ -0,0 +1,41 @@
# Changelog
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
## v0.4.0
- [BREAKING] Change `priority` mapping to *byte*
- Exclude `output.content` from _source when query jobs
- Add optional `created_by` value to job documents
## v0.3.2
- Misisng indiced returns empty array (fixed errors in v0.3.1)
## v0.3.1
- Ignore missing indices when looking for jobs
## v0.3.0
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
## v0.2.2
- Swollow errors when saving job output
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
- Update npm package
## v0.2.1
- Use `esqueue` namespace for debugging
## v0.2.0
- [BREAKING] Async jobs should return promises, not use callbacks
- Remove bluebird dependency
- Only require specific lodash modules, instead of the whole library
## v0.1.0
- Initial release

View File

@@ -1,6 +1,6 @@
{ {
"name": "esqueue", "name": "esqueue",
"version": "0.3.1", "version": "0.4.0",
"description": "Job queue, powered by Elasticsearch", "description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js", "main": "lib/index.js",
"scripts": { "scripts": {

View File

@@ -60,6 +60,7 @@ Option | Default | Description
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue. timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing. max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`. priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
### Creating a worker ### Creating a worker

View File

@@ -3,7 +3,7 @@ import { defaultSettings } from './constants';
const schema = { const schema = {
jobtype: { type: 'string', index: 'not_analyzed' }, jobtype: { type: 'string', index: 'not_analyzed' },
payload: { type: 'object', enabled: false }, payload: { type: 'object', enabled: false },
priority: { type: 'short' }, priority: { type: 'byte' },
timeout: { type: 'long' }, timeout: { type: 'long' },
process_expiration: { type: 'date' }, process_expiration: { type: 'date' },
created_by: { type: 'string', index: 'not_analyzed' }, created_by: { type: 'string', index: 'not_analyzed' },

View File

@@ -20,6 +20,7 @@ export default class Job extends events.EventEmitter {
this.index = index; this.index = index;
this.jobtype = type; this.jobtype = type;
this.payload = payload; this.payload = payload;
this.created_by = options.created_by || null;
this.timeout = options.timeout || 10000; this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3; this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20); this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
@@ -37,6 +38,7 @@ export default class Job extends events.EventEmitter {
jobtype: this.jobtype, jobtype: this.jobtype,
payload: this.payload, payload: this.payload,
priority: this.priority, priority: this.priority,
created_by: this.created_by,
timeout: this.timeout, timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(), created_at: new Date(),
@@ -87,6 +89,7 @@ export default class Job extends events.EventEmitter {
index: this.index, index: this.index,
type: this.doctype, type: this.doctype,
jobtype: this.jobtype, jobtype: this.jobtype,
created_by: this.created_by,
payload: this.payload, payload: this.payload,
timeout: this.timeout, timeout: this.timeout,
max_attempts: this.maxAttempts, max_attempts: this.maxAttempts,

View File

@@ -180,7 +180,7 @@ export default class Job extends events.EventEmitter {
} }
_claimPendingJobs(jobs) { _claimPendingJobs(jobs) {
if (jobs.length === 0) return; if (!jobs || jobs.length === 0) return;
this._stopJobPolling(); this._stopJobPolling();
let claimed = false; let claimed = false;
@@ -228,6 +228,9 @@ export default class Job extends events.EventEmitter {
_getPendingJobs() { _getPendingJobs() {
const nowTime = moment().toISOString(); const nowTime = moment().toISOString();
const query = { const query = {
_source : {
exclude: [ 'output.content' ]
},
query: { query: {
constant_score: { constant_score: {
filter: { filter: {
@@ -268,7 +271,7 @@ export default class Job extends events.EventEmitter {
}) })
.catch((err) => { .catch((err) => {
// ignore missing indices errors // ignore missing indices errors
if (err.status === 404) return; if (err.status === 404) return [];
this.debug('job querying failed', err); this.debug('job querying failed', err);
this.emit('error', err); this.emit('error', err);

View File

@@ -32,6 +32,7 @@ Client.prototype.get = function (params = {}, source = {}) {
const _source = Object.assign({ const _source = Object.assign({
jobtype: 'jobtype', jobtype: 'jobtype',
created_by: null,
payload: { payload: {
id: 'sample-job-1', id: 'sample-job-1',
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)' now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
@@ -44,14 +45,14 @@ Client.prototype.get = function (params = {}, source = {}) {
status: 'pending' status: 'pending'
}, source); }, source);
return { return Promise.resolve({
_index: params.index || 'index', _index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE, _type: params.type || DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T', _id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_version: params.version || 1, _version: params.version || 1,
found: true, found: true,
_source: _source _source: _source
}; });
}; };
Client.prototype.search = function (params = {}, count = 5, source = {}) { Client.prototype.search = function (params = {}, count = 5, source = {}) {

View File

@@ -14,6 +14,7 @@ const Job = module;
const maxPriority = 20; const maxPriority = 20;
const minPriority = -20; const minPriority = -20;
const defaultPriority = 10; const defaultPriority = 10;
const defaultCreatedBy = null;
describe('Job Class', function () { describe('Job Class', function () {
let client; let client;
@@ -90,6 +91,24 @@ describe('Job Class', function () {
}); });
}); });
it('should index the created_by value', function () {
const createdBy = 'user_identifier';
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', createdBy);
});
});
it('should index default created_by value', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
});
});
it('should index timeout value from options', function () { it('should index timeout value from options', function () {
const job = new Job(client, index, type, payload, options); const job = new Job(client, index, type, payload, options);
return job.ready.then(() => { return job.ready.then(() => {
@@ -181,6 +200,7 @@ describe('Job Class', function () {
expect(doc).to.have.property('type', jobDoc.type); expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id); expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('version', jobDoc.version); expect(doc).to.have.property('version', jobDoc.version);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('payload'); expect(doc).to.have.property('payload');
expect(doc).to.have.property('jobtype'); expect(doc).to.have.property('jobtype');
@@ -188,6 +208,25 @@ describe('Job Class', function () {
expect(doc).to.have.property('timeout'); expect(doc).to.have.property('timeout');
}); });
}); });
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
return Promise.resolve(client.get({}, optionals))
.then((doc) => {
console.log('mocked doc', doc);
sinon.stub(client, 'get').returns(Promise.resolve(doc));
})
.then(() => {
return job.get()
.then((doc) => {
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});
}); });
describe('toJSON method', function () { describe('toJSON method', function () {
@@ -211,12 +250,23 @@ describe('Job Class', function () {
expect(doc).to.have.property('index', index); expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE); expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type); expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout); expect(doc).to.have.property('timeout', options.timeout);
expect(doc).to.have.property('max_attempts', options.max_attempts); expect(doc).to.have.property('max_attempts', options.max_attempts);
expect(doc).to.have.property('priority', options.priority); expect(doc).to.have.property('priority', options.priority);
expect(doc).to.have.property('id'); expect(doc).to.have.property('id');
expect(doc).to.not.have.property('version'); expect(doc).to.not.have.property('version');
}); });
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(client, index, type, payload, optionals);
const doc = job.toJSON();
expect(doc).to.have.property('created_by', optionals.created_by);
});
}); });
}); });

View File

@@ -165,6 +165,24 @@ describe('Worker class', function () {
.then(() => { done(); }) .then(() => { done(); })
.catch(() => done(new Error('should not reject'))); .catch(() => done(new Error('should not reject')));
}); });
it('should return an empty array on missing index', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then((res) => {
try {
expect(res).to.be.an(Array);
expect(res).to.have.length(0);
done();
} catch (e) {
done(e);
}
})
.catch(() => done(new Error('should not reject')));
});
}); });
describe('query parameters', function () { describe('query parameters', function () {
@@ -203,6 +221,13 @@ describe('Worker class', function () {
clock.restore(); clock.restore();
}); });
it('should filter unwanted source data', function () {
const excludedFields = [ 'output.content' ];
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('_source');
expect(body._source).to.eql({ exclude: excludedFields });
});
it('should search by job type', function () { it('should search by job type', function () {
const { body } = getSearchParams(jobtype); const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath); const conditions = get(body, conditionPath);
@@ -260,10 +285,13 @@ describe('Worker class', function () {
id: 12345, id: 12345,
version: 3 version: 3
}; };
job = mockQueue.client.get(params); return mockQueue.client.get(params)
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop); worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update'); updateSpy = sinon.spy(mockQueue.client, 'update');
}); });
});
afterEach(() => { afterEach(() => {
clock.restore(); clock.restore();
@@ -339,16 +367,18 @@ describe('Worker class', function () {
anchorMoment = moment(anchor); anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf()); clock = sinon.useFakeTimers(anchorMoment.valueOf());
job = mockQueue.client.get(); return mockQueue.client.get()
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop); worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update'); updateSpy = sinon.spy(mockQueue.client, 'update');
}); });
});
afterEach(() => { afterEach(() => {
clock.restore(); clock.restore();
}); });
it('should use version on update', function () { it('should use version on update', function () {
worker._failJob(job); worker._failJob(job);
const query = updateSpy.firstCall.args[0]; const query = updateSpy.firstCall.args[0];
@@ -402,9 +432,13 @@ describe('Worker class', function () {
payload = { payload = {
value: random(0, 100, true) value: random(0, 100, true)
}; };
job = mockQueue.client.get({}, { payload });
return mockQueue.client.get({}, { payload })
.then((jobDoc) => {
job = jobDoc;
updateSpy = sinon.spy(mockQueue.client, 'update'); updateSpy = sinon.spy(mockQueue.client, 'update');
}); });
});
it('should call the workerFn with the payload', function (done) { it('should call the workerFn with the payload', function (done) {
const workerFn = function (jobPayload) { const workerFn = function (jobPayload) {