164 Commits

Author SHA1 Message Date
91e5316877 v2.0.2 2017-05-23 18:18:20 -07:00
ee948c5900 stop testing in node 4
because i don't feel like dealing with the peer dependency issues....
2017-05-23 18:16:27 -07:00
ff944c523a add yarn.lock to gitignore 2017-05-23 18:14:13 -07:00
6401022021 update changelog 2017-05-23 18:11:16 -07:00
32fb6a92f5 better fix for the worker doctype
use the queue's doctype if one is not provided. also adds the option to the readme
2017-05-23 18:11:16 -07:00
d4f24182b4 use the queue doctype in the worker by default
allow the user to override the setting
2017-05-23 18:11:16 -07:00
283731d6e6 2.0.1 2017-03-01 16:01:40 -07:00
79affbce76 remove ignore 400 on index creation 2017-03-01 16:01:28 -07:00
9b02bdf56c update the readme about version compatability 2017-03-01 11:30:33 -07:00
bae58b0370 2.0.0 2017-03-01 11:25:55 -07:00
c1216d5a4b update changelog 2017-03-01 11:25:46 -07:00
0e0c3feab9 fix mapping types for 5.x+
elasticsearch has deprecated the string type in 5.x, and is removing it in 6.x
2017-03-01 11:23:23 -07:00
0fb14d46c9 1.0.0 2017-02-28 16:03:19 -07:00
7900d0c7e1 update the changelog 2017-02-28 16:03:13 -07:00
c84b62bd31 update library deps 2017-02-28 16:01:52 -07:00
d972c9a1cb update remaining test deps 2017-02-28 16:01:40 -07:00
e294af2688 update eslint packages
change elasticsearch to ^ instead of ~
2017-02-28 15:57:43 -07:00
73efd7167d update eslint config and babel packages 2017-02-28 15:55:13 -07:00
0ed702c665 make elasticsearch a peer dependency 2017-02-28 15:52:41 -07:00
2324256039 remove error dependency 2017-02-28 15:52:41 -07:00
c6c73f6dd2 fix code and tests, use custom errors 2017-02-28 15:52:41 -07:00
9cff4e4b04 replace typed errors with custom errors 2017-02-28 15:52:40 -07:00
1e8340f214 remove lodash dependencies 2017-02-28 14:48:43 -07:00
bbd82a1de1 replace lodash methods with local helpers 2017-02-28 14:48:23 -07:00
64c1c90337 add plain object check and omit helpers 2017-02-28 14:45:43 -07:00
5e0cc440ff 0.12.2 2016-12-16 17:02:04 -07:00
1e3168452f update the changelog 2016-12-16 17:00:49 -07:00
Joe Fleming
3cc8573ab0 Merge pull request #13 from w33ble/fix/worker-destroy
Worker destroy() will stops job polling
2016-12-16 16:57:52 -07:00
b2f4ae857f test that the running state prevents searches 2016-12-16 16:53:22 -07:00
c64c5c80d6 call _stopJobPolling on destroy 2016-12-16 16:50:48 -07:00
4befaee4cc minor syntax change
make running more clearly a private state variable
2016-12-16 16:45:23 -07:00
Joe Fleming
af47d230db Merge pull request #10 from jasonallen/patch-1
Fix typo in Worker Class name
2016-12-16 10:27:30 -07:00
jason allen
7c1189cf3c [FIX] worker destroy will really stop it. 2016-12-15 09:27:27 -08:00
bcca5667fb udpate travis badge, remove codecov 2016-12-06 14:59:40 -07:00
5886c1399d 0.12.1 2016-12-06 14:52:34 -07:00
40aa1d0a2c change repo location and author 2016-12-06 14:52:22 -07:00
Jason Allen
3d34642de4 Fix typo in Worker Class name
It was mistakenly called Job.
2016-12-01 07:58:35 -08:00
7836ae5b39 0.12.0 2016-10-19 13:42:38 -07:00
6913391526 update the changelog 2016-10-19 13:39:59 -07:00
2753986038 remove the timeout param from ping 2016-10-19 13:39:22 -07:00
64ff676fab change the queue error 2016-10-19 13:39:07 -07:00
160f087c9f add 0.10.5 notes 2016-09-19 09:41:29 -07:00
50998fa69a 0.11.1 2016-09-19 09:34:26 -07:00
d7e12206c5 add apache 2.0 license file 2016-09-19 09:32:54 -07:00
d71792f939 0.11.0 2016-09-14 11:40:05 -07:00
Joe Fleming
9c411b0b11 Merge pull request #8 from epixa/excludes
Use excludes instead of exclude in ES _source query
2016-09-14 11:31:50 -07:00
Court Ewing
7af0c82080 Use excludes instead of exclude in ES _source query
As of ES 5.0, you must use the plural name.
2016-09-14 12:24:11 -04:00
c0c535076e add dateSeparator to readme 2016-08-22 16:44:08 -07:00
f5a12db60c add vulnerable packages check to test script 2016-08-22 11:35:14 -07:00
3bf62525a9 0.10.4 2016-08-22 11:26:11 -07:00
c368fa91a1 update changelog 2016-08-22 11:25:54 -07:00
a5bfb96855 add dateSeparator validation test 2016-08-22 11:24:59 -07:00
83f267a028 add dateSeparator on index creation 2016-08-22 11:11:57 -07:00
faeb705dee add date separator constant, update tests 2016-08-22 11:11:38 -07:00
2693c40423 add tests for custom date separator 2016-08-22 11:03:06 -07:00
10003e147d allow valid datestamp separator to be specified 2016-08-22 10:49:01 -07:00
c976684a42 0.10.3 2016-08-04 16:04:26 -07:00
1a1f9358fd update changelog 2016-08-04 16:04:15 -07:00
7867f14476 bump moment version 2016-08-04 16:00:55 -07:00
8b652f11a9 0.10.2 2016-07-22 14:41:47 -07:00
f3ac7c1958 add not about header passing on job creation 2016-07-21 14:44:57 -07:00
597052dc4e add test for passing headers on job creation 2016-07-21 14:39:07 -07:00
2c5519c253 pass headers on job creation 2016-07-21 14:31:32 -07:00
b916d1352a rename client creation module 2016-07-21 14:31:15 -07:00
6ad438db96 0.10.1 2016-07-19 13:43:18 -07:00
38287f10f7 update the changelog 2016-07-19 12:59:50 -07:00
9d20095ffb add a test for index refreshing 2016-07-19 12:59:24 -07:00
67381108d7 add indices.refresh method to mock client 2016-07-19 12:51:34 -07:00
2c025e02c9 refresh the index after adding job
makes the job searchable after the creation event emits
2016-07-19 12:49:27 -07:00
041af798a8 0.10.0 2016-07-12 13:39:39 -07:00
fd118fa746 add test for passing params to workers 2016-07-12 13:31:40 -07:00
f8db6e1bd3 add queue tests for job creation 2016-07-12 13:31:25 -07:00
775442f284 update worker failure tests
check for timeouts, rejections and thrown errors explicitely, and make the tests a little more focused
2016-07-12 11:15:43 -07:00
8e8609eede always emit job failure event 2016-07-12 11:15:05 -07:00
25878c0b33 add notes about events in the readme 2016-07-12 10:55:41 -07:00
bfe8799b90 add job creation error
use it on creation error, update tests
2016-07-11 16:03:59 -07:00
c758fb55a6 rename the worker timeout error 2016-07-11 15:48:31 -07:00
8cd2fde3a8 add test for worker failure event 2016-07-11 15:48:08 -07:00
9a71fcc7b5 emit on any worker job failure 2016-07-11 15:48:08 -07:00
95569c9e82 update changelog 2016-07-11 15:48:08 -07:00
288daecb6b Job - add emit on Queue instance test 2016-07-11 15:48:08 -07:00
9cfc080b64 emit events on queue instance 2016-07-11 15:48:08 -07:00
bdd94096db update Job tests, use mockQueue 2016-07-11 15:48:08 -07:00
40d67829c8 pass the entire queue to the Job instance 2016-07-11 15:48:08 -07:00
a50dbf752e add success and failure event tests for jobs 2016-07-11 15:48:08 -07:00
630733b093 apped job index to event 2016-07-11 15:48:07 -07:00
c6986e3677 move constants to lib root 2016-07-11 15:48:04 -07:00
27390fef44 put job info under job property on success emitter 2016-07-11 10:27:08 -07:00
3b135bbd09 update the changelog 2016-07-08 18:36:14 -07:00
5ed3280a18 test events triggered on queue, use mock queue 2016-07-08 18:35:36 -07:00
79358a76dd create a mock queue 2016-07-08 18:34:58 -07:00
d16d3ea4dd cause worker emitter to emit on the queue instance 2016-07-08 18:34:45 -07:00
057bd26b74 emit on worker completion
with test
2016-07-08 18:00:04 -07:00
362469f541 error emitters should return more than just the error
update tests, and properly catch error conditions in worker event tests
2016-07-08 18:00:04 -07:00
a2391a30c5 update event handlers in tests
use constants values for listeners
2016-07-08 18:00:04 -07:00
3934f0cd1b update emit calls with constants 2016-07-08 18:00:04 -07:00
919fec4835 add event types to constants 2016-07-08 18:00:04 -07:00
b99f5ff1b9 don't pass headers through to jobs 2016-07-08 18:00:02 -07:00
c44d275395 update the changelog doc 2016-07-08 17:26:53 -07:00
da57fdeee7 0.9.0 2016-06-28 18:35:58 -07:00
aa2c0040d5 add indexSettings option to the docs 2016-06-28 18:35:44 -07:00
d2e843f05b collect indexSettings on queue creation, pass to job and use at index creation 2016-06-28 18:29:56 -07:00
13a78d12cc allow index settings to be passed on index creation 2016-06-28 18:28:53 -07:00
1d4c45c5d9 fix worker debug output 2016-06-28 18:28:12 -07:00
eba6748ba9 add tests for various failure types 2016-06-28 18:03:15 -07:00
257645f11c add tests for worker timeout handling 2016-06-28 18:03:06 -07:00
460d83411e properly handle job errors without a toString method 2016-06-28 17:49:45 -07:00
2b2db9c5f9 add UnspecifiedWorkerError error 2016-06-28 17:49:10 -07:00
df03738b9a rename the timeout error type 2016-06-28 17:46:06 -07:00
db1d282da2 handle resolve, reject and timeout correctly 2016-06-28 17:03:28 -07:00
a5f1d77f23 remove unused var in test 2016-06-27 14:45:31 -07:00
d73155d538 0.8.0 2016-06-27 14:41:45 -07:00
866f5948af 100 percent coverage on create_index helper 2016-06-27 14:23:49 -07:00
eae6942ec2 add tests for true/false and emits on errors 2016-06-27 14:14:23 -07:00
b0fec7d1ed don't emit on version errors 2016-06-27 14:14:06 -07:00
638c896e37 don't emit on the queue instance 2016-06-27 14:04:33 -07:00
5d5552c548 don't throw on worker failures 2016-06-27 14:04:25 -07:00
ea0ea7e6c2 0.7.0 2016-06-07 11:20:13 -07:00
85cc4bf7f8 don't throw on job creation failures
there's nothing to catch the error, it should be handled with the error event
2016-06-07 11:16:57 -07:00
34592740c7 0.6.1 2016-06-06 17:23:06 -07:00
eeafaf7d42 add test for assign headers 2016-06-06 17:22:36 -07:00
9e3515ebd5 restructure the jobs tests 2016-06-06 17:12:09 -07:00
78871f97d9 simplofy the promise chain 2016-06-06 16:59:13 -07:00
5954ee1d51 append option headers to the index request 2016-06-06 16:59:02 -07:00
a2d3fb7ffd simplify the node test matrix 2016-06-06 15:39:08 -07:00
04608b0ab2 update nyc in package.json 2016-06-06 15:26:36 -07:00
c541e07bb5 fix wording around worker functions 2016-06-06 15:15:12 -07:00
d70a8cc3ea 0.6.0 2016-06-06 15:06:53 -07:00
68ef5d2147 add client option to docs 2016-06-06 15:06:40 -07:00
1c24a4f7c1 fix worker example
add example of async worker
2016-06-06 15:06:30 -07:00
5fe4e644d6 add custom client test 2016-06-06 14:58:54 -07:00
39315dddd0 add internal properties test 2016-06-06 14:58:48 -07:00
57c1bd1819 allow client to be passed with registering worker 2016-06-06 14:50:24 -07:00
e0605796a1 fix inconcistent constants use 2016-06-06 14:50:06 -07:00
a1a7b9e213 add tests for passing client on job create
also fix issue where the wrong client was being used to create the index
2016-06-06 14:48:01 -07:00
dcecd4020b allow new client instant to be passed on job creation 2016-06-06 14:47:04 -07:00
40c8d15562 looser node-version spec 2016-06-06 14:13:16 -07:00
21d52b4b50 add package script
handy for packaging up the lib path on release tags
2016-05-19 15:06:51 -07:00
a0042be7c9 0.5.0 2016-05-19 14:15:53 -07:00
d325f108f7 Merge branch 'master' into develop 2016-05-19 14:14:35 -07:00
1fe139e1b0 0.4.1 2016-05-19 14:13:49 -07:00
4beb4a880b update changelog 2016-05-19 14:12:51 -07:00
9ab3bb048e update tests and fixture 2016-05-19 14:12:01 -07:00
ca0da61a69 default created_by to false 2016-05-19 14:11:52 -07:00
59d30bbb0f remove console debugging 2016-05-19 14:10:03 -07:00
6ce18a5477 update changelog 2016-05-17 13:38:08 -07:00
89c08068dd use filter instead of must in query
faster, and the score does not matter here
2016-05-17 13:37:22 -07:00
5f2e2b09cf bump dev node version 2016-05-17 10:19:27 -07:00
fb2f51b11d 0.4.0 2016-05-16 17:19:55 -07:00
8ac9a81fdb update changelog and readme 2016-05-16 15:06:47 -07:00
4472e725fa add tests for optional fields to get and toJSON methods 2016-05-16 15:04:35 -07:00
be1eb81059 make the mock client get method async
and update worker tests to use async value
2016-05-16 15:03:20 -07:00
8d21dc6967 add optional created_by record to the doc 2016-05-16 15:02:34 -07:00
31159baae9 add future version notes to changelog 2016-05-16 14:24:20 -07:00
49965bbaf1 add changelog file 2016-05-16 14:23:12 -07:00
49b982db99 change priority field to byte 2016-05-16 14:09:42 -07:00
9aa8eed297 filter output when searching for jobs 2016-05-16 14:09:24 -07:00
0bf6fb0023 0.3.2 2016-05-13 14:59:17 -07:00
300449bfb0 add test for missing index result 2016-05-13 14:59:05 -07:00
868c808db7 missing indices need to return an array 2016-05-13 14:56:50 -07:00
ef61a33a38 0.3.1 2016-05-13 14:40:46 -07:00
dae14e0edc add tests for job search failures 2016-05-13 14:36:19 -07:00
c51ea64bdd [worker] swollow missing index errors 2016-05-13 14:35:39 -07:00
5d37399fbf run travis on version tags too 2016-05-12 17:27:53 -07:00
34 changed files with 1749 additions and 451 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ node_modules
npm-debug.log
.nyc_output
coverage.lcov
yarn.lock

View File

@@ -1 +1 @@
4.3.2
4.x

View File

@@ -2,9 +2,7 @@ language: node_js
node_js:
- "stable"
- "5"
- "4"
- "4.3"
- "6"
notifications:
email: false
@@ -16,3 +14,4 @@ after_success: npm run coverage
branches:
only:
- master
- /^v[0-9].*$/

142
CHANGELOG.md Normal file
View File

@@ -0,0 +1,142 @@
# Changelog
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
## v2.0.2
- Fix issue where creating a worker would not use the queue's doctype by default
## v2.0.1
- Don't swallow 400 errors at index creation
## v2.0.0
- Change `sting` mappings to `keyword`, since [string is deprecated and is being removed from elasticsearch](https://www.elastic.co/guide/en/elasticsearch/reference/current/breaking_50_mapping_changes.html#_literal_string_literal_fields_replaced_by_literal_text_literal_literal_keyword_literal_fields)
## v1.0.0
- [BREAKING] elasticsearch package is a peerDependency now, since it's not required if you are using the `client` option when instantiating the queue
- Remove lodash.isPlainObject and lodash.omit dependencies, use customer helpers
- Remove errors dependency, use custom errors
## v0.12.2
- Fixed issue where destoyed workers could continue running
## v0.12.1
- Move repo to elastic org
## v0.12.0
- [BREAKING] Rename general queue error to `queue:error` instead of simply `error`
- Remove the `timeout` parameter from the ping operation on intiialization
## v0.11.1
- Apache 2.0 license file
## v0.11.0
- Contrary to the [source filtering docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.4/search-request-source-filtering.html), use plural form of include/exclude due to breaking change in Elasticsearch 5.0
## v0.10.5
- Apache 2.0 license file
## v0.10.4
- Allow index pattern date separator to be customized
## v0.10.3
- Bump moment.js version, fix [DoS issue](https://nodesecurity.io/advisories/55)
## v0.10.2
- Allow passing headers on job creation, useful for auth and proxy traversal
## v0.10.1
- Refresh Elasticsearch index when creating job, fire event after refresh
## v0.10.0
- [BREAKING] Remove header passing on job creation
- [BREAKING] Job instantiation requires full queue instance
- Expose event names in constants
- Emit on Worker success conditions as well as errors
- Worker and Job emits on the Queue instance
## v0.9.0
- [BREAKING] Rename timeout error event
- Fix worker timeout condition
- Fix issue where a worker error was not an instance of Error, or lacked a `toString()` method
- Allow specifying option to pass to elasticsearch client on index creation
## v0.8.0
- [BREAKING] Don't throw on worker failures
- [BREAKING] Don't emit errors on queue instance
## v0.7.0
- [BREAKING] Don't throw on job creation failures
## v0.6.1
- Allow headers option on job creation, passed to elasticsearch index request
## v0.6.0
- Allow client instance to be passed when creating a job
- Allow client instance to be passed when creating a worker
- Prefer any 4.x version of node for development
## v0.5.0
- [BREAKING] Change default `created_by` value to `false` (formerly `null`)
## v0.4.1
- Use `filter` instead of `must` to query for outstanding jobs
## v0.4.0
- [BREAKING] Change `priority` mapping to *byte*
- Exclude `output.content` from _source when query jobs
- Add optional `created_by` value to job documents
## v0.3.2
- Misisng indiced returns empty array (fixed errors in v0.3.1)
## v0.3.1
- Ignore missing indices when looking for jobs
## v0.3.0
- [BREAKING] Use `jobtype` field to control document indexing and lookup (instead of document `_type`)
## v0.2.2
- Swollow errors when saving job output
- Set `process_expiration` value (prevents upstream Elasticsearch error in alpha builds)
- Update npm package
## v0.2.1
- Use `esqueue` namespace for debugging
## v0.2.0
- [BREAKING] Async jobs should return promises, not use callbacks
- Remove bluebird dependency
- Only require specific lodash modules, instead of the whole library
## v0.1.0
- Initial release

13
license.txt Normal file
View File

@@ -0,0 +1,13 @@
Copyright 2016 Joe Fleming
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

View File

@@ -1,16 +1,16 @@
{
"name": "esqueue",
"version": "0.3.0",
"version": "2.0.2",
"description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js",
"scripts": {
"build": "rm -rf lib && babel src --out-dir lib",
"coverage": "nyc report --reporter=text-lcov > coverage.lcov && codecov",
"package": "git checkout -B package-lib && npm run build && git add -f lib && git commit -m 'package lib' && echo Use npm to tag and publish",
"prepublish": "in-publish && npm run test || not-in-publish",
"test": "npm run build && npm run unit",
"test": "retire -n -p package.json && npm run build && npm run unit",
"unit": "nyc --require babel-core/register mocha test/src/**"
},
"author": "Joe Fleming (https://github.com/w33ble)",
"author": "Elastic (https://github.com/elastic)",
"keywords": [
"job",
"queue",
@@ -19,38 +19,38 @@
],
"repository": {
"type": "git",
"url": "https://github.com/w33ble/esqueue.git"
"url": "https://github.com/elastic/esqueue.git"
},
"license": "Apache-2.0",
"engines": {
"node": ">=4.3.0"
},
"devDependencies": {
"@elastic/eslint-config-kibana": "0.0.3",
"babel-cli": "~6.7.5",
"babel-core": "~6.7.6",
"babel-eslint": "~4.1.8",
"babel-plugin-add-module-exports": "~0.1.2",
"babel-preset-es2015": "~6.6.0",
"codecov": "~1.0.1",
"eslint": "~1.10.3",
"eslint-plugin-mocha": "~1.1.0",
"eslint-plugin-react": "~4.2.3",
"@elastic/eslint-config-kibana": "^0.3.0",
"babel-cli": "^6.23.0",
"babel-core": "^6.23.1",
"babel-eslint": "^7.1.1",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-preset-es2015": "^6.22.0",
"elasticsearch": "^12.0.0",
"eslint": "^3.16.1",
"eslint-plugin-mocha": "^4.8.0",
"eslint-plugin-react": "^6.10.0",
"expect.js": "~0.3.1",
"lodash": "~4.11.1",
"mocha": "~2.4.5",
"nyc": "~6.4.2",
"proxyquire": "~1.7.4",
"sinon": "~1.17.3"
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"nyc": "^10.1.2",
"proxyquire": "^1.7.4",
"retire": "^1.2.12",
"sinon": "^1.17.3"
},
"peerDependencies": {
"elasticsearch": ">=11.0.1"
},
"dependencies": {
"debug": "~2.2.0",
"elasticsearch": "~11.0.1",
"error": "~7.0.2",
"in-publish": "~2.0.0",
"lodash.isplainobject": "~4.0.4",
"lodash.omit": "~4.2.1",
"moment": "~2.10.6",
"puid": "~1.0.5"
"debug": "^2.6.1",
"in-publish": "^2.0.0",
"moment": "^2.17.1",
"puid": "^1.0.5"
}
}

View File

@@ -1,4 +1,4 @@
[![Build Status](https://travis-ci.org/w33ble/esqueue.svg?branch=master)](https://travis-ci.org/w33ble/esqueue) [![codecov](https://codecov.io/gh/w33ble/esqueue/branch/master/graph/badge.svg)](https://codecov.io/gh/w33ble/esqueue)
[![Build Status](https://travis-ci.org/elastic/esqueue.svg?branch=master)](https://travis-ci.org/elastic/esqueue)
# esqueue
@@ -8,6 +8,10 @@
`npm install esqueue`
Note that the more recent version of esqueue is meant to work with Elasticsearch 5.x or later. If you need it to work with an older version, use v1.0.0:
`npm install equeue@^1.0.0`
## Usage
Simply include the module in your application.
@@ -32,8 +36,10 @@ The queue instance is an event emitter, so you can listen for `error` events as
Option | Default | Description
------ | ----------- | -------
interval | `week` | Valid choices are `year`, `month`, `week`, `day`, `hour`, and even `minute`. | `week`
dateSeparator | `-` | Separator for the formatted date, *YYYY-MM-DD* for example, in the index pattern.
timeout | `10000` | The default job timeout, in `ms`. If workers take longer than this, the job is re-queued for another worker to complete it.
doctype | `esqueue` | The doctype to use in Elasticsearch
indexSettings | | Specify which `settings` to pass on index creation. See the [Elasticsearch index creation docs](https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-create-index.html) for more info.
client | | Options to use when creating a new client instance - see [the elasticsearch-js docs](https://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/configuration.html). If you rather use your own client instance, just pass it in here instead.
@@ -60,6 +66,9 @@ Option | Default | Description
timeout | `10000` | Timeout for the job, if different than the timeout configured on the queue.
max_attempts | `3` | Number of times to re-trying assigning the job to a worker before giving up and failing.
priority | `0` | Used to move jobs up the queue. Uses nice values from `-20` to `20`.
created_by | null | Used to filter job documents by a creator identifier; meant to be consumed by external applications.
headers | | Any headers to add to the index request. Handy for custom authentication or use with a proxy.
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
### Creating a worker
@@ -104,28 +113,62 @@ Option | Default | Description
------ | ----------- | -------
interval | `1500` | Time, in `ms` to poll for new jobs in the queue.
size | `10` | Number of records to return when polling for new jobs. Higher values may result in less Elasticsearch requests, but may also take longer to execute. A bit of tuning based on the number of workers you have my be required here.
client | | Alternative elasticsearch client instance, if you need to use one other than what the queue was created with.
doctype | `queue.doctype` | The doctype to use when polling for new jobs. You probably don't want to change this.
The worker's `output` can either be the raw output from the job, or on object that specifies the output's content type.
```js
var workerFn1 = function (payload, cb) {
var workerFn1 = function (payload) {
// Do some work, using the payload if required
var output = new Date().toString();
cb(null, output);
return output;
};
var workerFn2 = function (payload, cb) {
var workerFn2 = function (payload) {
// Do some work, using the payload if required
var output = {
content_type: 'text/plain',
content: new Date().toString();
};
cb(null, output);
return output;
};
var asyncWorker = function (payload) {
// Do some work, using the payload if required
return Promise.resolve({
content_type: 'text/plain',
content: new Date().toString();
})
};
```
Both are valid, but the `workerFn2` is likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned.
All of the above are valid. `workerFn2` and `asyncWorker` are likely to be more useful when retrieving the output, as the application doesn't need to know or make assumptions about the type of content the worker returned. Note that returning a Promise is all that's required for an async result in the worker functions.
## Queue events
`esqueue` components, namely the Queue, Job, and Worker instances, are also event emitters. Each instance will emit events to help your application know when certain things happen in the queue, like when a job is created, or a worker is done running, or when it times out.
It's important to note that all events emitted from the Job and Worker instances are also emitted on the Queue instance. This means that your application should be able to react to changes by only keeping track of that instance.
Available events can be found in `lib/constants/events.js`, and you're encouraged to import and use those constant values in your application. Here's an example:
```js
var Queue = require('esqueue');
var queueEvents = require('esqueue/lib/constants/events');
var jobQueue = new Queue('my-index');
jobQueue.on(queueEvents.EVENT_JOB_CREATE_ERROR, (err) => {
// handle error
console.log('ONOZ!!! Job creation failed :(', err.error.message);
});
```
The argument passed to listeners typically contains several pieces of information about what happened. For example, Worker events will contain information about the job, the worker, and if it's an error event, the error.
More than any other events, you'll probably want to know if a worker completed or failed. When a worker starts, it will always either emit `EVENT_WORKER_COMPLETE` or `EVENT_WORKER_JOB_FAIL`. Faliures may also emit other events, such as `EVENT_WORKER_JOB_TIMEOUT` or `EVENT_WORKER_JOB_EXECUTION_ERROR`, but you can rely on `EVENT_WORKER_JOB_FAIL` for all failure cases.
## Scaling the queue

View File

@@ -0,0 +1,6 @@
export default {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_DATE_SEPARATOR: '-',
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
};

14
src/constants/events.js Normal file
View File

@@ -0,0 +1,14 @@
export default {
EVENT_QUEUE_ERROR: 'queue:error',
EVENT_JOB_ERROR: 'job:error',
EVENT_JOB_CREATED: 'job:created',
EVENT_JOB_CREATE_ERROR: 'job:creation error',
EVENT_WORKER_COMPLETE: 'worker:job complete',
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
EVENT_WORKER_JOB_FAIL: 'worker:job failed',
EVENT_WORKER_JOB_FAIL_ERROR: 'worker:failed job update error',
EVENT_WORKER_JOB_EXECUTION_ERROR: 'worker:job execution error',
EVENT_WORKER_JOB_TIMEOUT: 'worker:job timeout',
};

5
src/constants/index.js Normal file
View File

@@ -0,0 +1,5 @@
import events from './events';
import statuses from './statuses';
import defaultSettings from './default_settings';
export default Object.assign({}, events, statuses, defaultSettings);

View File

@@ -0,0 +1,7 @@
export default {
JOB_STATUS_PENDING: 'pending',
JOB_STATUS_PROCESSING: 'processing',
JOB_STATUS_COMPLETED: 'completed',
JOB_STATUS_FAILED: 'failed',
JOB_STATUS_CANCELLED: 'cancelled',
};

View File

@@ -1,15 +0,0 @@
export const jobStatuses = {
JOB_STATUS_PENDING: 'pending',
JOB_STATUS_PROCESSING: 'processing',
JOB_STATUS_COMPLETED: 'completed',
JOB_STATUS_FAILED: 'failed',
JOB_STATUS_CANCELLED: 'cancelled',
};
export const defaultSettings = {
DEFAULT_SETTING_TIMEOUT: 10000,
DEFAULT_SETTING_INTERVAL: 'week',
DEFAULT_SETTING_DOCTYPE: 'esqueue',
};
export default Object.assign({}, jobStatuses, defaultSettings);

View File

@@ -3,7 +3,6 @@ import elasticsearch from 'elasticsearch';
export default function createClient(options) {
let client;
// if there's a transport property, assume it's a client instance
if (isClient(options)) {
client = options;
} else {
@@ -14,5 +13,6 @@ export default function createClient(options) {
};
export function isClient(client) {
// if there's a transport property, assume it's a client instance
return !!client.transport;
}

View File

@@ -1,40 +1,41 @@
import { defaultSettings } from './constants';
import { DEFAULT_SETTING_DOCTYPE } from '../constants';
const schema = {
jobtype: { type: 'string', index: 'not_analyzed' },
jobtype: { type: 'keyword' },
payload: { type: 'object', enabled: false },
priority: { type: 'short' },
priority: { type: 'byte' },
timeout: { type: 'long' },
process_expiration: { type: 'date' },
created_by: { type: 'string', index: 'not_analyzed' },
created_by: { type: 'keyword' },
created_at: { type: 'date' },
started_at: { type: 'date' },
completed_at: { type: 'date' },
attempts: { type: 'short' },
max_attempts: { type: 'short' },
status: { type: 'string', index: 'not_analyzed' },
status: { type: 'keyword' },
output: {
type: 'object',
properties: {
content_type: { type: 'string', index: 'not_analyzed' },
content_type: { type: 'keyword', index: false },
content: { type: 'object', enabled: false }
}
}
};
export default function createIndex(client, indexName, doctype = defaultSettings.DEFAULT_SETTING_DOCTYPE) {
export default function createIndex(client, indexName, doctype = DEFAULT_SETTING_DOCTYPE, settings = {}) {
const indexBody = { mappings : {} };
indexBody.mappings[doctype] = { properties: schema };
const body = Object.assign({}, { settings }, indexBody);
return client.indices.exists({
index: indexName,
})
.then((exists) => {
if (!exists) {
return client.indices.create({
ignore: 400,
index: indexName,
body: indexBody
body: body
})
.then(() => true);
}

View File

@@ -1,12 +1,20 @@
import typedError from 'error/typed';
export function WorkerTimeoutError(message, props = {}) {
this.name = 'WorkerTimeoutError';
this.message = message;
this.timeout = props.timeout;
this.jobId = props.jobId;
const errors = {};
if ("captureStackTrace" in Error) Error.captureStackTrace(this, WorkerTimeoutError);
else this.stack = (new Error()).stack;
}
WorkerTimeoutError.prototype = Object.create(Error.prototype);
errors.WorkerTimeoutError = typedError({
type: 'WorkerTimeout',
message: 'worker timed out, timeout={timeout}',
timeout: null,
jobId: null
});
export function UnspecifiedWorkerError(message, props = {}) {
this.name = 'UnspecifiedWorkerError';
this.message = message;
this.jobId = props.jobId;
export default errors;
if ("captureStackTrace" in Error) Error.captureStackTrace(this, UnspecifiedWorkerError);
else this.stack = (new Error()).stack;
}
UnspecifiedWorkerError.prototype = Object.create(Error.prototype);

View File

@@ -9,7 +9,9 @@ export const intervals = [
'minute'
];
export default function indexTimestamp(intervalStr) {
export default function indexTimestamp(intervalStr, separator = '-') {
if (separator.match(/[a-z]/i)) throw new Error('Interval separator can not be a letter');
const index = intervals.indexOf(intervalStr);
if (index === -1) throw new Error('Invalid index interval: ', intervalStr);
@@ -22,16 +24,16 @@ export default function indexTimestamp(intervalStr) {
dateString = 'YYYY';
break;
case 'month':
dateString = 'YYYY-MM';
dateString = `YYYY${separator}MM`;
break;
case 'hour':
dateString = 'YYYY-MM-DD-HH';
dateString = `YYYY${separator}MM${separator}DD${separator}HH`;
break;
case 'minute':
dateString = 'YYYY-MM-DD-HH-mm';
dateString = `YYYY${separator}MM${separator}DD${separator}HH${separator}mm`;
break;
default:
dateString = 'YYYY-MM-DD';
dateString = `YYYY${separator}MM${separator}DD`;
}
return m.format(dateString);

View File

@@ -0,0 +1,3 @@
export default function (obj) {
return (typeof obj === 'object' && !Array.isArray(obj) && obj !== null);
}

View File

@@ -0,0 +1,11 @@
import isPlainObject from './is_plain_object';
export default function (obj, props) {
if (!isPlainObject(obj)) return obj;
if (!Array.isArray(props)) props = [props];
const newObj = Object.assign({}, obj);
props.forEach(prop => delete newObj[prop]);
return newObj;
}

View File

@@ -1,11 +1,11 @@
import events from 'events';
import createClient from './helpers/es_client';
import indexTimestamp from './helpers/index_timestamp';
import logger from './helpers/logger';
import { defaultSettings } from './helpers/constants';
import Job from './job.js';
import Worker from './worker.js';
import omit from 'lodash.omit';
import constants from './constants';
import createClient from './helpers/create_client';
import indexTimestamp from './helpers/index_timestamp';
import objectOmit from './helpers/object_omit';
import logger from './helpers/logger';
const debug = logger('esqueue:queue');
@@ -16,19 +16,20 @@ export default class Esqueue extends events.EventEmitter {
super();
this.index = index;
this.settings = Object.assign({
interval: defaultSettings.DEFAULT_SETTING_INTERVAL,
timeout: defaultSettings.DEFAULT_SETTING_TIMEOUT,
doctype: defaultSettings.DEFAULT_SETTING_DOCTYPE,
}, omit(options, [ 'client' ]));
interval: constants.DEFAULT_SETTING_INTERVAL,
timeout: constants.DEFAULT_SETTING_TIMEOUT,
doctype: constants.DEFAULT_SETTING_DOCTYPE,
dateSeparator: constants.DEFAULT_SETTING_DATE_SEPARATOR,
}, objectOmit(options, [ 'client' ]));
this.client = createClient(options.client || {});
this._workers = [];
this._initTasks().catch((err) => this.emit('error', err));
this._initTasks().catch((err) => this.emit(constants.EVENT_QUEUE_ERROR, err));
}
_initTasks() {
const initTasks = [
this.client.ping({ timeout: 3000 }),
this.client.ping(),
];
return Promise.all(initTasks).catch((err) => {
@@ -38,17 +39,18 @@ export default class Esqueue extends events.EventEmitter {
}
addJob(type, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval);
const timestamp = indexTimestamp(this.settings.interval, this.settings.dateSeparator);
const index = `${this.index}-${timestamp}`;
const defaults = {
timeout: this.settings.timeout,
};
const options = Object.assign(defaults, opts, {
doctype: this.settings.doctype
doctype: this.settings.doctype,
indexSettings: this.settings.indexSettings,
});
return new Job(this.client, index, type, payload, options);
return new Job(this, index, type, payload, options);
}
registerWorker(type, workerFn, opts) {

View File

@@ -1,35 +1,36 @@
import events from 'events';
import isPlainObject from 'lodash.isplainobject';
import Puid from 'puid';
import contstants from './constants';
import logger from './helpers/logger';
import contstants from './helpers/constants';
import createIndex from './helpers/create_index';
import isPlainObject from './helpers/is_plain_object';
const debug = logger('esqueue:job');
const puid = new Puid();
export default class Job extends events.EventEmitter {
constructor(client, index, type, payload, options = {}) {
constructor(queue, index, type, payload, options = {}) {
if (typeof type !== 'string') throw new Error('Type must be a string');
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
super();
this.client = client;
this.queue = queue;
this.client = options.client || this.queue.client;
this.id = puid.generate();
this.index = index;
this.jobtype = type;
this.payload = payload;
this.created_by = options.created_by || false;
this.timeout = options.timeout || 10000;
this.maxAttempts = options.max_attempts || 3;
this.priority = Math.max(Math.min(options.priority || 10, 20), -20);
this.doctype = options.doctype || contstants.DEFAULT_SETTING_DOCTYPE;
this.indexSettings = options.indexSettings || {};
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this.ready = createIndex(client, index, this.doctype)
.then(() => {
return this.client.index({
const indexParams = {
index: this.index,
type: this.doctype,
id: this.id,
@@ -37,6 +38,7 @@ export default class Job extends events.EventEmitter {
jobtype: this.jobtype,
payload: this.payload,
priority: this.priority,
created_by: this.created_by,
timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(),
@@ -44,24 +46,41 @@ export default class Job extends events.EventEmitter {
max_attempts: this.maxAttempts,
status: contstants.JOB_STATUS_PENDING,
}
})
};
if (options.headers) {
indexParams.headers = options.headers;
}
this.ready = createIndex(this.client, this.index, this.doctype, this.indexSettings)
.then(() => this.client.index(indexParams))
.then((doc) => {
this.document = {
id: doc._id,
type: doc._type,
index: doc._index,
version: doc._version,
};
this.debug(`Job created in index ${this.index}`);
this.emit('created', this.document);
return this.client.indices.refresh({
index: this.index
}).then(() => {
this.debug(`Job index refreshed ${this.index}`);
this.emit(contstants.EVENT_JOB_CREATED, this.document);
});
})
.catch((err) => {
this.debug('Job creation failed', err);
this.emit('error', err);
throw err;
this.emit(contstants.EVENT_JOB_CREATE_ERROR, err);
});
}
emit(name, ...args) {
super.emit(name, ...args);
this.queue.emit(name, ...args);
}
get() {
return this.ready
.then(() => {
@@ -87,11 +106,11 @@ export default class Job extends events.EventEmitter {
index: this.index,
type: this.doctype,
jobtype: this.jobtype,
created_by: this.created_by,
payload: this.payload,
timeout: this.timeout,
max_attempts: this.maxAttempts,
priority: this.priority,
});
}
}

View File

@@ -1,14 +1,22 @@
import events from 'events';
import Puid from 'puid';
import moment from 'moment';
import constants from './constants';
import logger from './helpers/logger';
import constants from './helpers/constants';
import { WorkerTimeoutError } from './helpers/errors';
import { WorkerTimeoutError, UnspecifiedWorkerError } from './helpers/errors';
const puid = new Puid();
const debug = logger('esqueue:worker');
export default class Job extends events.EventEmitter {
function formatJobObject(job) {
return {
index: job._index,
type: job._type,
id: job._id,
};
}
export default class Worker extends events.EventEmitter {
constructor(queue, type, workerFn, opts = {}) {
if (typeof type !== 'string') throw new Error('Type must be a string');
if (typeof workerFn !== 'function') throw new Error('Worker must be a function');
@@ -17,22 +25,48 @@ export default class Job extends events.EventEmitter {
this.id = puid.generate();
this.queue = queue;
this.client = this.queue.client;
this.client = opts.client || this.queue.client;
this.jobtype = type;
this.workerFn = workerFn;
this.checkInterval = opts.interval || 1500;
this.checkSize = opts.size || 10;
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.doctype = opts.doctype || this.queue.doctype || constants.DEFAULT_SETTING_DOCTYPE;
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this._checker = false;
this.debug(`Created worker for type ${this.type}`);
this._running = true;
this.debug(`Created worker for job type ${this.jobtype}`);
this._startJobPolling();
}
destroy() {
clearInterval(this._checker);
this._running = false;
this._stopJobPolling();
}
toJSON() {
return {
id: this.id,
index: this.queue.index,
jobType: this.jobType,
doctype: this.doctype,
};
}
emit(name, ...args) {
super.emit(name, ...args);
this.queue.emit(name, ...args);
}
_formatErrorParams(err, job) {
const response = {
error: err,
worker: this.toJSON(),
};
if (job) response.job = formatJobObject(job);
return response;
}
_claimJob(job) {
@@ -67,8 +101,10 @@ export default class Job extends events.EventEmitter {
return updatedJob;
})
.catch((err) => {
if (err.statusCode === 409) return false;
throw err;
if (err.statusCode === 409) return true;
this.debug(`_claimJob failed on job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_JOB_CLAIM_ERROR, this._formatErrorParams(err, job));
return false;
});
}
@@ -76,14 +112,18 @@ export default class Job extends events.EventEmitter {
this.debug(`Failing job ${job._id}`);
const completedTime = moment().toISOString();
const docOutput = this._formatOutput(output);
const doc = {
status: constants.JOB_STATUS_FAILED,
completed_at: completedTime,
output: docOutput
};
if (output) {
doc.output = this._formatOutput(output);
}
this.emit(constants.EVENT_WORKER_JOB_FAIL, {
job: formatJobObject(job),
worker: this.toJSON(),
output: docOutput,
});
return this.client.update({
index: job._index,
@@ -92,9 +132,12 @@ export default class Job extends events.EventEmitter {
version: job._version,
body: { doc }
})
.then(() => true)
.catch((err) => {
if (err.statusCode === 409) return true;
throw err;
this.debug(`_failJob failed to update job ${job._id}`, err);
this.emit(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, this._formatErrorParams(err, job));
return false;
});
}
@@ -118,11 +161,24 @@ export default class Job extends events.EventEmitter {
this.debug(`Starting job ${job._id}`);
const workerOutput = new Promise((resolve, reject) => {
resolve(this.workerFn.call(null, job._source.payload));
// run the worker's workerFn
let isResolved = false;
Promise.resolve(this.workerFn.call(null, job._source.payload))
.then((res) => {
isResolved = true;
resolve(res);
})
.catch((err) => {
isResolved = true;
reject(err);
});
// fail if workerFn doesn't finish before timeout
setTimeout(() => {
if (isResolved) return;
this.debug(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError({
reject(new WorkerTimeoutError(`Worker timed out, timeout = ${job._source.timeout}`, {
timeout: job._source.timeout,
jobId: job._id,
}));
@@ -149,26 +205,52 @@ export default class Job extends events.EventEmitter {
version: job._version,
body: { doc }
})
.then(() => {
const eventOutput = {
job: formatJobObject(job),
output: docOutput,
};
this.emit(constants.EVENT_WORKER_COMPLETE, eventOutput);
})
.catch((err) => {
if (err.statusCode === 409) return false;
this.debug(`Failure saving job output ${job._id}`, err);
this.emit('job_error', err);
this.emit(constants.EVENT_WORKER_JOB_UPDATE_ERROR, this._formatErrorParams(err, job));
});
}, (jobErr) => {
if (!jobErr) {
jobErr = new UnspecifiedWorkerError('Unspecified worker error', {
jobId: job._id,
});
}
// job execution failed
if (jobErr.type === 'WorkerTimeout') {
if (jobErr.name === 'WorkerTimeoutError') {
this.debug(`Timeout on job ${job._id}`);
this.emit('job_timeout', jobErr);
this.emit(constants.EVENT_WORKER_JOB_TIMEOUT, this._formatErrorParams(jobErr, job));
return;
// append the jobId to the error
} else {
try {
Object.assign(jobErr, { jobId: job._id });
} catch (e) {
// do nothing if jobId can not be appended
}
}
this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.emit('job_error', jobErr);
return this._failJob(job, jobErr.toString());
this.emit(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, this._formatErrorParams(jobErr, job));
return this._failJob(job, (jobErr.toString) ? jobErr.toString() : false);
});
}
_startJobPolling() {
if (!this._running) {
return;
}
this._checker = setInterval(() => {
this._getPendingJobs()
.then((jobs) => this._claimPendingJobs(jobs));
@@ -180,11 +262,12 @@ export default class Job extends events.EventEmitter {
}
_claimPendingJobs(jobs) {
if (jobs.length === 0) return;
if (!jobs || jobs.length === 0) return;
this._stopJobPolling();
let claimed = false;
// claim a single job, stopping after first successful claim
return jobs.reduce((chain, job) => {
return chain.then((claimedJob) => {
// short-circuit the promise chain if a job has been claimed
@@ -199,16 +282,6 @@ export default class Job extends events.EventEmitter {
});
});
}, Promise.resolve())
.catch((err) => {
this.debug('Failed to claim outstanding jobs', err);
this.emit('error', err);
this.queue.emit('worker_error', {
id: this.id,
type: this.type,
err
});
throw err;
})
.then((claimedJob) => {
if (!claimedJob) {
this.debug(`All ${jobs.length} jobs already claimed`);
@@ -220,7 +293,6 @@ export default class Job extends events.EventEmitter {
.then(() => this._startJobPolling())
.catch((err) => {
this.debug('Error claiming jobs', err);
this.emit('error', err);
this._startJobPolling();
});
}
@@ -228,15 +300,18 @@ export default class Job extends events.EventEmitter {
_getPendingJobs() {
const nowTime = moment().toISOString();
const query = {
_source : {
excludes: [ 'output.content' ]
},
query: {
constant_score: {
filter: {
bool: {
must: { term: { jobtype: this.jobtype } },
filter: { term: { jobtype: this.jobtype } },
should: [
{ term: { status: 'pending'} },
{ bool:
{ must: [
{ filter: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] }
@@ -267,14 +342,11 @@ export default class Job extends events.EventEmitter {
return jobs;
})
.catch((err) => {
// ignore missing indices errors
if (err.status === 404) return [];
this.debug('job querying failed', err);
this.emit('error', err);
this.queue.emit('worker_error', {
id: this.id,
type: this.type,
err
});
throw err;
this.emit(constants.EVENT_WORKER_JOB_SEARCH_ERROR, this._formatErrorParams(err));
});
}
}

View File

@@ -1,11 +1,12 @@
import { uniqueId, times, random } from 'lodash';
import elasticsearch from 'elasticsearch';
import { DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
import constants from '../../lib/constants';
function Client() {
this.indices = {
create: () => Promise.resolve({ acknowledged: true }),
exists: () => Promise.resolve(false),
refresh: () => Promise.resolve(),
};
this.transport = {};
@@ -15,7 +16,7 @@ Client.prototype.index = function (params = {}) {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_version: 1,
_shards: { total: shardCount, successful: shardCount, failed: 0 },
@@ -32,6 +33,7 @@ Client.prototype.get = function (params = {}, source = {}) {
const _source = Object.assign({
jobtype: 'jobtype',
created_by: false,
payload: {
id: 'sample-job-1',
now: 'Mon Apr 25 2016 14:13:04 GMT-0700 (MST)'
@@ -44,21 +46,21 @@ Client.prototype.get = function (params = {}, source = {}) {
status: 'pending'
}, source);
return {
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || 'AVRPRLnlp7Ur1SZXfT-T',
_version: params.version || 1,
found: true,
_source: _source
};
});
};
Client.prototype.search = function (params = {}, count = 5, source = {}) {
const hits = times(count, () => {
return {
_index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: uniqueId('documentId'),
_version: random(1, 5),
_score: null,
@@ -88,7 +90,7 @@ Client.prototype.update = function (params = {}) {
const shardCount = 2;
return Promise.resolve({
_index: params.index || 'index',
_type: params.type || DEFAULT_SETTING_DOCTYPE,
_type: params.type || constants.DEFAULT_SETTING_DOCTYPE,
_id: params.id || uniqueId('testDoc'),
_version: params.version + 1 || 2,
_shards: { total: shardCount, successful: shardCount, failed: 0 },

17
test/fixtures/job.js vendored Normal file
View File

@@ -0,0 +1,17 @@
import events from 'events';
export default class Job extends events.EventEmitter {
constructor(queue, index, type, payload, options = {}) {
super();
this.queue = queue;
this.index = index;
this.jobType = type;
this.payload = payload;
this.options = options;
}
getProp(name) {
return this[name];
}
}

13
test/fixtures/queue.js vendored Normal file
View File

@@ -0,0 +1,13 @@
import events from 'events';
class MockQueue extends events.EventEmitter {
constructor() {
super();
}
setClient(client) {
this.client = client;
}
}
export default MockQueue;

16
test/fixtures/worker.js vendored Normal file
View File

@@ -0,0 +1,16 @@
import events from 'events';
export default class Worker extends events.EventEmitter {
constructor(queue, type, workerFn, opts = {}) {
super();
this.queue = queue;
this.type = type;
this.workerFn = workerFn;
this.options = opts;
}
getProp(name) {
return this[name];
}
}

View File

@@ -2,7 +2,7 @@ import expect from 'expect.js';
import proxyquire from 'proxyquire';
import elasticsearchMock from '../../fixtures/elasticsearch';
const module = proxyquire.noPreserveCache()('../../../lib/helpers/es_client', {
const module = proxyquire.noPreserveCache()('../../../lib/helpers/create_client', {
elasticsearch: elasticsearchMock
});

View File

@@ -2,9 +2,11 @@ import expect from 'expect.js';
import sinon from 'sinon';
import createIndex from '../../../lib/helpers/create_index';
import elasticsearchMock from '../../fixtures/elasticsearch';
import { defaultSettings } from '../../../lib/helpers/constants';
import { DEFAULT_SETTING_DOCTYPE } from '../../../lib/constants';
describe('Create Index', function () {
describe('Does not exist', function () {
let client;
let createSpy;
@@ -13,6 +15,14 @@ describe('Create Index', function () {
createSpy = sinon.spy(client.indices, 'create');
});
it('should return true', function () {
const indexName = 'test-index';
const result = createIndex(client, indexName);
return result
.then((exists) => expect(exists).to.be(true));
});
it('should create the index', function () {
const indexName = 'test-index';
const result = createIndex(client, indexName);
@@ -26,7 +36,7 @@ describe('Create Index', function () {
it('should create the type mappings', function () {
const indexName = 'test-index';
const docType = defaultSettings.DEFAULT_SETTING_DOCTYPE;
const docType = DEFAULT_SETTING_DOCTYPE;
const result = createIndex(client, indexName);
return result
@@ -55,4 +65,35 @@ describe('Create Index', function () {
expect(payload.body.mappings[docType]).to.have.property('properties');
});
});
});
describe('Does exist', function () {
let client;
let createSpy;
beforeEach(function () {
client = new elasticsearchMock.Client();
sinon.stub(client.indices, 'exists', () => Promise.resolve(true));
createSpy = sinon.spy(client.indices, 'create');
});
it('should return true', function () {
const indexName = 'test-index';
const result = createIndex(client, indexName);
return result
.then((exists) => expect(exists).to.be(true));
});
it('should not create the index', function () {
const indexName = 'test-index';
const result = createIndex(client, indexName);
return result
.then(function () {
sinon.assert.callCount(createSpy, 0);
});
});
});
});

View File

@@ -0,0 +1,51 @@
import expect from 'expect.js';
import { WorkerTimeoutError, UnspecifiedWorkerError } from '../../../lib/helpers/errors';
describe('custom errors', function () {
describe('WorkerTimeoutError', function () {
it('should be function', () => {
expect(WorkerTimeoutError).to.be.a('function');
});
it('should have a name', function () {
const err = new WorkerTimeoutError('timeout error');
expect(err).to.have.property('name', 'WorkerTimeoutError');
});
it('should take a jobId property', function () {
const err = new WorkerTimeoutError('timeout error', { jobId: 'il7hl34rqlo8ro' });
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
});
it('should take a timeout property', function () {
const err = new WorkerTimeoutError('timeout error', { timeout: 15000 });
expect(err).to.have.property('timeout', 15000);
});
it('should be stringifyable', function () {
const err = new WorkerTimeoutError('timeout error');
expect(`${err}`).to.equal('WorkerTimeoutError: timeout error');
});
});
describe('UnspecifiedWorkerError', function () {
it('should be function', () => {
expect(UnspecifiedWorkerError).to.be.a('function');
});
it('should have a name', function () {
const err = new UnspecifiedWorkerError('unspecified error');
expect(err).to.have.property('name', 'UnspecifiedWorkerError');
});
it('should take a jobId property', function () {
const err = new UnspecifiedWorkerError('unspecified error', { jobId: 'il7hl34rqlo8ro' });
expect(err).to.have.property('jobId', 'il7hl34rqlo8ro');
});
it('should be stringifyable', function () {
const err = new UnspecifiedWorkerError('unspecified error');
expect(`${err}`).to.equal('UnspecifiedWorkerError: unspecified error');
});
});
});

View File

@@ -1,22 +1,25 @@
import expect from 'expect.js';
import sinon from 'sinon';
import moment from 'moment';
import constants from '../../../lib/constants';
import indexTimestamp from '../../../lib/helpers/index_timestamp';
const anchor = '2016-04-02T01:02:03.456'; // saturday
describe('Index interval', function () {
describe('indexTimestamp construction', function () {
describe('Index timestamp interval', function () {
describe('construction', function () {
it('should throw given an invalid interval', function () {
const init = () => indexTimestamp('bananas');
expect(init).to.throwException(/invalid.+interval/i);
});
});
describe('indexTimestamp timestamps', function () {
describe('timestamps', function () {
let clock;
let separator;
beforeEach(function () {
separator = constants.DEFAULT_SETTING_DATE_SEPARATOR;
clock = sinon.useFakeTimers(moment(anchor).valueOf());
});
@@ -24,34 +27,61 @@ describe('Index interval', function () {
clock.restore();
});
describe('formats', function () {
it('should return the year', function () {
const timestamp = indexTimestamp('year');
expect(timestamp).to.equal('2016');
const str = `2016`;
expect(timestamp).to.equal(str);
});
it('should return the year and month', function () {
const timestamp = indexTimestamp('month');
expect(timestamp).to.equal('2016-04');
const str = `2016${separator}04`;
expect(timestamp).to.equal(str);
});
it('should return the year, month, and first day of the week', function () {
const timestamp = indexTimestamp('week');
expect(timestamp).to.equal('2016-03-27');
const str = `2016${separator}03${separator}27`;
expect(timestamp).to.equal(str);
});
it('should return the year, month, and day of the week', function () {
const timestamp = indexTimestamp('day');
expect(timestamp).to.equal('2016-04-02');
const str = `2016${separator}04${separator}02`;
expect(timestamp).to.equal(str);
});
it('should return the year, month, day and hour', function () {
const timestamp = indexTimestamp('hour');
expect(timestamp).to.equal('2016-04-02-01');
const str = `2016${separator}04${separator}02${separator}01`;
expect(timestamp).to.equal(str);
});
it('should return the year, month, day, hour and minute', function () {
const timestamp = indexTimestamp('minute');
expect(timestamp).to.equal('2016-04-02-01-02');
const str = `2016${separator}04${separator}02${separator}01${separator}02`;
expect(timestamp).to.equal(str);
});
});
describe('date separator', function () {
it('should be customizable', function () {
const separators = ['-', '.', '_'];
separators.forEach(customSep => {
const str = `2016${customSep}04${customSep}02${customSep}01${customSep}02`;
const timestamp = indexTimestamp('minute', customSep);
expect(timestamp).to.equal(str);
});
});
it('should throw if a letter is used', function () {
const separators = ['a', 'B', 'YYYY'];
separators.forEach(customSep => {
const fn = () => indexTimestamp('minute', customSep);
expect(fn).to.throwException();
});
});
});
});
});

View File

@@ -0,0 +1,48 @@
import expect from 'expect.js';
import isPlainObject from '../../../lib/helpers/is_plain_object';
function validateItems(checks, pass) {
checks.forEach(check => {
expect(isPlainObject(check)).to.be(pass);
});
}
describe('isPlainObject', function () {
describe('non-object primitives', function () {
it('return false', function () {
const checks = [
100,
true,
'i am a string',
function noop() {},
null,
];
validateItems(checks, false);
});
});
describe('arrays', function () {
it('return false', function () {
const checks = [
[],
[1,2,3],
['just a string'],
];
validateItems(checks, false);
});
});
describe('', function () {
it('return true', function () {
const checks = [
{},
{one:1},
{object:{with:{array:[]}}},
];
validateItems(checks, true);
});
});
});

37
test/src/helpers/omit.js Normal file
View File

@@ -0,0 +1,37 @@
import expect from 'expect.js';
import objectOmit from '../../../lib/helpers/object_omit';
describe('object omit', function () {
let obj = {};
beforeEach(() => {
obj = {
one: 1,
two: 2,
three: 3,
arr: [1,2,3],
check: 'aw yeah',
};
});
it('omits a single property', function () {
const val = objectOmit(obj, 'one');
expect(val).to.eql({
two: 2,
three: 3,
arr: [1,2,3],
check: 'aw yeah',
});
});
it('omits multiple properties', function () {
const val = objectOmit(obj, ['three', 'check']);
expect(val).to.eql({
one: 1,
two: 2,
arr: [1,2,3],
});
});
});

View File

@@ -1,9 +1,17 @@
import events from 'events';
import expect from 'expect.js';
import sinon from 'sinon';
import proxyquire from 'proxyquire';
import { noop, times } from 'lodash';
import constants from '../../lib/constants';
import elasticsearchMock from '../fixtures/elasticsearch';
import Esqueue from '../../lib/index';
import jobMock from '../fixtures/job';
import workerMock from '../fixtures/worker';
const Esqueue = proxyquire.noPreserveCache()('../../lib/index', {
'./job.js': jobMock,
'./worker.js': workerMock,
});
describe('Esqueue class', function () {
let client;
@@ -48,9 +56,71 @@ describe('Esqueue class', function () {
});
});
describe('Adding jobs', function () {
let indexName;
let jobType;
let payload;
let queue;
beforeEach(function () {
indexName = 'esqueue-index';
jobType = 'test-test';
payload = { payload: true };
queue = new Esqueue(indexName, { client });
});
it('should throw with invalid dateSeparator setting', function () {
queue = new Esqueue(indexName, { client, dateSeparator: 'a' });
const fn = () => queue.addJob(jobType, payload);
expect(fn).to.throwException();
});
it('should pass queue instance, index name, type and payload', function () {
const job = queue.addJob(jobType, payload);
expect(job.getProp('queue')).to.equal(queue);
expect(job.getProp('index')).to.match(new RegExp(indexName));
expect(job.getProp('jobType')).to.equal(jobType);
expect(job.getProp('payload')).to.equal(payload);
});
it('should pass default settings', function () {
const job = queue.addJob(jobType, payload);
const options = job.getProp('options');
expect(options).to.have.property('timeout', constants.DEFAULT_SETTING_TIMEOUT);
expect(options).to.have.property('doctype', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should pass queue index settings', function () {
const indexSettings = {
index: {
number_of_shards: 1
}
};
queue = new Esqueue(indexName, { client, indexSettings });
const job = queue.addJob(jobType, payload);
expect(job.getProp('options')).to.have.property('indexSettings', indexSettings);
});
it('should pass headers from options', function () {
const options = {
headers: {
authorization: 'Basic cXdlcnR5'
}
};
const job = queue.addJob(jobType, payload, options);
expect(job.getProp('options')).to.have.property('headers', options.headers);
});
});
describe('Registering workers', function () {
let queue;
beforeEach(function () {
queue = new Esqueue('esqueue', { client });
});
it('should keep track of workers', function () {
const queue = new Esqueue('esqueue', { client });
expect(queue.getWorkers()).to.eql([]);
expect(queue.getWorkers()).to.have.length(0);
@@ -59,6 +129,27 @@ describe('Esqueue class', function () {
queue.registerWorker('test2', noop);
expect(queue.getWorkers()).to.have.length(3);
});
it('should pass instance of queue, type, and worker function', function () {
const workerType = 'test-worker';
const workerFn = () => true;
const worker = queue.registerWorker(workerType, workerFn);
expect(worker.getProp('queue')).to.equal(queue);
expect(worker.getProp('type')).to.equal(workerType);
expect(worker.getProp('workerFn')).to.equal(workerFn);
});
it('should pass worker options', function () {
const workerOptions = {
size: 12,
doctype: 'tests'
};
queue = new Esqueue('esqueue', { client });
const worker = queue.registerWorker('type', noop, workerOptions);
expect(worker.getProp('options')).to.equal(workerOptions);
});
});
describe('Destroy', function () {

View File

@@ -2,10 +2,11 @@ import events from 'events';
import expect from 'expect.js';
import sinon from 'sinon';
import proxyquire from 'proxyquire';
import QueueMock from '../fixtures/queue';
import elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PENDING, DEFAULT_SETTING_DOCTYPE } from '../../lib/helpers/constants';
import contstants from '../../lib/constants';
const createIndexMock = sinon.stub().returns(Promise.resolve('mock'));
const createIndexMock = sinon.stub();
const module = proxyquire.noPreserveCache()('../../lib/job', {
'./helpers/create_index': createIndexMock
});
@@ -14,8 +15,16 @@ const Job = module;
const maxPriority = 20;
const minPriority = -20;
const defaultPriority = 10;
const defaultCreatedBy = false;
function validateDoc(spy) {
sinon.assert.callCount(spy, 1);
const spyCall = spy.getCall(0);
return spyCall.args[0];
}
describe('Job Class', function () {
let mockQueue;
let client;
let index;
@@ -25,142 +34,317 @@ describe('Job Class', function () {
beforeEach(function () {
createIndexMock.reset();
createIndexMock.returns(Promise.resolve('mock'));
index = 'test';
client = new elasticsearchMock.Client();
mockQueue = new QueueMock();
mockQueue.setClient(client);
});
it('should be an event emitter', function () {
const job = new Job(client, index, 'test', {});
const job = new Job(mockQueue, index, 'test', {});
expect(job).to.be.an(events.EventEmitter);
});
describe('invalid construction', function () {
it('should throw with a missing type', function () {
const init = () => new Job(client, index);
const init = () => new Job(mockQueue, index);
expect(init).to.throwException(/type.+string/i);
});
it('should throw with an invalid type', function () {
const init = () => new Job(client, index, { 'not a string': true });
const init = () => new Job(mockQueue, index, { 'not a string': true });
expect(init).to.throwException(/type.+string/i);
});
it('should throw with an invalid payload', function () {
const init = () => new Job(client, index, 'type1', [1, 2, 3]);
const init = () => new Job(mockQueue, index, 'type1', [1, 2, 3]);
expect(init).to.throwException(/plain.+object/i);
});
});
describe('construction', function () {
function validateDoc(spy) {
sinon.assert.callCount(spy, 1);
const spyCall = spy.getCall(0);
return spyCall.args[0];
}
beforeEach(function () {
type = 'type1';
payload = { id: '123' };
sinon.spy(client, 'index');
});
it('should create the target index', function () {
const job = new Job(mockQueue, index, type, payload, options);
return job.ready.then(() => {
sinon.assert.calledOnce(createIndexMock);
const args = createIndexMock.getCall(0).args;
expect(args[0]).to.equal(client);
expect(args[1]).to.equal(index);
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
});
});
it('should index the payload', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('payload', payload);
});
});
it('should index the job type', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs).to.have.property('index', index);
expect(indexArgs).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
expect(indexArgs).to.have.property('body');
expect(indexArgs.body).to.have.property('jobtype', type);
});
});
it('should set event creation time', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('created_at');
});
});
it('should refresh the index', function () {
const refreshSpy = sinon.spy(client.indices, 'refresh');
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
sinon.assert.calledOnce(refreshSpy);
const spyCall = refreshSpy.getCall(0);
expect(spyCall.args[0]).to.have.property('index', index);
});
});
it('should emit the job information on success', function (done) {
const job = new Job(mockQueue, index, type, payload);
job.once(contstants.EVENT_JOB_CREATED, (jobDoc) => {
try {
expect(jobDoc).to.have.property('id');
expect(jobDoc).to.have.property('index');
expect(jobDoc).to.have.property('type');
expect(jobDoc).to.have.property('version');
done();
} catch (e) {
done(e);
}
});
});
it('should emit error on index creation failure', function (done) {
const errMsg = 'test index creation failure';
createIndexMock.returns(Promise.reject(new Error(errMsg)));
const job = new Job(mockQueue, index, type, payload);
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
try {
expect(err.message).to.equal(errMsg);
done();
} catch (e) {
done(e);
}
});
});
it('should emit error on client index failure', function (done) {
const errMsg = 'test document index failure';
client.index.restore();
sinon.stub(client, 'index', () => Promise.reject(new Error(errMsg)));
const job = new Job(mockQueue, index, type, payload);
job.once(contstants.EVENT_JOB_CREATE_ERROR, (err) => {
try {
expect(err.message).to.equal(errMsg);
done();
} catch (e) {
done(e);
}
});
});
});
describe('event emitting', function () {
it('should trigger events on the queue instance', function (done) {
const eventName = 'test event';
const payload1 = {
test: true,
deep: { object: 'ok' }
};
const payload2 = 'two';
const payload3 = new Error('test error');
const job = new Job(mockQueue, index, type, payload, options);
mockQueue.on(eventName, (...args) => {
try {
expect(args[0]).to.equal(payload1);
expect(args[1]).to.equal(payload2);
expect(args[2]).to.equal(payload3);
done();
} catch (e) {
done(e);
}
});
job.emit(eventName, payload1, payload2, payload3);
});
});
describe('default values', function () {
beforeEach(function () {
type = 'type1';
payload = { id: '123' };
sinon.spy(client, 'index');
});
it('should set attempt count to 0', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('attempts', 0);
});
});
it('should index default created_by value', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('created_by', defaultCreatedBy);
});
});
it('should set an expired process_expiration time', function () {
const now = new Date().getTime();
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('process_expiration');
expect(indexArgs.body.process_expiration.getTime()).to.be.lessThan(now);
});
});
it('should set status as pending', function () {
const job = new Job(mockQueue, index, type, payload);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('status', contstants.JOB_STATUS_PENDING);
});
});
it('should have a default priority of 10', function () {
const job = new Job(mockQueue, index, type, payload, options);
return job.ready.then(() => {
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('priority', defaultPriority);
});
});
});
describe('option passing', function () {
beforeEach(function () {
type = 'type1';
payload = { id: '123' };
options = {
timeout: 4567,
max_attempts: 9,
headers: {
authorization: 'Basic cXdlcnR5'
}
};
sinon.spy(client, 'index');
});
it('should index the payload', function () {
const job = new Job(client, index, type, payload);
it('should index the created_by value', function () {
const createdBy = 'user_identifier';
const job = new Job(mockQueue, index, type, payload, Object.assign({ created_by: createdBy }, options));
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload);
});
});
it('should index the job type', function () {
const job = new Job(client, index, type, payload);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('jobtype', type);
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('created_by', createdBy);
});
});
it('should index timeout value from options', function () {
const job = new Job(client, index, type, payload, options);
const job = new Job(mockQueue, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('timeout', options.timeout);
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('timeout', options.timeout);
});
});
it('should set event times', function () {
const job = new Job(client, index, type, payload, options);
it('should set max attempt count', function () {
const job = new Job(mockQueue, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created_at');
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('max_attempts', options.max_attempts);
});
});
it('should set an expired process_expiration time', function () {
const now = new Date().getTime();
const job = new Job(client, index, type, payload, options);
it('should add headers to the request params', function () {
const job = new Job(mockQueue, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('process_expiration');
expect(newDoc.body.process_expiration.getTime()).to.be.lessThan(now);
});
});
it('should set attempt count', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('attempts', 0);
expect(newDoc.body).to.have.property('max_attempts', options.max_attempts);
});
});
it('should set status as pending', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING);
});
});
it('should create the target index', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
sinon.assert.calledOnce(createIndexMock);
});
});
it('should have a default priority of 10', function () {
const job = new Job(client, index, type, payload, options);
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('priority', defaultPriority);
const indexArgs = validateDoc(client.index);
expect(indexArgs).to.have.property('headers', options.headers);
});
});
it(`should use upper priority of ${maxPriority}`, function () {
const job = new Job(client, index, type, payload, { priority: maxPriority * 2 });
const job = new Job(mockQueue, index, type, payload, { priority: maxPriority * 2 });
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('priority', maxPriority);
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('priority', maxPriority);
});
});
it(`should use lower priority of ${minPriority}`, function () {
const job = new Job(client, index, type, payload, { priority: minPriority * 2 });
const job = new Job(mockQueue, index, type, payload, { priority: minPriority * 2 });
return job.ready.then(() => {
const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('priority', minPriority);
const indexArgs = validateDoc(client.index);
expect(indexArgs.body).to.have.property('priority', minPriority);
});
});
});
describe('custom client', function () {
let newClient;
let job;
beforeEach(function () {
sinon.spy(client, 'index');
newClient = new elasticsearchMock.Client();
sinon.spy(newClient, 'index');
job = new Job(mockQueue, index, type, payload, Object.assign({ client: newClient }, options));
});
it('should create the target index', function () {
return job.ready.then(() => {
sinon.assert.calledOnce(createIndexMock);
const args = createIndexMock.getCall(0).args;
expect(args[0]).to.equal(newClient);
expect(args[1]).to.equal(index);
expect(args[2]).to.equal(contstants.DEFAULT_SETTING_DOCTYPE);
});
});
it('should index the payload', function () {
return job.ready.then(() => {
sinon.assert.callCount(client.index, 0);
sinon.assert.callCount(newClient.index, 1);
const newDoc = newClient.index.getCall(0).args[0];
expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload);
});
});
});
@@ -172,7 +356,7 @@ describe('Job Class', function () {
});
it('should return the job document', function () {
const job = new Job(client, index, type, payload);
const job = new Job(mockQueue, index, type, payload);
return job.get()
.then((doc) => {
@@ -181,6 +365,7 @@ describe('Job Class', function () {
expect(doc).to.have.property('type', jobDoc.type);
expect(doc).to.have.property('id', jobDoc.id);
expect(doc).to.have.property('version', jobDoc.version);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('payload');
expect(doc).to.have.property('jobtype');
@@ -188,6 +373,24 @@ describe('Job Class', function () {
expect(doc).to.have.property('timeout');
});
});
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(mockQueue, index, type, payload, optionals);
return Promise.resolve(client.get({}, optionals))
.then((doc) => {
sinon.stub(client, 'get').returns(Promise.resolve(doc));
})
.then(() => {
return job.get()
.then((doc) => {
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});
});
describe('toJSON method', function () {
@@ -202,21 +405,32 @@ describe('Job Class', function () {
});
it('should return the static information about the job', function () {
const job = new Job(client, index, type, payload, options);
const job = new Job(mockQueue, index, type, payload, options);
// toJSON is sync, should work before doc is written to elasticsearch
expect(job.document).to.be(undefined);
const doc = job.toJSON();
expect(doc).to.have.property('index', index);
expect(doc).to.have.property('type', DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('type', contstants.DEFAULT_SETTING_DOCTYPE);
expect(doc).to.have.property('jobtype', type);
expect(doc).to.have.property('created_by', defaultCreatedBy);
expect(doc).to.have.property('timeout', options.timeout);
expect(doc).to.have.property('max_attempts', options.max_attempts);
expect(doc).to.have.property('priority', options.priority);
expect(doc).to.have.property('id');
expect(doc).to.not.have.property('version');
});
it('should contain optional data', function () {
const optionals = {
created_by: 'some_ident'
};
const job = new Job(mockQueue, index, type, payload, optionals);
const doc = job.toJSON();
expect(doc).to.have.property('created_by', optionals.created_by);
});
});
});

View File

@@ -2,9 +2,10 @@ import expect from 'expect.js';
import sinon from 'sinon';
import moment from 'moment';
import { noop, random, get, find } from 'lodash';
import Worker from '../../lib/worker';
import elasticsearchMock from '../fixtures/elasticsearch';
import constants from '../../lib/helpers/constants';
import QueueMock from '../fixtures/queue';
import Worker from '../../lib/worker';
import constants from '../../lib/constants';
const anchor = '2016-04-02T01:02:03.456'; // saturday
const defaults = {
@@ -23,9 +24,8 @@ describe('Worker class', function () {
beforeEach(function () {
client = new elasticsearchMock.Client();
mockQueue = {
client: client
};
mockQueue = new QueueMock();
mockQueue.setClient(client);
});
describe('invalid construction', function () {
@@ -51,6 +51,20 @@ describe('Worker class', function () {
});
describe('construction', function () {
it('should assign internal properties', function () {
const jobtype = 'testjob';
const workerFn = noop;
const worker = new Worker(mockQueue, jobtype, workerFn);
expect(worker).to.have.property('id');
expect(worker).to.have.property('queue', mockQueue);
expect(worker).to.have.property('client', client);
expect(worker).to.have.property('jobtype', jobtype);
expect(worker).to.have.property('workerFn', workerFn);
expect(worker).to.have.property('checkInterval');
expect(worker).to.have.property('checkSize');
expect(worker).to.have.property('doctype');
});
it('should have a unique ID', function () {
const worker = new Worker(mockQueue, 'test', noop);
expect(worker.id).to.be.a('string');
@@ -60,6 +74,45 @@ describe('Worker class', function () {
expect(worker.id).to.not.equal(worker2.id);
});
it('should use custom client', function () {
const newClient = new elasticsearchMock.Client();
const worker = new Worker(mockQueue, 'test', noop, { client: newClient });
expect(worker).to.have.property('queue', mockQueue);
expect(worker).to.have.property('client', newClient);
expect(worker.client).to.not.equal(client);
});
});
describe('event emitting', function () {
let worker;
beforeEach(function () {
worker = new Worker(mockQueue, 'test', noop);
});
it('should trigger events on the queue instance', function (done) {
const eventName = 'test event';
const payload1 = {
test: true,
deep: { object: 'ok' }
};
const payload2 = 'two';
const payload3 = new Error('test error');
mockQueue.on(eventName, (...args) => {
try {
expect(args[0]).to.equal(payload1);
expect(args[1]).to.equal(payload2);
expect(args[2]).to.equal(payload3);
done();
} catch (e) {
done(e);
}
});
worker.emit(eventName, payload1, payload2, payload3);
});
});
describe('output formatting', function () {
@@ -105,15 +158,9 @@ describe('Worker class', function () {
});
});
describe('searching for jobs', function () {
describe('polling for jobs', function () {
let searchSpy;
function getSearchParams(jobtype, params = {}) {
new Worker(mockQueue, jobtype, noop, params);
clock.tick(defaults.interval);
return searchSpy.firstCall.args[0];
}
beforeEach(() => {
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
@@ -124,7 +171,6 @@ describe('Worker class', function () {
clock.restore();
});
describe('polling interval', function () {
it('should start polling for jobs after interval', function () {
new Worker(mockQueue, 'test', noop);
sinon.assert.notCalled(searchSpy);
@@ -139,22 +185,98 @@ describe('Worker class', function () {
clock.tick(interval);
sinon.assert.calledOnce(searchSpy);
});
it('should not poll once destroyed', function () {
const worker = new Worker(mockQueue, 'test', noop);
// move the clock a couple times, test for searches each time
sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledOnce(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
// destroy the worker, move the clock, make sure another search doesn't happen
worker.destroy();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
// manually call job poller, move the clock, make sure another search doesn't happen
worker._startJobPolling();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
});
});
describe('query for pending jobs', function () {
let worker;
let searchStub;
function getSearchParams(jobtype = 'test', params = {}) {
worker = new Worker(mockQueue, jobtype, noop, params);
worker._getPendingJobs();
return searchStub.firstCall.args[0];
}
describe('error handling', function () {
beforeEach(() => {
});
it('should pass search errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject());
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => done(new Error('should not resolve')))
.catch(() => { done(); });
});
it('should swollow index missing errors', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then(() => { done(); })
.catch(() => done(new Error('should not reject')));
});
it('should return an empty array on missing index', function (done) {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.reject({
status: 404
}));
worker = new Worker(mockQueue, 'test', noop);
worker._getPendingJobs()
.then((res) => {
try {
expect(res).to.be.an(Array);
expect(res).to.have.length(0);
done();
} catch (e) {
done(e);
}
})
.catch(() => done(new Error('should not reject')));
});
});
describe('query parameters', function () {
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
});
it('should query with version', function () {
const params = getSearchParams('test');
const params = getSearchParams();
expect(params).to.have.property('version', true);
});
it('should query by default doctype', function () {
const params = getSearchParams('test');
const params = getSearchParams();
expect(params).to.have.property('type', constants.DEFAULT_SETTING_DOCTYPE);
});
it('should query by custom doctype', function () {
const doctype = 'custom_test';
const params = getSearchParams('test', { doctype });
const params = getSearchParams('type', { doctype });
expect(params).to.have.property('type', doctype);
});
});
@@ -163,11 +285,27 @@ describe('Worker class', function () {
const conditionPath = 'query.constant_score.filter.bool';
const jobtype = 'test_jobtype';
beforeEach(() => {
searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve());
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
});
afterEach(() => {
clock.restore();
});
it('should filter unwanted source data', function () {
const excludedFields = [ 'output.content' ];
const { body } = getSearchParams(jobtype);
expect(body).to.have.property('_source');
expect(body._source).to.eql({ excludes: excludedFields });
});
it('should search by job type', function () {
const { body } = getSearchParams(jobtype);
const conditions = get(body, conditionPath);
expect(conditions).to.have.property('must');
expect(conditions.must).to.eql({ term: { jobtype: jobtype } });
expect(conditions.filter).to.eql({ term: { jobtype: jobtype } });
});
it('should search for pending or expired jobs', function () {
@@ -178,7 +316,7 @@ describe('Worker class', function () {
// this works because we are stopping the clock, so all times match
const nowTime = moment().toISOString();
const pending = { term: { status: 'pending'} };
const expired = { bool: { must: [
const expired = { bool: { filter: [
{ term: { status: 'processing' } },
{ range: { process_expiration: { lte: nowTime } } }
] } };
@@ -201,7 +339,6 @@ describe('Worker class', function () {
expect(body).to.have.property('size', size);
});
});
});
describe('claiming a job', function () {
@@ -220,10 +357,13 @@ describe('Worker class', function () {
id: 12345,
version: 3
};
job = mockQueue.client.get(params);
return mockQueue.client.get(params)
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
afterEach(() => {
clock.restore();
@@ -283,10 +423,36 @@ describe('Worker class', function () {
expect(msg).to.equal(false);
});
it('should swallow version mismatch errors', function () {
it('should return true on version errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._claimJob(job);
return worker._claimJob(job)
.then((res) => expect(res).to.equal(true));
});
it('should return false on other errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
return worker._claimJob(job)
.then((res) => expect(res).to.equal(false));
});
it('should emit on other errors', function (done) {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
worker.on(constants.EVENT_WORKER_JOB_CLAIM_ERROR, function (err) {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
expect(err.error).to.have.property('statusCode', 401);
done();
} catch (e) {
done(e);
}
});
worker._claimJob(job);
});
});
@@ -299,16 +465,18 @@ describe('Worker class', function () {
anchorMoment = moment(anchor);
clock = sinon.useFakeTimers(anchorMoment.valueOf());
job = mockQueue.client.get();
return mockQueue.client.get()
.then((jobDoc) => {
job = jobDoc;
worker = new Worker(mockQueue, 'test', noop);
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
afterEach(() => {
clock.restore();
});
it('should use version on update', function () {
worker._failJob(job);
const query = updateSpy.firstCall.args[0];
@@ -332,13 +500,21 @@ describe('Worker class', function () {
expect(doc.output).to.have.property('content', msg);
});
it('should swallow version mismatch errors', function () {
it('should return true on version mismatch errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 409 }));
return worker._failJob(job);
return worker._failJob(job)
.then((res) => expect(res).to.equal(true));
});
it('should set completed time and status to failed', function () {
it('should return false on other docuemnt update errors', function () {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
return worker._failJob(job)
.then((res) => expect(res).to.equal(false));
});
it('should set completed time and status to failure', function () {
const startTime = moment().valueOf();
const msg = 'test message';
clock.tick(100);
@@ -351,6 +527,39 @@ describe('Worker class', function () {
const completedTimestamp = moment(doc.completed_at).valueOf();
expect(completedTimestamp).to.be.greaterThan(startTime);
});
it('should emit worker failure event', function (done) {
worker.on(constants.EVENT_WORKER_JOB_FAIL, (err) => {
try {
expect(err).to.have.property('output');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
done();
} catch (e) {
done(e);
}
});
return worker._failJob(job);
});
it('should emit on other docuemnt update errors', function (done) {
mockQueue.client.update.restore();
sinon.stub(mockQueue.client, 'update').returns(Promise.reject({ statusCode: 401 }));
worker.on(constants.EVENT_WORKER_FAIL_UPDATE_ERROR, function (err) {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
expect(err.error).to.have.property('statusCode', 401);
done();
} catch (e) {
done(e);
}
});
worker._failJob(job);
});
});
describe('performing a job', function () {
@@ -362,10 +571,15 @@ describe('Worker class', function () {
payload = {
value: random(0, 100, true)
};
job = mockQueue.client.get({}, { payload });
return mockQueue.client.get({}, { payload })
.then((jobDoc) => {
job = jobDoc;
updateSpy = sinon.spy(mockQueue.client, 'update');
});
});
describe('worker success', function () {
it('should call the workerFn with the payload', function (done) {
const workerFn = function (jobPayload) {
expect(jobPayload).to.eql(payload);
@@ -418,6 +632,33 @@ describe('Worker class', function () {
});
});
it('should emit completion event', function (done) {
const worker = new Worker(mockQueue, 'test', noop);
worker.once(constants.EVENT_WORKER_COMPLETE, (workerJob) => {
try {
expect(workerJob).to.not.have.property('_source');
expect(workerJob).to.have.property('job');
expect(workerJob.job).to.have.property('id');
expect(workerJob.job).to.have.property('index');
expect(workerJob.job).to.have.property('type');
expect(workerJob).to.have.property('output');
expect(workerJob.output).to.have.property('content');
expect(workerJob.output).to.have.property('content_type');
done();
} catch (e) {
done(e);
}
});
worker._performJob(job);
});
});
describe('worker failure', function () {
it('should append error output to job', function () {
const workerFn = function () {
throw new Error('test error');
@@ -431,40 +672,204 @@ describe('Worker class', function () {
sinon.assert.calledWith(failStub, job, 'Error: test error');
});
});
it('should handle async errors', function () {
const workerFn = function () {
return new Promise((resolve, reject) => {
reject(new Error('test error'));
});
};
const worker = new Worker(mockQueue, 'test', workerFn);
const failStub = sinon.stub(worker, '_failJob');
return worker._performJob(job)
.then(() => {
sinon.assert.calledOnce(failStub);
sinon.assert.calledWith(failStub, job, 'Error: test error');
});
});
describe('job timeouts', function () {
let job;
let failStub;
it('should handle rejecting with strings', function () {
const errorMessage = 'this is a string error';
const workerFn = function () {
return new Promise((resolve, reject) => {
reject(errorMessage);
});
};
const worker = new Worker(mockQueue, 'test', workerFn);
const failStub = sinon.stub(worker, '_failJob');
return worker._performJob(job)
.then(() => {
sinon.assert.calledOnce(failStub);
sinon.assert.calledWith(failStub, job, errorMessage);
});
});
it('should handle empty rejection', function (done) {
const workerFn = function () {
return new Promise((resolve, reject) => {
reject();
});
};
const worker = new Worker(mockQueue, 'test', workerFn);
worker.once(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
expect(err.error).to.have.property('name', 'UnspecifiedWorkerError');
done();
} catch (e) {
done(e);
}
});
worker._performJob(job);
});
});
});
describe('job failures', function () {
function getFailStub(worker) {
return sinon.stub(worker, '_failJob').returns(Promise.resolve());
}
describe('timeout', function () {
let worker;
const timeout = 20;
const timeoutPadding = 10;
let failStub;
let job;
beforeEach(function () {
const timeout = 20;
const workerFn = function () {
return new Promise(function (resolve) {
setTimeout(() => {
resolve();
}, timeout + timeoutPadding);
}, timeout * 2);
});
};
worker = new Worker(mockQueue, 'test', workerFn);
failStub = getFailStub(worker);
job = {
_id: 'testJob1',
_id: 'testTimeoutJob',
_source: {
timeout: timeout,
payload: 'test'
}
};
failStub = sinon.stub(worker, '_failJob').returns(Promise.resolve());
});
it('should fail if not complete within allotted time', function () {
it('should not fail job', function () {
// fire of the job worker
return worker._performJob(job)
.then(() => {
sinon.assert.notCalled(failStub);
});
});
it('should emit timeout if not completed in time', function (done) {
worker.once(constants.EVENT_WORKER_JOB_TIMEOUT, (err) => {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
expect(err.error).to.have.property('name', 'WorkerTimeoutError');
done();
} catch (e) {
done(e);
}
});
// fire of the job worker
worker._performJob(job);
});
});
describe('worker failure', function () {
let worker;
let failStub;
const timeout = 20;
const job = {
_id: 'testTimeoutJob',
_source: {
timeout: timeout,
payload: 'test'
}
};
describe('reject', function () {
beforeEach(function () {
const workerFn = function () {
return new Promise(function (resolve, reject) {
setTimeout(() => {
reject();
}, timeout / 2);
});
};
worker = new Worker(mockQueue, 'test', workerFn);
failStub = getFailStub(worker);
});
it('should fail the job', function () {
return worker._performJob(job)
.then(() => {
sinon.assert.calledOnce(failStub);
});
});
it('should emit worker execution error', function (done) {
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
done();
} catch (e) {
done(e);
}
});
// fire of the job worker
worker._performJob(job);
});
});
describe('throw', function () {
beforeEach(function () {
const workerFn = function () {
throw new Error('test throw');
};
worker = new Worker(mockQueue, 'test', workerFn);
failStub = getFailStub(worker);
});
it('should fail the job', function () {
return worker._performJob(job)
.then(() => {
sinon.assert.calledOnce(failStub);
});
});
it('should emit worker execution error', function (done) {
worker.on(constants.EVENT_WORKER_JOB_EXECUTION_ERROR, (err) => {
try {
expect(err).to.have.property('error');
expect(err).to.have.property('job');
expect(err).to.have.property('worker');
done();
} catch (e) {
done(e);
}
});
// fire of the job worker
worker._performJob(job);
});
});
});
});
});