diff --git a/.node-version b/.node-version index aa3d773..8f0591a 100644 --- a/.node-version +++ b/.node-version @@ -1 +1 @@ -4.x +6.x diff --git a/.npmrc b/.npmrc deleted file mode 100644 index 3f7b4cf..0000000 --- a/.npmrc +++ /dev/null @@ -1 +0,0 @@ -save-prefix='~' diff --git a/CHANGELOG.md b/CHANGELOG.md index 66957bd..8da527d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions. +## v3.0.0 + +- support for node v4 or earlier is no longer tested +- update several dependencies +- fix issue where job poller would not wait for ES response +- when job polling search fails, wait for a 20x interval before searching again + ## v2.0.2 - Fix issue where creating a worker would not use the queue's doctype by default diff --git a/package.json b/package.json index a8148fe..952703e 100644 --- a/package.json +++ b/package.json @@ -29,20 +29,21 @@ "@elastic/eslint-config-kibana": "^0.3.0", "babel-cli": "^6.23.0", "babel-core": "^6.23.1", - "babel-eslint": "^7.1.1", + "babel-eslint": "6.1.2", "babel-plugin-add-module-exports": "^0.2.1", "babel-preset-es2015": "^6.22.0", - "elasticsearch": "^12.0.0", - "eslint": "^3.16.1", - "eslint-plugin-mocha": "^4.8.0", - "eslint-plugin-react": "^6.10.0", + "elasticsearch": "^13.0.1", + "eslint": "3.11.1", + "eslint-plugin-babel": "4.0.0", + "eslint-plugin-mocha": "4.7.0", + "eslint-plugin-react": "^7.0.1", "expect.js": "~0.3.1", "lodash": "^4.17.4", "mocha": "^3.2.0", "nyc": "^10.1.2", "proxyquire": "^1.7.4", "retire": "^1.2.12", - "sinon": "^1.17.3" + "sinon": "^2.3.1" }, "peerDependencies": { "elasticsearch": ">=11.0.1" diff --git a/src/constants/events.js b/src/constants/events.js index 7e0fc85..29a3e71 100644 --- a/src/constants/events.js +++ b/src/constants/events.js @@ -5,6 +5,8 @@ export default { EVENT_JOB_CREATE_ERROR: 'job:creation error', EVENT_WORKER_COMPLETE: 'worker:job complete', EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error', + EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready', + EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned', EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error', EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error', EVENT_WORKER_JOB_FAIL: 'worker:job failed', diff --git a/src/worker.js b/src/worker.js index e4cb865..4cee588 100644 --- a/src/worker.js +++ b/src/worker.js @@ -34,14 +34,18 @@ export default class Worker extends events.EventEmitter { this.debug = (...msg) => debug(...msg, `id: ${this.id}`); - this._checker = false; - this._running = true; + this._poller = { + timer: false, + enabled: true, + running: false, + }; + this.debug(`Created worker for job type ${this.jobtype}`); this._startJobPolling(); } destroy() { - this._running = false; + this._poller.enabled = false; this._stopJobPolling(); } @@ -247,24 +251,45 @@ export default class Worker extends events.EventEmitter { } _startJobPolling() { - if (!this._running) { + if (!this._poller.enabled || this._poller.running) { return; } - this._checker = setInterval(() => { + this._poller.timer = setTimeout(() => { this._getPendingJobs() - .then((jobs) => this._claimPendingJobs(jobs)); + .then((jobs) => { + if (!this._poller.running) return; + + const foundJobs = (!jobs || jobs.length === 0); + const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs); + + task.then(() => { + this._poller.running = false; + this._startJobPolling(); + }); + }, () => { + // if the search failed for some reason, back off the polling + // we assume errors came from a busy cluster + // TODO: check what error actually happened + const multiplier = 20; + + setTimeout(() => { + this._poller.running = false; + this._startJobPolling(); + }, this.checkInterval * multiplier); + }); } , this.checkInterval); + + this._poller.running = true; + this.emit(constants.EVENT_WORKER_JOB_POLLING_READY); } _stopJobPolling() { - clearInterval(this._checker); + this._poller.running = false; + clearTimeout(this._poller.timer); } - _claimPendingJobs(jobs) { - if (!jobs || jobs.length === 0) return; - - this._stopJobPolling(); + _claimPendingJobs(jobs = []) { let claimed = false; // claim a single job, stopping after first successful claim @@ -290,10 +315,8 @@ export default class Worker extends events.EventEmitter { this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) - .then(() => this._startJobPolling()) .catch((err) => { this.debug('Error claiming jobs', err); - this._startJobPolling(); }); } @@ -338,7 +361,10 @@ export default class Worker extends events.EventEmitter { }) .then((results) => { const jobs = results.hits.hits; + this.debug(`${jobs.length} outstanding jobs returned`); + this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs); + return jobs; }) .catch((err) => { diff --git a/test/src/worker.js b/test/src/worker.js index 90a5db8..616766a 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -187,24 +187,63 @@ describe('Worker class', function () { }); it('should not poll once destroyed', function () { + // remove the search spy + mockQueue.client.search.restore(); + + // mock the search, return 0 new jobs + const zeroHits = { hits: { hits: [] } }; + const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits)); + const worker = new Worker(mockQueue, 'test', noop); + function waitForSearch() { + return new Promise((resolve) => { + worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => { + resolve() + }); + }); + } + + function waitForPoller() { + return new Promise((resolve) => { + worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => { + resolve() + }); + }); + } + // move the clock a couple times, test for searches each time - sinon.assert.notCalled(searchSpy); - clock.tick(defaults.interval); - sinon.assert.calledOnce(searchSpy); - clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); + sinon.assert.notCalled(searchStub); - // destroy the worker, move the clock, make sure another search doesn't happen - worker.destroy(); + const firstWait = waitForSearch(); clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); - // manually call job poller, move the clock, make sure another search doesn't happen - worker._startJobPolling(); - clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); + return firstWait + .then(() => { + sinon.assert.calledOnce(searchStub); + return waitForPoller(); + }) + .then(() => { + const secondWait = waitForSearch(); + clock.tick(defaults.interval); + return secondWait; + }) + .then(() => { + sinon.assert.calledTwice(searchStub); + return waitForPoller(); + }) + .then(() => { + // destroy the worker, move the clock, make sure another search doesn't happen + worker.destroy(); + + clock.tick(defaults.interval); + sinon.assert.calledTwice(searchStub); + + // manually call job poller, move the clock, make sure another search doesn't happen + worker._startJobPolling(); + clock.tick(defaults.interval); + sinon.assert.calledTwice(searchStub); + }); }); });