Compare commits
14 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 1fe139e1b0 | |||
| 59d30bbb0f | |||
| 6ce18a5477 | |||
| 89c08068dd | |||
| 5f2e2b09cf | |||
| fb2f51b11d | |||
| 8ac9a81fdb | |||
| 4472e725fa | |||
| be1eb81059 | |||
| 8d21dc6967 | |||
| 31159baae9 | |||
| 49965bbaf1 | |||
| 49b982db99 | |||
| 9aa8eed297 |
@@ -1 +1 @@
|
|||||||
4.3.2
|
4.4.4
|
||||||
|
|||||||
45
CHANGELOG.md
Normal file
45
CHANGELOG.md
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
# Changelog
|
||||||
|
|
||||||
|
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
|
||||||
|
|
||||||
|
## v0.4.1
|
||||||
|
|
||||||
|
- Use `filter` instead of `must` to query for outstanding jobs
|
||||||
|
|
||||||
|
## v0.4.0
|
||||||
|
|
||||||
|
- [BREAKING] Change `priority` mapping to *byte*
|
||||||
|
- Exclude `output.content` from _source when query jobs
|
||||||
|
- Add optional `created_by` value to job documents
|
||||||
|
|
||||||
|
## v0.3.2
|
||||||
|
|
||||||
|
- Misisng indiced returns empty array (fixed errors in v0.3.1)
|
||||||
|
|
||||||
|
## v0.3.1
|
||||||
|
|
||||||
|
- Ignore missing indices when looking for jobs
|
||||||
|
|
||||||
|
## v0.3.0
|
||||||
|
|
||||||
|
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
|
||||||
|
|
||||||
|
## v0.2.2
|
||||||
|
|
||||||
|
- Swollow errors when saving job output
|
||||||
|
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
|
||||||
|
- Update npm package
|
||||||
|
|
||||||
|
## v0.2.1
|
||||||
|
|
||||||
|
- Use `esqueue` namespace for debugging
|
||||||
|
|
||||||
|
## v0.2.0
|
||||||
|
|
||||||
|
- [BREAKING] Async jobs should return promises, not use callbacks
|
||||||
|
- Remove bluebird dependency
|
||||||
|
- Only require specific lodash modules, instead of the whole library
|
||||||
|
|
||||||
|
## v0.1.0
|
||||||
|
|
||||||
|
- Initial release
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "esqueue",
|
"name": "esqueue",
|
||||||
"version": "0.3.2",
|
"version": "0.4.1",
|
||||||
"description": "Job queue, powered by Elasticsearch",
|
"description": "Job queue, powered by Elasticsearch",
|
||||||
"main": "lib/index.js",
|
"main": "lib/index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
|
|||||||
@@ -60,6 +60,7 @@ Option | Default | Description
|
|||||||
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
|
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
|
||||||
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
||||||
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
||||||
|
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
|
||||||
|
|
||||||
### Creating a worker
|
### Creating a worker
|
||||||
|
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import { defaultSettings } from './constants';
|
|||||||
const schema = {
|
const schema = {
|
||||||
jobtype: { type: 'string', index: 'not_analyzed' },
|
jobtype: { type: 'string', index: 'not_analyzed' },
|
||||||
payload: { type: 'object', enabled: false },
|
payload: { type: 'object', enabled: false },
|
||||||
priority: { type: 'short' },
|
priority: { type: 'byte' },
|
||||||
timeout: { type: 'long' },
|
timeout: { type: 'long' },
|
||||||
process_expiration: { type: 'date' },
|
process_expiration: { type: 'date' },
|
||||||
created_by: { type: 'string', index: 'not_analyzed' },
|
created_by: { type: 'string', index: 'not_analyzed' },
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
this.index = index;
|
this.index = index;
|
||||||
this.jobtype = type;
|
this.jobtype = type;
|
||||||
this.payload = payload;
|
this.payload = payload;
|
||||||
|
this.created_by = options.created_by || null;
|
||||||
this.timeout = options.timeout || 10000;
|
this.timeout = options.timeout || 10000;
|
||||||
this.maxAttempts = options.max_attempts || 3;
|
this.maxAttempts = options.max_attempts || 3;
|
||||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||||
@@ -37,6 +38,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
jobtype: this.jobtype,
|
jobtype: this.jobtype,
|
||||||
payload: this.payload,
|
payload: this.payload,
|
||||||
priority: this.priority,
|
priority: this.priority,
|
||||||
|
created_by: this.created_by,
|
||||||
timeout: this.timeout,
|
timeout: this.timeout,
|
||||||
process_expiration: new Date(0), // use epoch so the job query works
|
process_expiration: new Date(0), // use epoch so the job query works
|
||||||
created_at: new Date(),
|
created_at: new Date(),
|
||||||
@@ -87,6 +89,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
index: this.index,
|
index: this.index,
|
||||||
type: this.doctype,
|
type: this.doctype,
|
||||||
jobtype: this.jobtype,
|
jobtype: this.jobtype,
|
||||||
|
created_by: this.created_by,
|
||||||
payload: this.payload,
|
payload: this.payload,
|
||||||
timeout: this.timeout,
|
timeout: this.timeout,
|
||||||
max_attempts: this.maxAttempts,
|
max_attempts: this.maxAttempts,
|
||||||
|
|||||||
@@ -228,15 +228,18 @@ export default class Job extends events.EventEmitter {
|
|||||||
_getPendingJobs() {
|
_getPendingJobs() {
|
||||||
const nowTime = moment().toISOString();
|
const nowTime = moment().toISOString();
|
||||||
const query = {
|
const query = {
|
||||||
|
_source : {
|
||||||
|
exclude: [ 'output.content' ]
|
||||||
|
},
|
||||||
query: {
|
query: {
|
||||||
constant_score: {
|
constant_score: {
|
||||||
filter: {
|
filter: {
|
||||||
bool: {
|
bool: {
|
||||||
must: { term: { jobtype: this.jobtype } },
|
filter: { term: { jobtype: this.jobtype } },
|
||||||
should: [
|
should: [
|
||||||
{ term: { status: 'pending'} },
|
{ term: { status: 'pending'} },
|
||||||
{ bool:
|
{ bool:
|
||||||
{ must: [
|
{ filter: [
|
||||||
{ term: { status: 'processing' } },
|
{ term: { status: 'processing' } },
|
||||||
{ range: { process_expiration: { lte: nowTime } } }
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
] }
|
] }
|
||||||
|
|||||||
5
test/fixtures/elasticsearch.js
vendored
5
test/fixtures/elasticsearch.js
vendored
@@ -32,6 +32,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
|||||||
|
|
||||||
const _source = Object.assign({
|
const _source = Object.assign({
|
||||||
jobtype: 'jobtype',
|
jobtype: 'jobtype',
|
||||||
|
created_by: null,
|
||||||
payload: {
|
payload: {
|
||||||
id: 'sample-job-1',
|
id: 'sample-job-1',
|
||||||
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
||||||
@@ -44,14 +45,14 @@ Client.prototype.get = function (params = {}, source = {}) {
|
|||||||
status: 'pending'
|
status: 'pending'
|
||||||
}, source);
|
}, source);
|
||||||
|
|
||||||
return {
|
return Promise.resolve({
|
||||||
_index: params.index || 'index',
|
_index: params.index || 'index',
|
||||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||||
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
||||||
_version: params.version || 1,
|
_version: params.version || 1,
|
||||||
found: true,
|
found: true,
|
||||||
_source: _source
|
_source: _source
|
||||||
};
|
});
|
||||||
};
|
};
|
||||||
|
|
||||||
Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||||
|
|||||||
@@ -14,6 +14,7 @@ const Job = module;
|
|||||||
const maxPriority = 20;
|
const maxPriority = 20;
|
||||||
const minPriority = -20;
|
const minPriority = -20;
|
||||||
const defaultPriority = 10;
|
const defaultPriority = 10;
|
||||||
|
const defaultCreatedBy = null;
|
||||||
|
|
||||||
describe('Job Class', function () {
|
describe('Job Class', function () {
|
||||||
let client;
|
let client;
|
||||||
@@ -90,6 +91,24 @@ describe('Job Class', function () {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
|
it('should index the created_by value', function () {
|
||||||
|
const createdBy = 'user_identifier';
|
||||||
|
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
|
||||||
|
return job.ready.then(() => {
|
||||||
|
const newDoc = validateDoc(client.index);
|
||||||
|
expect(newDoc.body).to.have.property('created_by', createdBy);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should index default created_by value', function () {
|
||||||
|
const job = new Job(client, index, type, payload, options);
|
||||||
|
return job.ready.then(() => {
|
||||||
|
const newDoc = validateDoc(client.index);
|
||||||
|
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should index timeout value from options', function () {
|
it('should index timeout value from options', function () {
|
||||||
const job = new Job(client, index, type, payload, options);
|
const job = new Job(client, index, type, payload, options);
|
||||||
return job.ready.then(() => {
|
return job.ready.then(() => {
|
||||||
@@ -181,6 +200,7 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('type', jobDoc.type);
|
expect(doc).to.have.property('type', jobDoc.type);
|
||||||
expect(doc).to.have.property('id', jobDoc.id);
|
expect(doc).to.have.property('id', jobDoc.id);
|
||||||
expect(doc).to.have.property('version', jobDoc.version);
|
expect(doc).to.have.property('version', jobDoc.version);
|
||||||
|
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||||
|
|
||||||
expect(doc).to.have.property('payload');
|
expect(doc).to.have.property('payload');
|
||||||
expect(doc).to.have.property('jobtype');
|
expect(doc).to.have.property('jobtype');
|
||||||
@@ -188,6 +208,24 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('timeout');
|
expect(doc).to.have.property('timeout');
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should contain optional data', function () {
|
||||||
|
const optionals = {
|
||||||
|
created_by: 'some_ident'
|
||||||
|
};
|
||||||
|
|
||||||
|
const job = new Job(client, index, type, payload, optionals);
|
||||||
|
return Promise.resolve(client.get({}, optionals))
|
||||||
|
.then((doc) => {
|
||||||
|
sinon.stub(client, 'get').returns(Promise.resolve(doc));
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
return job.get()
|
||||||
|
.then((doc) => {
|
||||||
|
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('toJSON method', function () {
|
describe('toJSON method', function () {
|
||||||
@@ -211,12 +249,23 @@ describe('Job Class', function () {
|
|||||||
expect(doc).to.have.property('index', index);
|
expect(doc).to.have.property('index', index);
|
||||||
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||||
expect(doc).to.have.property('jobtype', type);
|
expect(doc).to.have.property('jobtype', type);
|
||||||
|
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||||
expect(doc).to.have.property('timeout', options.timeout);
|
expect(doc).to.have.property('timeout', options.timeout);
|
||||||
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
||||||
expect(doc).to.have.property('priority', options.priority);
|
expect(doc).to.have.property('priority', options.priority);
|
||||||
expect(doc).to.have.property('id');
|
expect(doc).to.have.property('id');
|
||||||
expect(doc).to.not.have.property('version');
|
expect(doc).to.not.have.property('version');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should contain optional data', function () {
|
||||||
|
const optionals = {
|
||||||
|
created_by: 'some_ident'
|
||||||
|
};
|
||||||
|
|
||||||
|
const job = new Job(client, index, type, payload, optionals);
|
||||||
|
const doc = job.toJSON();
|
||||||
|
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -221,11 +221,17 @@ describe('Worker class', function () {
|
|||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('should filter unwanted source data', function () {
|
||||||
|
const excludedFields = [ 'output.content' ];
|
||||||
|
const { body } = getSearchParams(jobtype);
|
||||||
|
expect(body).to.have.property('_source');
|
||||||
|
expect(body._source).to.eql({ exclude: excludedFields });
|
||||||
|
});
|
||||||
|
|
||||||
it('should search by job type', function () {
|
it('should search by job type', function () {
|
||||||
const { body } = getSearchParams(jobtype);
|
const { body } = getSearchParams(jobtype);
|
||||||
const conditions = get(body, conditionPath);
|
const conditions = get(body, conditionPath);
|
||||||
expect(conditions).to.have.property('must');
|
expect(conditions.filter).to.eql({ term: { jobtype: jobtype } });
|
||||||
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should search for pending or expired jobs', function () {
|
it('should search for pending or expired jobs', function () {
|
||||||
@@ -236,7 +242,7 @@ describe('Worker class', function () {
|
|||||||
// this works because we are stopping the clock, so all times match
|
// this works because we are stopping the clock, so all times match
|
||||||
const nowTime = moment().toISOString();
|
const nowTime = moment().toISOString();
|
||||||
const pending = { term: { status: 'pending'} };
|
const pending = { term: { status: 'pending'} };
|
||||||
const expired = { bool: { must: [
|
const expired = { bool: { filter: [
|
||||||
{ term: { status: 'processing' } },
|
{ term: { status: 'processing' } },
|
||||||
{ range: { process_expiration: { lte: nowTime } } }
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
] } };
|
] } };
|
||||||
@@ -278,10 +284,13 @@ describe('Worker class', function () {
|
|||||||
id: 12345,
|
id: 12345,
|
||||||
version: 3
|
version: 3
|
||||||
};
|
};
|
||||||
job = mockQueue.client.get(params);
|
return mockQueue.client.get(params)
|
||||||
|
.then((jobDoc) => {
|
||||||
|
job = jobDoc;
|
||||||
worker = new Worker(mockQueue, 'test', noop);
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
clock.restore();
|
clock.restore();
|
||||||
@@ -357,16 +366,18 @@ describe('Worker class', function () {
|
|||||||
anchorMoment = moment(anchor);
|
anchorMoment = moment(anchor);
|
||||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||||
|
|
||||||
job = mockQueue.client.get();
|
return mockQueue.client.get()
|
||||||
|
.then((jobDoc) => {
|
||||||
|
job = jobDoc;
|
||||||
worker = new Worker(mockQueue, 'test', noop);
|
worker = new Worker(mockQueue, 'test', noop);
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
clock.restore();
|
clock.restore();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
||||||
it('should use version on update', function () {
|
it('should use version on update', function () {
|
||||||
worker._failJob(job);
|
worker._failJob(job);
|
||||||
const query = updateSpy.firstCall.args[0];
|
const query = updateSpy.firstCall.args[0];
|
||||||
@@ -420,9 +431,13 @@ describe('Worker class', function () {
|
|||||||
payload = {
|
payload = {
|
||||||
value: random(0, 100, true)
|
value: random(0, 100, true)
|
||||||
};
|
};
|
||||||
job = mockQueue.client.get({}, { payload });
|
|
||||||
|
return mockQueue.client.get({}, { payload })
|
||||||
|
.then((jobDoc) => {
|
||||||
|
job = jobDoc;
|
||||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||||
});
|
});
|
||||||
|
});
|
||||||
|
|
||||||
it('should call the workerFn with the payload', function (done) {
|
it('should call the workerFn with the payload', function (done) {
|
||||||
const workerFn = function (jobPayload) {
|
const workerFn = function (jobPayload) {
|
||||||
|
|||||||
Reference in New Issue
Block a user