9 Commits

Author SHA1 Message Date
4793027ff3 0.2.2 2016-05-10 17:31:09 -07:00
38532a6296 fix scoping issue, add debugging on worker register 2016-05-10 17:28:27 -07:00
aa5ea72e3b swollow errors saving job output, include error in debugging output 2016-05-10 17:24:05 -07:00
e077442340 add debugging on job timeout 2016-05-10 16:57:40 -07:00
82506a74e8 set process_expiration by default
without this, the job query fails with field='process_expiration' is unrecognized
2016-05-10 16:01:24 -07:00
cae02cb0f8 add description and keywords 2016-05-10 14:14:19 -07:00
11de18f4da 0.2.1 2016-05-10 14:06:37 -07:00
f5bf40cf71 prefix all the debugger namespaces 2016-05-05 11:41:57 -07:00
1f053cbb6b stop bothering me travis 2016-05-05 09:46:59 -07:00
5 changed files with 21 additions and 9 deletions

View File

@@ -2,4 +2,6 @@ language: node_js
node_js:
- "4"
- "4.3"
after_success: npm run coverage
notifications:
email: false
after_success: npm run coverage

View File

@@ -1,7 +1,7 @@
{
"name": "esqueue",
"version": "0.2.0",
"description": "",
"version": "0.2.2",
"description": "Job queue, powered by Elasticsearch",
"main": "lib/index.js",
"scripts": {
"build": "rm -rf lib && babel src --out-dir lib",
@@ -11,6 +11,12 @@
"unit": "nyc --require babel-core/register mocha test/src/**"
},
"author": "Joe Fleming (https://github.com/w33ble)",
"keywords": [
"job",
"queue",
"worker",
"elasticsearch"
],
"repository": {
"type": "git",
"url": "https://github.com/w33ble/esqueue.git"

View File

@@ -6,7 +6,7 @@ import Job from './job.js';
import Worker from './worker.js';
import omit from 'lodash.omit';
const debug = logger('queue');
const debug = logger('esqueue:queue');
export default class Esqueue extends events.EventEmitter {
constructor(index, options = {}) {

View File

@@ -5,7 +5,7 @@ import logger from './helpers/logger';
import { jobStatuses } from './helpers/constants';
import createIndex from './helpers/create_index';
const debug = logger('job');
const debug = logger('esqueue:job');
const puid = new Puid();
export default class Job extends events.EventEmitter {
@@ -36,6 +36,7 @@ export default class Job extends events.EventEmitter {
payload: this.payload,
priority: this.priority,
timeout: this.timeout,
process_expiration: new Date(0), // use epoch so the job query works
created_at: new Date(),
attempts: 0,
max_attempts: this.maxAttempts,

View File

@@ -6,7 +6,7 @@ import { jobStatuses } from './helpers/constants';
import { WorkerTimeoutError } from './helpers/errors';
const puid = new Puid();
const debug = logger('worker');
const debug = logger('esqueue:worker');
export default class Job extends events.EventEmitter {
constructor(queue, type, workerFn, opts = {}) {
@@ -26,6 +26,7 @@ export default class Job extends events.EventEmitter {
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this._checker = false;
this.debug(`Created worker for type ${this.type}`);
this._startJobPolling();
}
@@ -118,7 +119,8 @@ export default class Job extends events.EventEmitter {
const workerOutput = new Promise((resolve, reject) => {
resolve(this.workerFn.call(null, job._source.payload));
setTimeout(function () {
setTimeout(() => {
this.debug(`Timeout processing job ${job._id}`);
reject(new WorkerTimeoutError({
timeout: job._source.timeout,
jobId: job._id,
@@ -148,7 +150,8 @@ export default class Job extends events.EventEmitter {
})
.catch((err) => {
if (err.statusCode === 409) return false;
throw err;
this.debug(`Failure saving job output ${job._id}`, err);
this.emit('job_error', err);
});
}, (jobErr) => {
// job execution failed
@@ -158,7 +161,7 @@ export default class Job extends events.EventEmitter {
return;
}
this.debug(`Failure occurred on job ${job._id}`);
this.debug(`Failure occurred on job ${job._id}`, jobErr);
this.emit('job_error', jobErr);
return this._failJob(job, jobErr.toString());
});