21 Commits

Author SHA1 Message Date
f4dfddef8e chore: change travis script
run lint and test
2018-03-28 14:44:38 -07:00
efbda1b1f5 chore: tweak travis config 2018-03-28 14:44:22 -07:00
bef3104755 chore: add linting, fix lint issues 2018-03-28 14:44:04 -07:00
be536277c3 chore: align master changelog with 2.0.3 2017-08-14 15:18:27 -07:00
71d8b63531 chore: add DS_Store to gitignore 2017-08-14 15:17:40 -07:00
83a0352202 chore: rename module to elastiq 2017-08-14 15:17:40 -07:00
170ba3b8a0 chore: update the travis rules 2017-08-14 15:17:39 -07:00
26715f9ba2 chore: add version compat table 2017-08-14 15:17:39 -07:00
b980ad9e9e chore: add license badge
and rename the license file
2017-08-14 15:17:39 -07:00
9d8ae30ec6 chore: update links in readme
this is a hard fork, after all
2017-08-14 15:17:36 -07:00
Joe Fleming
295db5e8fd Merge pull request #1 from w33ble/develop
Fix issue with slow responses from ES
2017-08-08 11:02:43 -07:00
6620a5de39 update changelog 2017-08-08 10:55:33 -07:00
bb898e26c2 add 20x delay when search results in an error 2017-08-08 10:55:33 -07:00
4813c5bfa3 update the changelog 2017-08-08 10:55:33 -07:00
0ce26f6d9d fix destroy test 2017-08-08 10:55:33 -07:00
6bdf7163b6 add new events
one for when the search is completed, and one for when the job poller is ready
2017-08-08 10:55:33 -07:00
128905cdb4 track poller running state
use setTimeout, and wait for the search and job processing to complete before polling for jobs again
2017-08-08 10:55:33 -07:00
01adab4174 move poller state into this._poller 2017-08-08 10:55:33 -07:00
eb17575a96 update dependencies, remove npmrc 2017-08-08 10:55:33 -07:00
73c7147c26 bump node version requirement 2017-08-08 10:55:33 -07:00
7787e35ea5 chore: add project status section to the readme
also add not that Elastic does not support this library, just in case
2017-08-08 10:54:52 -07:00
17 changed files with 179 additions and 81 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.DS_Store
lib
node_modules
npm-debug.log

View File

@@ -1 +1 @@
4.x
6.x

1
.npmrc
View File

@@ -1 +0,0 @@
save-prefix='~'

View File

@@ -2,15 +2,22 @@ language: node_js
node_js:
- "stable"
- "8"
- "6"
notifications:
email: false
cache:
directories:
- node_modules
before_script: npm install
notifications:
email:
on_success: never
on_failure: change
after_success: npm run coverage
script: npm run test:ci
branches:
only:
- master

View File

@@ -2,6 +2,17 @@
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
## v3.0.0
- support for node v4 or earlier is no longer tested
- update several dependencies
## v2.0.3
- rename to elastiq
- fix issue where job poller would not wait for ES response
- when job polling search fails, wait for a 20x interval before searching again
## v2.0.2
- Fix issue where creating a worker would not use the queue's doctype by default

View File

@@ -1,6 +1,6 @@
{
"name": "esqueue",
"version": "2.0.2",
"name": "elastiq",
"version": "3.0.0",
"description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js",
"scripts": {
@@ -8,9 +8,11 @@
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
"prepublish": "in-publish && npm run test || not-in-publish",
"test": "retire -n -p package.json && npm run build && npm run unit",
"lint": "eslint \"*.js\" \"src/**/*.js\"",
"test:ci": "npm run lint && npm run test",
"unit": "nyc --require babel-core/register mocha test/src/**"
},
"author": "Elastic (https://github.com/elastic)",
"author": "Joe Fleming (https://github.com/w33ble)",
"keywords": [
"job",
"queue",
@@ -19,7 +21,7 @@
],
"repository": {
"type": "git",
"url": "https://github.com/elastic/esqueue.git"
"url": "https://github.com/w33ble/elastiq.git"
},
"license": "Apache-2.0",
"engines": {
@@ -29,20 +31,21 @@
"@elastic/eslint-config-kibana": "^0.3.0",
"babel-cli": "^6.23.0",
"babel-core": "^6.23.1",
"babel-eslint": "^7.1.1",
"babel-eslint": "6.1.2",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-preset-es2015": "^6.22.0",
"elasticsearch": "^12.0.0",
"eslint": "^3.16.1",
"eslint-plugin-mocha": "^4.8.0",
"eslint-plugin-react": "^6.10.0",
"elasticsearch": "^13.0.1",
"eslint": "3.11.1",
"eslint-plugin-babel": "4.0.0",
"eslint-plugin-mocha": "4.7.0",
"eslint-plugin-react": "^7.0.1",
"expect.js": "~0.3.1",
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"nyc": "^10.1.2",
"proxyquire": "^1.7.4",
"retire": "^1.2.12",
"sinon": "^1.17.3"
"sinon": "^2.3.1"
},
"peerDependencies": {
"elasticsearch": ">=11.0.1"

View File

@@ -1,22 +1,32 @@
[![Build Status](https://travis-ci.org/elastic/esqueue.svg?branch=master)](https://travis-ci.org/elastic/esqueue)
[![Build Status](https://travis-ci.org/w33ble/elastiq.svg?branch=master)](https://travis-ci.org/w33ble/elastiq)
[![Apache License](https://img.shields.io/badge/license-apache_2.0-a9215a.svg)](https://raw.githubusercontent.com/w33ble/elastiq/master/LICENSE)
[![Project Status](https://img.shields.io/badge/status-experimental-orange.svg)](https://github.com/w33ble/elastiq#project-status)
# esqueue
# elastiq
`esqueue` is an Elasticsearch-powered job queue
`elastiq` is an Elasticsearch-powered job queue.
Pronounced Elasti-queue. This is not supported by Elastic.
## Project Status
While it's believed to be pretty stable, this library isn't really being used anywhere. Issues and PRs are welcome, but it isn't actively being developed. As such, it doesn't get a lot of stress testing, and I don't recommend you rely too heavily on it, hence its experimental status.
## Installation
`npm install esqueue`
Version | Elasticsearch Version
------- | ---------------------
3.x + | 5.x +
Note that the more recent version of esqueue is meant to work with Elasticsearch 5.x or later. If you need it to work with an older version, use v1.0.0:
`npm install elastiq`
`npm install equeue@^1.0.0`
If you are working with an older version of Elasticsearch, consider using `esqueue`.
## Usage
Simply include the module in your application.
`var Esqueue = require('esqueue');`
`var elastiq = require('elastiq');`
### Creating a queue
@@ -26,7 +36,7 @@ The first step is to create a new Queue instance. This is your point of entry, i
var index = 'my-index';
var options = {};
var queue = new Esqueue(index, options);
var queue = new Elastiq(index, options);
```
The queue instance is an event emitter, so you can listen for `error` events as you would any other event emitter.
@@ -38,7 +48,7 @@ Option | Default | Description
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
doctype | `esqueue` | The doctype to use in Elasticsearch
doctype | `elastiq` | The doctype to use in Elasticsearch
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
@@ -148,15 +158,15 @@ All of the above are valid. `workerFn2` and `asyncWorker` are likely to be more
## Queue events
`esqueue` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
`elastiq` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
```js
var Queue = require('esqueue');
var queueEvents = require('esqueue/lib/constants/events');
var Queue = require('elastiq');
var queueEvents = require('elastiq/lib/constants/events');
var jobQueue = new Queue('my-index');

View File

@@ -2,5 +2,5 @@ export default {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_DATE_SEPARATOR: '-',
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
DEFAULT_SETTING_DOCTYPE: 'elastiq',
};

View File

@@ -5,6 +5,8 @@ export default {
EVENT_JOB_CREATE_ERROR: 'job:creation error',
EVENT_WORKER_COMPLETE: 'worker:job complete',
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready',
EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned',
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
EVENT_WORKER_JOB_FAIL: 'worker:job failed',

View File

@@ -10,7 +10,7 @@ export default function createClient(options) {
}
return client;
};
}
export function isClient(client) {
// if there's a transport property, assume it's a client instance

View File

@@ -4,7 +4,7 @@ export function WorkerTimeoutError(message, props = {}) {
this.timeout = props.timeout;
this.jobId = props.jobId;
if ("captureStackTrace" in Error) Error.captureStackTrace(this, WorkerTimeoutError);
if ('captureStackTrace' in Error) Error.captureStackTrace(this, WorkerTimeoutError);
else this.stack = (new Error()).stack;
}
WorkerTimeoutError.prototype = Object.create(Error.prototype);
@@ -14,7 +14,7 @@ export function UnspecifiedWorkerError(message, props = {}) {
this.message = message;
this.jobId = props.jobId;
if ("captureStackTrace" in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
if ('captureStackTrace' in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
else this.stack = (new Error()).stack;
}
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);

View File

@@ -7,9 +7,9 @@ import indexTimestamp from './helpers/index_timestamp';
import objectOmit from './helpers/object_omit';
import logger from './helpers/logger';
const debug = logger('esqueue:queue');
const debug = logger('elastiq:queue');
export default class Esqueue extends events.EventEmitter {
export default class Elastiq extends events.EventEmitter {
constructor(index, options = {}) {
if (!index) throw new Error('Must specify an index to write to');

View File

@@ -5,7 +5,7 @@ import logger from './helpers/logger';
import createIndex from './helpers/create_index';
import isPlainObject from './helpers/is_plain_object';
const debug = logger('esqueue:job');
const debug = logger('elastiq:job');
const puid = new Puid();
export default class Job extends events.EventEmitter {

View File

@@ -6,7 +6,7 @@ import logger from './helpers/logger';
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
const puid = new Puid();
const debug = logger('esqueue:worker');
const debug = logger('elastiq:worker');
function formatJobObject(job) {
return {
@@ -34,14 +34,18 @@ export default class Worker extends events.EventEmitter {
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this._checker = false;
this._running = true;
this._poller = {
timer: false,
enabled: true,
running: false,
};
this.debug(`Created worker for job type ${this.jobtype}`);
this._startJobPolling();
}
destroy() {
this._running = false;
this._poller.enabled = false;
this._stopJobPolling();
}
@@ -247,24 +251,45 @@ export default class Worker extends events.EventEmitter {
}
_startJobPolling() {
if (!this._running) {
if (!this._poller.enabled || this._poller.running) {
return;
}
this._checker = setInterval(() => {
this._poller.timer = setTimeout(() => {
this._getPendingJobs()
.then((jobs) => this._claimPendingJobs(jobs));
.then((jobs) => {
if (!this._poller.running) return;
const foundJobs = (!jobs || jobs.length === 0);
const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs);
task.then(() => {
this._poller.running = false;
this._startJobPolling();
});
}, () => {
// if the search failed for some reason, back off the polling
// we assume errors came from a busy cluster
// TODO: check what error actually happened
const multiplier = 20;
setTimeout(() => {
this._poller.running = false;
this._startJobPolling();
}, this.checkInterval * multiplier);
});
} , this.checkInterval);
this._poller.running = true;
this.emit(constants.EVENT_WORKER_JOB_POLLING_READY);
}
_stopJobPolling() {
clearInterval(this._checker);
this._poller.running = false;
clearTimeout(this._poller.timer);
}
_claimPendingJobs(jobs) {
if (!jobs || jobs.length === 0) return;
this._stopJobPolling();
_claimPendingJobs(jobs = []) {
let claimed = false;
// claim a single job, stopping after first successful claim
@@ -290,10 +315,8 @@ export default class Worker extends events.EventEmitter {
this.debug(`Claimed job ${claimedJob._id}`);
return this._performJob(claimedJob);
})
.then(() => this._startJobPolling())
.catch((err) => {
this.debug('Error claiming jobs', err);
this._startJobPolling();
});
}
@@ -309,7 +332,7 @@ export default class Worker extends events.EventEmitter {
bool: {
filter: { term: { jobtype: this.jobtype } },
should: [
{ term: { status: 'pending'} },
{ term: { status: 'pending' } },
{ bool:
{ filter: [
{ term: { status: 'processing' } },
@@ -322,8 +345,8 @@ export default class Worker extends events.EventEmitter {
}
},
sort: [
{ priority: { order: 'asc' }},
{ created_at: { order: 'asc' }}
{ priority: { order: 'asc' } },
{ created_at: { order: 'asc' } }
],
size: this.checkSize
};
@@ -338,7 +361,10 @@ export default class Worker extends events.EventEmitter {
})
.then((results) => {
const jobs = results.hits.hits;
this.debug(`${jobs.length} outstanding jobs returned`);
this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs);
return jobs;
})
.catch((err) => {

View File

@@ -8,12 +8,12 @@ import elasticsearchMock from '../fixtures/elasticsearch';
import jobMock from '../fixtures/job';
import workerMock from '../fixtures/worker';
const Esqueue = proxyquire.noPreserveCache()('../../lib/index', {
const Queue = proxyquire.noPreserveCache()('../../lib/index', {
'./job.js': jobMock,
'./worker.js': workerMock,
});
describe('Esqueue class', function () {
describe('Elastiq class', function () {
let client;
beforeEach(function () {
@@ -21,18 +21,18 @@ describe('Esqueue class', function () {
});
it('should be an event emitter', function () {
const queue = new Esqueue('esqueue', { client });
const queue = new Queue('elastiq', { client });
expect(queue).to.be.an(events.EventEmitter);
});
describe('Option validation', function () {
it('should throw without an index', function () {
const init = () => new Esqueue();
const init = () => new Queue();
expect(init).to.throwException(/must.+specify.+index/i);
});
it('should throw with an invalid host', function () {
const init = () => new Esqueue('esqueue', {
const init = () => new Queue('elastiq', {
client: { host: 'nope://nope' }
});
@@ -40,7 +40,7 @@ describe('Esqueue class', function () {
});
it('should throw with invalid hosts', function () {
const init = () => new Esqueue('esqueue', {
const init = () => new Queue('elastiq', {
client: { hosts: [{ host: 'localhost', protocol: 'nope' }] }
});
@@ -51,7 +51,7 @@ describe('Esqueue class', function () {
describe('Queue construction', function () {
it('should ping the ES server', function () {
const pingSpy = sinon.spy(client, 'ping');
new Esqueue('esqueue', { client });
new Queue('elastiq', { client });
sinon.assert.calledOnce(pingSpy);
});
});
@@ -63,14 +63,14 @@ describe('Esqueue class', function () {
let queue;
beforeEach(function () {
indexName = 'esqueue-index';
indexName = 'elastiq-index';
jobType = 'test-test';
payload = { payload: true };
queue = new Esqueue(indexName, { client });
queue = new Queue(indexName, { client });
});
it('should throw with invalid dateSeparator setting', function () {
queue = new Esqueue(indexName, { client, dateSeparator: 'a' });
queue = new Queue(indexName, { client, dateSeparator: 'a' });
const fn = () => queue.addJob(jobType, payload);
expect(fn).to.throwException();
});
@@ -97,7 +97,7 @@ describe('Esqueue class', function () {
}
};
queue = new Esqueue(indexName, { client, indexSettings });
queue = new Queue(indexName, { client, indexSettings });
const job = queue.addJob(jobType, payload);
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
});
@@ -117,7 +117,7 @@ describe('Esqueue class', function () {
let queue;
beforeEach(function () {
queue = new Esqueue('esqueue', { client });
queue = new Queue('elastiq', { client });
});
it('should keep track of workers', function () {
@@ -146,7 +146,7 @@ describe('Esqueue class', function () {
doctype: 'tests'
};
queue = new Esqueue('esqueue', { client });
queue = new Queue('elastiq', { client });
const worker = queue.registerWorker('type', noop, workerOptions);
expect(worker.getProp('options')).to.equal(workerOptions);
});
@@ -154,7 +154,7 @@ describe('Esqueue class', function () {
describe('Destroy', function () {
it('should destroy workers', function () {
const queue = new Esqueue('esqueue', { client });
const queue = new Queue('elastiq', { client });
const stubs = times(3, () => { return { destroy: sinon.stub() }; });
stubs.forEach((stub) => queue._workers.push(stub));
expect(queue.getWorkers()).to.have.length(3);

View File

@@ -187,24 +187,63 @@ describe('Worker class', function () {
});
it('should not poll once destroyed', function () {
// remove the search spy
mockQueue.client.search.restore();
// mock the search, return 0 new jobs
const zeroHits = { hits: { hits: [] } };
const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits));
const worker = new Worker(mockQueue, 'test', noop);
// move the clock a couple times, test for searches each time
sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledOnce(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
function waitForSearch() {
return new Promise((resolve) => {
worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => {
resolve()
});
});
}
function waitForPoller() {
return new Promise((resolve) => {
worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => {
resolve()
});
});
}
// move the clock a couple times, test for searches each time
sinon.assert.notCalled(searchStub);
const firstWait = waitForSearch();
clock.tick(defaults.interval);
return firstWait
.then(() => {
sinon.assert.calledOnce(searchStub);
return waitForPoller();
})
.then(() => {
const secondWait = waitForSearch();
clock.tick(defaults.interval);
return secondWait;
})
.then(() => {
sinon.assert.calledTwice(searchStub);
return waitForPoller();
})
.then(() => {
// destroy the worker, move the clock, make sure another search doesn't happen
worker.destroy();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
sinon.assert.calledTwice(searchStub);
// manually call job poller, move the clock, make sure another search doesn't happen
worker._startJobPolling();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
sinon.assert.calledTwice(searchStub);
});
});
});