Compare commits
21 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f4dfddef8e | |||
| efbda1b1f5 | |||
| bef3104755 | |||
| be536277c3 | |||
| 71d8b63531 | |||
| 83a0352202 | |||
| 170ba3b8a0 | |||
| 26715f9ba2 | |||
| b980ad9e9e | |||
| 9d8ae30ec6 | |||
|
|
295db5e8fd | ||
| 6620a5de39 | |||
| bb898e26c2 | |||
| 4813c5bfa3 | |||
| 0ce26f6d9d | |||
| 6bdf7163b6 | |||
| 128905cdb4 | |||
| 01adab4174 | |||
| eb17575a96 | |||
| 73c7147c26 | |||
| 7787e35ea5 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
.DS_Store
|
||||||
lib
|
lib
|
||||||
node_modules
|
node_modules
|
||||||
npm-debug.log
|
npm-debug.log
|
||||||
|
|||||||
@@ -1 +1 @@
|
|||||||
4.x
|
6.x
|
||||||
|
|||||||
17
.travis.yml
17
.travis.yml
@@ -1,16 +1,23 @@
|
|||||||
language: node_js
|
language: node_js
|
||||||
|
|
||||||
node_js:
|
node_js:
|
||||||
- "stable"
|
- "stable"
|
||||||
- "6"
|
- "8"
|
||||||
|
- "6"
|
||||||
|
|
||||||
|
cache:
|
||||||
|
directories:
|
||||||
|
- node_modules
|
||||||
|
|
||||||
notifications:
|
notifications:
|
||||||
email: false
|
email:
|
||||||
|
on_success: never
|
||||||
before_script: npm install
|
on_failure: change
|
||||||
|
|
||||||
after_success: npm run coverage
|
after_success: npm run coverage
|
||||||
|
|
||||||
|
script: npm run test:ci
|
||||||
|
|
||||||
branches:
|
branches:
|
||||||
only:
|
only:
|
||||||
- master
|
- master
|
||||||
|
|||||||
11
CHANGELOG.md
11
CHANGELOG.md
@@ -2,6 +2,17 @@
|
|||||||
|
|
||||||
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
|
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
|
||||||
|
|
||||||
|
## v3.0.0
|
||||||
|
|
||||||
|
- support for node v4 or earlier is no longer tested
|
||||||
|
- update several dependencies
|
||||||
|
|
||||||
|
## v2.0.3
|
||||||
|
|
||||||
|
- rename to elastiq
|
||||||
|
- fix issue where job poller would not wait for ES response
|
||||||
|
- when job polling search fails, wait for a 20x interval before searching again
|
||||||
|
|
||||||
## v2.0.2
|
## v2.0.2
|
||||||
|
|
||||||
- Fix issue where creating a worker would not use the queue's doctype by default
|
- Fix issue where creating a worker would not use the queue's doctype by default
|
||||||
|
|||||||
23
package.json
23
package.json
@@ -1,6 +1,6 @@
|
|||||||
{
|
{
|
||||||
"name": "esqueue",
|
"name": "elastiq",
|
||||||
"version": "2.0.2",
|
"version": "3.0.0",
|
||||||
"description": "Job queue, powered by Elasticsearch",
|
"description": "Job queue, powered by Elasticsearch",
|
||||||
"main": "lib/index.js",
|
"main": "lib/index.js",
|
||||||
"scripts": {
|
"scripts": {
|
||||||
@@ -8,9 +8,11 @@
|
|||||||
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
|
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
|
||||||
"prepublish": "in-publish && npm run test || not-in-publish",
|
"prepublish": "in-publish && npm run test || not-in-publish",
|
||||||
"test": "retire -n -p package.json && npm run build && npm run unit",
|
"test": "retire -n -p package.json && npm run build && npm run unit",
|
||||||
|
"lint": "eslint \"*.js\" \"src/**/*.js\"",
|
||||||
|
"test:ci": "npm run lint && npm run test",
|
||||||
"unit": "nyc --require babel-core/register mocha test/src/**"
|
"unit": "nyc --require babel-core/register mocha test/src/**"
|
||||||
},
|
},
|
||||||
"author": "Elastic (https://github.com/elastic)",
|
"author": "Joe Fleming (https://github.com/w33ble)",
|
||||||
"keywords": [
|
"keywords": [
|
||||||
"job",
|
"job",
|
||||||
"queue",
|
"queue",
|
||||||
@@ -19,7 +21,7 @@
|
|||||||
],
|
],
|
||||||
"repository": {
|
"repository": {
|
||||||
"type": "git",
|
"type": "git",
|
||||||
"url": "https://github.com/elastic/esqueue.git"
|
"url": "https://github.com/w33ble/elastiq.git"
|
||||||
},
|
},
|
||||||
"license": "Apache-2.0",
|
"license": "Apache-2.0",
|
||||||
"engines": {
|
"engines": {
|
||||||
@@ -29,20 +31,21 @@
|
|||||||
"@elastic/eslint-config-kibana": "^0.3.0",
|
"@elastic/eslint-config-kibana": "^0.3.0",
|
||||||
"babel-cli": "^6.23.0",
|
"babel-cli": "^6.23.0",
|
||||||
"babel-core": "^6.23.1",
|
"babel-core": "^6.23.1",
|
||||||
"babel-eslint": "^7.1.1",
|
"babel-eslint": "6.1.2",
|
||||||
"babel-plugin-add-module-exports": "^0.2.1",
|
"babel-plugin-add-module-exports": "^0.2.1",
|
||||||
"babel-preset-es2015": "^6.22.0",
|
"babel-preset-es2015": "^6.22.0",
|
||||||
"elasticsearch": "^12.0.0",
|
"elasticsearch": "^13.0.1",
|
||||||
"eslint": "^3.16.1",
|
"eslint": "3.11.1",
|
||||||
"eslint-plugin-mocha": "^4.8.0",
|
"eslint-plugin-babel": "4.0.0",
|
||||||
"eslint-plugin-react": "^6.10.0",
|
"eslint-plugin-mocha": "4.7.0",
|
||||||
|
"eslint-plugin-react": "^7.0.1",
|
||||||
"expect.js": "~0.3.1",
|
"expect.js": "~0.3.1",
|
||||||
"lodash": "^4.17.4",
|
"lodash": "^4.17.4",
|
||||||
"mocha": "^3.2.0",
|
"mocha": "^3.2.0",
|
||||||
"nyc": "^10.1.2",
|
"nyc": "^10.1.2",
|
||||||
"proxyquire": "^1.7.4",
|
"proxyquire": "^1.7.4",
|
||||||
"retire": "^1.2.12",
|
"retire": "^1.2.12",
|
||||||
"sinon": "^1.17.3"
|
"sinon": "^2.3.1"
|
||||||
},
|
},
|
||||||
"peerDependencies": {
|
"peerDependencies": {
|
||||||
"elasticsearch": ">=11.0.1"
|
"elasticsearch": ">=11.0.1"
|
||||||
|
|||||||
34
readme.md
34
readme.md
@@ -1,22 +1,32 @@
|
|||||||
[](https://travis-ci.org/elastic/esqueue)
|
[](https://travis-ci.org/w33ble/elastiq)
|
||||||
|
[](https://raw.githubusercontent.com/w33ble/elastiq/master/LICENSE)
|
||||||
|
[](https://github.com/w33ble/elastiq#project-status)
|
||||||
|
|
||||||
# esqueue
|
# elastiq
|
||||||
|
|
||||||
`esqueue` is an Elasticsearch-powered job queue
|
`elastiq` is an Elasticsearch-powered job queue.
|
||||||
|
|
||||||
|
Pronounced Elasti-queue. This is not supported by Elastic.
|
||||||
|
|
||||||
|
## Project Status
|
||||||
|
|
||||||
|
While it's believed to be pretty stable, this library isn't really being used anywhere. Issues and PRs are welcome, but it isn't actively being developed. As such, it doesn't get a lot of stress testing, and I don't recommend you rely too heavily on it, hence its experimental status.
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
`npm install esqueue`
|
Version | Elasticsearch Version
|
||||||
|
------- | ---------------------
|
||||||
|
3.x + | 5.x +
|
||||||
|
|
||||||
Note that the more recent version of esqueue is meant to work with Elasticsearch 5.x or later. If you need it to work with an older version, use v1.0.0:
|
`npm install elastiq`
|
||||||
|
|
||||||
`npm install equeue@^1.0.0`
|
If you are working with an older version of Elasticsearch, consider using `esqueue`.
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
Simply include the module in your application.
|
Simply include the module in your application.
|
||||||
|
|
||||||
`var Esqueue = require('esqueue');`
|
`var elastiq = require('elastiq');`
|
||||||
|
|
||||||
### Creating a queue
|
### Creating a queue
|
||||||
|
|
||||||
@@ -26,7 +36,7 @@ The first step is to create a new Queue instance. This is your point of entry, i
|
|||||||
var index = 'my-index';
|
var index = 'my-index';
|
||||||
var options = {};
|
var options = {};
|
||||||
|
|
||||||
var queue = new Esqueue(index, options);
|
var queue = new Elastiq(index, options);
|
||||||
```
|
```
|
||||||
|
|
||||||
The queue instance is an event emitter, so you can listen for `error` events as you would any other event emitter.
|
The queue instance is an event emitter, so you can listen for `error` events as you would any other event emitter.
|
||||||
@@ -38,7 +48,7 @@ Option | Default | Description
|
|||||||
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
|
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
|
||||||
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
|
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
|
||||||
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
|
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
|
||||||
doctype | `esqueue` | The doctype to use in Elasticsearch
|
doctype | `elastiq` | The doctype to use in Elasticsearch
|
||||||
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
|
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
|
||||||
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
|
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
|
||||||
|
|
||||||
@@ -148,15 +158,15 @@ All of the above are valid. `workerFn2` and `asyncWorker` are likely to be more
|
|||||||
|
|
||||||
## Queue events
|
## Queue events
|
||||||
|
|
||||||
`esqueue` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
|
`elastiq` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
|
||||||
|
|
||||||
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
|
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
|
||||||
|
|
||||||
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
|
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
|
||||||
|
|
||||||
```js
|
```js
|
||||||
var Queue = require('esqueue');
|
var Queue = require('elastiq');
|
||||||
var queueEvents = require('esqueue/lib/constants/events');
|
var queueEvents = require('elastiq/lib/constants/events');
|
||||||
|
|
||||||
var jobQueue = new Queue('my-index');
|
var jobQueue = new Queue('my-index');
|
||||||
|
|
||||||
|
|||||||
@@ -2,5 +2,5 @@ export default {
|
|||||||
DEFAULT_SETTING_TIMEOUT: 10000,
|
DEFAULT_SETTING_TIMEOUT: 10000,
|
||||||
DEFAULT_SETTING_DATE_SEPARATOR: '-',
|
DEFAULT_SETTING_DATE_SEPARATOR: '-',
|
||||||
DEFAULT_SETTING_INTERVAL: 'week',
|
DEFAULT_SETTING_INTERVAL: 'week',
|
||||||
DEFAULT_SETTING_DOCTYPE: 'esqueue',
|
DEFAULT_SETTING_DOCTYPE: 'elastiq',
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ export default {
|
|||||||
EVENT_JOB_CREATE_ERROR: 'job:creation error',
|
EVENT_JOB_CREATE_ERROR: 'job:creation error',
|
||||||
EVENT_WORKER_COMPLETE: 'worker:job complete',
|
EVENT_WORKER_COMPLETE: 'worker:job complete',
|
||||||
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
|
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
|
||||||
|
EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready',
|
||||||
|
EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned',
|
||||||
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
|
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
|
||||||
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
|
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
|
||||||
EVENT_WORKER_JOB_FAIL: 'worker:job failed',
|
EVENT_WORKER_JOB_FAIL: 'worker:job failed',
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ export default function createClient(options) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return client;
|
return client;
|
||||||
};
|
}
|
||||||
|
|
||||||
export function isClient(client) {
|
export function isClient(client) {
|
||||||
// if there's a transport property, assume it's a client instance
|
// if there's a transport property, assume it's a client instance
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ export function WorkerTimeoutError(message, props = {}) {
|
|||||||
this.timeout = props.timeout;
|
this.timeout = props.timeout;
|
||||||
this.jobId = props.jobId;
|
this.jobId = props.jobId;
|
||||||
|
|
||||||
if ("captureStackTrace" in Error) Error.captureStackTrace(this, WorkerTimeoutError);
|
if ('captureStackTrace' in Error) Error.captureStackTrace(this, WorkerTimeoutError);
|
||||||
else this.stack = (new Error()).stack;
|
else this.stack = (new Error()).stack;
|
||||||
}
|
}
|
||||||
WorkerTimeoutError.prototype = Object.create(Error.prototype);
|
WorkerTimeoutError.prototype = Object.create(Error.prototype);
|
||||||
@@ -14,7 +14,7 @@ export function UnspecifiedWorkerError(message, props = {}) {
|
|||||||
this.message = message;
|
this.message = message;
|
||||||
this.jobId = props.jobId;
|
this.jobId = props.jobId;
|
||||||
|
|
||||||
if ("captureStackTrace" in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
|
if ('captureStackTrace' in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
|
||||||
else this.stack = (new Error()).stack;
|
else this.stack = (new Error()).stack;
|
||||||
}
|
}
|
||||||
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);
|
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);
|
||||||
|
|||||||
@@ -7,9 +7,9 @@ import indexTimestamp from './helpers/index_timestamp';
|
|||||||
import objectOmit from './helpers/object_omit';
|
import objectOmit from './helpers/object_omit';
|
||||||
import logger from './helpers/logger';
|
import logger from './helpers/logger';
|
||||||
|
|
||||||
const debug = logger('esqueue:queue');
|
const debug = logger('elastiq:queue');
|
||||||
|
|
||||||
export default class Esqueue extends events.EventEmitter {
|
export default class Elastiq extends events.EventEmitter {
|
||||||
constructor(index, options = {}) {
|
constructor(index, options = {}) {
|
||||||
if (!index) throw new Error('Must specify an index to write to');
|
if (!index) throw new Error('Must specify an index to write to');
|
||||||
|
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import logger from './helpers/logger';
|
|||||||
import createIndex from './helpers/create_index';
|
import createIndex from './helpers/create_index';
|
||||||
import isPlainObject from './helpers/is_plain_object';
|
import isPlainObject from './helpers/is_plain_object';
|
||||||
|
|
||||||
const debug = logger('esqueue:job');
|
const debug = logger('elastiq:job');
|
||||||
const puid = new Puid();
|
const puid = new Puid();
|
||||||
|
|
||||||
export default class Job extends events.EventEmitter {
|
export default class Job extends events.EventEmitter {
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import logger from './helpers/logger';
|
|||||||
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
|
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
|
||||||
|
|
||||||
const puid = new Puid();
|
const puid = new Puid();
|
||||||
const debug = logger('esqueue:worker');
|
const debug = logger('elastiq:worker');
|
||||||
|
|
||||||
function formatJobObject(job) {
|
function formatJobObject(job) {
|
||||||
return {
|
return {
|
||||||
@@ -34,14 +34,18 @@ export default class Worker extends events.EventEmitter {
|
|||||||
|
|
||||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||||
|
|
||||||
this._checker = false;
|
this._poller = {
|
||||||
this._running = true;
|
timer: false,
|
||||||
|
enabled: true,
|
||||||
|
running: false,
|
||||||
|
};
|
||||||
|
|
||||||
this.debug(`Created worker for job type ${this.jobtype}`);
|
this.debug(`Created worker for job type ${this.jobtype}`);
|
||||||
this._startJobPolling();
|
this._startJobPolling();
|
||||||
}
|
}
|
||||||
|
|
||||||
destroy() {
|
destroy() {
|
||||||
this._running = false;
|
this._poller.enabled = false;
|
||||||
this._stopJobPolling();
|
this._stopJobPolling();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -247,24 +251,45 @@ export default class Worker extends events.EventEmitter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
_startJobPolling() {
|
_startJobPolling() {
|
||||||
if (!this._running) {
|
if (!this._poller.enabled || this._poller.running) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
this._checker = setInterval(() => {
|
this._poller.timer = setTimeout(() => {
|
||||||
this._getPendingJobs()
|
this._getPendingJobs()
|
||||||
.then((jobs) => this._claimPendingJobs(jobs));
|
.then((jobs) => {
|
||||||
|
if (!this._poller.running) return;
|
||||||
|
|
||||||
|
const foundJobs = (!jobs || jobs.length === 0);
|
||||||
|
const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs);
|
||||||
|
|
||||||
|
task.then(() => {
|
||||||
|
this._poller.running = false;
|
||||||
|
this._startJobPolling();
|
||||||
|
});
|
||||||
|
}, () => {
|
||||||
|
// if the search failed for some reason, back off the polling
|
||||||
|
// we assume errors came from a busy cluster
|
||||||
|
// TODO: check what error actually happened
|
||||||
|
const multiplier = 20;
|
||||||
|
|
||||||
|
setTimeout(() => {
|
||||||
|
this._poller.running = false;
|
||||||
|
this._startJobPolling();
|
||||||
|
}, this.checkInterval * multiplier);
|
||||||
|
});
|
||||||
} , this.checkInterval);
|
} , this.checkInterval);
|
||||||
|
|
||||||
|
this._poller.running = true;
|
||||||
|
this.emit(constants.EVENT_WORKER_JOB_POLLING_READY);
|
||||||
}
|
}
|
||||||
|
|
||||||
_stopJobPolling() {
|
_stopJobPolling() {
|
||||||
clearInterval(this._checker);
|
this._poller.running = false;
|
||||||
|
clearTimeout(this._poller.timer);
|
||||||
}
|
}
|
||||||
|
|
||||||
_claimPendingJobs(jobs) {
|
_claimPendingJobs(jobs = []) {
|
||||||
if (!jobs || jobs.length === 0) return;
|
|
||||||
|
|
||||||
this._stopJobPolling();
|
|
||||||
let claimed = false;
|
let claimed = false;
|
||||||
|
|
||||||
// claim a single job, stopping after first successful claim
|
// claim a single job, stopping after first successful claim
|
||||||
@@ -290,10 +315,8 @@ export default class Worker extends events.EventEmitter {
|
|||||||
this.debug(`Claimed job ${claimedJob._id}`);
|
this.debug(`Claimed job ${claimedJob._id}`);
|
||||||
return this._performJob(claimedJob);
|
return this._performJob(claimedJob);
|
||||||
})
|
})
|
||||||
.then(() => this._startJobPolling())
|
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
this.debug('Error claiming jobs', err);
|
this.debug('Error claiming jobs', err);
|
||||||
this._startJobPolling();
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -309,12 +332,12 @@ export default class Worker extends events.EventEmitter {
|
|||||||
bool: {
|
bool: {
|
||||||
filter: { term: { jobtype: this.jobtype } },
|
filter: { term: { jobtype: this.jobtype } },
|
||||||
should: [
|
should: [
|
||||||
{ term: { status: 'pending'} },
|
{ term: { status: 'pending' } },
|
||||||
{ bool:
|
{ bool:
|
||||||
{ filter: [
|
{ filter: [
|
||||||
{ term: { status: 'processing' } },
|
{ term: { status: 'processing' } },
|
||||||
{ range: { process_expiration: { lte: nowTime } } }
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
] }
|
] }
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
@@ -322,8 +345,8 @@ export default class Worker extends events.EventEmitter {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
sort: [
|
sort: [
|
||||||
{ priority: { order: 'asc' }},
|
{ priority: { order: 'asc' } },
|
||||||
{ created_at: { order: 'asc' }}
|
{ created_at: { order: 'asc' } }
|
||||||
],
|
],
|
||||||
size: this.checkSize
|
size: this.checkSize
|
||||||
};
|
};
|
||||||
@@ -338,7 +361,10 @@ export default class Worker extends events.EventEmitter {
|
|||||||
})
|
})
|
||||||
.then((results) => {
|
.then((results) => {
|
||||||
const jobs = results.hits.hits;
|
const jobs = results.hits.hits;
|
||||||
|
|
||||||
this.debug(`${jobs.length} outstanding jobs returned`);
|
this.debug(`${jobs.length} outstanding jobs returned`);
|
||||||
|
this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs);
|
||||||
|
|
||||||
return jobs;
|
return jobs;
|
||||||
})
|
})
|
||||||
.catch((err) => {
|
.catch((err) => {
|
||||||
|
|||||||
@@ -8,12 +8,12 @@ import elasticsearchMock from '../fixtures/elasticsearch';
|
|||||||
import jobMock from '../fixtures/job';
|
import jobMock from '../fixtures/job';
|
||||||
import workerMock from '../fixtures/worker';
|
import workerMock from '../fixtures/worker';
|
||||||
|
|
||||||
const Esqueue = proxyquire.noPreserveCache()('../../lib/index', {
|
const Queue = proxyquire.noPreserveCache()('../../lib/index', {
|
||||||
'./job.js': jobMock,
|
'./job.js': jobMock,
|
||||||
'./worker.js': workerMock,
|
'./worker.js': workerMock,
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Esqueue class', function () {
|
describe('Elastiq class', function () {
|
||||||
let client;
|
let client;
|
||||||
|
|
||||||
beforeEach(function () {
|
beforeEach(function () {
|
||||||
@@ -21,18 +21,18 @@ describe('Esqueue class', function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should be an event emitter', function () {
|
it('should be an event emitter', function () {
|
||||||
const queue = new Esqueue('esqueue', { client });
|
const queue = new Queue('elastiq', { client });
|
||||||
expect(queue).to.be.an(events.EventEmitter);
|
expect(queue).to.be.an(events.EventEmitter);
|
||||||
});
|
});
|
||||||
|
|
||||||
describe('Option validation', function () {
|
describe('Option validation', function () {
|
||||||
it('should throw without an index', function () {
|
it('should throw without an index', function () {
|
||||||
const init = () => new Esqueue();
|
const init = () => new Queue();
|
||||||
expect(init).to.throwException(/must.+specify.+index/i);
|
expect(init).to.throwException(/must.+specify.+index/i);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw with an invalid host', function () {
|
it('should throw with an invalid host', function () {
|
||||||
const init = () => new Esqueue('esqueue', {
|
const init = () => new Queue('elastiq', {
|
||||||
client: { host: 'nope://nope' }
|
client: { host: 'nope://nope' }
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -40,7 +40,7 @@ describe('Esqueue class', function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should throw with invalid hosts', function () {
|
it('should throw with invalid hosts', function () {
|
||||||
const init = () => new Esqueue('esqueue', {
|
const init = () => new Queue('elastiq', {
|
||||||
client: { hosts: [{ host: 'localhost', protocol: 'nope' }] }
|
client: { hosts: [{ host: 'localhost', protocol: 'nope' }] }
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -51,7 +51,7 @@ describe('Esqueue class', function () {
|
|||||||
describe('Queue construction', function () {
|
describe('Queue construction', function () {
|
||||||
it('should ping the ES server', function () {
|
it('should ping the ES server', function () {
|
||||||
const pingSpy = sinon.spy(client, 'ping');
|
const pingSpy = sinon.spy(client, 'ping');
|
||||||
new Esqueue('esqueue', { client });
|
new Queue('elastiq', { client });
|
||||||
sinon.assert.calledOnce(pingSpy);
|
sinon.assert.calledOnce(pingSpy);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
@@ -63,14 +63,14 @@ describe('Esqueue class', function () {
|
|||||||
let queue;
|
let queue;
|
||||||
|
|
||||||
beforeEach(function () {
|
beforeEach(function () {
|
||||||
indexName = 'esqueue-index';
|
indexName = 'elastiq-index';
|
||||||
jobType = 'test-test';
|
jobType = 'test-test';
|
||||||
payload = { payload: true };
|
payload = { payload: true };
|
||||||
queue = new Esqueue(indexName, { client });
|
queue = new Queue(indexName, { client });
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should throw with invalid dateSeparator setting', function () {
|
it('should throw with invalid dateSeparator setting', function () {
|
||||||
queue = new Esqueue(indexName, { client, dateSeparator: 'a' });
|
queue = new Queue(indexName, { client, dateSeparator: 'a' });
|
||||||
const fn = () => queue.addJob(jobType, payload);
|
const fn = () => queue.addJob(jobType, payload);
|
||||||
expect(fn).to.throwException();
|
expect(fn).to.throwException();
|
||||||
});
|
});
|
||||||
@@ -97,7 +97,7 @@ describe('Esqueue class', function () {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
queue = new Esqueue(indexName, { client, indexSettings });
|
queue = new Queue(indexName, { client, indexSettings });
|
||||||
const job = queue.addJob(jobType, payload);
|
const job = queue.addJob(jobType, payload);
|
||||||
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
|
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
|
||||||
});
|
});
|
||||||
@@ -117,7 +117,7 @@ describe('Esqueue class', function () {
|
|||||||
let queue;
|
let queue;
|
||||||
|
|
||||||
beforeEach(function () {
|
beforeEach(function () {
|
||||||
queue = new Esqueue('esqueue', { client });
|
queue = new Queue('elastiq', { client });
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should keep track of workers', function () {
|
it('should keep track of workers', function () {
|
||||||
@@ -146,7 +146,7 @@ describe('Esqueue class', function () {
|
|||||||
doctype: 'tests'
|
doctype: 'tests'
|
||||||
};
|
};
|
||||||
|
|
||||||
queue = new Esqueue('esqueue', { client });
|
queue = new Queue('elastiq', { client });
|
||||||
const worker = queue.registerWorker('type', noop, workerOptions);
|
const worker = queue.registerWorker('type', noop, workerOptions);
|
||||||
expect(worker.getProp('options')).to.equal(workerOptions);
|
expect(worker.getProp('options')).to.equal(workerOptions);
|
||||||
});
|
});
|
||||||
@@ -154,7 +154,7 @@ describe('Esqueue class', function () {
|
|||||||
|
|
||||||
describe('Destroy', function () {
|
describe('Destroy', function () {
|
||||||
it('should destroy workers', function () {
|
it('should destroy workers', function () {
|
||||||
const queue = new Esqueue('esqueue', { client });
|
const queue = new Queue('elastiq', { client });
|
||||||
const stubs = times(3, () => { return { destroy: sinon.stub() }; });
|
const stubs = times(3, () => { return { destroy: sinon.stub() }; });
|
||||||
stubs.forEach((stub) => queue._workers.push(stub));
|
stubs.forEach((stub) => queue._workers.push(stub));
|
||||||
expect(queue.getWorkers()).to.have.length(3);
|
expect(queue.getWorkers()).to.have.length(3);
|
||||||
|
|||||||
@@ -187,24 +187,63 @@ describe('Worker class', function () {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should not poll once destroyed', function () {
|
it('should not poll once destroyed', function () {
|
||||||
|
// remove the search spy
|
||||||
|
mockQueue.client.search.restore();
|
||||||
|
|
||||||
|
// mock the search, return 0 new jobs
|
||||||
|
const zeroHits = { hits: { hits: [] } };
|
||||||
|
const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits));
|
||||||
|
|
||||||
const worker = new Worker(mockQueue, 'test', noop);
|
const worker = new Worker(mockQueue, 'test', noop);
|
||||||
|
|
||||||
|
function waitForSearch() {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => {
|
||||||
|
resolve()
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function waitForPoller() {
|
||||||
|
return new Promise((resolve) => {
|
||||||
|
worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => {
|
||||||
|
resolve()
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
// move the clock a couple times, test for searches each time
|
// move the clock a couple times, test for searches each time
|
||||||
sinon.assert.notCalled(searchSpy);
|
sinon.assert.notCalled(searchStub);
|
||||||
clock.tick(defaults.interval);
|
|
||||||
sinon.assert.calledOnce(searchSpy);
|
|
||||||
clock.tick(defaults.interval);
|
|
||||||
sinon.assert.calledTwice(searchSpy);
|
|
||||||
|
|
||||||
// destroy the worker, move the clock, make sure another search doesn't happen
|
const firstWait = waitForSearch();
|
||||||
worker.destroy();
|
|
||||||
clock.tick(defaults.interval);
|
clock.tick(defaults.interval);
|
||||||
sinon.assert.calledTwice(searchSpy);
|
|
||||||
|
|
||||||
// manually call job poller, move the clock, make sure another search doesn't happen
|
return firstWait
|
||||||
worker._startJobPolling();
|
.then(() => {
|
||||||
clock.tick(defaults.interval);
|
sinon.assert.calledOnce(searchStub);
|
||||||
sinon.assert.calledTwice(searchSpy);
|
return waitForPoller();
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
const secondWait = waitForSearch();
|
||||||
|
clock.tick(defaults.interval);
|
||||||
|
return secondWait;
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
sinon.assert.calledTwice(searchStub);
|
||||||
|
return waitForPoller();
|
||||||
|
})
|
||||||
|
.then(() => {
|
||||||
|
// destroy the worker, move the clock, make sure another search doesn't happen
|
||||||
|
worker.destroy();
|
||||||
|
|
||||||
|
clock.tick(defaults.interval);
|
||||||
|
sinon.assert.calledTwice(searchStub);
|
||||||
|
|
||||||
|
// manually call job poller, move the clock, make sure another search doesn't happen
|
||||||
|
worker._startJobPolling();
|
||||||
|
clock.tick(defaults.interval);
|
||||||
|
sinon.assert.calledTwice(searchStub);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user