Compare commits
143 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 91e5316877 | |||
| ee948c5900 | |||
| ff944c523a | |||
| 6401022021 | |||
| 32fb6a92f5 | |||
| d4f24182b4 | |||
| 283731d6e6 | |||
| 79affbce76 | |||
| 9b02bdf56c | |||
| bae58b0370 | |||
| c1216d5a4b | |||
| 0e0c3feab9 | |||
| 0fb14d46c9 | |||
| 7900d0c7e1 | |||
| c84b62bd31 | |||
| d972c9a1cb | |||
| e294af2688 | |||
| 73efd7167d | |||
| 0ed702c665 | |||
| 2324256039 | |||
| c6c73f6dd2 | |||
| 9cff4e4b04 | |||
| 1e8340f214 | |||
| bbd82a1de1 | |||
| 64c1c90337 | |||
| 5e0cc440ff | |||
| 1e3168452f | |||
|
|
3cc8573ab0 | ||
| b2f4ae857f | |||
| c64c5c80d6 | |||
| 4befaee4cc | |||
|
|
af47d230db | ||
|
|
7c1189cf3c | ||
| bcca5667fb | |||
| 5886c1399d | |||
| 40aa1d0a2c | |||
|
|
3d34642de4 | ||
| 7836ae5b39 | |||
| 6913391526 | |||
| 2753986038 | |||
| 64ff676fab | |||
| 160f087c9f | |||
| 50998fa69a | |||
| d7e12206c5 | |||
| d71792f939 | |||
|
|
9c411b0b11 | ||
|
|
7af0c82080 | ||
| c0c535076e | |||
| f5a12db60c | |||
| 3bf62525a9 | |||
| c368fa91a1 | |||
| a5bfb96855 | |||
| 83f267a028 | |||
| faeb705dee | |||
| 2693c40423 | |||
| 10003e147d | |||
| c976684a42 | |||
| 1a1f9358fd | |||
| 7867f14476 | |||
| 8b652f11a9 | |||
| f3ac7c1958 | |||
| 597052dc4e | |||
| 2c5519c253 | |||
| b916d1352a | |||
| 6ad438db96 | |||
| 38287f10f7 | |||
| 9d20095ffb | |||
| 67381108d7 | |||
| 2c025e02c9 | |||
| 041af798a8 | |||
| fd118fa746 | |||
| f8db6e1bd3 | |||
| 775442f284 | |||
| 8e8609eede | |||
| 25878c0b33 | |||
| bfe8799b90 | |||
| c758fb55a6 | |||
| 8cd2fde3a8 | |||
| 9a71fcc7b5 | |||
| 95569c9e82 | |||
| 288daecb6b | |||
| 9cfc080b64 | |||
| bdd94096db | |||
| 40d67829c8 | |||
| a50dbf752e | |||
| 630733b093 | |||
| c6986e3677 | |||
| 27390fef44 | |||
| 3b135bbd09 | |||
| 5ed3280a18 | |||
| 79358a76dd | |||
| d16d3ea4dd | |||
| 057bd26b74 | |||
| 362469f541 | |||
| a2391a30c5 | |||
| 3934f0cd1b | |||
| 919fec4835 | |||
| b99f5ff1b9 | |||
| c44d275395 | |||
| da57fdeee7 | |||
| aa2c0040d5 | |||
| d2e843f05b | |||
| 13a78d12cc | |||
| 1d4c45c5d9 | |||
| eba6748ba9 | |||
| 257645f11c | |||
| 460d83411e | |||
| 2b2db9c5f9 | |||
| df03738b9a | |||
| db1d282da2 | |||
| a5f1d77f23 | |||
| d73155d538 | |||
| 866f5948af | |||
| eae6942ec2 | |||
| b0fec7d1ed | |||
| 638c896e37 | |||
| 5d5552c548 | |||
| ea0ea7e6c2 | |||
| 85cc4bf7f8 | |||
| 34592740c7 | |||
| eeafaf7d42 | |||
| 9e3515ebd5 | |||
| 78871f97d9 | |||
| 5954ee1d51 | |||
| a2d3fb7ffd | |||
| 04608b0ab2 | |||
| c541e07bb5 | |||
| d70a8cc3ea | |||
| 68ef5d2147 | |||
| 1c24a4f7c1 | |||
| 5fe4e644d6 | |||
| 39315dddd0 | |||
| 57c1bd1819 | |||
| e0605796a1 | |||
| a1a7b9e213 | |||
| dcecd4020b | |||
| 40c8d15562 | |||
| 21d52b4b50 | |||
| a0042be7c9 | |||
| d325f108f7 | |||
| 4beb4a880b | |||
| 9ab3bb048e | |||
| ca0da61a69 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -3,3 +3,4 @@ node_modules
|
||||
npm-debug.log
|
||||
.nyc_output
|
||||
coverage.lcov
|
||||
yarn.lock
|
||||
|
||||
@@ -1 +1 @@
|
||||
4.4.4
|
||||
4.x
|
||||
|
||||
@@ -2,9 +2,7 @@ language: node_js
|
||||
|
||||
node_js:
|
||||
- "stable"
|
||||
- "5"
|
||||
- "4"
|
||||
- "4.3"
|
||||
- "6"
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
|
||||
99
CHANGELOG.md
99
CHANGELOG.md
@@ -1,6 +1,103 @@
|
||||
# Changelog
|
||||
|
||||
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading.
|
||||
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
|
||||
|
||||
## v2.0.2
|
||||
|
||||
- Fix issue where creating a worker would not use the queue's doctype by default
|
||||
|
||||
## v2.0.1
|
||||
|
||||
- Don't swallow 400 errors at index creation
|
||||
|
||||
## v2.0.0
|
||||
|
||||
- Change `sting` mappings to `keyword`, since [string is deprecated and is being removed from elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_50_mapping_changes.html#_literal_string_literal_fields_replaced_by_literal_text_literal_literal_keyword_literal_fields)
|
||||
|
||||
## v1.0.0
|
||||
|
||||
- [BREAKING] elasticsearch package is a peerDependency now, since it's not required if you are using the `client` option when instantiating the queue
|
||||
- Remove lodash.isPlainObject and lodash.omit dependencies, use customer helpers
|
||||
- Remove errors dependency, use custom errors
|
||||
|
||||
## v0.12.2
|
||||
|
||||
- Fixed issue where destoyed workers could continue running
|
||||
|
||||
## v0.12.1
|
||||
|
||||
- Move repo to elastic org
|
||||
|
||||
## v0.12.0
|
||||
|
||||
- [BREAKING] Rename general queue error to `queue:error` instead of simply `error`
|
||||
- Remove the `timeout` parameter from the ping operation on intiialization
|
||||
|
||||
## v0.11.1
|
||||
|
||||
- Apache 2.0 license file
|
||||
|
||||
## v0.11.0
|
||||
|
||||
- Contrary to the [source filtering docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-source-filtering.html), use plural form of include/exclude due to breaking change in Elasticsearch 5.0
|
||||
|
||||
## v0.10.5
|
||||
|
||||
- Apache 2.0 license file
|
||||
|
||||
## v0.10.4
|
||||
|
||||
- Allow index pattern date separator to be customized
|
||||
|
||||
## v0.10.3
|
||||
|
||||
- Bump moment.js version, fix [DoS issue](https://nodesecurity.io/advisories/55)
|
||||
|
||||
## v0.10.2
|
||||
|
||||
- Allow passing headers on job creation, useful for auth and proxy traversal
|
||||
|
||||
## v0.10.1
|
||||
|
||||
- Refresh Elasticsearch index when creating job, fire event after refresh
|
||||
|
||||
## v0.10.0
|
||||
|
||||
- [BREAKING] Remove header passing on job creation
|
||||
- [BREAKING] Job instantiation requires full queue instance
|
||||
- Expose event names in constants
|
||||
- Emit on Worker success conditions as well as errors
|
||||
- Worker and Job emits on the Queue instance
|
||||
|
||||
## v0.9.0
|
||||
|
||||
- [BREAKING] Rename timeout error event
|
||||
- Fix worker timeout condition
|
||||
- Fix issue where a worker error was not an instance of Error, or lacked a `toString()` method
|
||||
- Allow specifying option to pass to elasticsearch client on index creation
|
||||
|
||||
## v0.8.0
|
||||
|
||||
- [BREAKING] Don't throw on worker failures
|
||||
- [BREAKING] Don't emit errors on queue instance
|
||||
|
||||
## v0.7.0
|
||||
|
||||
- [BREAKING] Don't throw on job creation failures
|
||||
|
||||
## v0.6.1
|
||||
|
||||
- Allow headers option on job creation, passed to elasticsearch index request
|
||||
|
||||
## v0.6.0
|
||||
|
||||
- Allow client instance to be passed when creating a job
|
||||
- Allow client instance to be passed when creating a worker
|
||||
- Prefer any 4.x version of node for development
|
||||
|
||||
## v0.5.0
|
||||
|
||||
- [BREAKING] Change default `created_by` value to `false` (formerly `null`)
|
||||
|
||||
## v0.4.1
|
||||
|
||||
|
||||
13
license.txt
Normal file
13
license.txt
Normal file
@@ -0,0 +1,13 @@
|
||||
Copyright 2016 Joe Fleming
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
56
package.json
56
package.json
@@ -1,16 +1,16 @@
|
||||
{
|
||||
"name": "esqueue",
|
||||
"version": "0.4.1",
|
||||
"version": "2.0.2",
|
||||
"description": "Job queue, powered by Elasticsearch",
|
||||
"main": "lib/index.js",
|
||||
"scripts": {
|
||||
"build": "rm -rf lib && babel src --out-dir lib",
|
||||
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov",
|
||||
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
|
||||
"prepublish": "in-publish && npm run test || not-in-publish",
|
||||
"test": "npm run build && npm run unit",
|
||||
"test": "retire -n -p package.json && npm run build && npm run unit",
|
||||
"unit": "nyc --require babel-core/register mocha test/src/**"
|
||||
},
|
||||
"author": "Joe Fleming (https://github.com/w33ble)",
|
||||
"author": "Elastic (https://github.com/elastic)",
|
||||
"keywords": [
|
||||
"job",
|
||||
"queue",
|
||||
@@ -19,38 +19,38 @@
|
||||
],
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/w33ble/esqueue.git"
|
||||
"url": "https://github.com/elastic/esqueue.git"
|
||||
},
|
||||
"license": "Apache-2.0",
|
||||
"engines": {
|
||||
"node": ">=4.3.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@elastic/eslint-config-kibana": "0.0.3",
|
||||
"babel-cli": "~6.7.5",
|
||||
"babel-core": "~6.7.6",
|
||||
"babel-eslint": "~4.1.8",
|
||||
"babel-plugin-add-module-exports": "~0.1.2",
|
||||
"babel-preset-es2015": "~6.6.0",
|
||||
"codecov": "~1.0.1",
|
||||
"eslint": "~1.10.3",
|
||||
"eslint-plugin-mocha": "~1.1.0",
|
||||
"eslint-plugin-react": "~4.2.3",
|
||||
"@elastic/eslint-config-kibana": "^0.3.0",
|
||||
"babel-cli": "^6.23.0",
|
||||
"babel-core": "^6.23.1",
|
||||
"babel-eslint": "^7.1.1",
|
||||
"babel-plugin-add-module-exports": "^0.2.1",
|
||||
"babel-preset-es2015": "^6.22.0",
|
||||
"elasticsearch": "^12.0.0",
|
||||
"eslint": "^3.16.1",
|
||||
"eslint-plugin-mocha": "^4.8.0",
|
||||
"eslint-plugin-react": "^6.10.0",
|
||||
"expect.js": "~0.3.1",
|
||||
"lodash": "~4.11.1",
|
||||
"mocha": "~2.4.5",
|
||||
"nyc": "~6.4.2",
|
||||
"proxyquire": "~1.7.4",
|
||||
"sinon": "~1.17.3"
|
||||
"lodash": "^4.17.4",
|
||||
"mocha": "^3.2.0",
|
||||
"nyc": "^10.1.2",
|
||||
"proxyquire": "^1.7.4",
|
||||
"retire": "^1.2.12",
|
||||
"sinon": "^1.17.3"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"elasticsearch": ">=11.0.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"debug": "~2.2.0",
|
||||
"elasticsearch": "~11.0.1",
|
||||
"error": "~7.0.2",
|
||||
"in-publish": "~2.0.0",
|
||||
"lodash.isplainobject": "~4.0.4",
|
||||
"lodash.omit": "~4.2.1",
|
||||
"moment": "~2.10.6",
|
||||
"puid": "~1.0.5"
|
||||
"debug": "^2.6.1",
|
||||
"in-publish": "^2.0.0",
|
||||
"moment": "^2.17.1",
|
||||
"puid": "^1.0.5"
|
||||
}
|
||||
}
|
||||
|
||||
54
readme.md
54
readme.md
@@ -1,4 +1,4 @@
|
||||
[](https://travis-ci.org/w33ble/esqueue) [](https://codecov.io/gh/w33ble/esqueue)
|
||||
[](https://travis-ci.org/elastic/esqueue)
|
||||
|
||||
# esqueue
|
||||
|
||||
@@ -8,6 +8,10 @@
|
||||
|
||||
`npm install esqueue`
|
||||
|
||||
Note that the more recent version of esqueue is meant to work with Elasticsearch 5.x or later. If you need it to work with an older version, use v1.0.0:
|
||||
|
||||
`npm install equeue@^1.0.0`
|
||||
|
||||
## Usage
|
||||
|
||||
Simply include the module in your application.
|
||||
@@ -32,8 +36,10 @@ The queue instance is an event emitter, so you can listen for `error` events as
|
||||
Option | Default | Description
|
||||
------ | ----------- | -------
|
||||
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
|
||||
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
|
||||
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
|
||||
doctype | `esqueue` | The doctype to use in Elasticsearch
|
||||
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
|
||||
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
|
||||
|
||||
|
||||
@@ -61,6 +67,8 @@ timeout | `10000` | Timeout for the job, if different than the timeout configure
|
||||
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
||||
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
||||
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
|
||||
headers | | Any headers to add to the index request. Handy for custom authentication or use with a proxy.
|
||||
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
|
||||
|
||||
### Creating a worker
|
||||
|
||||
@@ -105,28 +113,62 @@ Option | Default | Description
|
||||
------ | ----------- | -------
|
||||
interval | `1500` | Time, in `ms` to poll for new jobs in the queue.
|
||||
size | `10` | Number of records to return when polling for new jobs. Higher values may result in less Elasticsearch requests, but may also take longer to execute. A bit of tuning based on the number of workers you have my be required here.
|
||||
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
|
||||
doctype | `queue.doctype` | The doctype to use when polling for new jobs. You probably don't want to change this.
|
||||
|
||||
The worker's `output` can either be the raw output from the job, or on object that specifies the output's content type.
|
||||
|
||||
```js
|
||||
var workerFn1 = function (payload, cb) {
|
||||
var workerFn1 = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
var output = new Date().toString();
|
||||
cb(null, output);
|
||||
return output;
|
||||
};
|
||||
|
||||
var workerFn2 = function (payload, cb) {
|
||||
var workerFn2 = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
var output = {
|
||||
content_type: 'text/plain',
|
||||
content: new Date().toString();
|
||||
};
|
||||
cb(null, output);
|
||||
return output;
|
||||
};
|
||||
|
||||
var asyncWorker = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
return Promise.resolve({
|
||||
content_type: 'text/plain',
|
||||
content: new Date().toString();
|
||||
})
|
||||
};
|
||||
|
||||
```
|
||||
|
||||
Both are valid, but the `workerFn2` is likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned.
|
||||
All of the above are valid. `workerFn2` and `asyncWorker` are likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned. Note that returning a Promise is all that's required for an async result in the worker functions.
|
||||
|
||||
## Queue events
|
||||
|
||||
`esqueue` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
|
||||
|
||||
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
|
||||
|
||||
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
|
||||
|
||||
```js
|
||||
var Queue = require('esqueue');
|
||||
var queueEvents = require('esqueue/lib/constants/events');
|
||||
|
||||
var jobQueue = new Queue('my-index');
|
||||
|
||||
jobQueue.on(queueEvents.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
// handle error
|
||||
console.log('ONOZ!!! Job creation failed :(', err.error.message);
|
||||
});
|
||||
```
|
||||
|
||||
The argument passed to listeners typically contains several pieces of information about what happened. For example, Worker events will contain information about the job, the worker, and if it's an error event, the error.
|
||||
|
||||
More than any other events, you'll probably want to know if a worker completed or failed. When a worker starts, it will always either emit `EVENT_WORKER_COMPLETE` or `EVENT_WORKER_JOB_FAIL`. Faliures may also emit other events, such as `EVENT_WORKER_JOB_TIMEOUT` or `EVENT_WORKER_JOB_EXECUTION_ERROR`, but you can rely on `EVENT_WORKER_JOB_FAIL` for all failure cases.
|
||||
|
||||
## Scaling the queue
|
||||
|
||||
|
||||
6
src/constants/default_settings.js
Normal file
6
src/constants/default_settings.js
Normal file
@@ -0,0 +1,6 @@
|
||||
export default {
|
||||
DEFAULT_SETTING_TIMEOUT: 10000,
|
||||
DEFAULT_SETTING_DATE_SEPARATOR: '-',
|
||||
DEFAULT_SETTING_INTERVAL: 'week',
|
||||
DEFAULT_SETTING_DOCTYPE: 'esqueue',
|
||||
};
|
||||
14
src/constants/events.js
Normal file
14
src/constants/events.js
Normal file
@@ -0,0 +1,14 @@
|
||||
export default {
|
||||
EVENT_QUEUE_ERROR: 'queue:error',
|
||||
EVENT_JOB_ERROR: 'job:error',
|
||||
EVENT_JOB_CREATED: 'job:created',
|
||||
EVENT_JOB_CREATE_ERROR: 'job:creation error',
|
||||
EVENT_WORKER_COMPLETE: 'worker:job complete',
|
||||
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
|
||||
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
|
||||
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
|
||||
EVENT_WORKER_JOB_FAIL: 'worker:job failed',
|
||||
EVENT_WORKER_JOB_FAIL_ERROR: 'worker:failed job update error',
|
||||
EVENT_WORKER_JOB_EXECUTION_ERROR: 'worker:job execution error',
|
||||
EVENT_WORKER_JOB_TIMEOUT: 'worker:job timeout',
|
||||
};
|
||||
5
src/constants/index.js
Normal file
5
src/constants/index.js
Normal file
@@ -0,0 +1,5 @@
|
||||
import events from './events';
|
||||
import statuses from './statuses';
|
||||
import defaultSettings from './default_settings';
|
||||
|
||||
export default Object.assign({}, events, statuses, defaultSettings);
|
||||
7
src/constants/statuses.js
Normal file
7
src/constants/statuses.js
Normal file
@@ -0,0 +1,7 @@
|
||||
export default {
|
||||
JOB_STATUS_PENDING: 'pending',
|
||||
JOB_STATUS_PROCESSING: 'processing',
|
||||
JOB_STATUS_COMPLETED: 'completed',
|
||||
JOB_STATUS_FAILED: 'failed',
|
||||
JOB_STATUS_CANCELLED: 'cancelled',
|
||||
};
|
||||
@@ -1,15 +0,0 @@
|
||||
export const jobStatuses = {
|
||||
JOB_STATUS_PENDING: 'pending',
|
||||
JOB_STATUS_PROCESSING: 'processing',
|
||||
JOB_STATUS_COMPLETED: 'completed',
|
||||
JOB_STATUS_FAILED: 'failed',
|
||||
JOB_STATUS_CANCELLED: 'cancelled',
|
||||
};
|
||||
|
||||
export const defaultSettings = {
|
||||
DEFAULT_SETTING_TIMEOUT: 10000,
|
||||
DEFAULT_SETTING_INTERVAL: 'week',
|
||||
DEFAULT_SETTING_DOCTYPE: 'esqueue',
|
||||
};
|
||||
|
||||
export default Object.assign({}, jobStatuses, defaultSettings);
|
||||
@@ -3,7 +3,6 @@ import elasticsearch from 'elasticsearch';
|
||||
export default function createClient(options) {
|
||||
let client;
|
||||
|
||||
// if there's a transport property, assume it's a client instance
|
||||
if (isClient(options)) {
|
||||
client = options;
|
||||
} else {
|
||||
@@ -14,5 +13,6 @@ export default function createClient(options) {
|
||||
};
|
||||
|
||||
export function isClient(client) {
|
||||
// if there's a transport property, assume it's a client instance
|
||||
return !!client.transport;
|
||||
}
|
||||
@@ -1,40 +1,41 @@
|
||||
import { defaultSettings } from './constants';
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../constants';
|
||||
|
||||
const schema = {
|
||||
jobtype: { type: 'string', index: 'not_analyzed' },
|
||||
jobtype: { type: 'keyword' },
|
||||
payload: { type: 'object', enabled: false },
|
||||
priority: { type: 'byte' },
|
||||
timeout: { type: 'long' },
|
||||
process_expiration: { type: 'date' },
|
||||
created_by: { type: 'string', index: 'not_analyzed' },
|
||||
created_by: { type: 'keyword' },
|
||||
created_at: { type: 'date' },
|
||||
started_at: { type: 'date' },
|
||||
completed_at: { type: 'date' },
|
||||
attempts: { type: 'short' },
|
||||
max_attempts: { type: 'short' },
|
||||
status: { type: 'string', index: 'not_analyzed' },
|
||||
status: { type: 'keyword' },
|
||||
output: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
content_type: { type: 'string', index: 'not_analyzed' },
|
||||
content_type: { type: 'keyword', index: false },
|
||||
content: { type: 'object', enabled: false }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) {
|
||||
export default function createIndex(client, indexName, doctype = DEFAULT_SETTING_DOCTYPE, settings = {}) {
|
||||
const indexBody = { mappings : {} };
|
||||
indexBody.mappings[doctype] = { properties: schema };
|
||||
|
||||
const body = Object.assign({}, { settings }, indexBody);
|
||||
|
||||
return client.indices.exists({
|
||||
index: indexName,
|
||||
})
|
||||
.then((exists) => {
|
||||
if (!exists) {
|
||||
return client.indices.create({
|
||||
ignore: 400,
|
||||
index: indexName,
|
||||
body: indexBody
|
||||
body: body
|
||||
})
|
||||
.then(() => true);
|
||||
}
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
import typedError from 'error/typed';
|
||||
export function WorkerTimeoutError(message, props = {}) {
|
||||
this.name = 'WorkerTimeoutError';
|
||||
this.message = message;
|
||||
this.timeout = props.timeout;
|
||||
this.jobId = props.jobId;
|
||||
|
||||
const errors = {};
|
||||
if ("captureStackTrace" in Error) Error.captureStackTrace(this, WorkerTimeoutError);
|
||||
else this.stack = (new Error()).stack;
|
||||
}
|
||||
WorkerTimeoutError.prototype = Object.create(Error.prototype);
|
||||
|
||||
errors.WorkerTimeoutError = typedError({
|
||||
type: 'WorkerTimeout',
|
||||
message: 'worker timed out, timeout={timeout}',
|
||||
timeout: null,
|
||||
jobId: null
|
||||
});
|
||||
export function UnspecifiedWorkerError(message, props = {}) {
|
||||
this.name = 'UnspecifiedWorkerError';
|
||||
this.message = message;
|
||||
this.jobId = props.jobId;
|
||||
|
||||
export default errors;
|
||||
if ("captureStackTrace" in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
|
||||
else this.stack = (new Error()).stack;
|
||||
}
|
||||
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);
|
||||
|
||||
@@ -9,7 +9,9 @@ export const intervals = [
|
||||
'minute'
|
||||
];
|
||||
|
||||
export default function indexTimestamp(intervalStr) {
|
||||
export default function indexTimestamp(intervalStr, separator = '-') {
|
||||
if (separator.match(/[a-z]/i)) throw new Error('Interval separator can not be a letter');
|
||||
|
||||
const index = intervals.indexOf(intervalStr);
|
||||
if (index === -1) throw new Error('Invalid index interval: ', intervalStr);
|
||||
|
||||
@@ -22,16 +24,16 @@ export default function indexTimestamp(intervalStr) {
|
||||
dateString = 'YYYY';
|
||||
break;
|
||||
case 'month':
|
||||
dateString = 'YYYY-MM';
|
||||
dateString = `YYYY${separator}MM`;
|
||||
break;
|
||||
case 'hour':
|
||||
dateString = 'YYYY-MM-DD-HH';
|
||||
dateString = `YYYY${separator}MM${separator}DD${separator}HH`;
|
||||
break;
|
||||
case 'minute':
|
||||
dateString = 'YYYY-MM-DD-HH-mm';
|
||||
dateString = `YYYY${separator}MM${separator}DD${separator}HH${separator}mm`;
|
||||
break;
|
||||
default:
|
||||
dateString = 'YYYY-MM-DD';
|
||||
dateString = `YYYY${separator}MM${separator}DD`;
|
||||
}
|
||||
|
||||
return m.format(dateString);
|
||||
|
||||
3
src/helpers/is_plain_object.js
Normal file
3
src/helpers/is_plain_object.js
Normal file
@@ -0,0 +1,3 @@
|
||||
export default function (obj) {
|
||||
return (typeof obj === 'object' && !Array.isArray(obj) && obj !== null);
|
||||
}
|
||||
11
src/helpers/object_omit.js
Normal file
11
src/helpers/object_omit.js
Normal file
@@ -0,0 +1,11 @@
|
||||
import isPlainObject from './is_plain_object';
|
||||
|
||||
export default function (obj, props) {
|
||||
if (!isPlainObject(obj)) return obj;
|
||||
if (!Array.isArray(props)) props = [props];
|
||||
|
||||
const newObj = Object.assign({}, obj);
|
||||
|
||||
props.forEach(prop => delete newObj[prop]);
|
||||
return newObj;
|
||||
}
|
||||
30
src/index.js
30
src/index.js
@@ -1,11 +1,11 @@
|
||||
import events from 'events';
|
||||
import createClient from './helpers/es_client';
|
||||
import indexTimestamp from './helpers/index_timestamp';
|
||||
import logger from './helpers/logger';
|
||||
import { defaultSettings } from './helpers/constants';
|
||||
import Job from './job.js';
|
||||
import Worker from './worker.js';
|
||||
import omit from 'lodash.omit';
|
||||
import constants from './constants';
|
||||
import createClient from './helpers/create_client';
|
||||
import indexTimestamp from './helpers/index_timestamp';
|
||||
import objectOmit from './helpers/object_omit';
|
||||
import logger from './helpers/logger';
|
||||
|
||||
const debug = logger('esqueue:queue');
|
||||
|
||||
@@ -16,19 +16,20 @@ export default class Esqueue extends events.EventEmitter {
|
||||
super();
|
||||
this.index = index;
|
||||
this.settings = Object.assign({
|
||||
interval: defaultSettings.DEFAULT_SETTING_INTERVAL,
|
||||
timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT,
|
||||
doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE,
|
||||
}, omit(options, [ 'client' ]));
|
||||
interval: constants.DEFAULT_SETTING_INTERVAL,
|
||||
timeout: constants.DEFAULT_SETTING_TIMEOUT,
|
||||
doctype: constants.DEFAULT_SETTING_DOCTYPE,
|
||||
dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR,
|
||||
}, objectOmit(options, [ 'client' ]));
|
||||
this.client = createClient(options.client || {});
|
||||
|
||||
this._workers = [];
|
||||
this._initTasks().catch((err) => this.emit('error', err));
|
||||
this._initTasks().catch((err) => this.emit(constants.EVENT_QUEUE_ERROR, err));
|
||||
}
|
||||
|
||||
_initTasks() {
|
||||
const initTasks = [
|
||||
this.client.ping({ timeout: 3000 }),
|
||||
this.client.ping(),
|
||||
];
|
||||
|
||||
return Promise.all(initTasks).catch((err) => {
|
||||
@@ -38,17 +39,18 @@ export default class Esqueue extends events.EventEmitter {
|
||||
}
|
||||
|
||||
addJob(type, payload, opts = {}) {
|
||||
const timestamp = indexTimestamp(this.settings.interval);
|
||||
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
|
||||
const index = `${this.index}-${timestamp}`;
|
||||
const defaults = {
|
||||
timeout: this.settings.timeout,
|
||||
};
|
||||
|
||||
const options = Object.assign(defaults, opts, {
|
||||
doctype: this.settings.doctype
|
||||
doctype: this.settings.doctype,
|
||||
indexSettings: this.settings.indexSettings,
|
||||
});
|
||||
|
||||
return new Job(this.client, index, type, payload, options);
|
||||
return new Job(this, index, type, payload, options);
|
||||
}
|
||||
|
||||
registerWorker(type, workerFn, opts) {
|
||||
|
||||
42
src/job.js
42
src/job.js
@@ -1,36 +1,36 @@
|
||||
import events from 'events';
|
||||
import isPlainObject from 'lodash.isplainobject';
|
||||
import Puid from 'puid';
|
||||
import contstants from './constants';
|
||||
import logger from './helpers/logger';
|
||||
import contstants from './helpers/constants';
|
||||
import createIndex from './helpers/create_index';
|
||||
import isPlainObject from './helpers/is_plain_object';
|
||||
|
||||
const debug = logger('esqueue:job');
|
||||
const puid = new Puid();
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
constructor(client, index, type, payload, options = {}) {
|
||||
constructor(queue, index, type, payload, options = {}) {
|
||||
if (typeof type !== 'string') throw new Error('Type must be a string');
|
||||
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
|
||||
|
||||
super();
|
||||
|
||||
this.client = client;
|
||||
this.queue = queue;
|
||||
this.client = options.client || this.queue.client;
|
||||
this.id = puid.generate();
|
||||
this.index = index;
|
||||
this.jobtype = type;
|
||||
this.payload = payload;
|
||||
this.created_by = options.created_by || null;
|
||||
this.created_by = options.created_by || false;
|
||||
this.timeout = options.timeout || 10000;
|
||||
this.maxAttempts = options.max_attempts || 3;
|
||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
|
||||
this.indexSettings = options.indexSettings || {};
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this.ready = createIndex(client, index, this.doctype)
|
||||
.then(() => {
|
||||
return this.client.index({
|
||||
const indexParams = {
|
||||
index: this.index,
|
||||
type: this.doctype,
|
||||
id: this.id,
|
||||
@@ -46,24 +46,41 @@ export default class Job extends events.EventEmitter {
|
||||
max_attempts: this.maxAttempts,
|
||||
status: contstants.JOB_STATUS_PENDING,
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
if (options.headers) {
|
||||
indexParams.headers = options.headers;
|
||||
}
|
||||
|
||||
this.ready = createIndex(this.client, this.index, this.doctype, this.indexSettings)
|
||||
.then(() => this.client.index(indexParams))
|
||||
.then((doc) => {
|
||||
this.document = {
|
||||
id: doc._id,
|
||||
type: doc._type,
|
||||
index: doc._index,
|
||||
version: doc._version,
|
||||
};
|
||||
this.debug(`Job created in index ${this.index}`);
|
||||
this.emit('created', this.document);
|
||||
|
||||
return this.client.indices.refresh({
|
||||
index: this.index
|
||||
}).then(() => {
|
||||
this.debug(`Job index refreshed ${this.index}`);
|
||||
this.emit(contstants.EVENT_JOB_CREATED, this.document);
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
this.debug('Job creation failed', err);
|
||||
this.emit('error', err);
|
||||
throw err;
|
||||
this.emit(contstants.EVENT_JOB_CREATE_ERROR, err);
|
||||
});
|
||||
}
|
||||
|
||||
emit(name, ...args) {
|
||||
super.emit(name, ...args);
|
||||
this.queue.emit(name, ...args);
|
||||
}
|
||||
|
||||
get() {
|
||||
return this.ready
|
||||
.then(() => {
|
||||
@@ -96,5 +113,4 @@ export default class Job extends events.EventEmitter {
|
||||
priority: this.priority,
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
144
src/worker.js
144
src/worker.js
@@ -1,14 +1,22 @@
|
||||
import events from 'events';
|
||||
import Puid from 'puid';
|
||||
import moment from 'moment';
|
||||
import constants from './constants';
|
||||
import logger from './helpers/logger';
|
||||
import constants from './helpers/constants';
|
||||
import { WorkerTimeoutError } from './helpers/errors';
|
||||
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
|
||||
|
||||
const puid = new Puid();
|
||||
const debug = logger('esqueue:worker');
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
function formatJobObject(job) {
|
||||
return {
|
||||
index: job._index,
|
||||
type: job._type,
|
||||
id: job._id,
|
||||
};
|
||||
}
|
||||
|
||||
export default class Worker extends events.EventEmitter {
|
||||
constructor(queue, type, workerFn, opts = {}) {
|
||||
if (typeof type !== 'string') throw new Error('Type must be a string');
|
||||
if (typeof workerFn !== 'function') throw new Error('Worker must be a function');
|
||||
@@ -17,22 +25,48 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
this.id = puid.generate();
|
||||
this.queue = queue;
|
||||
this.client = this.queue.client;
|
||||
this.client = opts.client || this.queue.client;
|
||||
this.jobtype = type;
|
||||
this.workerFn = workerFn;
|
||||
this.checkInterval = opts.interval || 1500;
|
||||
this.checkSize = opts.size || 10;
|
||||
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
|
||||
this.doctype = opts.doctype || this.queue.doctype || constants.DEFAULT_SETTING_DOCTYPE;
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this._checker = false;
|
||||
this.debug(`Created worker for type ${this.type}`);
|
||||
this._running = true;
|
||||
this.debug(`Created worker for job type ${this.jobtype}`);
|
||||
this._startJobPolling();
|
||||
}
|
||||
|
||||
destroy() {
|
||||
clearInterval(this._checker);
|
||||
this._running = false;
|
||||
this._stopJobPolling();
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return {
|
||||
id: this.id,
|
||||
index: this.queue.index,
|
||||
jobType: this.jobType,
|
||||
doctype: this.doctype,
|
||||
};
|
||||
}
|
||||
|
||||
emit(name, ...args) {
|
||||
super.emit(name, ...args);
|
||||
this.queue.emit(name, ...args);
|
||||
}
|
||||
|
||||
_formatErrorParams(err, job) {
|
||||
const response = {
|
||||
error: err,
|
||||
worker: this.toJSON(),
|
||||
};
|
||||
|
||||
if (job) response.job = formatJobObject(job);
|
||||
return response;
|
||||
}
|
||||
|
||||
_claimJob(job) {
|
||||
@@ -67,8 +101,10 @@ export default class Job extends events.EventEmitter {
|
||||
return updatedJob;
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return false;
|
||||
throw err;
|
||||
if (err.statusCode === 409) return true;
|
||||
this.debug(`_claimJob failed on job ${job._id}`, err);
|
||||
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -76,14 +112,18 @@ export default class Job extends events.EventEmitter {
|
||||
this.debug(`Failing job ${job._id}`);
|
||||
|
||||
const completedTime = moment().toISOString();
|
||||
const docOutput = this._formatOutput(output);
|
||||
const doc = {
|
||||
status: constants.JOB_STATUS_FAILED,
|
||||
completed_at: completedTime,
|
||||
output: docOutput
|
||||
};
|
||||
|
||||
if (output) {
|
||||
doc.output = this._formatOutput(output);
|
||||
}
|
||||
this.emit(constants.EVENT_WORKER_JOB_FAIL, {
|
||||
job: formatJobObject(job),
|
||||
worker: this.toJSON(),
|
||||
output: docOutput,
|
||||
});
|
||||
|
||||
return this.client.update({
|
||||
index: job._index,
|
||||
@@ -92,9 +132,12 @@ export default class Job extends events.EventEmitter {
|
||||
version: job._version,
|
||||
body: { doc }
|
||||
})
|
||||
.then(() => true)
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return true;
|
||||
throw err;
|
||||
this.debug(`_failJob failed to update job ${job._id}`, err);
|
||||
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -118,11 +161,24 @@ export default class Job extends events.EventEmitter {
|
||||
this.debug(`Starting job ${job._id}`);
|
||||
|
||||
const workerOutput = new Promise((resolve, reject) => {
|
||||
resolve(this.workerFn.call(null, job._source.payload));
|
||||
// run the worker's workerFn
|
||||
let isResolved = false;
|
||||
Promise.resolve(this.workerFn.call(null, job._source.payload))
|
||||
.then((res) => {
|
||||
isResolved = true;
|
||||
resolve(res);
|
||||
})
|
||||
.catch((err) => {
|
||||
isResolved = true;
|
||||
reject(err);
|
||||
});
|
||||
|
||||
// fail if workerFn doesn't finish before timeout
|
||||
setTimeout(() => {
|
||||
if (isResolved) return;
|
||||
|
||||
this.debug(`Timeout processing job ${job._id}`);
|
||||
reject(new WorkerTimeoutError({
|
||||
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
|
||||
timeout: job._source.timeout,
|
||||
jobId: job._id,
|
||||
}));
|
||||
@@ -149,26 +205,52 @@ export default class Job extends events.EventEmitter {
|
||||
version: job._version,
|
||||
body: { doc }
|
||||
})
|
||||
.then(() => {
|
||||
const eventOutput = {
|
||||
job: formatJobObject(job),
|
||||
output: docOutput,
|
||||
};
|
||||
|
||||
this.emit(constants.EVENT_WORKER_COMPLETE, eventOutput);
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return false;
|
||||
this.debug(`Failure saving job output ${job._id}`, err);
|
||||
this.emit('job_error', err);
|
||||
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||
});
|
||||
}, (jobErr) => {
|
||||
if (!jobErr) {
|
||||
jobErr = new UnspecifiedWorkerError('Unspecified worker error', {
|
||||
jobId: job._id,
|
||||
});
|
||||
}
|
||||
|
||||
// job execution failed
|
||||
if (jobErr.type === 'WorkerTimeout') {
|
||||
if (jobErr.name === 'WorkerTimeoutError') {
|
||||
this.debug(`Timeout on job ${job._id}`);
|
||||
this.emit('job_timeout', jobErr);
|
||||
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
|
||||
return;
|
||||
|
||||
// append the jobId to the error
|
||||
} else {
|
||||
try {
|
||||
Object.assign(jobErr, { jobId: job._id });
|
||||
} catch (e) {
|
||||
// do nothing if jobId can not be appended
|
||||
}
|
||||
}
|
||||
|
||||
this.debug(`Failure occurred on job ${job._id}`, jobErr);
|
||||
this.emit('job_error', jobErr);
|
||||
return this._failJob(job, jobErr.toString());
|
||||
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
|
||||
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
|
||||
});
|
||||
}
|
||||
|
||||
_startJobPolling() {
|
||||
if (!this._running) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._checker = setInterval(() => {
|
||||
this._getPendingJobs()
|
||||
.then((jobs) => this._claimPendingJobs(jobs));
|
||||
@@ -185,6 +267,7 @@ export default class Job extends events.EventEmitter {
|
||||
this._stopJobPolling();
|
||||
let claimed = false;
|
||||
|
||||
// claim a single job, stopping after first successful claim
|
||||
return jobs.reduce((chain, job) => {
|
||||
return chain.then((claimedJob) => {
|
||||
// short-circuit the promise chain if a job has been claimed
|
||||
@@ -199,16 +282,6 @@ export default class Job extends events.EventEmitter {
|
||||
});
|
||||
});
|
||||
}, Promise.resolve())
|
||||
.catch((err) => {
|
||||
this.debug('Failed to claim outstanding jobs', err);
|
||||
this.emit('error', err);
|
||||
this.queue.emit('worker_error', {
|
||||
id: this.id,
|
||||
type: this.type,
|
||||
err
|
||||
});
|
||||
throw err;
|
||||
})
|
||||
.then((claimedJob) => {
|
||||
if (!claimedJob) {
|
||||
this.debug(`All ${jobs.length} jobs already claimed`);
|
||||
@@ -220,7 +293,6 @@ export default class Job extends events.EventEmitter {
|
||||
.then(() => this._startJobPolling())
|
||||
.catch((err) => {
|
||||
this.debug('Error claiming jobs', err);
|
||||
this.emit('error', err);
|
||||
this._startJobPolling();
|
||||
});
|
||||
}
|
||||
@@ -229,7 +301,7 @@ export default class Job extends events.EventEmitter {
|
||||
const nowTime = moment().toISOString();
|
||||
const query = {
|
||||
_source : {
|
||||
exclude: [ 'output.content' ]
|
||||
excludes: [ 'output.content' ]
|
||||
},
|
||||
query: {
|
||||
constant_score: {
|
||||
@@ -274,13 +346,7 @@ export default class Job extends events.EventEmitter {
|
||||
if (err.status === 404) return [];
|
||||
|
||||
this.debug('job querying failed', err);
|
||||
this.emit('error', err);
|
||||
this.queue.emit('worker_error', {
|
||||
id: this.id,
|
||||
type: this.type,
|
||||
err
|
||||
});
|
||||
throw err;
|
||||
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
|
||||
});
|
||||
}
|
||||
}
|
||||
13
test/fixtures/elasticsearch.js
vendored
13
test/fixtures/elasticsearch.js
vendored
@@ -1,11 +1,12 @@
|
||||
import { uniqueId, times, random } from 'lodash';
|
||||
import elasticsearch from 'elasticsearch';
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
|
||||
import constants from '../../lib/constants';
|
||||
|
||||
function Client() {
|
||||
this.indices = {
|
||||
create: () => Promise.resolve({ acknowledged: true }),
|
||||
exists: () => Promise.resolve(false),
|
||||
refresh: () => Promise.resolve(),
|
||||
};
|
||||
|
||||
this.transport = {};
|
||||
@@ -15,7 +16,7 @@ Client.prototype.index = function (params = {}) {
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: 1,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
@@ -32,7 +33,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
|
||||
const _source = Object.assign({
|
||||
jobtype: 'jobtype',
|
||||
created_by: null,
|
||||
created_by: false,
|
||||
payload: {
|
||||
id: 'sample-job-1',
|
||||
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
||||
@@ -47,7 +48,7 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
||||
_version: params.version || 1,
|
||||
found: true,
|
||||
@@ -59,7 +60,7 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||
const hits = times(count, () => {
|
||||
return {
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: uniqueId('documentId'),
|
||||
_version: random(1, 5),
|
||||
_score: null,
|
||||
@@ -89,7 +90,7 @@ Client.prototype.update = function (params = {}) {
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || DEFAULT_SETTING_DOCTYPE,
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: params.version + 1 || 2,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
|
||||
17
test/fixtures/job.js
vendored
Normal file
17
test/fixtures/job.js
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
import events from 'events';
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
constructor(queue, index, type, payload, options = {}) {
|
||||
super();
|
||||
|
||||
this.queue = queue;
|
||||
this.index = index;
|
||||
this.jobType = type;
|
||||
this.payload = payload;
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
getProp(name) {
|
||||
return this[name];
|
||||
}
|
||||
}
|
||||
13
test/fixtures/queue.js
vendored
Normal file
13
test/fixtures/queue.js
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
import events from 'events';
|
||||
|
||||
class MockQueue extends events.EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
setClient(client) {
|
||||
this.client = client;
|
||||
}
|
||||
}
|
||||
|
||||
export default MockQueue;
|
||||
16
test/fixtures/worker.js
vendored
Normal file
16
test/fixtures/worker.js
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
import events from 'events';
|
||||
|
||||
export default class Worker extends events.EventEmitter {
|
||||
constructor(queue, type, workerFn, opts = {}) {
|
||||
super();
|
||||
|
||||
this.queue = queue;
|
||||
this.type = type;
|
||||
this.workerFn = workerFn;
|
||||
this.options = opts;
|
||||
}
|
||||
|
||||
getProp(name) {
|
||||
return this[name];
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import expect from 'expect.js';
|
||||
import proxyquire from 'proxyquire';
|
||||
import elasticsearchMock from '../../fixtures/elasticsearch';
|
||||
|
||||
const module = proxyquire.noPreserveCache()('../../../lib/helpers/es_client', {
|
||||
const module = proxyquire.noPreserveCache()('../../../lib/helpers/create_client', {
|
||||
elasticsearch: elasticsearchMock
|
||||
});
|
||||
|
||||
@@ -2,9 +2,11 @@ import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import createIndex from '../../../lib/helpers/create_index';
|
||||
import elasticsearchMock from '../../fixtures/elasticsearch';
|
||||
import { defaultSettings } from '../../../lib/helpers/constants';
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../../../lib/constants';
|
||||
|
||||
describe('Create Index', function () {
|
||||
|
||||
describe('Does not exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
@@ -13,6 +15,14 @@ describe('Create Index', function () {
|
||||
createSpy = sinon.spy(client.indices, 'create');
|
||||
});
|
||||
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
@@ -26,7 +36,7 @@ describe('Create Index', function () {
|
||||
|
||||
it('should create the type mappings', function () {
|
||||
const indexName = 'test-index';
|
||||
const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE;
|
||||
const docType = DEFAULT_SETTING_DOCTYPE;
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
@@ -55,4 +65,35 @@ describe('Create Index', function () {
|
||||
expect(payload.body.mappings[docType]).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('Does exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
sinon.stub(client.indices, 'exists', () => Promise.resolve(true));
|
||||
createSpy = sinon.spy(client.indices, 'create');
|
||||
});
|
||||
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should not create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
sinon.assert.callCount(createSpy, 0);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
51
test/src/helpers/errors.js
Normal file
51
test/src/helpers/errors.js
Normal file
@@ -0,0 +1,51 @@
|
||||
import expect from 'expect.js';
|
||||
import { WorkerTimeoutError, UnspecifiedWorkerError } from '../../../lib/helpers/errors';
|
||||
|
||||
describe('custom errors', function () {
|
||||
describe('WorkerTimeoutError', function () {
|
||||
it('should be function', () => {
|
||||
expect(WorkerTimeoutError).to.be.a('function');
|
||||
});
|
||||
|
||||
it('should have a name', function () {
|
||||
const err = new WorkerTimeoutError('timeout error');
|
||||
expect(err).to.have.property('name', 'WorkerTimeoutError');
|
||||
});
|
||||
|
||||
it('should take a jobId property', function () {
|
||||
const err = new WorkerTimeoutError('timeout error', { jobId: 'il7hl34rqlo8ro' });
|
||||
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
|
||||
});
|
||||
|
||||
it('should take a timeout property', function () {
|
||||
const err = new WorkerTimeoutError('timeout error', { timeout: 15000 });
|
||||
expect(err).to.have.property('timeout', 15000);
|
||||
});
|
||||
|
||||
it('should be stringifyable', function () {
|
||||
const err = new WorkerTimeoutError('timeout error');
|
||||
expect(`${err}`).to.equal('WorkerTimeoutError: timeout error');
|
||||
});
|
||||
});
|
||||
|
||||
describe('UnspecifiedWorkerError', function () {
|
||||
it('should be function', () => {
|
||||
expect(UnspecifiedWorkerError).to.be.a('function');
|
||||
});
|
||||
|
||||
it('should have a name', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error');
|
||||
expect(err).to.have.property('name', 'UnspecifiedWorkerError');
|
||||
});
|
||||
|
||||
it('should take a jobId property', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error', { jobId: 'il7hl34rqlo8ro' });
|
||||
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
|
||||
});
|
||||
|
||||
it('should be stringifyable', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error');
|
||||
expect(`${err}`).to.equal('UnspecifiedWorkerError: unspecified error');
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,22 +1,25 @@
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import constants from '../../../lib/constants';
|
||||
import indexTimestamp from '../../../lib/helpers/index_timestamp';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
|
||||
describe('Index interval', function () {
|
||||
describe('indexTimestamp construction', function () {
|
||||
describe('Index timestamp interval', function () {
|
||||
describe('construction', function () {
|
||||
it('should throw given an invalid interval', function () {
|
||||
const init = () => indexTimestamp('bananas');
|
||||
expect(init).to.throwException(/invalid.+interval/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('indexTimestamp timestamps', function () {
|
||||
describe('timestamps', function () {
|
||||
let clock;
|
||||
let separator;
|
||||
|
||||
beforeEach(function () {
|
||||
separator = constants.DEFAULT_SETTING_DATE_SEPARATOR;
|
||||
clock = sinon.useFakeTimers(moment(anchor).valueOf());
|
||||
});
|
||||
|
||||
@@ -24,34 +27,61 @@ describe('Index interval', function () {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
describe('formats', function () {
|
||||
it('should return the year', function () {
|
||||
const timestamp = indexTimestamp('year');
|
||||
expect(timestamp).to.equal('2016');
|
||||
const str = `2016`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year and month', function () {
|
||||
const timestamp = indexTimestamp('month');
|
||||
expect(timestamp).to.equal('2016-04');
|
||||
const str = `2016${separator}04`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and first day of the week', function () {
|
||||
const timestamp = indexTimestamp('week');
|
||||
expect(timestamp).to.equal('2016-03-27');
|
||||
const str = `2016${separator}03${separator}27`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and day of the week', function () {
|
||||
const timestamp = indexTimestamp('day');
|
||||
expect(timestamp).to.equal('2016-04-02');
|
||||
const str = `2016${separator}04${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day and hour', function () {
|
||||
const timestamp = indexTimestamp('hour');
|
||||
expect(timestamp).to.equal('2016-04-02-01');
|
||||
const str = `2016${separator}04${separator}02${separator}01`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day, hour and minute', function () {
|
||||
const timestamp = indexTimestamp('minute');
|
||||
expect(timestamp).to.equal('2016-04-02-01-02');
|
||||
const str = `2016${separator}04${separator}02${separator}01${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
describe('date separator', function () {
|
||||
it('should be customizable', function () {
|
||||
const separators = ['-', '.', '_'];
|
||||
separators.forEach(customSep => {
|
||||
const str = `2016${customSep}04${customSep}02${customSep}01${customSep}02`;
|
||||
const timestamp = indexTimestamp('minute', customSep);
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
it('should throw if a letter is used', function () {
|
||||
const separators = ['a', 'B', 'YYYY'];
|
||||
separators.forEach(customSep => {
|
||||
const fn = () => indexTimestamp('minute', customSep);
|
||||
expect(fn).to.throwException();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
48
test/src/helpers/is_plain_object.js
Normal file
48
test/src/helpers/is_plain_object.js
Normal file
@@ -0,0 +1,48 @@
|
||||
import expect from 'expect.js';
|
||||
import isPlainObject from '../../../lib/helpers/is_plain_object';
|
||||
|
||||
function validateItems(checks, pass) {
|
||||
checks.forEach(check => {
|
||||
expect(isPlainObject(check)).to.be(pass);
|
||||
});
|
||||
}
|
||||
|
||||
describe('isPlainObject', function () {
|
||||
describe('non-object primitives', function () {
|
||||
it('return false', function () {
|
||||
const checks = [
|
||||
100,
|
||||
true,
|
||||
'i am a string',
|
||||
function noop() {},
|
||||
null,
|
||||
];
|
||||
|
||||
validateItems(checks, false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('arrays', function () {
|
||||
it('return false', function () {
|
||||
const checks = [
|
||||
[],
|
||||
[1,2,3],
|
||||
['just a string'],
|
||||
];
|
||||
|
||||
validateItems(checks, false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('', function () {
|
||||
it('return true', function () {
|
||||
const checks = [
|
||||
{},
|
||||
{one:1},
|
||||
{object:{with:{array:[]}}},
|
||||
];
|
||||
|
||||
validateItems(checks, true);
|
||||
});
|
||||
});
|
||||
});
|
||||
37
test/src/helpers/omit.js
Normal file
37
test/src/helpers/omit.js
Normal file
@@ -0,0 +1,37 @@
|
||||
import expect from 'expect.js';
|
||||
import objectOmit from '../../../lib/helpers/object_omit';
|
||||
|
||||
describe('object omit', function () {
|
||||
let obj = {};
|
||||
|
||||
beforeEach(() => {
|
||||
obj = {
|
||||
one: 1,
|
||||
two: 2,
|
||||
three: 3,
|
||||
arr: [1,2,3],
|
||||
check: 'aw yeah',
|
||||
};
|
||||
});
|
||||
|
||||
it('omits a single property', function () {
|
||||
const val = objectOmit(obj, 'one');
|
||||
|
||||
expect(val).to.eql({
|
||||
two: 2,
|
||||
three: 3,
|
||||
arr: [1,2,3],
|
||||
check: 'aw yeah',
|
||||
});
|
||||
});
|
||||
|
||||
it('omits multiple properties', function () {
|
||||
const val = objectOmit(obj, ['three', 'check']);
|
||||
|
||||
expect(val).to.eql({
|
||||
one: 1,
|
||||
two: 2,
|
||||
arr: [1,2,3],
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,9 +1,17 @@
|
||||
import events from 'events';
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import { noop, times } from 'lodash';
|
||||
import constants from '../../lib/constants';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import Esqueue from '../../lib/index';
|
||||
import jobMock from '../fixtures/job';
|
||||
import workerMock from '../fixtures/worker';
|
||||
|
||||
const Esqueue = proxyquire.noPreserveCache()('../../lib/index', {
|
||||
'./job.js': jobMock,
|
||||
'./worker.js': workerMock,
|
||||
});
|
||||
|
||||
describe('Esqueue class', function () {
|
||||
let client;
|
||||
@@ -48,9 +56,71 @@ describe('Esqueue class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
describe('Adding jobs', function () {
|
||||
let indexName;
|
||||
let jobType;
|
||||
let payload;
|
||||
let queue;
|
||||
|
||||
beforeEach(function () {
|
||||
indexName = 'esqueue-index';
|
||||
jobType = 'test-test';
|
||||
payload = { payload: true };
|
||||
queue = new Esqueue(indexName, { client });
|
||||
});
|
||||
|
||||
it('should throw with invalid dateSeparator setting', function () {
|
||||
queue = new Esqueue(indexName, { client, dateSeparator: 'a' });
|
||||
const fn = () => queue.addJob(jobType, payload);
|
||||
expect(fn).to.throwException();
|
||||
});
|
||||
|
||||
it('should pass queue instance, index name, type and payload', function () {
|
||||
const job = queue.addJob(jobType, payload);
|
||||
expect(job.getProp('queue')).to.equal(queue);
|
||||
expect(job.getProp('index')).to.match(new RegExp(indexName));
|
||||
expect(job.getProp('jobType')).to.equal(jobType);
|
||||
expect(job.getProp('payload')).to.equal(payload);
|
||||
});
|
||||
|
||||
it('should pass default settings', function () {
|
||||
const job = queue.addJob(jobType, payload);
|
||||
const options = job.getProp('options');
|
||||
expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT);
|
||||
expect(options).to.have.property('doctype', constants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
|
||||
it('should pass queue index settings', function () {
|
||||
const indexSettings = {
|
||||
index: {
|
||||
number_of_shards: 1
|
||||
}
|
||||
};
|
||||
|
||||
queue = new Esqueue(indexName, { client, indexSettings });
|
||||
const job = queue.addJob(jobType, payload);
|
||||
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
|
||||
});
|
||||
|
||||
it('should pass headers from options', function () {
|
||||
const options = {
|
||||
headers: {
|
||||
authorization: 'Basic cXdlcnR5'
|
||||
}
|
||||
};
|
||||
const job = queue.addJob(jobType, payload, options);
|
||||
expect(job.getProp('options')).to.have.property('headers', options.headers);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Registering workers', function () {
|
||||
let queue;
|
||||
|
||||
beforeEach(function () {
|
||||
queue = new Esqueue('esqueue', { client });
|
||||
});
|
||||
|
||||
it('should keep track of workers', function () {
|
||||
const queue = new Esqueue('esqueue', { client });
|
||||
expect(queue.getWorkers()).to.eql([]);
|
||||
expect(queue.getWorkers()).to.have.length(0);
|
||||
|
||||
@@ -59,6 +129,27 @@ describe('Esqueue class', function () {
|
||||
queue.registerWorker('test2', noop);
|
||||
expect(queue.getWorkers()).to.have.length(3);
|
||||
});
|
||||
|
||||
it('should pass instance of queue, type, and worker function', function () {
|
||||
const workerType = 'test-worker';
|
||||
const workerFn = () => true;
|
||||
|
||||
const worker = queue.registerWorker(workerType, workerFn);
|
||||
expect(worker.getProp('queue')).to.equal(queue);
|
||||
expect(worker.getProp('type')).to.equal(workerType);
|
||||
expect(worker.getProp('workerFn')).to.equal(workerFn);
|
||||
});
|
||||
|
||||
it('should pass worker options', function () {
|
||||
const workerOptions = {
|
||||
size: 12,
|
||||
doctype: 'tests'
|
||||
};
|
||||
|
||||
queue = new Esqueue('esqueue', { client });
|
||||
const worker = queue.registerWorker('type', noop, workerOptions);
|
||||
expect(worker.getProp('options')).to.equal(workerOptions);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Destroy', function () {
|
||||
|
||||
369
test/src/job.js
369
test/src/job.js
@@ -2,10 +2,11 @@ import events from 'events';
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import QueueMock from '../fixtures/queue';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
|
||||
import contstants from '../../lib/constants';
|
||||
|
||||
const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
|
||||
const createIndexMock = sinon.stub();
|
||||
const module = proxyquire.noPreserveCache()('../../lib/job', {
|
||||
'./helpers/create_index': createIndexMock
|
||||
});
|
||||
@@ -14,9 +15,16 @@ const Job = module;
|
||||
const maxPriority = 20;
|
||||
const minPriority = -20;
|
||||
const defaultPriority = 10;
|
||||
const defaultCreatedBy = null;
|
||||
const defaultCreatedBy = false;
|
||||
|
||||
function validateDoc(spy) {
|
||||
sinon.assert.callCount(spy, 1);
|
||||
const spyCall = spy.getCall(0);
|
||||
return spyCall.args[0];
|
||||
}
|
||||
|
||||
describe('Job Class', function () {
|
||||
let mockQueue;
|
||||
let client;
|
||||
let index;
|
||||
|
||||
@@ -26,160 +34,317 @@ describe('Job Class', function () {
|
||||
|
||||
beforeEach(function () {
|
||||
createIndexMock.reset();
|
||||
createIndexMock.returns(Promise.resolve('mock'));
|
||||
index = 'test';
|
||||
|
||||
client = new elasticsearchMock.Client();
|
||||
mockQueue = new QueueMock();
|
||||
mockQueue.setClient(client);
|
||||
});
|
||||
|
||||
it('should be an event emitter', function () {
|
||||
const job = new Job(client, index, 'test', {});
|
||||
const job = new Job(mockQueue, index, 'test', {});
|
||||
expect(job).to.be.an(events.EventEmitter);
|
||||
});
|
||||
|
||||
describe('invalid construction', function () {
|
||||
it('should throw with a missing type', function () {
|
||||
const init = () => new Job(client, index);
|
||||
const init = () => new Job(mockQueue, index);
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid type', function () {
|
||||
const init = () => new Job(client, index, { 'not a string': true });
|
||||
const init = () => new Job(mockQueue, index, { 'not a string': true });
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid payload', function () {
|
||||
const init = () => new Job(client, index, 'type1', [1, 2, 3]);
|
||||
const init = () => new Job(mockQueue, index, 'type1', [1, 2, 3]);
|
||||
expect(init).to.throwException(/plain.+object/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('construction', function () {
|
||||
function validateDoc(spy) {
|
||||
sinon.assert.callCount(spy, 1);
|
||||
const spyCall = spy.getCall(0);
|
||||
return spyCall.args[0];
|
||||
}
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
const args = createIndexMock.getCall(0).args;
|
||||
expect(args[0]).to.equal(client);
|
||||
expect(args[1]).to.equal(index);
|
||||
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the job type', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('jobtype', type);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set event creation time', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_at');
|
||||
});
|
||||
});
|
||||
|
||||
it('should refresh the index', function () {
|
||||
const refreshSpy = sinon.spy(client.indices, 'refresh');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(refreshSpy);
|
||||
const spyCall = refreshSpy.getCall(0);
|
||||
expect(spyCall.args[0]).to.have.property('index', index);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit the job information on success', function (done) {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
job.once(contstants.EVENT_JOB_CREATED, (jobDoc) => {
|
||||
try {
|
||||
expect(jobDoc).to.have.property('id');
|
||||
expect(jobDoc).to.have.property('index');
|
||||
expect(jobDoc).to.have.property('type');
|
||||
expect(jobDoc).to.have.property('version');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on index creation failure', function (done) {
|
||||
const errMsg = 'test index creation failure';
|
||||
|
||||
createIndexMock.returns(Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on client index failure', function (done) {
|
||||
const errMsg = 'test document index failure';
|
||||
|
||||
client.index.restore();
|
||||
sinon.stub(client, 'index', () => Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emitting', function () {
|
||||
it('should trigger events on the queue instance', function (done) {
|
||||
const eventName = 'test event';
|
||||
const payload1 = {
|
||||
test: true,
|
||||
deep: { object: 'ok' }
|
||||
};
|
||||
const payload2 = 'two';
|
||||
const payload3 = new Error('test error');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
mockQueue.on(eventName, (...args) => {
|
||||
try {
|
||||
expect(args[0]).to.equal(payload1);
|
||||
expect(args[1]).to.equal(payload2);
|
||||
expect(args[2]).to.equal(payload3);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
job.emit(eventName, payload1, payload2, payload3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('default values', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should set attempt count to 0', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('attempts', 0);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index default created_by value', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_by', defaultCreatedBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set an expired process_expiration time', function () {
|
||||
const now = new Date().getTime();
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('process_expiration');
|
||||
expect(indexArgs.body.process_expiration.getTime()).to.be.lessThan(now);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status as pending', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('status', contstants.JOB_STATUS_PENDING);
|
||||
});
|
||||
});
|
||||
|
||||
it('should have a default priority of 10', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', defaultPriority);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('option passing', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
options = {
|
||||
timeout: 4567,
|
||||
max_attempts: 9,
|
||||
headers: {
|
||||
authorization: 'Basic cXdlcnR5'
|
||||
}
|
||||
};
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the job type', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('jobtype', type);
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
it('should index the created_by value', function () {
|
||||
const createdBy = 'user_identifier';
|
||||
const job = new Job(client, index, type, payload, Object.assign({ created_by: createdBy }, options));
|
||||
const job = new Job(mockQueue, index, type, payload, Object.assign({ created_by: createdBy }, options));
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('created_by', createdBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index default created_by value', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('created_by', defaultCreatedBy);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_by', createdBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index timeout value from options', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('timeout', options.timeout);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('timeout', options.timeout);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set event times', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
it('should set max attempt count', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('created_at');
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('max_attempts', options.max_attempts);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set an expired process_expiration time', function () {
|
||||
const now = new Date().getTime();
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
it('should add headers to the request params', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('process_expiration');
|
||||
expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set attempt count', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('attempts', 0);
|
||||
expect(newDoc.body).to.have.property('max_attempts', options.max_attempts);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status as pending', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING);
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
});
|
||||
});
|
||||
|
||||
it('should have a default priority of 10', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', defaultPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('headers', options.headers);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use upper priority of ${maxPriority}`, function () {
|
||||
const job = new Job(client, index, type, payload, { priority: maxPriority * 2 });
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: maxPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', maxPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', maxPriority);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use lower priority of ${minPriority}`, function () {
|
||||
const job = new Job(client, index, type, payload, { priority: minPriority * 2 });
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: minPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', minPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', minPriority);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('custom client', function () {
|
||||
let newClient;
|
||||
let job;
|
||||
|
||||
beforeEach(function () {
|
||||
sinon.spy(client, 'index');
|
||||
|
||||
newClient = new elasticsearchMock.Client();
|
||||
sinon.spy(newClient, 'index');
|
||||
job = new Job(mockQueue, index, type, payload, Object.assign({ client: newClient }, options));
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
const args = createIndexMock.getCall(0).args;
|
||||
expect(args[0]).to.equal(newClient);
|
||||
expect(args[1]).to.equal(index);
|
||||
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.callCount(client.index, 0);
|
||||
sinon.assert.callCount(newClient.index, 1);
|
||||
|
||||
const newDoc = newClient.index.getCall(0).args[0];
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -191,7 +356,7 @@ describe('Job Class', function () {
|
||||
});
|
||||
|
||||
it('should return the job document', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
return job.get()
|
||||
.then((doc) => {
|
||||
@@ -214,7 +379,7 @@ describe('Job Class', function () {
|
||||
created_by: 'some_ident'
|
||||
};
|
||||
|
||||
const job = new Job(client, index, type, payload, optionals);
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
return Promise.resolve(client.get({}, optionals))
|
||||
.then((doc) => {
|
||||
sinon.stub(client, 'get').returns(Promise.resolve(doc));
|
||||
@@ -240,14 +405,14 @@ describe('Job Class', function () {
|
||||
});
|
||||
|
||||
it('should return the static information about the job', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
// toJSON is sync, should work before doc is written to elasticsearch
|
||||
expect(job.document).to.be(undefined);
|
||||
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
|
||||
expect(doc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(doc).to.have.property('jobtype', type);
|
||||
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||
expect(doc).to.have.property('timeout', options.timeout);
|
||||
@@ -262,7 +427,7 @@ describe('Job Class', function () {
|
||||
created_by: 'some_ident'
|
||||
};
|
||||
|
||||
const job = new Job(client, index, type, payload, optionals);
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||
});
|
||||
|
||||
@@ -2,9 +2,10 @@ import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import { noop, random, get, find } from 'lodash';
|
||||
import Worker from '../../lib/worker';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import constants from '../../lib/helpers/constants';
|
||||
import QueueMock from '../fixtures/queue';
|
||||
import Worker from '../../lib/worker';
|
||||
import constants from '../../lib/constants';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
const defaults = {
|
||||
@@ -23,9 +24,8 @@ describe('Worker class', function () {
|
||||
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
mockQueue = {
|
||||
client: client
|
||||
};
|
||||
mockQueue = new QueueMock();
|
||||
mockQueue.setClient(client);
|
||||
});
|
||||
|
||||
describe('invalid construction', function () {
|
||||
@@ -51,6 +51,20 @@ describe('Worker class', function () {
|
||||
});
|
||||
|
||||
describe('construction', function () {
|
||||
it('should assign internal properties', function () {
|
||||
const jobtype = 'testjob';
|
||||
const workerFn = noop;
|
||||
const worker = new Worker(mockQueue, jobtype, workerFn);
|
||||
expect(worker).to.have.property('id');
|
||||
expect(worker).to.have.property('queue', mockQueue);
|
||||
expect(worker).to.have.property('client', client);
|
||||
expect(worker).to.have.property('jobtype', jobtype);
|
||||
expect(worker).to.have.property('workerFn', workerFn);
|
||||
expect(worker).to.have.property('checkInterval');
|
||||
expect(worker).to.have.property('checkSize');
|
||||
expect(worker).to.have.property('doctype');
|
||||
});
|
||||
|
||||
it('should have a unique ID', function () {
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
expect(worker.id).to.be.a('string');
|
||||
@@ -60,6 +74,45 @@ describe('Worker class', function () {
|
||||
|
||||
expect(worker.id).to.not.equal(worker2.id);
|
||||
});
|
||||
|
||||
it('should use custom client', function () {
|
||||
const newClient = new elasticsearchMock.Client();
|
||||
const worker = new Worker(mockQueue, 'test', noop, { client: newClient });
|
||||
expect(worker).to.have.property('queue', mockQueue);
|
||||
expect(worker).to.have.property('client', newClient);
|
||||
expect(worker.client).to.not.equal(client);
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emitting', function () {
|
||||
let worker;
|
||||
|
||||
beforeEach(function () {
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
});
|
||||
|
||||
it('should trigger events on the queue instance', function (done) {
|
||||
const eventName = 'test event';
|
||||
const payload1 = {
|
||||
test: true,
|
||||
deep: { object: 'ok' }
|
||||
};
|
||||
const payload2 = 'two';
|
||||
const payload3 = new Error('test error');
|
||||
|
||||
mockQueue.on(eventName, (...args) => {
|
||||
try {
|
||||
expect(args[0]).to.equal(payload1);
|
||||
expect(args[1]).to.equal(payload2);
|
||||
expect(args[2]).to.equal(payload3);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker.emit(eventName, payload1, payload2, payload3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('output formatting', function () {
|
||||
@@ -132,6 +185,27 @@ describe('Worker class', function () {
|
||||
clock.tick(interval);
|
||||
sinon.assert.calledOnce(searchSpy);
|
||||
});
|
||||
|
||||
it('should not poll once destroyed', function () {
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
|
||||
// move the clock a couple times, test for searches each time
|
||||
sinon.assert.notCalled(searchSpy);
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledOnce(searchSpy);
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledTwice(searchSpy);
|
||||
|
||||
// destroy the worker, move the clock, make sure another search doesn't happen
|
||||
worker.destroy();
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledTwice(searchSpy);
|
||||
|
||||
// manually call job poller, move the clock, make sure another search doesn't happen
|
||||
worker._startJobPolling();
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledTwice(searchSpy);
|
||||
});
|
||||
});
|
||||
|
||||
describe('query for pending jobs', function () {
|
||||
@@ -225,7 +299,7 @@ describe('Worker class', function () {
|
||||
const excludedFields = [ 'output.content' ];
|
||||
const { body } = getSearchParams(jobtype);
|
||||
expect(body).to.have.property('_source');
|
||||
expect(body._source).to.eql({ exclude: excludedFields });
|
||||
expect(body._source).to.eql({ excludes: excludedFields });
|
||||
});
|
||||
|
||||
it('should search by job type', function () {
|
||||
@@ -267,7 +341,6 @@ describe('Worker class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
describe('claiming a job', function () {
|
||||
let params;
|
||||
let job;
|
||||
@@ -350,10 +423,36 @@ describe('Worker class', function () {
|
||||
expect(msg).to.equal(false);
|
||||
});
|
||||
|
||||
it('should swallow version mismatch errors', function () {
|
||||
it('should return true on version errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
|
||||
return worker._claimJob(job);
|
||||
return worker._claimJob(job)
|
||||
.then((res) => expect(res).to.equal(true));
|
||||
});
|
||||
|
||||
it('should return false on other errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
return worker._claimJob(job)
|
||||
.then((res) => expect(res).to.equal(false));
|
||||
});
|
||||
|
||||
it('should emit on other errors', function (done) {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
|
||||
worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('statusCode', 401);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
worker._claimJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -401,13 +500,21 @@ describe('Worker class', function () {
|
||||
expect(doc.output).to.have.property('content', msg);
|
||||
});
|
||||
|
||||
it('should swallow version mismatch errors', function () {
|
||||
it('should return true on version mismatch errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
|
||||
return worker._failJob(job);
|
||||
return worker._failJob(job)
|
||||
.then((res) => expect(res).to.equal(true));
|
||||
});
|
||||
|
||||
it('should set completed time and status to failed', function () {
|
||||
it('should return false on other docuemnt update errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
return worker._failJob(job)
|
||||
.then((res) => expect(res).to.equal(false));
|
||||
});
|
||||
|
||||
it('should set completed time and status to failure', function () {
|
||||
const startTime = moment().valueOf();
|
||||
const msg = 'test message';
|
||||
clock.tick(100);
|
||||
@@ -420,6 +527,39 @@ describe('Worker class', function () {
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
});
|
||||
|
||||
it('should emit worker failure event', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_FAIL, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('output');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
return worker._failJob(job);
|
||||
});
|
||||
|
||||
it('should emit on other docuemnt update errors', function (done) {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
|
||||
worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('statusCode', 401);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
worker._failJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('performing a job', function () {
|
||||
@@ -439,6 +579,7 @@ describe('Worker class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
describe('worker success', function () {
|
||||
it('should call the workerFn with the payload', function (done) {
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
@@ -491,6 +632,33 @@ describe('Worker class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit completion event', function (done) {
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
|
||||
worker.once(constants.EVENT_WORKER_COMPLETE, (workerJob) => {
|
||||
try {
|
||||
expect(workerJob).to.not.have.property('_source');
|
||||
|
||||
expect(workerJob).to.have.property('job');
|
||||
expect(workerJob.job).to.have.property('id');
|
||||
expect(workerJob.job).to.have.property('index');
|
||||
expect(workerJob.job).to.have.property('type');
|
||||
|
||||
expect(workerJob).to.have.property('output');
|
||||
expect(workerJob.output).to.have.property('content');
|
||||
expect(workerJob.output).to.have.property('content_type');
|
||||
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('worker failure', function () {
|
||||
it('should append error output to job', function () {
|
||||
const workerFn = function () {
|
||||
throw new Error('test error');
|
||||
@@ -504,40 +672,204 @@ describe('Worker class', function () {
|
||||
sinon.assert.calledWith(failStub, job, 'Error: test error');
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle async errors', function () {
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject(new Error('test error'));
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, 'Error: test error');
|
||||
});
|
||||
});
|
||||
|
||||
describe('job timeouts', function () {
|
||||
let job;
|
||||
let failStub;
|
||||
it('should handle rejecting with strings', function () {
|
||||
const errorMessage = 'this is a string error';
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject(errorMessage);
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, errorMessage);
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle empty rejection', function (done) {
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject();
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('name', 'UnspecifiedWorkerError');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('job failures', function () {
|
||||
function getFailStub(worker) {
|
||||
return sinon.stub(worker, '_failJob').returns(Promise.resolve());
|
||||
}
|
||||
|
||||
describe('timeout', function () {
|
||||
let worker;
|
||||
const timeout = 20;
|
||||
const timeoutPadding = 10;
|
||||
let failStub;
|
||||
let job;
|
||||
|
||||
beforeEach(function () {
|
||||
const timeout = 20;
|
||||
const workerFn = function () {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, timeout + timeoutPadding);
|
||||
}, timeout * 2);
|
||||
});
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
|
||||
job = {
|
||||
_id: 'testJob1',
|
||||
_id: 'testTimeoutJob',
|
||||
_source: {
|
||||
timeout: timeout,
|
||||
payload: 'test'
|
||||
}
|
||||
};
|
||||
failStub = sinon.stub(worker, '_failJob').returns(Promise.resolve());
|
||||
});
|
||||
|
||||
it('should fail if not complete within allotted time', function () {
|
||||
it('should not fail job', function () {
|
||||
// fire of the job worker
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.notCalled(failStub);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit timeout if not completed in time', function (done) {
|
||||
worker.once(constants.EVENT_WORKER_JOB_TIMEOUT, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('name', 'WorkerTimeoutError');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('worker failure', function () {
|
||||
let worker;
|
||||
let failStub;
|
||||
|
||||
const timeout = 20;
|
||||
const job = {
|
||||
_id: 'testTimeoutJob',
|
||||
_source: {
|
||||
timeout: timeout,
|
||||
payload: 'test'
|
||||
}
|
||||
};
|
||||
|
||||
describe('reject', function () {
|
||||
beforeEach(function () {
|
||||
const workerFn = function () {
|
||||
return new Promise(function (resolve, reject) {
|
||||
setTimeout(() => {
|
||||
reject();
|
||||
}, timeout / 2);
|
||||
});
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
});
|
||||
|
||||
it('should fail the job', function () {
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit worker execution error', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('throw', function () {
|
||||
beforeEach(function () {
|
||||
const workerFn = function () {
|
||||
throw new Error('test throw');
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
});
|
||||
|
||||
it('should fail the job', function () {
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit worker execution error', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user