Compare commits
204 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| f4dfddef8e | |||
| efbda1b1f5 | |||
| bef3104755 | |||
| be536277c3 | |||
| 71d8b63531 | |||
| 83a0352202 | |||
| 170ba3b8a0 | |||
| 26715f9ba2 | |||
| b980ad9e9e | |||
| 9d8ae30ec6 | |||
|
|
295db5e8fd | ||
| 6620a5de39 | |||
| bb898e26c2 | |||
| 4813c5bfa3 | |||
| 0ce26f6d9d | |||
| 6bdf7163b6 | |||
| 128905cdb4 | |||
| 01adab4174 | |||
| eb17575a96 | |||
| 73c7147c26 | |||
| 7787e35ea5 | |||
| 91e5316877 | |||
| ee948c5900 | |||
| ff944c523a | |||
| 6401022021 | |||
| 32fb6a92f5 | |||
| d4f24182b4 | |||
| 283731d6e6 | |||
| 79affbce76 | |||
| 9b02bdf56c | |||
| bae58b0370 | |||
| c1216d5a4b | |||
| 0e0c3feab9 | |||
| 0fb14d46c9 | |||
| 7900d0c7e1 | |||
| c84b62bd31 | |||
| d972c9a1cb | |||
| e294af2688 | |||
| 73efd7167d | |||
| 0ed702c665 | |||
| 2324256039 | |||
| c6c73f6dd2 | |||
| 9cff4e4b04 | |||
| 1e8340f214 | |||
| bbd82a1de1 | |||
| 64c1c90337 | |||
| 5e0cc440ff | |||
| 1e3168452f | |||
|
|
3cc8573ab0 | ||
| b2f4ae857f | |||
| c64c5c80d6 | |||
| 4befaee4cc | |||
|
|
af47d230db | ||
|
|
7c1189cf3c | ||
| bcca5667fb | |||
| 5886c1399d | |||
| 40aa1d0a2c | |||
|
|
3d34642de4 | ||
| 7836ae5b39 | |||
| 6913391526 | |||
| 2753986038 | |||
| 64ff676fab | |||
| 160f087c9f | |||
| 50998fa69a | |||
| d7e12206c5 | |||
| d71792f939 | |||
|
|
9c411b0b11 | ||
|
|
7af0c82080 | ||
| c0c535076e | |||
| f5a12db60c | |||
| 3bf62525a9 | |||
| c368fa91a1 | |||
| a5bfb96855 | |||
| 83f267a028 | |||
| faeb705dee | |||
| 2693c40423 | |||
| 10003e147d | |||
| c976684a42 | |||
| 1a1f9358fd | |||
| 7867f14476 | |||
| 8b652f11a9 | |||
| f3ac7c1958 | |||
| 597052dc4e | |||
| 2c5519c253 | |||
| b916d1352a | |||
| 6ad438db96 | |||
| 38287f10f7 | |||
| 9d20095ffb | |||
| 67381108d7 | |||
| 2c025e02c9 | |||
| 041af798a8 | |||
| fd118fa746 | |||
| f8db6e1bd3 | |||
| 775442f284 | |||
| 8e8609eede | |||
| 25878c0b33 | |||
| bfe8799b90 | |||
| c758fb55a6 | |||
| 8cd2fde3a8 | |||
| 9a71fcc7b5 | |||
| 95569c9e82 | |||
| 288daecb6b | |||
| 9cfc080b64 | |||
| bdd94096db | |||
| 40d67829c8 | |||
| a50dbf752e | |||
| 630733b093 | |||
| c6986e3677 | |||
| 27390fef44 | |||
| 3b135bbd09 | |||
| 5ed3280a18 | |||
| 79358a76dd | |||
| d16d3ea4dd | |||
| 057bd26b74 | |||
| 362469f541 | |||
| a2391a30c5 | |||
| 3934f0cd1b | |||
| 919fec4835 | |||
| b99f5ff1b9 | |||
| c44d275395 | |||
| da57fdeee7 | |||
| aa2c0040d5 | |||
| d2e843f05b | |||
| 13a78d12cc | |||
| 1d4c45c5d9 | |||
| eba6748ba9 | |||
| 257645f11c | |||
| 460d83411e | |||
| 2b2db9c5f9 | |||
| df03738b9a | |||
| db1d282da2 | |||
| a5f1d77f23 | |||
| d73155d538 | |||
| 866f5948af | |||
| eae6942ec2 | |||
| b0fec7d1ed | |||
| 638c896e37 | |||
| 5d5552c548 | |||
| ea0ea7e6c2 | |||
| 85cc4bf7f8 | |||
| 34592740c7 | |||
| eeafaf7d42 | |||
| 9e3515ebd5 | |||
| 78871f97d9 | |||
| 5954ee1d51 | |||
| a2d3fb7ffd | |||
| 04608b0ab2 | |||
| c541e07bb5 | |||
| d70a8cc3ea | |||
| 68ef5d2147 | |||
| 1c24a4f7c1 | |||
| 5fe4e644d6 | |||
| 39315dddd0 | |||
| 57c1bd1819 | |||
| e0605796a1 | |||
| a1a7b9e213 | |||
| dcecd4020b | |||
| 40c8d15562 | |||
| 21d52b4b50 | |||
| a0042be7c9 | |||
| d325f108f7 | |||
| 1fe139e1b0 | |||
| 4beb4a880b | |||
| 9ab3bb048e | |||
| ca0da61a69 | |||
| 59d30bbb0f | |||
| 6ce18a5477 | |||
| 89c08068dd | |||
| 5f2e2b09cf | |||
| fb2f51b11d | |||
| 8ac9a81fdb | |||
| 4472e725fa | |||
| be1eb81059 | |||
| 8d21dc6967 | |||
| 31159baae9 | |||
| 49965bbaf1 | |||
| 49b982db99 | |||
| 9aa8eed297 | |||
| 0bf6fb0023 | |||
| 300449bfb0 | |||
| 868c808db7 | |||
| ef61a33a38 | |||
| dae14e0edc | |||
| c51ea64bdd | |||
| 5d37399fbf | |||
| e4e8e9222c | |||
| 02a530c4c7 | |||
| 959f58ad8b | |||
| e9c3f5553d | |||
| 234b829adf | |||
| df9508808b | |||
| 609e81fdef | |||
| 3375335d24 | |||
| 0020050f3f | |||
| fa784393e5 | |||
| a4323433f2 | |||
| 7d08b98b15 | |||
| 4793027ff3 | |||
| 38532a6296 | |||
| aa5ea72e3b | |||
| d1e5d68f74 | |||
| e077442340 | |||
| 82506a74e8 | |||
| cae02cb0f8 |
4
.gitignore
vendored
4
.gitignore
vendored
@@ -1,5 +1,7 @@
|
||||
.DS_Store
|
||||
lib
|
||||
node_modules
|
||||
npm-debug.log
|
||||
.nyc_output
|
||||
coverage.lcov
|
||||
coverage.lcov
|
||||
yarn.lock
|
||||
|
||||
@@ -1 +1 @@
|
||||
4.3.2
|
||||
6.x
|
||||
|
||||
23
.travis.yml
23
.travis.yml
@@ -1,7 +1,24 @@
|
||||
language: node_js
|
||||
|
||||
node_js:
|
||||
- "4"
|
||||
- "4.3"
|
||||
- "stable"
|
||||
- "8"
|
||||
- "6"
|
||||
|
||||
cache:
|
||||
directories:
|
||||
- node_modules
|
||||
|
||||
notifications:
|
||||
email: false
|
||||
email:
|
||||
on_success: never
|
||||
on_failure: change
|
||||
|
||||
after_success: npm run coverage
|
||||
|
||||
script: npm run test:ci
|
||||
|
||||
branches:
|
||||
only:
|
||||
- master
|
||||
- /^v[0-9].*$/
|
||||
|
||||
153
CHANGELOG.md
Normal file
153
CHANGELOG.md
Normal file
@@ -0,0 +1,153 @@
|
||||
# Changelog
|
||||
|
||||
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
|
||||
|
||||
## v3.0.0
|
||||
|
||||
- support for node v4 or earlier is no longer tested
|
||||
- update several dependencies
|
||||
|
||||
## v2.0.3
|
||||
|
||||
- rename to elastiq
|
||||
- fix issue where job poller would not wait for ES response
|
||||
- when job polling search fails, wait for a 20x interval before searching again
|
||||
|
||||
## v2.0.2
|
||||
|
||||
- Fix issue where creating a worker would not use the queue's doctype by default
|
||||
|
||||
## v2.0.1
|
||||
|
||||
- Don't swallow 400 errors at index creation
|
||||
|
||||
## v2.0.0
|
||||
|
||||
- Change `sting` mappings to `keyword`, since [string is deprecated and is being removed from elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_50_mapping_changes.html#_literal_string_literal_fields_replaced_by_literal_text_literal_literal_keyword_literal_fields)
|
||||
|
||||
## v1.0.0
|
||||
|
||||
- [BREAKING] elasticsearch package is a peerDependency now, since it's not required if you are using the `client` option when instantiating the queue
|
||||
- Remove lodash.isPlainObject and lodash.omit dependencies, use customer helpers
|
||||
- Remove errors dependency, use custom errors
|
||||
|
||||
## v0.12.2
|
||||
|
||||
- Fixed issue where destoyed workers could continue running
|
||||
|
||||
## v0.12.1
|
||||
|
||||
- Move repo to elastic org
|
||||
|
||||
## v0.12.0
|
||||
|
||||
- [BREAKING] Rename general queue error to `queue:error` instead of simply `error`
|
||||
- Remove the `timeout` parameter from the ping operation on intiialization
|
||||
|
||||
## v0.11.1
|
||||
|
||||
- Apache 2.0 license file
|
||||
|
||||
## v0.11.0
|
||||
|
||||
- Contrary to the [source filtering docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-source-filtering.html), use plural form of include/exclude due to breaking change in Elasticsearch 5.0
|
||||
|
||||
## v0.10.5
|
||||
|
||||
- Apache 2.0 license file
|
||||
|
||||
## v0.10.4
|
||||
|
||||
- Allow index pattern date separator to be customized
|
||||
|
||||
## v0.10.3
|
||||
|
||||
- Bump moment.js version, fix [DoS issue](https://nodesecurity.io/advisories/55)
|
||||
|
||||
## v0.10.2
|
||||
|
||||
- Allow passing headers on job creation, useful for auth and proxy traversal
|
||||
|
||||
## v0.10.1
|
||||
|
||||
- Refresh Elasticsearch index when creating job, fire event after refresh
|
||||
|
||||
## v0.10.0
|
||||
|
||||
- [BREAKING] Remove header passing on job creation
|
||||
- [BREAKING] Job instantiation requires full queue instance
|
||||
- Expose event names in constants
|
||||
- Emit on Worker success conditions as well as errors
|
||||
- Worker and Job emits on the Queue instance
|
||||
|
||||
## v0.9.0
|
||||
|
||||
- [BREAKING] Rename timeout error event
|
||||
- Fix worker timeout condition
|
||||
- Fix issue where a worker error was not an instance of Error, or lacked a `toString()` method
|
||||
- Allow specifying option to pass to elasticsearch client on index creation
|
||||
|
||||
## v0.8.0
|
||||
|
||||
- [BREAKING] Don't throw on worker failures
|
||||
- [BREAKING] Don't emit errors on queue instance
|
||||
|
||||
## v0.7.0
|
||||
|
||||
- [BREAKING] Don't throw on job creation failures
|
||||
|
||||
## v0.6.1
|
||||
|
||||
- Allow headers option on job creation, passed to elasticsearch index request
|
||||
|
||||
## v0.6.0
|
||||
|
||||
- Allow client instance to be passed when creating a job
|
||||
- Allow client instance to be passed when creating a worker
|
||||
- Prefer any 4.x version of node for development
|
||||
|
||||
## v0.5.0
|
||||
|
||||
- [BREAKING] Change default `created_by` value to `false` (formerly `null`)
|
||||
|
||||
## v0.4.1
|
||||
|
||||
- Use `filter` instead of `must` to query for outstanding jobs
|
||||
|
||||
## v0.4.0
|
||||
|
||||
- [BREAKING] Change `priority` mapping to *byte*
|
||||
- Exclude `output.content` from _source when query jobs
|
||||
- Add optional `created_by` value to job documents
|
||||
|
||||
## v0.3.2
|
||||
|
||||
- Misisng indiced returns empty array (fixed errors in v0.3.1)
|
||||
|
||||
## v0.3.1
|
||||
|
||||
- Ignore missing indices when looking for jobs
|
||||
|
||||
## v0.3.0
|
||||
|
||||
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
|
||||
|
||||
## v0.2.2
|
||||
|
||||
- Swollow errors when saving job output
|
||||
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
|
||||
- Update npm package
|
||||
|
||||
## v0.2.1
|
||||
|
||||
- Use `esqueue` namespace for debugging
|
||||
|
||||
## v0.2.0
|
||||
|
||||
- [BREAKING] Async jobs should return promises, not use callbacks
|
||||
- Remove bluebird dependency
|
||||
- Only require specific lodash modules, instead of the whole library
|
||||
|
||||
## v0.1.0
|
||||
|
||||
- Initial release
|
||||
13
LICENSE
Normal file
13
LICENSE
Normal file
@@ -0,0 +1,13 @@
|
||||
Copyright 2016 Joe Fleming
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
67
package.json
67
package.json
@@ -1,50 +1,59 @@
|
||||
{
|
||||
"name": "esqueue",
|
||||
"version": "0.2.1",
|
||||
"description": "",
|
||||
"name": "elastiq",
|
||||
"version": "3.0.0",
|
||||
"description": "Job queue, powered by Elasticsearch",
|
||||
"main": "lib/index.js",
|
||||
"scripts": {
|
||||
"build": "rm -rf lib && babel src --out-dir lib",
|
||||
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov",
|
||||
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
|
||||
"prepublish": "in-publish && npm run test || not-in-publish",
|
||||
"test": "npm run build && npm run unit",
|
||||
"test": "retire -n -p package.json && npm run build && npm run unit",
|
||||
"lint": "eslint \"*.js\" \"src/**/*.js\"",
|
||||
"test:ci": "npm run lint && npm run test",
|
||||
"unit": "nyc --require babel-core/register mocha test/src/**"
|
||||
},
|
||||
"author": "Joe Fleming (https://github.com/w33ble)",
|
||||
"keywords": [
|
||||
"job",
|
||||
"queue",
|
||||
"worker",
|
||||
"elasticsearch"
|
||||
],
|
||||
"repository": {
|
||||
"type": "git",
|
||||
"url": "https://github.com/w33ble/esqueue.git"
|
||||
"url": "https://github.com/w33ble/elastiq.git"
|
||||
},
|
||||
"license": "Apache-2.0",
|
||||
"engines": {
|
||||
"node": ">=4.3.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@elastic/eslint-config-kibana": "0.0.3",
|
||||
"babel-cli": "~6.7.5",
|
||||
"babel-core": "~6.7.6",
|
||||
"babel-eslint": "~4.1.8",
|
||||
"babel-plugin-add-module-exports": "~0.1.2",
|
||||
"babel-preset-es2015": "~6.6.0",
|
||||
"codecov": "~1.0.1",
|
||||
"eslint": "~1.10.3",
|
||||
"eslint-plugin-mocha": "~1.1.0",
|
||||
"eslint-plugin-react": "~4.2.3",
|
||||
"@elastic/eslint-config-kibana": "^0.3.0",
|
||||
"babel-cli": "^6.23.0",
|
||||
"babel-core": "^6.23.1",
|
||||
"babel-eslint": "6.1.2",
|
||||
"babel-plugin-add-module-exports": "^0.2.1",
|
||||
"babel-preset-es2015": "^6.22.0",
|
||||
"elasticsearch": "^13.0.1",
|
||||
"eslint": "3.11.1",
|
||||
"eslint-plugin-babel": "4.0.0",
|
||||
"eslint-plugin-mocha": "4.7.0",
|
||||
"eslint-plugin-react": "^7.0.1",
|
||||
"expect.js": "~0.3.1",
|
||||
"lodash": "~4.11.1",
|
||||
"mocha": "~2.4.5",
|
||||
"nyc": "~6.4.2",
|
||||
"proxyquire": "~1.7.4",
|
||||
"sinon": "~1.17.3"
|
||||
"lodash": "^4.17.4",
|
||||
"mocha": "^3.2.0",
|
||||
"nyc": "^10.1.2",
|
||||
"proxyquire": "^1.7.4",
|
||||
"retire": "^1.2.12",
|
||||
"sinon": "^2.3.1"
|
||||
},
|
||||
"peerDependencies": {
|
||||
"elasticsearch": ">=11.0.1"
|
||||
},
|
||||
"dependencies": {
|
||||
"debug": "~2.2.0",
|
||||
"elasticsearch": "~11.0.1",
|
||||
"error": "~7.0.2",
|
||||
"in-publish": "~2.0.0",
|
||||
"lodash.isplainobject": "~4.0.4",
|
||||
"lodash.omit": "~4.2.1",
|
||||
"moment": "~2.10.6",
|
||||
"puid": "~1.0.5"
|
||||
"debug": "^2.6.1",
|
||||
"in-publish": "^2.0.0",
|
||||
"moment": "^2.17.1",
|
||||
"puid": "^1.0.5"
|
||||
}
|
||||
}
|
||||
|
||||
76
readme.md
76
readme.md
@@ -1,18 +1,32 @@
|
||||
[](https://travis-ci.org/w33ble/esqueue) [](https://codecov.io/gh/w33ble/esqueue)
|
||||
[](https://travis-ci.org/w33ble/elastiq)
|
||||
[](https://raw.githubusercontent.com/w33ble/elastiq/master/LICENSE)
|
||||
[](https://github.com/w33ble/elastiq#project-status)
|
||||
|
||||
# esqueue
|
||||
# elastiq
|
||||
|
||||
`esqueue` is an Elasticsearch-powered job queue
|
||||
`elastiq` is an Elasticsearch-powered job queue.
|
||||
|
||||
Pronounced Elasti-queue. This is not supported by Elastic.
|
||||
|
||||
## Project Status
|
||||
|
||||
While it's believed to be pretty stable, this library isn't really being used anywhere. Issues and PRs are welcome, but it isn't actively being developed. As such, it doesn't get a lot of stress testing, and I don't recommend you rely too heavily on it, hence its experimental status.
|
||||
|
||||
## Installation
|
||||
|
||||
`npm install esqueue`
|
||||
Version | Elasticsearch Version
|
||||
------- | ---------------------
|
||||
3.x + | 5.x +
|
||||
|
||||
`npm install elastiq`
|
||||
|
||||
If you are working with an older version of Elasticsearch, consider using `esqueue`.
|
||||
|
||||
## Usage
|
||||
|
||||
Simply include the module in your application.
|
||||
|
||||
`var Esqueue = require('esqueue');`
|
||||
`var elastiq = require('elastiq');`
|
||||
|
||||
### Creating a queue
|
||||
|
||||
@@ -22,7 +36,7 @@ The first step is to create a new Queue instance. This is your point of entry, i
|
||||
var index = 'my-index';
|
||||
var options = {};
|
||||
|
||||
var queue = new Esqueue(index, options);
|
||||
var queue = new Elastiq(index, options);
|
||||
```
|
||||
|
||||
The queue instance is an event emitter, so you can listen for `error` events as you would any other event emitter.
|
||||
@@ -32,7 +46,10 @@ The queue instance is an event emitter, so you can listen for `error` events as
|
||||
Option | Default | Description
|
||||
------ | ----------- | -------
|
||||
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
|
||||
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
|
||||
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
|
||||
doctype | `elastiq` | The doctype to use in Elasticsearch
|
||||
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
|
||||
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
|
||||
|
||||
|
||||
@@ -59,6 +76,9 @@ Option | Default | Description
|
||||
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
|
||||
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
|
||||
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
|
||||
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
|
||||
headers | | Any headers to add to the index request. Handy for custom authentication or use with a proxy.
|
||||
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
|
||||
|
||||
### Creating a worker
|
||||
|
||||
@@ -103,28 +123,62 @@ Option | Default | Description
|
||||
------ | ----------- | -------
|
||||
interval | `1500` | Time, in `ms` to poll for new jobs in the queue.
|
||||
size | `10` | Number of records to return when polling for new jobs. Higher values may result in less Elasticsearch requests, but may also take longer to execute. A bit of tuning based on the number of workers you have my be required here.
|
||||
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
|
||||
doctype | `queue.doctype` | The doctype to use when polling for new jobs. You probably don't want to change this.
|
||||
|
||||
The worker's `output` can either be the raw output from the job, or on object that specifies the output's content type.
|
||||
|
||||
```js
|
||||
var workerFn1 = function (payload, cb) {
|
||||
var workerFn1 = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
var output = new Date().toString();
|
||||
cb(null, output);
|
||||
return output;
|
||||
};
|
||||
|
||||
var workerFn2 = function (payload, cb) {
|
||||
var workerFn2 = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
var output = {
|
||||
content_type: 'text/plain',
|
||||
content: new Date().toString();
|
||||
};
|
||||
cb(null, output);
|
||||
return output;
|
||||
};
|
||||
|
||||
var asyncWorker = function (payload) {
|
||||
// Do some work, using the payload if required
|
||||
return Promise.resolve({
|
||||
content_type: 'text/plain',
|
||||
content: new Date().toString();
|
||||
})
|
||||
};
|
||||
|
||||
```
|
||||
|
||||
Both are valid, but the `workerFn2` is likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned.
|
||||
All of the above are valid. `workerFn2` and `asyncWorker` are likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned. Note that returning a Promise is all that's required for an async result in the worker functions.
|
||||
|
||||
## Queue events
|
||||
|
||||
`elastiq` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
|
||||
|
||||
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
|
||||
|
||||
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
|
||||
|
||||
```js
|
||||
var Queue = require('elastiq');
|
||||
var queueEvents = require('elastiq/lib/constants/events');
|
||||
|
||||
var jobQueue = new Queue('my-index');
|
||||
|
||||
jobQueue.on(queueEvents.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
// handle error
|
||||
console.log('ONOZ!!! Job creation failed :(', err.error.message);
|
||||
});
|
||||
```
|
||||
|
||||
The argument passed to listeners typically contains several pieces of information about what happened. For example, Worker events will contain information about the job, the worker, and if it's an error event, the error.
|
||||
|
||||
More than any other events, you'll probably want to know if a worker completed or failed. When a worker starts, it will always either emit `EVENT_WORKER_COMPLETE` or `EVENT_WORKER_JOB_FAIL`. Faliures may also emit other events, such as `EVENT_WORKER_JOB_TIMEOUT` or `EVENT_WORKER_JOB_EXECUTION_ERROR`, but you can rely on `EVENT_WORKER_JOB_FAIL` for all failure cases.
|
||||
|
||||
## Scaling the queue
|
||||
|
||||
|
||||
6
src/constants/default_settings.js
Normal file
6
src/constants/default_settings.js
Normal file
@@ -0,0 +1,6 @@
|
||||
export default {
|
||||
DEFAULT_SETTING_TIMEOUT: 10000,
|
||||
DEFAULT_SETTING_DATE_SEPARATOR: '-',
|
||||
DEFAULT_SETTING_INTERVAL: 'week',
|
||||
DEFAULT_SETTING_DOCTYPE: 'elastiq',
|
||||
};
|
||||
16
src/constants/events.js
Normal file
16
src/constants/events.js
Normal file
@@ -0,0 +1,16 @@
|
||||
export default {
|
||||
EVENT_QUEUE_ERROR: 'queue:error',
|
||||
EVENT_JOB_ERROR: 'job:error',
|
||||
EVENT_JOB_CREATED: 'job:created',
|
||||
EVENT_JOB_CREATE_ERROR: 'job:creation error',
|
||||
EVENT_WORKER_COMPLETE: 'worker:job complete',
|
||||
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
|
||||
EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready',
|
||||
EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned',
|
||||
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
|
||||
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
|
||||
EVENT_WORKER_JOB_FAIL: 'worker:job failed',
|
||||
EVENT_WORKER_JOB_FAIL_ERROR: 'worker:failed job update error',
|
||||
EVENT_WORKER_JOB_EXECUTION_ERROR: 'worker:job execution error',
|
||||
EVENT_WORKER_JOB_TIMEOUT: 'worker:job timeout',
|
||||
};
|
||||
5
src/constants/index.js
Normal file
5
src/constants/index.js
Normal file
@@ -0,0 +1,5 @@
|
||||
import events from './events';
|
||||
import statuses from './statuses';
|
||||
import defaultSettings from './default_settings';
|
||||
|
||||
export default Object.assign({}, events, statuses, defaultSettings);
|
||||
@@ -1,9 +1,7 @@
|
||||
export const jobStatuses = {
|
||||
export default {
|
||||
JOB_STATUS_PENDING: 'pending',
|
||||
JOB_STATUS_PROCESSING: 'processing',
|
||||
JOB_STATUS_COMPLETED: 'completed',
|
||||
JOB_STATUS_FAILED: 'failed',
|
||||
JOB_STATUS_CANCELLED: 'cancelled',
|
||||
};
|
||||
|
||||
export default Object.assign({}, jobStatuses);
|
||||
@@ -3,7 +3,6 @@ import elasticsearch from 'elasticsearch';
|
||||
export default function createClient(options) {
|
||||
let client;
|
||||
|
||||
// if there's a transport property, assume it's a client instance
|
||||
if (isClient(options)) {
|
||||
client = options;
|
||||
} else {
|
||||
@@ -11,8 +10,9 @@ export default function createClient(options) {
|
||||
}
|
||||
|
||||
return client;
|
||||
};
|
||||
}
|
||||
|
||||
export function isClient(client) {
|
||||
// if there's a transport property, assume it's a client instance
|
||||
return !!client.transport;
|
||||
}
|
||||
@@ -1,9 +1,12 @@
|
||||
var schema = {
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../constants';
|
||||
|
||||
const schema = {
|
||||
jobtype: { type: 'keyword' },
|
||||
payload: { type: 'object', enabled: false },
|
||||
priority: { type: 'short' },
|
||||
priority: { type: 'byte' },
|
||||
timeout: { type: 'long' },
|
||||
process_expiration: { type: 'date' },
|
||||
created_by: { type: 'string', index: 'not_analyzed' },
|
||||
created_by: { type: 'keyword' },
|
||||
created_at: { type: 'date' },
|
||||
started_at: { type: 'date' },
|
||||
completed_at: { type: 'date' },
|
||||
@@ -13,20 +16,17 @@ var schema = {
|
||||
output: {
|
||||
type: 'object',
|
||||
properties: {
|
||||
content_type: { type: 'string', index: 'not_analyzed' },
|
||||
content_type: { type: 'keyword', index: false },
|
||||
content: { type: 'object', enabled: false }
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export default function createIndex(client, indexName) {
|
||||
const indexBody = {
|
||||
mappings: {
|
||||
_default_: {
|
||||
properties: schema
|
||||
}
|
||||
}
|
||||
};
|
||||
export default function createIndex(client, indexName, doctype = DEFAULT_SETTING_DOCTYPE, settings = {}) {
|
||||
const indexBody = { mappings : {} };
|
||||
indexBody.mappings[doctype] = { properties: schema };
|
||||
|
||||
const body = Object.assign({}, { settings }, indexBody);
|
||||
|
||||
return client.indices.exists({
|
||||
index: indexName,
|
||||
@@ -34,9 +34,8 @@ export default function createIndex(client, indexName) {
|
||||
.then((exists) => {
|
||||
if (!exists) {
|
||||
return client.indices.create({
|
||||
ignore: 400,
|
||||
index: indexName,
|
||||
body: indexBody
|
||||
body: body
|
||||
})
|
||||
.then(() => true);
|
||||
}
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
import typedError from 'error/typed';
|
||||
export function WorkerTimeoutError(message, props = {}) {
|
||||
this.name = 'WorkerTimeoutError';
|
||||
this.message = message;
|
||||
this.timeout = props.timeout;
|
||||
this.jobId = props.jobId;
|
||||
|
||||
const errors = {};
|
||||
if ('captureStackTrace' in Error) Error.captureStackTrace(this, WorkerTimeoutError);
|
||||
else this.stack = (new Error()).stack;
|
||||
}
|
||||
WorkerTimeoutError.prototype = Object.create(Error.prototype);
|
||||
|
||||
errors.WorkerTimeoutError = typedError({
|
||||
type: 'WorkerTimeout',
|
||||
message: 'worker timed out, timeout={timeout}',
|
||||
timeout: null,
|
||||
jobId: null
|
||||
});
|
||||
export function UnspecifiedWorkerError(message, props = {}) {
|
||||
this.name = 'UnspecifiedWorkerError';
|
||||
this.message = message;
|
||||
this.jobId = props.jobId;
|
||||
|
||||
export default errors;
|
||||
if ('captureStackTrace' in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
|
||||
else this.stack = (new Error()).stack;
|
||||
}
|
||||
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);
|
||||
|
||||
@@ -9,7 +9,9 @@ export const intervals = [
|
||||
'minute'
|
||||
];
|
||||
|
||||
export default function indexTimestamp(intervalStr) {
|
||||
export default function indexTimestamp(intervalStr, separator = '-') {
|
||||
if (separator.match(/[a-z]/i)) throw new Error('Interval separator can not be a letter');
|
||||
|
||||
const index = intervals.indexOf(intervalStr);
|
||||
if (index === -1) throw new Error('Invalid index interval: ', intervalStr);
|
||||
|
||||
@@ -22,16 +24,16 @@ export default function indexTimestamp(intervalStr) {
|
||||
dateString = 'YYYY';
|
||||
break;
|
||||
case 'month':
|
||||
dateString = 'YYYY-MM';
|
||||
dateString = `YYYY${separator}MM`;
|
||||
break;
|
||||
case 'hour':
|
||||
dateString = 'YYYY-MM-DD-HH';
|
||||
dateString = `YYYY${separator}MM${separator}DD${separator}HH`;
|
||||
break;
|
||||
case 'minute':
|
||||
dateString = 'YYYY-MM-DD-HH-mm';
|
||||
dateString = `YYYY${separator}MM${separator}DD${separator}HH${separator}mm`;
|
||||
break;
|
||||
default:
|
||||
dateString = 'YYYY-MM-DD';
|
||||
dateString = `YYYY${separator}MM${separator}DD`;
|
||||
}
|
||||
|
||||
return m.format(dateString);
|
||||
|
||||
3
src/helpers/is_plain_object.js
Normal file
3
src/helpers/is_plain_object.js
Normal file
@@ -0,0 +1,3 @@
|
||||
export default function (obj) {
|
||||
return (typeof obj === 'object' && !Array.isArray(obj) && obj !== null);
|
||||
}
|
||||
11
src/helpers/object_omit.js
Normal file
11
src/helpers/object_omit.js
Normal file
@@ -0,0 +1,11 @@
|
||||
import isPlainObject from './is_plain_object';
|
||||
|
||||
export default function (obj, props) {
|
||||
if (!isPlainObject(obj)) return obj;
|
||||
if (!Array.isArray(props)) props = [props];
|
||||
|
||||
const newObj = Object.assign({}, obj);
|
||||
|
||||
props.forEach(prop => delete newObj[prop]);
|
||||
return newObj;
|
||||
}
|
||||
41
src/index.js
41
src/index.js
@@ -1,32 +1,35 @@
|
||||
import events from 'events';
|
||||
import createClient from './helpers/es_client';
|
||||
import indexTimestamp from './helpers/index_timestamp';
|
||||
import logger from './helpers/logger';
|
||||
import Job from './job.js';
|
||||
import Worker from './worker.js';
|
||||
import omit from 'lodash.omit';
|
||||
import constants from './constants';
|
||||
import createClient from './helpers/create_client';
|
||||
import indexTimestamp from './helpers/index_timestamp';
|
||||
import objectOmit from './helpers/object_omit';
|
||||
import logger from './helpers/logger';
|
||||
|
||||
const debug = logger('esqueue:queue');
|
||||
const debug = logger('elastiq:queue');
|
||||
|
||||
export default class Esqueue extends events.EventEmitter {
|
||||
export default class Elastiq extends events.EventEmitter {
|
||||
constructor(index, options = {}) {
|
||||
if (!index) throw new Error('Must specify an index to write to');
|
||||
|
||||
super();
|
||||
this.index = index;
|
||||
this.settings = Object.assign({
|
||||
interval: 'week',
|
||||
timeout: 10000,
|
||||
}, omit(options, [ 'client' ]));
|
||||
interval: constants.DEFAULT_SETTING_INTERVAL,
|
||||
timeout: constants.DEFAULT_SETTING_TIMEOUT,
|
||||
doctype: constants.DEFAULT_SETTING_DOCTYPE,
|
||||
dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR,
|
||||
}, objectOmit(options, [ 'client' ]));
|
||||
this.client = createClient(options.client || {});
|
||||
|
||||
this._workers = [];
|
||||
this._initTasks().catch((err) => this.emit('error', err));
|
||||
this._initTasks().catch((err) => this.emit(constants.EVENT_QUEUE_ERROR, err));
|
||||
}
|
||||
|
||||
_initTasks() {
|
||||
var initTasks = [
|
||||
this.client.ping({ timeout: 3000 }),
|
||||
const initTasks = [
|
||||
this.client.ping(),
|
||||
];
|
||||
|
||||
return Promise.all(initTasks).catch((err) => {
|
||||
@@ -36,14 +39,18 @@ export default class Esqueue extends events.EventEmitter {
|
||||
}
|
||||
|
||||
addJob(type, payload, opts = {}) {
|
||||
const timestamp = indexTimestamp(this.settings.interval);
|
||||
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
|
||||
const index = `${this.index}-${timestamp}`;
|
||||
const defaults = {
|
||||
timeout: this.settings.timeout,
|
||||
};
|
||||
|
||||
const options = Object.assign({
|
||||
timeout: this.settings.timeout
|
||||
}, opts);
|
||||
const options = Object.assign(defaults, opts, {
|
||||
doctype: this.settings.doctype,
|
||||
indexSettings: this.settings.indexSettings,
|
||||
});
|
||||
|
||||
return new Job(this.client, index, type, payload, options);
|
||||
return new Job(this, index, type, payload, options);
|
||||
}
|
||||
|
||||
registerWorker(type, workerFn, opts) {
|
||||
|
||||
93
src/job.js
93
src/job.js
@@ -1,70 +1,92 @@
|
||||
import events from 'events';
|
||||
import isPlainObject from 'lodash.isplainobject';
|
||||
import Puid from 'puid';
|
||||
import contstants from './constants';
|
||||
import logger from './helpers/logger';
|
||||
import { jobStatuses } from './helpers/constants';
|
||||
import createIndex from './helpers/create_index';
|
||||
import isPlainObject from './helpers/is_plain_object';
|
||||
|
||||
const debug = logger('esqueue:job');
|
||||
const debug = logger('elastiq:job');
|
||||
const puid = new Puid();
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
constructor(client, index, type, payload, options = {}) {
|
||||
constructor(queue, index, type, payload, options = {}) {
|
||||
if (typeof type !== 'string') throw new Error('Type must be a string');
|
||||
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
|
||||
|
||||
super();
|
||||
|
||||
this.client = client;
|
||||
this.queue = queue;
|
||||
this.client = options.client || this.queue.client;
|
||||
this.id = puid.generate();
|
||||
this.index = index;
|
||||
this.type = type;
|
||||
this.jobtype = type;
|
||||
this.payload = payload;
|
||||
this.created_by = options.created_by || false;
|
||||
this.timeout = options.timeout || 10000;
|
||||
this.maxAttempts = options.max_attempts || 3;
|
||||
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
|
||||
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
|
||||
this.indexSettings = options.indexSettings || {};
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this.ready = createIndex(client, index)
|
||||
.then(() => {
|
||||
return this.client.index({
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
id: this.id,
|
||||
body: {
|
||||
payload: this.payload,
|
||||
priority: this.priority,
|
||||
timeout: this.timeout,
|
||||
created_at: new Date(),
|
||||
attempts: 0,
|
||||
max_attempts: this.maxAttempts,
|
||||
status: jobStatuses.JOB_STATUS_PENDING,
|
||||
}
|
||||
})
|
||||
.then((doc) => {
|
||||
this.document = {
|
||||
id: doc._id,
|
||||
type: doc._type,
|
||||
version: doc._version,
|
||||
};
|
||||
this.debug(`Job created in index ${this.index}`);
|
||||
this.emit('created', this.document);
|
||||
const indexParams = {
|
||||
index: this.index,
|
||||
type: this.doctype,
|
||||
id: this.id,
|
||||
body: {
|
||||
jobtype: this.jobtype,
|
||||
payload: this.payload,
|
||||
priority: this.priority,
|
||||
created_by: this.created_by,
|
||||
timeout: this.timeout,
|
||||
process_expiration: new Date(0), // use epoch so the job query works
|
||||
created_at: new Date(),
|
||||
attempts: 0,
|
||||
max_attempts: this.maxAttempts,
|
||||
status: contstants.JOB_STATUS_PENDING,
|
||||
}
|
||||
};
|
||||
|
||||
if (options.headers) {
|
||||
indexParams.headers = options.headers;
|
||||
}
|
||||
|
||||
this.ready = createIndex(this.client, this.index, this.doctype, this.indexSettings)
|
||||
.then(() => this.client.index(indexParams))
|
||||
.then((doc) => {
|
||||
this.document = {
|
||||
id: doc._id,
|
||||
type: doc._type,
|
||||
index: doc._index,
|
||||
version: doc._version,
|
||||
};
|
||||
this.debug(`Job created in index ${this.index}`);
|
||||
|
||||
return this.client.indices.refresh({
|
||||
index: this.index
|
||||
}).then(() => {
|
||||
this.debug(`Job index refreshed ${this.index}`);
|
||||
this.emit(contstants.EVENT_JOB_CREATED, this.document);
|
||||
});
|
||||
})
|
||||
.catch((err) => {
|
||||
this.debug('Job creation failed', err);
|
||||
this.emit('error', err);
|
||||
throw err;
|
||||
this.emit(contstants.EVENT_JOB_CREATE_ERROR, err);
|
||||
});
|
||||
}
|
||||
|
||||
emit(name, ...args) {
|
||||
super.emit(name, ...args);
|
||||
this.queue.emit(name, ...args);
|
||||
}
|
||||
|
||||
get() {
|
||||
return this.ready
|
||||
.then(() => {
|
||||
return this.client.get({
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
id: this.id
|
||||
});
|
||||
})
|
||||
@@ -82,12 +104,13 @@ export default class Job extends events.EventEmitter {
|
||||
return Object.assign({
|
||||
id: this.id,
|
||||
index: this.index,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
jobtype: this.jobtype,
|
||||
created_by: this.created_by,
|
||||
payload: this.payload,
|
||||
timeout: this.timeout,
|
||||
max_attempts: this.maxAttempts,
|
||||
priority: this.priority,
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
249
src/worker.js
249
src/worker.js
@@ -1,14 +1,22 @@
|
||||
import events from 'events';
|
||||
import Puid from 'puid';
|
||||
import moment from 'moment';
|
||||
import constants from './constants';
|
||||
import logger from './helpers/logger';
|
||||
import { jobStatuses } from './helpers/constants';
|
||||
import { WorkerTimeoutError } from './helpers/errors';
|
||||
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
|
||||
|
||||
const puid = new Puid();
|
||||
const debug = logger('esqueue:worker');
|
||||
const debug = logger('elastiq:worker');
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
function formatJobObject(job) {
|
||||
return {
|
||||
index: job._index,
|
||||
type: job._type,
|
||||
id: job._id,
|
||||
};
|
||||
}
|
||||
|
||||
export default class Worker extends events.EventEmitter {
|
||||
constructor(queue, type, workerFn, opts = {}) {
|
||||
if (typeof type !== 'string') throw new Error('Type must be a string');
|
||||
if (typeof workerFn !== 'function') throw new Error('Worker must be a function');
|
||||
@@ -17,20 +25,52 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
this.id = puid.generate();
|
||||
this.queue = queue;
|
||||
this.client = this.queue.client;
|
||||
this.type = type;
|
||||
this.client = opts.client || this.queue.client;
|
||||
this.jobtype = type;
|
||||
this.workerFn = workerFn;
|
||||
this.checkInterval = opts.interval || 1500;
|
||||
this.checkSize = opts.size || 10;
|
||||
this.doctype = opts.doctype || this.queue.doctype || constants.DEFAULT_SETTING_DOCTYPE;
|
||||
|
||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||
|
||||
this._checker = false;
|
||||
this._poller = {
|
||||
timer: false,
|
||||
enabled: true,
|
||||
running: false,
|
||||
};
|
||||
|
||||
this.debug(`Created worker for job type ${this.jobtype}`);
|
||||
this._startJobPolling();
|
||||
}
|
||||
|
||||
destroy() {
|
||||
clearInterval(this._checker);
|
||||
this._poller.enabled = false;
|
||||
this._stopJobPolling();
|
||||
}
|
||||
|
||||
toJSON() {
|
||||
return {
|
||||
id: this.id,
|
||||
index: this.queue.index,
|
||||
jobType: this.jobType,
|
||||
doctype: this.doctype,
|
||||
};
|
||||
}
|
||||
|
||||
emit(name, ...args) {
|
||||
super.emit(name, ...args);
|
||||
this.queue.emit(name, ...args);
|
||||
}
|
||||
|
||||
_formatErrorParams(err, job) {
|
||||
const response = {
|
||||
error: err,
|
||||
worker: this.toJSON(),
|
||||
};
|
||||
|
||||
if (job) response.job = formatJobObject(job);
|
||||
return response;
|
||||
}
|
||||
|
||||
_claimJob(job) {
|
||||
@@ -49,7 +89,7 @@ export default class Job extends events.EventEmitter {
|
||||
attempts: attempts,
|
||||
started_at: startTime,
|
||||
process_expiration: expirationTime,
|
||||
status: jobStatuses.JOB_STATUS_PROCESSING,
|
||||
status: constants.JOB_STATUS_PROCESSING,
|
||||
};
|
||||
|
||||
return this.client.update({
|
||||
@@ -65,8 +105,10 @@ export default class Job extends events.EventEmitter {
|
||||
return updatedJob;
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return false;
|
||||
throw err;
|
||||
if (err.statusCode === 409) return true;
|
||||
this.debug(`_claimJob failed on job ${job._id}`, err);
|
||||
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -74,14 +116,18 @@ export default class Job extends events.EventEmitter {
|
||||
this.debug(`Failing job ${job._id}`);
|
||||
|
||||
const completedTime = moment().toISOString();
|
||||
const docOutput = this._formatOutput(output);
|
||||
const doc = {
|
||||
status: jobStatuses.JOB_STATUS_FAILED,
|
||||
status: constants.JOB_STATUS_FAILED,
|
||||
completed_at: completedTime,
|
||||
output: docOutput
|
||||
};
|
||||
|
||||
if (output) {
|
||||
doc.output = this._formatOutput(output);
|
||||
}
|
||||
this.emit(constants.EVENT_WORKER_JOB_FAIL, {
|
||||
job: formatJobObject(job),
|
||||
worker: this.toJSON(),
|
||||
output: docOutput,
|
||||
});
|
||||
|
||||
return this.client.update({
|
||||
index: job._index,
|
||||
@@ -90,9 +136,12 @@ export default class Job extends events.EventEmitter {
|
||||
version: job._version,
|
||||
body: { doc }
|
||||
})
|
||||
.then(() => true)
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return true;
|
||||
throw err;
|
||||
this.debug(`_failJob failed to update job ${job._id}`, err);
|
||||
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -116,10 +165,24 @@ export default class Job extends events.EventEmitter {
|
||||
this.debug(`Starting job ${job._id}`);
|
||||
|
||||
const workerOutput = new Promise((resolve, reject) => {
|
||||
resolve(this.workerFn.call(null, job._source.payload));
|
||||
// run the worker's workerFn
|
||||
let isResolved = false;
|
||||
Promise.resolve(this.workerFn.call(null, job._source.payload))
|
||||
.then((res) => {
|
||||
isResolved = true;
|
||||
resolve(res);
|
||||
})
|
||||
.catch((err) => {
|
||||
isResolved = true;
|
||||
reject(err);
|
||||
});
|
||||
|
||||
setTimeout(function () {
|
||||
reject(new WorkerTimeoutError({
|
||||
// fail if workerFn doesn't finish before timeout
|
||||
setTimeout(() => {
|
||||
if (isResolved) return;
|
||||
|
||||
this.debug(`Timeout processing job ${job._id}`);
|
||||
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
|
||||
timeout: job._source.timeout,
|
||||
jobId: job._id,
|
||||
}));
|
||||
@@ -134,7 +197,7 @@ export default class Job extends events.EventEmitter {
|
||||
const docOutput = this._formatOutput(output);
|
||||
|
||||
const doc = {
|
||||
status: jobStatuses.JOB_STATUS_COMPLETED,
|
||||
status: constants.JOB_STATUS_COMPLETED,
|
||||
completed_at: completedTime,
|
||||
output: docOutput
|
||||
};
|
||||
@@ -146,41 +209,90 @@ export default class Job extends events.EventEmitter {
|
||||
version: job._version,
|
||||
body: { doc }
|
||||
})
|
||||
.then(() => {
|
||||
const eventOutput = {
|
||||
job: formatJobObject(job),
|
||||
output: docOutput,
|
||||
};
|
||||
|
||||
this.emit(constants.EVENT_WORKER_COMPLETE, eventOutput);
|
||||
})
|
||||
.catch((err) => {
|
||||
if (err.statusCode === 409) return false;
|
||||
throw err;
|
||||
this.debug(`Failure saving job output ${job._id}`, err);
|
||||
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
|
||||
});
|
||||
}, (jobErr) => {
|
||||
// job execution failed
|
||||
if (jobErr.type === 'WorkerTimeout') {
|
||||
this.debug(`Timeout on job ${job._id}`);
|
||||
this.emit('job_timeout', jobErr);
|
||||
return;
|
||||
if (!jobErr) {
|
||||
jobErr = new UnspecifiedWorkerError('Unspecified worker error', {
|
||||
jobId: job._id,
|
||||
});
|
||||
}
|
||||
|
||||
this.debug(`Failure occurred on job ${job._id}`);
|
||||
this.emit('job_error', jobErr);
|
||||
return this._failJob(job, jobErr.toString());
|
||||
// job execution failed
|
||||
if (jobErr.name === 'WorkerTimeoutError') {
|
||||
this.debug(`Timeout on job ${job._id}`);
|
||||
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
|
||||
return;
|
||||
|
||||
// append the jobId to the error
|
||||
} else {
|
||||
try {
|
||||
Object.assign(jobErr, { jobId: job._id });
|
||||
} catch (e) {
|
||||
// do nothing if jobId can not be appended
|
||||
}
|
||||
}
|
||||
|
||||
this.debug(`Failure occurred on job ${job._id}`, jobErr);
|
||||
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
|
||||
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
|
||||
});
|
||||
}
|
||||
|
||||
_startJobPolling() {
|
||||
this._checker = setInterval(() => {
|
||||
if (!this._poller.enabled || this._poller.running) {
|
||||
return;
|
||||
}
|
||||
|
||||
this._poller.timer = setTimeout(() => {
|
||||
this._getPendingJobs()
|
||||
.then((jobs) => this._claimPendingJobs(jobs));
|
||||
.then((jobs) => {
|
||||
if (!this._poller.running) return;
|
||||
|
||||
const foundJobs = (!jobs || jobs.length === 0);
|
||||
const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs);
|
||||
|
||||
task.then(() => {
|
||||
this._poller.running = false;
|
||||
this._startJobPolling();
|
||||
});
|
||||
}, () => {
|
||||
// if the search failed for some reason, back off the polling
|
||||
// we assume errors came from a busy cluster
|
||||
// TODO: check what error actually happened
|
||||
const multiplier = 20;
|
||||
|
||||
setTimeout(() => {
|
||||
this._poller.running = false;
|
||||
this._startJobPolling();
|
||||
}, this.checkInterval * multiplier);
|
||||
});
|
||||
} , this.checkInterval);
|
||||
|
||||
this._poller.running = true;
|
||||
this.emit(constants.EVENT_WORKER_JOB_POLLING_READY);
|
||||
}
|
||||
|
||||
_stopJobPolling() {
|
||||
clearInterval(this._checker);
|
||||
this._poller.running = false;
|
||||
clearTimeout(this._poller.timer);
|
||||
}
|
||||
|
||||
_claimPendingJobs(jobs) {
|
||||
if (jobs.length === 0) return;
|
||||
|
||||
this._stopJobPolling();
|
||||
_claimPendingJobs(jobs = []) {
|
||||
let claimed = false;
|
||||
|
||||
// claim a single job, stopping after first successful claim
|
||||
return jobs.reduce((chain, job) => {
|
||||
return chain.then((claimedJob) => {
|
||||
// short-circuit the promise chain if a job has been claimed
|
||||
@@ -195,16 +307,6 @@ export default class Job extends events.EventEmitter {
|
||||
});
|
||||
});
|
||||
}, Promise.resolve())
|
||||
.catch((err) => {
|
||||
this.debug('Failed to claim outstanding jobs', err);
|
||||
this.emit('error', err);
|
||||
this.queue.emit('worker_error', {
|
||||
id: this.id,
|
||||
type: this.type,
|
||||
err
|
||||
});
|
||||
throw err;
|
||||
})
|
||||
.then((claimedJob) => {
|
||||
if (!claimedJob) {
|
||||
this.debug(`All ${jobs.length} jobs already claimed`);
|
||||
@@ -213,35 +315,38 @@ export default class Job extends events.EventEmitter {
|
||||
this.debug(`Claimed job ${claimedJob._id}`);
|
||||
return this._performJob(claimedJob);
|
||||
})
|
||||
.then(() => this._startJobPolling())
|
||||
.catch((err) => {
|
||||
this.debug('Error claiming jobs', err);
|
||||
this.emit('error', err);
|
||||
this._startJobPolling();
|
||||
});
|
||||
}
|
||||
|
||||
_getPendingJobs() {
|
||||
const nowTime = moment().toISOString();
|
||||
const dateFilter = {
|
||||
range: {
|
||||
process_expiration: {
|
||||
lte: nowTime
|
||||
}
|
||||
}
|
||||
};
|
||||
const query = {
|
||||
_source : {
|
||||
excludes: [ 'output.content' ]
|
||||
},
|
||||
query: {
|
||||
bool: {
|
||||
should: [
|
||||
{ bool: { must: [{ term: { status: 'pending'} }] }},
|
||||
{ bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } }
|
||||
]
|
||||
constant_score: {
|
||||
filter: {
|
||||
bool: {
|
||||
filter: { term: { jobtype: this.jobtype } },
|
||||
should: [
|
||||
{ term: { status: 'pending' } },
|
||||
{ bool:
|
||||
{ filter: [
|
||||
{ term: { status: 'processing' } },
|
||||
{ range: { process_expiration: { lte: nowTime } } }
|
||||
] }
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
sort: [
|
||||
{ priority: { order: 'asc' }},
|
||||
{ created_at: { order: 'asc' }}
|
||||
{ priority: { order: 'asc' } },
|
||||
{ created_at: { order: 'asc' } }
|
||||
],
|
||||
size: this.checkSize
|
||||
};
|
||||
@@ -250,24 +355,24 @@ export default class Job extends events.EventEmitter {
|
||||
|
||||
return this.client.search({
|
||||
index: `${this.queue.index}-*`,
|
||||
type: this.type,
|
||||
type: this.doctype,
|
||||
version: true,
|
||||
body: query
|
||||
})
|
||||
.then((results) => {
|
||||
const jobs = results.hits.hits;
|
||||
|
||||
this.debug(`${jobs.length} outstanding jobs returned`);
|
||||
this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs);
|
||||
|
||||
return jobs;
|
||||
})
|
||||
.catch((err) => {
|
||||
// ignore missing indices errors
|
||||
if (err.status === 404) return [];
|
||||
|
||||
this.debug('job querying failed', err);
|
||||
this.emit('error', err);
|
||||
this.queue.emit('worker_error', {
|
||||
id: this.id,
|
||||
type: this.type,
|
||||
err
|
||||
});
|
||||
throw err;
|
||||
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
20
test/fixtures/elasticsearch.js
vendored
20
test/fixtures/elasticsearch.js
vendored
@@ -1,20 +1,22 @@
|
||||
import { uniqueId, times, random } from 'lodash';
|
||||
import elasticsearch from 'elasticsearch';
|
||||
import constants from '../../lib/constants';
|
||||
|
||||
function Client() {
|
||||
this.indices = {
|
||||
create: () => Promise.resolve({ acknowledged: true }),
|
||||
exists: () => Promise.resolve(false),
|
||||
refresh: () => Promise.resolve(),
|
||||
};
|
||||
|
||||
this.transport = {};
|
||||
}
|
||||
|
||||
Client.prototype.index = function (params = {}) {
|
||||
var shardCount = 2;
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: 1,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
@@ -30,6 +32,8 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
if (params === elasticsearch.errors.NotFound) return elasticsearch.errors.NotFound;
|
||||
|
||||
const _source = Object.assign({
|
||||
jobtype: 'jobtype',
|
||||
created_by: false,
|
||||
payload: {
|
||||
id: 'sample-job-1',
|
||||
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
|
||||
@@ -42,21 +46,21 @@ Client.prototype.get = function (params = {}, source = {}) {
|
||||
status: 'pending'
|
||||
}, source);
|
||||
|
||||
return {
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
|
||||
_version: params.version || 1,
|
||||
found: true,
|
||||
_source: _source
|
||||
};
|
||||
});
|
||||
};
|
||||
|
||||
Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||
const hits = times(count, () => {
|
||||
return {
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: uniqueId('documentId'),
|
||||
_version: random(1, 5),
|
||||
_score: null,
|
||||
@@ -83,10 +87,10 @@ Client.prototype.search = function (params = {}, count = 5, source = {}) {
|
||||
};
|
||||
|
||||
Client.prototype.update = function (params = {}) {
|
||||
var shardCount = 2;
|
||||
const shardCount = 2;
|
||||
return Promise.resolve({
|
||||
_index: params.index || 'index',
|
||||
_type: params.type || 'type',
|
||||
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
|
||||
_id: params.id || uniqueId('testDoc'),
|
||||
_version: params.version + 1 || 2,
|
||||
_shards: { total: shardCount, successful: shardCount, failed: 0 },
|
||||
|
||||
17
test/fixtures/job.js
vendored
Normal file
17
test/fixtures/job.js
vendored
Normal file
@@ -0,0 +1,17 @@
|
||||
import events from 'events';
|
||||
|
||||
export default class Job extends events.EventEmitter {
|
||||
constructor(queue, index, type, payload, options = {}) {
|
||||
super();
|
||||
|
||||
this.queue = queue;
|
||||
this.index = index;
|
||||
this.jobType = type;
|
||||
this.payload = payload;
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
getProp(name) {
|
||||
return this[name];
|
||||
}
|
||||
}
|
||||
13
test/fixtures/queue.js
vendored
Normal file
13
test/fixtures/queue.js
vendored
Normal file
@@ -0,0 +1,13 @@
|
||||
import events from 'events';
|
||||
|
||||
class MockQueue extends events.EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
}
|
||||
|
||||
setClient(client) {
|
||||
this.client = client;
|
||||
}
|
||||
}
|
||||
|
||||
export default MockQueue;
|
||||
16
test/fixtures/worker.js
vendored
Normal file
16
test/fixtures/worker.js
vendored
Normal file
@@ -0,0 +1,16 @@
|
||||
import events from 'events';
|
||||
|
||||
export default class Worker extends events.EventEmitter {
|
||||
constructor(queue, type, workerFn, opts = {}) {
|
||||
super();
|
||||
|
||||
this.queue = queue;
|
||||
this.type = type;
|
||||
this.workerFn = workerFn;
|
||||
this.options = opts;
|
||||
}
|
||||
|
||||
getProp(name) {
|
||||
return this[name];
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ import expect from 'expect.js';
|
||||
import proxyquire from 'proxyquire';
|
||||
import elasticsearchMock from '../../fixtures/elasticsearch';
|
||||
|
||||
const module = proxyquire.noPreserveCache()('../../../lib/helpers/es_client', {
|
||||
const module = proxyquire.noPreserveCache()('../../../lib/helpers/create_client', {
|
||||
elasticsearch: elasticsearchMock
|
||||
});
|
||||
|
||||
@@ -2,39 +2,98 @@ import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import createIndex from '../../../lib/helpers/create_index';
|
||||
import elasticsearchMock from '../../fixtures/elasticsearch';
|
||||
import { DEFAULT_SETTING_DOCTYPE } from '../../../lib/constants';
|
||||
|
||||
describe('Create Index', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
createSpy = sinon.spy(client.indices, 'create');
|
||||
});
|
||||
describe('Does not exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
it('should create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
createSpy = sinon.spy(client.indices, 'create');
|
||||
});
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(createSpy.getCall(0).args[0]).to.have.property('index', indexName);
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(createSpy.getCall(0).args[0]).to.have.property('index', indexName);
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the type mappings', function () {
|
||||
const indexName = 'test-index';
|
||||
const docType = DEFAULT_SETTING_DOCTYPE;
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
const payload = createSpy.getCall(0).args[0];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property(docType);
|
||||
expect(payload.body.mappings[docType]).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
|
||||
it('should accept a custom doctype', function () {
|
||||
const indexName = 'test-index';
|
||||
const docType = 'my_type';
|
||||
const result = createIndex(client, indexName, docType);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
const payload = createSpy.getCall(0).args[0];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property(docType);
|
||||
expect(payload.body.mappings[docType]).to.have.property('properties');
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the default mappings', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
describe('Does exist', function () {
|
||||
let client;
|
||||
let createSpy;
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
const payload = createSpy.getCall(0).args[0];
|
||||
sinon.assert.callCount(createSpy, 1);
|
||||
expect(payload).to.have.property('body');
|
||||
expect(payload.body).to.have.property('mappings');
|
||||
expect(payload.body.mappings).to.have.property('_default_');
|
||||
expect(payload.body.mappings._default_).to.have.property('properties');
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
sinon.stub(client.indices, 'exists', () => Promise.resolve(true));
|
||||
createSpy = sinon.spy(client.indices, 'create');
|
||||
});
|
||||
|
||||
it('should return true', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then((exists) => expect(exists).to.be(true));
|
||||
});
|
||||
|
||||
it('should not create the index', function () {
|
||||
const indexName = 'test-index';
|
||||
const result = createIndex(client, indexName);
|
||||
|
||||
return result
|
||||
.then(function () {
|
||||
sinon.assert.callCount(createSpy, 0);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
});
|
||||
|
||||
51
test/src/helpers/errors.js
Normal file
51
test/src/helpers/errors.js
Normal file
@@ -0,0 +1,51 @@
|
||||
import expect from 'expect.js';
|
||||
import { WorkerTimeoutError, UnspecifiedWorkerError } from '../../../lib/helpers/errors';
|
||||
|
||||
describe('custom errors', function () {
|
||||
describe('WorkerTimeoutError', function () {
|
||||
it('should be function', () => {
|
||||
expect(WorkerTimeoutError).to.be.a('function');
|
||||
});
|
||||
|
||||
it('should have a name', function () {
|
||||
const err = new WorkerTimeoutError('timeout error');
|
||||
expect(err).to.have.property('name', 'WorkerTimeoutError');
|
||||
});
|
||||
|
||||
it('should take a jobId property', function () {
|
||||
const err = new WorkerTimeoutError('timeout error', { jobId: 'il7hl34rqlo8ro' });
|
||||
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
|
||||
});
|
||||
|
||||
it('should take a timeout property', function () {
|
||||
const err = new WorkerTimeoutError('timeout error', { timeout: 15000 });
|
||||
expect(err).to.have.property('timeout', 15000);
|
||||
});
|
||||
|
||||
it('should be stringifyable', function () {
|
||||
const err = new WorkerTimeoutError('timeout error');
|
||||
expect(`${err}`).to.equal('WorkerTimeoutError: timeout error');
|
||||
});
|
||||
});
|
||||
|
||||
describe('UnspecifiedWorkerError', function () {
|
||||
it('should be function', () => {
|
||||
expect(UnspecifiedWorkerError).to.be.a('function');
|
||||
});
|
||||
|
||||
it('should have a name', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error');
|
||||
expect(err).to.have.property('name', 'UnspecifiedWorkerError');
|
||||
});
|
||||
|
||||
it('should take a jobId property', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error', { jobId: 'il7hl34rqlo8ro' });
|
||||
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
|
||||
});
|
||||
|
||||
it('should be stringifyable', function () {
|
||||
const err = new UnspecifiedWorkerError('unspecified error');
|
||||
expect(`${err}`).to.equal('UnspecifiedWorkerError: unspecified error');
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,22 +1,25 @@
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import constants from '../../../lib/constants';
|
||||
import indexTimestamp from '../../../lib/helpers/index_timestamp';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
|
||||
describe('Index interval', function () {
|
||||
describe('indexTimestamp construction', function () {
|
||||
describe('Index timestamp interval', function () {
|
||||
describe('construction', function () {
|
||||
it('should throw given an invalid interval', function () {
|
||||
const init = () => indexTimestamp('bananas');
|
||||
expect(init).to.throwException(/invalid.+interval/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('indexTimestamp timestamps', function () {
|
||||
describe('timestamps', function () {
|
||||
let clock;
|
||||
let separator;
|
||||
|
||||
beforeEach(function () {
|
||||
separator = constants.DEFAULT_SETTING_DATE_SEPARATOR;
|
||||
clock = sinon.useFakeTimers(moment(anchor).valueOf());
|
||||
});
|
||||
|
||||
@@ -24,34 +27,61 @@ describe('Index interval', function () {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
it('should return the year', function () {
|
||||
var timestamp = indexTimestamp('year');
|
||||
expect(timestamp).to.equal('2016');
|
||||
describe('formats', function () {
|
||||
it('should return the year', function () {
|
||||
const timestamp = indexTimestamp('year');
|
||||
const str = `2016`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year and month', function () {
|
||||
const timestamp = indexTimestamp('month');
|
||||
const str = `2016${separator}04`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and first day of the week', function () {
|
||||
const timestamp = indexTimestamp('week');
|
||||
const str = `2016${separator}03${separator}27`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, and day of the week', function () {
|
||||
const timestamp = indexTimestamp('day');
|
||||
const str = `2016${separator}04${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day and hour', function () {
|
||||
const timestamp = indexTimestamp('hour');
|
||||
const str = `2016${separator}04${separator}02${separator}01`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
|
||||
it('should return the year, month, day, hour and minute', function () {
|
||||
const timestamp = indexTimestamp('minute');
|
||||
const str = `2016${separator}04${separator}02${separator}01${separator}02`;
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
it('should return the year and month', function () {
|
||||
var timestamp = indexTimestamp('month');
|
||||
expect(timestamp).to.equal('2016-04');
|
||||
});
|
||||
describe('date separator', function () {
|
||||
it('should be customizable', function () {
|
||||
const separators = ['-', '.', '_'];
|
||||
separators.forEach(customSep => {
|
||||
const str = `2016${customSep}04${customSep}02${customSep}01${customSep}02`;
|
||||
const timestamp = indexTimestamp('minute', customSep);
|
||||
expect(timestamp).to.equal(str);
|
||||
});
|
||||
});
|
||||
|
||||
it('should return the year, month, and first day of the week', function () {
|
||||
var timestamp = indexTimestamp('week');
|
||||
expect(timestamp).to.equal('2016-03-27');
|
||||
});
|
||||
|
||||
it('should return the year, month, and day of the week', function () {
|
||||
var timestamp = indexTimestamp('day');
|
||||
expect(timestamp).to.equal('2016-04-02');
|
||||
});
|
||||
|
||||
it('should return the year, month, day and hour', function () {
|
||||
var timestamp = indexTimestamp('hour');
|
||||
expect(timestamp).to.equal('2016-04-02-01');
|
||||
});
|
||||
|
||||
it('should return the year, month, day, hour and minute', function () {
|
||||
var timestamp = indexTimestamp('minute');
|
||||
expect(timestamp).to.equal('2016-04-02-01-02');
|
||||
it('should throw if a letter is used', function () {
|
||||
const separators = ['a', 'B', 'YYYY'];
|
||||
separators.forEach(customSep => {
|
||||
const fn = () => indexTimestamp('minute', customSep);
|
||||
expect(fn).to.throwException();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
48
test/src/helpers/is_plain_object.js
Normal file
48
test/src/helpers/is_plain_object.js
Normal file
@@ -0,0 +1,48 @@
|
||||
import expect from 'expect.js';
|
||||
import isPlainObject from '../../../lib/helpers/is_plain_object';
|
||||
|
||||
function validateItems(checks, pass) {
|
||||
checks.forEach(check => {
|
||||
expect(isPlainObject(check)).to.be(pass);
|
||||
});
|
||||
}
|
||||
|
||||
describe('isPlainObject', function () {
|
||||
describe('non-object primitives', function () {
|
||||
it('return false', function () {
|
||||
const checks = [
|
||||
100,
|
||||
true,
|
||||
'i am a string',
|
||||
function noop() {},
|
||||
null,
|
||||
];
|
||||
|
||||
validateItems(checks, false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('arrays', function () {
|
||||
it('return false', function () {
|
||||
const checks = [
|
||||
[],
|
||||
[1,2,3],
|
||||
['just a string'],
|
||||
];
|
||||
|
||||
validateItems(checks, false);
|
||||
});
|
||||
});
|
||||
|
||||
describe('', function () {
|
||||
it('return true', function () {
|
||||
const checks = [
|
||||
{},
|
||||
{one:1},
|
||||
{object:{with:{array:[]}}},
|
||||
];
|
||||
|
||||
validateItems(checks, true);
|
||||
});
|
||||
});
|
||||
});
|
||||
37
test/src/helpers/omit.js
Normal file
37
test/src/helpers/omit.js
Normal file
@@ -0,0 +1,37 @@
|
||||
import expect from 'expect.js';
|
||||
import objectOmit from '../../../lib/helpers/object_omit';
|
||||
|
||||
describe('object omit', function () {
|
||||
let obj = {};
|
||||
|
||||
beforeEach(() => {
|
||||
obj = {
|
||||
one: 1,
|
||||
two: 2,
|
||||
three: 3,
|
||||
arr: [1,2,3],
|
||||
check: 'aw yeah',
|
||||
};
|
||||
});
|
||||
|
||||
it('omits a single property', function () {
|
||||
const val = objectOmit(obj, 'one');
|
||||
|
||||
expect(val).to.eql({
|
||||
two: 2,
|
||||
three: 3,
|
||||
arr: [1,2,3],
|
||||
check: 'aw yeah',
|
||||
});
|
||||
});
|
||||
|
||||
it('omits multiple properties', function () {
|
||||
const val = objectOmit(obj, ['three', 'check']);
|
||||
|
||||
expect(val).to.eql({
|
||||
one: 1,
|
||||
two: 2,
|
||||
arr: [1,2,3],
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -1,11 +1,19 @@
|
||||
import events from 'events';
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import { noop, times } from 'lodash';
|
||||
import constants from '../../lib/constants';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import Esqueue from '../../lib/index';
|
||||
import jobMock from '../fixtures/job';
|
||||
import workerMock from '../fixtures/worker';
|
||||
|
||||
describe('Esqueue class', function () {
|
||||
const Queue = proxyquire.noPreserveCache()('../../lib/index', {
|
||||
'./job.js': jobMock,
|
||||
'./worker.js': workerMock,
|
||||
});
|
||||
|
||||
describe('Elastiq class', function () {
|
||||
let client;
|
||||
|
||||
beforeEach(function () {
|
||||
@@ -13,18 +21,18 @@ describe('Esqueue class', function () {
|
||||
});
|
||||
|
||||
it('should be an event emitter', function () {
|
||||
const queue = new Esqueue('esqueue', { client });
|
||||
const queue = new Queue('elastiq', { client });
|
||||
expect(queue).to.be.an(events.EventEmitter);
|
||||
});
|
||||
|
||||
describe('Option validation', function () {
|
||||
it('should throw without an index', function () {
|
||||
const init = () => new Esqueue();
|
||||
const init = () => new Queue();
|
||||
expect(init).to.throwException(/must.+specify.+index/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid host', function () {
|
||||
const init = () => new Esqueue('esqueue', {
|
||||
const init = () => new Queue('elastiq', {
|
||||
client: { host: 'nope://nope' }
|
||||
});
|
||||
|
||||
@@ -32,7 +40,7 @@ describe('Esqueue class', function () {
|
||||
});
|
||||
|
||||
it('should throw with invalid hosts', function () {
|
||||
const init = () => new Esqueue('esqueue', {
|
||||
const init = () => new Queue('elastiq', {
|
||||
client: { hosts: [{ host: 'localhost', protocol: 'nope' }] }
|
||||
});
|
||||
|
||||
@@ -43,14 +51,76 @@ describe('Esqueue class', function () {
|
||||
describe('Queue construction', function () {
|
||||
it('should ping the ES server', function () {
|
||||
const pingSpy = sinon.spy(client, 'ping');
|
||||
new Esqueue('esqueue', { client });
|
||||
new Queue('elastiq', { client });
|
||||
sinon.assert.calledOnce(pingSpy);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Adding jobs', function () {
|
||||
let indexName;
|
||||
let jobType;
|
||||
let payload;
|
||||
let queue;
|
||||
|
||||
beforeEach(function () {
|
||||
indexName = 'elastiq-index';
|
||||
jobType = 'test-test';
|
||||
payload = { payload: true };
|
||||
queue = new Queue(indexName, { client });
|
||||
});
|
||||
|
||||
it('should throw with invalid dateSeparator setting', function () {
|
||||
queue = new Queue(indexName, { client, dateSeparator: 'a' });
|
||||
const fn = () => queue.addJob(jobType, payload);
|
||||
expect(fn).to.throwException();
|
||||
});
|
||||
|
||||
it('should pass queue instance, index name, type and payload', function () {
|
||||
const job = queue.addJob(jobType, payload);
|
||||
expect(job.getProp('queue')).to.equal(queue);
|
||||
expect(job.getProp('index')).to.match(new RegExp(indexName));
|
||||
expect(job.getProp('jobType')).to.equal(jobType);
|
||||
expect(job.getProp('payload')).to.equal(payload);
|
||||
});
|
||||
|
||||
it('should pass default settings', function () {
|
||||
const job = queue.addJob(jobType, payload);
|
||||
const options = job.getProp('options');
|
||||
expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT);
|
||||
expect(options).to.have.property('doctype', constants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
|
||||
it('should pass queue index settings', function () {
|
||||
const indexSettings = {
|
||||
index: {
|
||||
number_of_shards: 1
|
||||
}
|
||||
};
|
||||
|
||||
queue = new Queue(indexName, { client, indexSettings });
|
||||
const job = queue.addJob(jobType, payload);
|
||||
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
|
||||
});
|
||||
|
||||
it('should pass headers from options', function () {
|
||||
const options = {
|
||||
headers: {
|
||||
authorization: 'Basic cXdlcnR5'
|
||||
}
|
||||
};
|
||||
const job = queue.addJob(jobType, payload, options);
|
||||
expect(job.getProp('options')).to.have.property('headers', options.headers);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Registering workers', function () {
|
||||
let queue;
|
||||
|
||||
beforeEach(function () {
|
||||
queue = new Queue('elastiq', { client });
|
||||
});
|
||||
|
||||
it('should keep track of workers', function () {
|
||||
const queue = new Esqueue('esqueue', { client });
|
||||
expect(queue.getWorkers()).to.eql([]);
|
||||
expect(queue.getWorkers()).to.have.length(0);
|
||||
|
||||
@@ -59,11 +129,32 @@ describe('Esqueue class', function () {
|
||||
queue.registerWorker('test2', noop);
|
||||
expect(queue.getWorkers()).to.have.length(3);
|
||||
});
|
||||
|
||||
it('should pass instance of queue, type, and worker function', function () {
|
||||
const workerType = 'test-worker';
|
||||
const workerFn = () => true;
|
||||
|
||||
const worker = queue.registerWorker(workerType, workerFn);
|
||||
expect(worker.getProp('queue')).to.equal(queue);
|
||||
expect(worker.getProp('type')).to.equal(workerType);
|
||||
expect(worker.getProp('workerFn')).to.equal(workerFn);
|
||||
});
|
||||
|
||||
it('should pass worker options', function () {
|
||||
const workerOptions = {
|
||||
size: 12,
|
||||
doctype: 'tests'
|
||||
};
|
||||
|
||||
queue = new Queue('elastiq', { client });
|
||||
const worker = queue.registerWorker('type', noop, workerOptions);
|
||||
expect(worker.getProp('options')).to.equal(workerOptions);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Destroy', function () {
|
||||
it('should destroy workers', function () {
|
||||
const queue = new Esqueue('esqueue', { client });
|
||||
const queue = new Queue('elastiq', { client });
|
||||
const stubs = times(3, () => { return { destroy: sinon.stub() }; });
|
||||
stubs.forEach((stub) => queue._workers.push(stub));
|
||||
expect(queue.getWorkers()).to.have.length(3);
|
||||
|
||||
365
test/src/job.js
365
test/src/job.js
@@ -2,10 +2,11 @@ import events from 'events';
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import proxyquire from 'proxyquire';
|
||||
import QueueMock from '../fixtures/queue';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import { JOB_STATUS_PENDING } from '../../lib/helpers/constants';
|
||||
import contstants from '../../lib/constants';
|
||||
|
||||
const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
|
||||
const createIndexMock = sinon.stub();
|
||||
const module = proxyquire.noPreserveCache()('../../lib/job', {
|
||||
'./helpers/create_index': createIndexMock
|
||||
});
|
||||
@@ -14,8 +15,16 @@ const Job = module;
|
||||
const maxPriority = 20;
|
||||
const minPriority = -20;
|
||||
const defaultPriority = 10;
|
||||
const defaultCreatedBy = false;
|
||||
|
||||
function validateDoc(spy) {
|
||||
sinon.assert.callCount(spy, 1);
|
||||
const spyCall = spy.getCall(0);
|
||||
return spyCall.args[0];
|
||||
}
|
||||
|
||||
describe('Job Class', function () {
|
||||
let mockQueue;
|
||||
let client;
|
||||
let index;
|
||||
|
||||
@@ -25,121 +34,317 @@ describe('Job Class', function () {
|
||||
|
||||
beforeEach(function () {
|
||||
createIndexMock.reset();
|
||||
createIndexMock.returns(Promise.resolve('mock'));
|
||||
index = 'test';
|
||||
|
||||
client = new elasticsearchMock.Client();
|
||||
mockQueue = new QueueMock();
|
||||
mockQueue.setClient(client);
|
||||
});
|
||||
|
||||
it('should be an event emitter', function () {
|
||||
const job = new Job(client, index, 'test', {});
|
||||
const job = new Job(mockQueue, index, 'test', {});
|
||||
expect(job).to.be.an(events.EventEmitter);
|
||||
});
|
||||
|
||||
describe('invalid construction', function () {
|
||||
it('should throw with a missing type', function () {
|
||||
const init = () => new Job(client, index);
|
||||
const init = () => new Job(mockQueue, index);
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid type', function () {
|
||||
const init = () => new Job(client, index, { 'not a string': true });
|
||||
const init = () => new Job(mockQueue, index, { 'not a string': true });
|
||||
expect(init).to.throwException(/type.+string/i);
|
||||
});
|
||||
|
||||
it('should throw with an invalid payload', function () {
|
||||
const init = () => new Job(client, index, 'type1', [1, 2, 3]);
|
||||
const init = () => new Job(mockQueue, index, 'type1', [1, 2, 3]);
|
||||
expect(init).to.throwException(/plain.+object/i);
|
||||
});
|
||||
});
|
||||
|
||||
describe('construction', function () {
|
||||
function validateDoc(spy) {
|
||||
sinon.assert.callCount(spy, 1);
|
||||
const spyCall = spy.getCall(0);
|
||||
return spyCall.args[0];
|
||||
}
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
const args = createIndexMock.getCall(0).args;
|
||||
expect(args[0]).to.equal(client);
|
||||
expect(args[1]).to.equal(index);
|
||||
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the job type', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('index', index);
|
||||
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(indexArgs).to.have.property('body');
|
||||
expect(indexArgs.body).to.have.property('jobtype', type);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set event creation time', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_at');
|
||||
});
|
||||
});
|
||||
|
||||
it('should refresh the index', function () {
|
||||
const refreshSpy = sinon.spy(client.indices, 'refresh');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(refreshSpy);
|
||||
const spyCall = refreshSpy.getCall(0);
|
||||
expect(spyCall.args[0]).to.have.property('index', index);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit the job information on success', function (done) {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
job.once(contstants.EVENT_JOB_CREATED, (jobDoc) => {
|
||||
try {
|
||||
expect(jobDoc).to.have.property('id');
|
||||
expect(jobDoc).to.have.property('index');
|
||||
expect(jobDoc).to.have.property('type');
|
||||
expect(jobDoc).to.have.property('version');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on index creation failure', function (done) {
|
||||
const errMsg = 'test index creation failure';
|
||||
|
||||
createIndexMock.returns(Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit error on client index failure', function (done) {
|
||||
const errMsg = 'test document index failure';
|
||||
|
||||
client.index.restore();
|
||||
sinon.stub(client, 'index', () => Promise.reject(new Error(errMsg)));
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
|
||||
try {
|
||||
expect(err.message).to.equal(errMsg);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emitting', function () {
|
||||
it('should trigger events on the queue instance', function (done) {
|
||||
const eventName = 'test event';
|
||||
const payload1 = {
|
||||
test: true,
|
||||
deep: { object: 'ok' }
|
||||
};
|
||||
const payload2 = 'two';
|
||||
const payload3 = new Error('test error');
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
mockQueue.on(eventName, (...args) => {
|
||||
try {
|
||||
expect(args[0]).to.equal(payload1);
|
||||
expect(args[1]).to.equal(payload2);
|
||||
expect(args[2]).to.equal(payload3);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
job.emit(eventName, payload1, payload2, payload3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('default values', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should set attempt count to 0', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('attempts', 0);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index default created_by value', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_by', defaultCreatedBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set an expired process_expiration time', function () {
|
||||
const now = new Date().getTime();
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('process_expiration');
|
||||
expect(indexArgs.body.process_expiration.getTime()).to.be.lessThan(now);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status as pending', function () {
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('status', contstants.JOB_STATUS_PENDING);
|
||||
});
|
||||
});
|
||||
|
||||
it('should have a default priority of 10', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', defaultPriority);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('option passing', function () {
|
||||
beforeEach(function () {
|
||||
type = 'type1';
|
||||
payload = { id: '123' };
|
||||
options = {
|
||||
timeout: 4567,
|
||||
max_attempts: 9,
|
||||
headers: {
|
||||
authorization: 'Basic cXdlcnR5'
|
||||
}
|
||||
};
|
||||
sinon.spy(client, 'index');
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
it('should index the created_by value', function () {
|
||||
const createdBy = 'user_identifier';
|
||||
const job = new Job(mockQueue, index, type, payload, Object.assign({ created_by: createdBy }, options));
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', type);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('payload', payload);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('created_by', createdBy);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index timeout value from options', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('timeout', options.timeout);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('timeout', options.timeout);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set event times', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
it('should set max attempt count', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('created_at');
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('max_attempts', options.max_attempts);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set attempt count', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
it('should add headers to the request params', function () {
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('attempts', 0);
|
||||
expect(newDoc.body).to.have.property('max_attempts', options.max_attempts);
|
||||
});
|
||||
});
|
||||
|
||||
it('should set status as pending', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING);
|
||||
});
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
});
|
||||
});
|
||||
|
||||
it('should have a default priority of 10', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', defaultPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs).to.have.property('headers', options.headers);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use upper priority of ${maxPriority}`, function () {
|
||||
const job = new Job(client, index, type, payload, { priority: maxPriority * 2 });
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: maxPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', maxPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', maxPriority);
|
||||
});
|
||||
});
|
||||
|
||||
it(`should use lower priority of ${minPriority}`, function () {
|
||||
const job = new Job(client, index, type, payload, { priority: minPriority * 2 });
|
||||
const job = new Job(mockQueue, index, type, payload, { priority: minPriority * 2 });
|
||||
return job.ready.then(() => {
|
||||
const newDoc = validateDoc(client.index);
|
||||
expect(newDoc.body).to.have.property('priority', minPriority);
|
||||
const indexArgs = validateDoc(client.index);
|
||||
expect(indexArgs.body).to.have.property('priority', minPriority);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('custom client', function () {
|
||||
let newClient;
|
||||
let job;
|
||||
|
||||
beforeEach(function () {
|
||||
sinon.spy(client, 'index');
|
||||
|
||||
newClient = new elasticsearchMock.Client();
|
||||
sinon.spy(newClient, 'index');
|
||||
job = new Job(mockQueue, index, type, payload, Object.assign({ client: newClient }, options));
|
||||
});
|
||||
|
||||
it('should create the target index', function () {
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.calledOnce(createIndexMock);
|
||||
const args = createIndexMock.getCall(0).args;
|
||||
expect(args[0]).to.equal(newClient);
|
||||
expect(args[1]).to.equal(index);
|
||||
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
});
|
||||
|
||||
it('should index the payload', function () {
|
||||
return job.ready.then(() => {
|
||||
sinon.assert.callCount(client.index, 0);
|
||||
sinon.assert.callCount(newClient.index, 1);
|
||||
|
||||
const newDoc = newClient.index.getCall(0).args[0];
|
||||
expect(newDoc).to.have.property('index', index);
|
||||
expect(newDoc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(newDoc).to.have.property('body');
|
||||
expect(newDoc.body).to.have.property('payload', payload);
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -151,19 +356,41 @@ describe('Job Class', function () {
|
||||
});
|
||||
|
||||
it('should return the job document', function () {
|
||||
const job = new Job(client, index, type, payload);
|
||||
const job = new Job(mockQueue, index, type, payload);
|
||||
|
||||
return job.get()
|
||||
.then((doc) => {
|
||||
const jobDoc = job.document; // document should be resolved
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('type', type);
|
||||
expect(doc).to.have.property('type', jobDoc.type);
|
||||
expect(doc).to.have.property('id', jobDoc.id);
|
||||
expect(doc).to.have.property('version', jobDoc.version);
|
||||
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||
|
||||
expect(doc).to.have.property('payload');
|
||||
expect(doc).to.have.property('jobtype');
|
||||
expect(doc).to.have.property('priority');
|
||||
expect(doc).to.have.property('timeout');
|
||||
});
|
||||
});
|
||||
|
||||
it('should contain optional data', function () {
|
||||
const optionals = {
|
||||
created_by: 'some_ident'
|
||||
};
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
return Promise.resolve(client.get({}, optionals))
|
||||
.then((doc) => {
|
||||
sinon.stub(client, 'get').returns(Promise.resolve(doc));
|
||||
})
|
||||
.then(() => {
|
||||
return job.get()
|
||||
.then((doc) => {
|
||||
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('toJSON method', function () {
|
||||
@@ -178,20 +405,32 @@ describe('Job Class', function () {
|
||||
});
|
||||
|
||||
it('should return the static information about the job', function () {
|
||||
const job = new Job(client, index, type, payload, options);
|
||||
const job = new Job(mockQueue, index, type, payload, options);
|
||||
|
||||
// toJSON is sync, should work before doc is written to elasticsearch
|
||||
expect(job.document).to.be(undefined);
|
||||
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('index', index);
|
||||
expect(doc).to.have.property('type', type);
|
||||
expect(doc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
|
||||
expect(doc).to.have.property('jobtype', type);
|
||||
expect(doc).to.have.property('created_by', defaultCreatedBy);
|
||||
expect(doc).to.have.property('timeout', options.timeout);
|
||||
expect(doc).to.have.property('max_attempts', options.max_attempts);
|
||||
expect(doc).to.have.property('priority', options.priority);
|
||||
expect(doc).to.have.property('id');
|
||||
expect(doc).to.not.have.property('version');
|
||||
});
|
||||
|
||||
it('should contain optional data', function () {
|
||||
const optionals = {
|
||||
created_by: 'some_ident'
|
||||
};
|
||||
|
||||
const job = new Job(mockQueue, index, type, payload, optionals);
|
||||
const doc = job.toJSON();
|
||||
expect(doc).to.have.property('created_by', optionals.created_by);
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import expect from 'expect.js';
|
||||
import sinon from 'sinon';
|
||||
import moment from 'moment';
|
||||
import { noop, random } from 'lodash';
|
||||
import Worker from '../../lib/worker';
|
||||
import { noop, random, get, find } from 'lodash';
|
||||
import elasticsearchMock from '../fixtures/elasticsearch';
|
||||
import { JOB_STATUS_PROCESSING, JOB_STATUS_COMPLETED, JOB_STATUS_FAILED } from '../../lib/helpers/constants';
|
||||
import QueueMock from '../fixtures/queue';
|
||||
import Worker from '../../lib/worker';
|
||||
import constants from '../../lib/constants';
|
||||
|
||||
const anchor = '2016-04-02T01:02:03.456'; // saturday
|
||||
const defaults = {
|
||||
@@ -23,9 +24,8 @@ describe('Worker class', function () {
|
||||
|
||||
beforeEach(function () {
|
||||
client = new elasticsearchMock.Client();
|
||||
mockQueue = {
|
||||
client: client
|
||||
};
|
||||
mockQueue = new QueueMock();
|
||||
mockQueue.setClient(client);
|
||||
});
|
||||
|
||||
describe('invalid construction', function () {
|
||||
@@ -51,6 +51,20 @@ describe('Worker class', function () {
|
||||
});
|
||||
|
||||
describe('construction', function () {
|
||||
it('should assign internal properties', function () {
|
||||
const jobtype = 'testjob';
|
||||
const workerFn = noop;
|
||||
const worker = new Worker(mockQueue, jobtype, workerFn);
|
||||
expect(worker).to.have.property('id');
|
||||
expect(worker).to.have.property('queue', mockQueue);
|
||||
expect(worker).to.have.property('client', client);
|
||||
expect(worker).to.have.property('jobtype', jobtype);
|
||||
expect(worker).to.have.property('workerFn', workerFn);
|
||||
expect(worker).to.have.property('checkInterval');
|
||||
expect(worker).to.have.property('checkSize');
|
||||
expect(worker).to.have.property('doctype');
|
||||
});
|
||||
|
||||
it('should have a unique ID', function () {
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
expect(worker.id).to.be.a('string');
|
||||
@@ -60,6 +74,45 @@ describe('Worker class', function () {
|
||||
|
||||
expect(worker.id).to.not.equal(worker2.id);
|
||||
});
|
||||
|
||||
it('should use custom client', function () {
|
||||
const newClient = new elasticsearchMock.Client();
|
||||
const worker = new Worker(mockQueue, 'test', noop, { client: newClient });
|
||||
expect(worker).to.have.property('queue', mockQueue);
|
||||
expect(worker).to.have.property('client', newClient);
|
||||
expect(worker.client).to.not.equal(client);
|
||||
});
|
||||
});
|
||||
|
||||
describe('event emitting', function () {
|
||||
let worker;
|
||||
|
||||
beforeEach(function () {
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
});
|
||||
|
||||
it('should trigger events on the queue instance', function (done) {
|
||||
const eventName = 'test event';
|
||||
const payload1 = {
|
||||
test: true,
|
||||
deep: { object: 'ok' }
|
||||
};
|
||||
const payload2 = 'two';
|
||||
const payload3 = new Error('test error');
|
||||
|
||||
mockQueue.on(eventName, (...args) => {
|
||||
try {
|
||||
expect(args[0]).to.equal(payload1);
|
||||
expect(args[1]).to.equal(payload2);
|
||||
expect(args[2]).to.equal(payload3);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker.emit(eventName, payload1, payload2, payload3);
|
||||
});
|
||||
});
|
||||
|
||||
describe('output formatting', function () {
|
||||
@@ -105,10 +158,13 @@ describe('Worker class', function () {
|
||||
});
|
||||
});
|
||||
|
||||
describe('searching for jobs', function () {
|
||||
describe('polling for jobs', function () {
|
||||
let searchSpy;
|
||||
|
||||
beforeEach(() => {
|
||||
anchorMoment = moment(anchor);
|
||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||
searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -116,7 +172,6 @@ describe('Worker class', function () {
|
||||
});
|
||||
|
||||
it('should start polling for jobs after interval', function () {
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop);
|
||||
sinon.assert.notCalled(searchSpy);
|
||||
clock.tick(defaults.interval);
|
||||
@@ -125,28 +180,203 @@ describe('Worker class', function () {
|
||||
|
||||
it('should use interval option to control polling', function () {
|
||||
const interval = 567;
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop, { interval });
|
||||
sinon.assert.notCalled(searchSpy);
|
||||
clock.tick(interval);
|
||||
sinon.assert.calledOnce(searchSpy);
|
||||
});
|
||||
|
||||
it('should use default size', function () {
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop);
|
||||
it('should not poll once destroyed', function () {
|
||||
// remove the search spy
|
||||
mockQueue.client.search.restore();
|
||||
|
||||
// mock the search, return 0 new jobs
|
||||
const zeroHits = { hits: { hits: [] } };
|
||||
const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits));
|
||||
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
|
||||
function waitForSearch() {
|
||||
return new Promise((resolve) => {
|
||||
worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => {
|
||||
resolve()
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
function waitForPoller() {
|
||||
return new Promise((resolve) => {
|
||||
worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => {
|
||||
resolve()
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// move the clock a couple times, test for searches each time
|
||||
sinon.assert.notCalled(searchStub);
|
||||
|
||||
const firstWait = waitForSearch();
|
||||
clock.tick(defaults.interval);
|
||||
const body = searchSpy.firstCall.args[0].body;
|
||||
expect(body).to.have.property('size', defaults.size);
|
||||
|
||||
return firstWait
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(searchStub);
|
||||
return waitForPoller();
|
||||
})
|
||||
.then(() => {
|
||||
const secondWait = waitForSearch();
|
||||
clock.tick(defaults.interval);
|
||||
return secondWait;
|
||||
})
|
||||
.then(() => {
|
||||
sinon.assert.calledTwice(searchStub);
|
||||
return waitForPoller();
|
||||
})
|
||||
.then(() => {
|
||||
// destroy the worker, move the clock, make sure another search doesn't happen
|
||||
worker.destroy();
|
||||
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledTwice(searchStub);
|
||||
|
||||
// manually call job poller, move the clock, make sure another search doesn't happen
|
||||
worker._startJobPolling();
|
||||
clock.tick(defaults.interval);
|
||||
sinon.assert.calledTwice(searchStub);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('query for pending jobs', function () {
|
||||
let worker;
|
||||
let searchStub;
|
||||
|
||||
function getSearchParams(jobtype = 'test', params = {}) {
|
||||
worker = new Worker(mockQueue, jobtype, noop, params);
|
||||
worker._getPendingJobs();
|
||||
return searchStub.firstCall.args[0];
|
||||
}
|
||||
|
||||
describe('error handling', function () {
|
||||
beforeEach(() => {
|
||||
});
|
||||
|
||||
it('should pass search errors', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then(() => done(new Error('should not resolve')))
|
||||
.catch(() => { done(); });
|
||||
});
|
||||
|
||||
it('should swollow index missing errors', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||
status: 404
|
||||
}));
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then(() => { done(); })
|
||||
.catch(() => done(new Error('should not reject')));
|
||||
});
|
||||
|
||||
it('should return an empty array on missing index', function (done) {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
|
||||
status: 404
|
||||
}));
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
worker._getPendingJobs()
|
||||
.then((res) => {
|
||||
try {
|
||||
expect(res).to.be.an(Array);
|
||||
expect(res).to.have.length(0);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
})
|
||||
.catch(() => done(new Error('should not reject')));
|
||||
});
|
||||
});
|
||||
|
||||
it('should observe the size option', function () {
|
||||
const size = 25;
|
||||
const searchSpy = sinon.spy(mockQueue.client, 'search');
|
||||
new Worker(mockQueue, 'test', noop, { size });
|
||||
clock.tick(defaults.interval);
|
||||
const body = searchSpy.firstCall.args[0].body;
|
||||
expect(body).to.have.property('size', size);
|
||||
describe('query parameters', function () {
|
||||
beforeEach(() => {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||
});
|
||||
|
||||
it('should query with version', function () {
|
||||
const params = getSearchParams();
|
||||
expect(params).to.have.property('version', true);
|
||||
});
|
||||
|
||||
it('should query by default doctype', function () {
|
||||
const params = getSearchParams();
|
||||
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
|
||||
});
|
||||
|
||||
it('should query by custom doctype', function () {
|
||||
const doctype = 'custom_test';
|
||||
const params = getSearchParams('type', { doctype });
|
||||
expect(params).to.have.property('type', doctype);
|
||||
});
|
||||
});
|
||||
|
||||
describe('query body', function () {
|
||||
const conditionPath = 'query.constant_score.filter.bool';
|
||||
const jobtype = 'test_jobtype';
|
||||
|
||||
beforeEach(() => {
|
||||
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
|
||||
anchorMoment = moment(anchor);
|
||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
it('should filter unwanted source data', function () {
|
||||
const excludedFields = [ 'output.content' ];
|
||||
const { body } = getSearchParams(jobtype);
|
||||
expect(body).to.have.property('_source');
|
||||
expect(body._source).to.eql({ excludes: excludedFields });
|
||||
});
|
||||
|
||||
it('should search by job type', function () {
|
||||
const { body } = getSearchParams(jobtype);
|
||||
const conditions = get(body, conditionPath);
|
||||
expect(conditions.filter).to.eql({ term: { jobtype: jobtype } });
|
||||
});
|
||||
|
||||
it('should search for pending or expired jobs', function () {
|
||||
const { body } = getSearchParams(jobtype);
|
||||
const conditions = get(body, conditionPath);
|
||||
expect(conditions).to.have.property('should');
|
||||
|
||||
// this works because we are stopping the clock, so all times match
|
||||
const nowTime = moment().toISOString();
|
||||
const pending = { term: { status: 'pending'} };
|
||||
const expired = { bool: { filter: [
|
||||
{ term: { status: 'processing' } },
|
||||
{ range: { process_expiration: { lte: nowTime } } }
|
||||
] } };
|
||||
|
||||
const pendingMatch = find(conditions.should, pending);
|
||||
expect(pendingMatch).to.not.be(undefined);
|
||||
|
||||
const expiredMatch = find(conditions.should, expired);
|
||||
expect(expiredMatch).to.not.be(undefined);
|
||||
});
|
||||
|
||||
it('should use default size', function () {
|
||||
const { body } = getSearchParams(jobtype);
|
||||
expect(body).to.have.property('size', defaults.size);
|
||||
});
|
||||
|
||||
it('should observe the size option', function () {
|
||||
const size = 25;
|
||||
const { body } = getSearchParams(jobtype, { size });
|
||||
expect(body).to.have.property('size', size);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -166,9 +396,12 @@ describe('Worker class', function () {
|
||||
id: 12345,
|
||||
version: 3
|
||||
};
|
||||
job = mockQueue.client.get(params);
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
return mockQueue.client.get(params)
|
||||
.then((jobDoc) => {
|
||||
job = jobDoc;
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
@@ -193,7 +426,7 @@ describe('Worker class', function () {
|
||||
it('should update the job status', function () {
|
||||
worker._claimJob(job);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_PROCESSING);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_PROCESSING);
|
||||
});
|
||||
|
||||
it('should set job expiration time', function () {
|
||||
@@ -229,10 +462,36 @@ describe('Worker class', function () {
|
||||
expect(msg).to.equal(false);
|
||||
});
|
||||
|
||||
it('should swallow version mismatch errors', function () {
|
||||
it('should return true on version errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
|
||||
return worker._claimJob(job);
|
||||
return worker._claimJob(job)
|
||||
.then((res) => expect(res).to.equal(true));
|
||||
});
|
||||
|
||||
it('should return false on other errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
return worker._claimJob(job)
|
||||
.then((res) => expect(res).to.equal(false));
|
||||
});
|
||||
|
||||
it('should emit on other errors', function (done) {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
|
||||
worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('statusCode', 401);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
worker._claimJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -245,16 +504,18 @@ describe('Worker class', function () {
|
||||
anchorMoment = moment(anchor);
|
||||
clock = sinon.useFakeTimers(anchorMoment.valueOf());
|
||||
|
||||
job = mockQueue.client.get();
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
return mockQueue.client.get()
|
||||
.then((jobDoc) => {
|
||||
job = jobDoc;
|
||||
worker = new Worker(mockQueue, 'test', noop);
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
clock.restore();
|
||||
});
|
||||
|
||||
|
||||
it('should use version on update', function () {
|
||||
worker._failJob(job);
|
||||
const query = updateSpy.firstCall.args[0];
|
||||
@@ -267,7 +528,7 @@ describe('Worker class', function () {
|
||||
it('should set status to failed', function () {
|
||||
worker._failJob(job);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
|
||||
});
|
||||
|
||||
it('should append error message if supplied', function () {
|
||||
@@ -278,13 +539,21 @@ describe('Worker class', function () {
|
||||
expect(doc.output).to.have.property('content', msg);
|
||||
});
|
||||
|
||||
it('should swallow version mismatch errors', function () {
|
||||
it('should return true on version mismatch errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
|
||||
return worker._failJob(job);
|
||||
return worker._failJob(job)
|
||||
.then((res) => expect(res).to.equal(true));
|
||||
});
|
||||
|
||||
it('should set completed time and status to failed', function () {
|
||||
it('should return false on other docuemnt update errors', function () {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
return worker._failJob(job)
|
||||
.then((res) => expect(res).to.equal(false));
|
||||
});
|
||||
|
||||
it('should set completed time and status to failure', function () {
|
||||
const startTime = moment().valueOf();
|
||||
const msg = 'test message';
|
||||
clock.tick(100);
|
||||
@@ -292,11 +561,44 @@ describe('Worker class', function () {
|
||||
worker._failJob(job, msg);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('output');
|
||||
expect(doc).to.have.property('status', JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_FAILED);
|
||||
expect(doc).to.have.property('completed_at');
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
});
|
||||
|
||||
it('should emit worker failure event', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_FAIL, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('output');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
return worker._failJob(job);
|
||||
});
|
||||
|
||||
it('should emit on other docuemnt update errors', function (done) {
|
||||
mockQueue.client.update.restore();
|
||||
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
|
||||
|
||||
worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('statusCode', 401);
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
worker._failJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('performing a job', function () {
|
||||
@@ -308,109 +610,305 @@ describe('Worker class', function () {
|
||||
payload = {
|
||||
value: random(0, 100, true)
|
||||
};
|
||||
job = mockQueue.client.get({}, { payload });
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
});
|
||||
|
||||
it('should call the workerFn with the payload', function (done) {
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
worker._performJob(job)
|
||||
.then(() => done());
|
||||
});
|
||||
|
||||
it('should update the job with the workerFn output', function () {
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
return payload;
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(updateSpy);
|
||||
const query = updateSpy.firstCall.args[0];
|
||||
expect(query).to.have.property('index', job._index);
|
||||
expect(query).to.have.property('type', job._type);
|
||||
expect(query).to.have.property('id', job._id);
|
||||
expect(query).to.have.property('version', job._version);
|
||||
expect(query.body.doc).to.have.property('output');
|
||||
expect(query.body.doc.output).to.have.property('content_type', false);
|
||||
expect(query.body.doc.output).to.have.property('content', payload);
|
||||
return mockQueue.client.get({}, { payload })
|
||||
.then((jobDoc) => {
|
||||
job = jobDoc;
|
||||
updateSpy = sinon.spy(mockQueue.client, 'update');
|
||||
});
|
||||
});
|
||||
|
||||
it('should update the job status and completed time', function () {
|
||||
const startTime = moment().valueOf();
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(() => resolve(payload), 10);
|
||||
describe('worker success', function () {
|
||||
it('should call the workerFn with the payload', function (done) {
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
worker._performJob(job)
|
||||
.then(() => done());
|
||||
});
|
||||
|
||||
it('should update the job with the workerFn output', function () {
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
return payload;
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(updateSpy);
|
||||
const query = updateSpy.firstCall.args[0];
|
||||
expect(query).to.have.property('index', job._index);
|
||||
expect(query).to.have.property('type', job._type);
|
||||
expect(query).to.have.property('id', job._id);
|
||||
expect(query).to.have.property('version', job._version);
|
||||
expect(query.body.doc).to.have.property('output');
|
||||
expect(query.body.doc.output).to.have.property('content_type', false);
|
||||
expect(query.body.doc.output).to.have.property('content', payload);
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
});
|
||||
|
||||
worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(updateSpy);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', JOB_STATUS_COMPLETED);
|
||||
expect(doc).to.have.property('completed_at');
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
it('should update the job status and completed time', function () {
|
||||
const startTime = moment().valueOf();
|
||||
const workerFn = function (jobPayload) {
|
||||
expect(jobPayload).to.eql(payload);
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(() => resolve(payload), 10);
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(updateSpy);
|
||||
const doc = updateSpy.firstCall.args[0].body.doc;
|
||||
expect(doc).to.have.property('status', constants.JOB_STATUS_COMPLETED);
|
||||
expect(doc).to.have.property('completed_at');
|
||||
const completedTimestamp = moment(doc.completed_at).valueOf();
|
||||
expect(completedTimestamp).to.be.greaterThan(startTime);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit completion event', function (done) {
|
||||
const worker = new Worker(mockQueue, 'test', noop);
|
||||
|
||||
worker.once(constants.EVENT_WORKER_COMPLETE, (workerJob) => {
|
||||
try {
|
||||
expect(workerJob).to.not.have.property('_source');
|
||||
|
||||
expect(workerJob).to.have.property('job');
|
||||
expect(workerJob.job).to.have.property('id');
|
||||
expect(workerJob.job).to.have.property('index');
|
||||
expect(workerJob.job).to.have.property('type');
|
||||
|
||||
expect(workerJob).to.have.property('output');
|
||||
expect(workerJob.output).to.have.property('content');
|
||||
expect(workerJob.output).to.have.property('content_type');
|
||||
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
it('should append error output to job', function () {
|
||||
const workerFn = function () {
|
||||
throw new Error('test error');
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
describe('worker failure', function () {
|
||||
it('should append error output to job', function () {
|
||||
const workerFn = function () {
|
||||
throw new Error('test error');
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, 'Error: test error');
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, 'Error: test error');
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle async errors', function () {
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject(new Error('test error'));
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, 'Error: test error');
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle rejecting with strings', function () {
|
||||
const errorMessage = 'this is a string error';
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject(errorMessage);
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
const failStub = sinon.stub(worker, '_failJob');
|
||||
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
sinon.assert.calledWith(failStub, job, errorMessage);
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle empty rejection', function (done) {
|
||||
const workerFn = function () {
|
||||
return new Promise((resolve, reject) => {
|
||||
reject();
|
||||
});
|
||||
};
|
||||
const worker = new Worker(mockQueue, 'test', workerFn);
|
||||
|
||||
worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('name', 'UnspecifiedWorkerError');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('job timeouts', function () {
|
||||
let job;
|
||||
let failStub;
|
||||
let worker;
|
||||
const timeout = 20;
|
||||
const timeoutPadding = 10;
|
||||
describe('job failures', function () {
|
||||
function getFailStub(worker) {
|
||||
return sinon.stub(worker, '_failJob').returns(Promise.resolve());
|
||||
}
|
||||
|
||||
beforeEach(function () {
|
||||
const workerFn = function () {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, timeout + timeoutPadding);
|
||||
describe('timeout', function () {
|
||||
let worker;
|
||||
let failStub;
|
||||
let job;
|
||||
|
||||
beforeEach(function () {
|
||||
const timeout = 20;
|
||||
const workerFn = function () {
|
||||
return new Promise(function (resolve) {
|
||||
setTimeout(() => {
|
||||
resolve();
|
||||
}, timeout * 2);
|
||||
});
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
|
||||
job = {
|
||||
_id: 'testTimeoutJob',
|
||||
_source: {
|
||||
timeout: timeout,
|
||||
payload: 'test'
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
it('should not fail job', function () {
|
||||
// fire of the job worker
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.notCalled(failStub);
|
||||
});
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
job = {
|
||||
_id: 'testJob1',
|
||||
});
|
||||
|
||||
it('should emit timeout if not completed in time', function (done) {
|
||||
worker.once(constants.EVENT_WORKER_JOB_TIMEOUT, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
expect(err.error).to.have.property('name', 'WorkerTimeoutError');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('worker failure', function () {
|
||||
let worker;
|
||||
let failStub;
|
||||
|
||||
const timeout = 20;
|
||||
const job = {
|
||||
_id: 'testTimeoutJob',
|
||||
_source: {
|
||||
timeout: timeout,
|
||||
payload: 'test'
|
||||
}
|
||||
};
|
||||
failStub = sinon.stub(worker, '_failJob').returns(Promise.resolve());
|
||||
});
|
||||
|
||||
it('should fail if not complete within allotted time', function () {
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.notCalled(failStub);
|
||||
describe('reject', function () {
|
||||
beforeEach(function () {
|
||||
const workerFn = function () {
|
||||
return new Promise(function (resolve, reject) {
|
||||
setTimeout(() => {
|
||||
reject();
|
||||
}, timeout / 2);
|
||||
});
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
});
|
||||
|
||||
it('should fail the job', function () {
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit worker execution error', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
|
||||
describe('throw', function () {
|
||||
beforeEach(function () {
|
||||
const workerFn = function () {
|
||||
throw new Error('test throw');
|
||||
};
|
||||
worker = new Worker(mockQueue, 'test', workerFn);
|
||||
failStub = getFailStub(worker);
|
||||
});
|
||||
|
||||
it('should fail the job', function () {
|
||||
return worker._performJob(job)
|
||||
.then(() => {
|
||||
sinon.assert.calledOnce(failStub);
|
||||
});
|
||||
});
|
||||
|
||||
it('should emit worker execution error', function (done) {
|
||||
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
|
||||
try {
|
||||
expect(err).to.have.property('error');
|
||||
expect(err).to.have.property('job');
|
||||
expect(err).to.have.property('worker');
|
||||
done();
|
||||
} catch (e) {
|
||||
done(e);
|
||||
}
|
||||
});
|
||||
|
||||
// fire of the job worker
|
||||
worker._performJob(job);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user