From 73c7147c26454c8c1b4585e3137fe0c1847f751d Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 13:47:30 -0700 Subject: [PATCH 1/9] bump node version requirement --- .node-version | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.node-version b/.node-version index aa3d773..8f0591a 100644 --- a/.node-version +++ b/.node-version @@ -1 +1 @@ -4.x +6.x From eb17575a9667ef2917e98aa166b09a1252f6b716 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 14:04:05 -0700 Subject: [PATCH 2/9] update dependencies, remove npmrc --- .npmrc | 1 - package.json | 13 +++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) delete mode 100644 .npmrc diff --git a/.npmrc b/.npmrc deleted file mode 100644 index 3f7b4cf..0000000 --- a/.npmrc +++ /dev/null @@ -1 +0,0 @@ -save-prefix='~' diff --git a/package.json b/package.json index a8148fe..952703e 100644 --- a/package.json +++ b/package.json @@ -29,20 +29,21 @@ "@elastic/eslint-config-kibana": "^0.3.0", "babel-cli": "^6.23.0", "babel-core": "^6.23.1", - "babel-eslint": "^7.1.1", + "babel-eslint": "6.1.2", "babel-plugin-add-module-exports": "^0.2.1", "babel-preset-es2015": "^6.22.0", - "elasticsearch": "^12.0.0", - "eslint": "^3.16.1", - "eslint-plugin-mocha": "^4.8.0", - "eslint-plugin-react": "^6.10.0", + "elasticsearch": "^13.0.1", + "eslint": "3.11.1", + "eslint-plugin-babel": "4.0.0", + "eslint-plugin-mocha": "4.7.0", + "eslint-plugin-react": "^7.0.1", "expect.js": "~0.3.1", "lodash": "^4.17.4", "mocha": "^3.2.0", "nyc": "^10.1.2", "proxyquire": "^1.7.4", "retire": "^1.2.12", - "sinon": "^1.17.3" + "sinon": "^2.3.1" }, "peerDependencies": { "elasticsearch": ">=11.0.1" From 01adab41745a06198b5b82c74d18907800acf3c6 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 18:26:24 -0700 Subject: [PATCH 3/9] move poller state into this._poller --- src/worker.js | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/worker.js b/src/worker.js index e4cb865..d37d4b1 100644 --- a/src/worker.js +++ b/src/worker.js @@ -34,14 +34,17 @@ export default class Worker extends events.EventEmitter { this.debug = (...msg) => debug(...msg, `id: ${this.id}`); - this._checker = false; - this._running = true; + this._poller = { + timer: false, + enabled: true, + }; + this.debug(`Created worker for job type ${this.jobtype}`); this._startJobPolling(); } destroy() { - this._running = false; + this._poller.enabled = false; this._stopJobPolling(); } @@ -247,18 +250,17 @@ export default class Worker extends events.EventEmitter { } _startJobPolling() { - if (!this._running) { + if (!this._poller.enabled) { return; } - this._checker = setInterval(() => { - this._getPendingJobs() - .then((jobs) => this._claimPendingJobs(jobs)); + this._poller.timer = setInterval(() => { + this._getPendingJobs().then((jobs) => this._claimPendingJobs(jobs)); } , this.checkInterval); } _stopJobPolling() { - clearInterval(this._checker); + clearInterval(this._poller.timer); } _claimPendingJobs(jobs) { From 128905cdb4f57cb8947983e789d955d9bba4cd30 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:10:38 -0700 Subject: [PATCH 4/9] track poller running state use setTimeout, and wait for the search and job processing to complete before polling for jobs again --- src/worker.js | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/src/worker.js b/src/worker.js index d37d4b1..c314f2a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -37,6 +37,7 @@ export default class Worker extends events.EventEmitter { this._poller = { timer: false, enabled: true, + running: false, }; this.debug(`Created worker for job type ${this.jobtype}`); @@ -250,23 +251,33 @@ export default class Worker extends events.EventEmitter { } _startJobPolling() { - if (!this._poller.enabled) { + if (!this._poller.enabled || this._poller.running) { return; } - this._poller.timer = setInterval(() => { - this._getPendingJobs().then((jobs) => this._claimPendingJobs(jobs)); + this._poller.timer = setTimeout(() => { + this._getPendingJobs().then((jobs) => { + if (!this._poller.running) return; + + const foundJobs = (!jobs || jobs.length === 0); + const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs); + + task.then(() => { + this._poller.running = false; + this._startJobPolling(); + }); + }); } , this.checkInterval); + + this._poller.running = true; } _stopJobPolling() { - clearInterval(this._poller.timer); + this._poller.running = false; + clearTimeout(this._poller.timer); } - _claimPendingJobs(jobs) { - if (!jobs || jobs.length === 0) return; - - this._stopJobPolling(); + _claimPendingJobs(jobs = []) { let claimed = false; // claim a single job, stopping after first successful claim @@ -292,10 +303,8 @@ export default class Worker extends events.EventEmitter { this.debug(`Claimed job ${claimedJob._id}`); return this._performJob(claimedJob); }) - .then(() => this._startJobPolling()) .catch((err) => { this.debug('Error claiming jobs', err); - this._startJobPolling(); }); } From 6bdf7163b697621bf0f49848334568fc8a6027c4 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:11:07 -0700 Subject: [PATCH 5/9] add new events one for when the search is completed, and one for when the job poller is ready --- src/constants/events.js | 2 ++ src/worker.js | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/src/constants/events.js b/src/constants/events.js index 7e0fc85..29a3e71 100644 --- a/src/constants/events.js +++ b/src/constants/events.js @@ -5,6 +5,8 @@ export default { EVENT_JOB_CREATE_ERROR: 'job:creation error', EVENT_WORKER_COMPLETE: 'worker:job complete', EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error', + EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready', + EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned', EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error', EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error', EVENT_WORKER_JOB_FAIL: 'worker:job failed', diff --git a/src/worker.js b/src/worker.js index c314f2a..7be854a 100644 --- a/src/worker.js +++ b/src/worker.js @@ -270,6 +270,7 @@ export default class Worker extends events.EventEmitter { } , this.checkInterval); this._poller.running = true; + this.emit(constants.EVENT_WORKER_JOB_POLLING_READY); } _stopJobPolling() { @@ -349,7 +350,10 @@ export default class Worker extends events.EventEmitter { }) .then((results) => { const jobs = results.hits.hits; + this.debug(`${jobs.length} outstanding jobs returned`); + this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs); + return jobs; }) .catch((err) => { From 0ce26f6d9d3adb7be2cd84e75fa5d26288c23661 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:11:21 -0700 Subject: [PATCH 6/9] fix destroy test --- test/src/worker.js | 63 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 51 insertions(+), 12 deletions(-) diff --git a/test/src/worker.js b/test/src/worker.js index 90a5db8..616766a 100644 --- a/test/src/worker.js +++ b/test/src/worker.js @@ -187,24 +187,63 @@ describe('Worker class', function () { }); it('should not poll once destroyed', function () { + // remove the search spy + mockQueue.client.search.restore(); + + // mock the search, return 0 new jobs + const zeroHits = { hits: { hits: [] } }; + const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits)); + const worker = new Worker(mockQueue, 'test', noop); + function waitForSearch() { + return new Promise((resolve) => { + worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => { + resolve() + }); + }); + } + + function waitForPoller() { + return new Promise((resolve) => { + worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => { + resolve() + }); + }); + } + // move the clock a couple times, test for searches each time - sinon.assert.notCalled(searchSpy); - clock.tick(defaults.interval); - sinon.assert.calledOnce(searchSpy); - clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); + sinon.assert.notCalled(searchStub); - // destroy the worker, move the clock, make sure another search doesn't happen - worker.destroy(); + const firstWait = waitForSearch(); clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); - // manually call job poller, move the clock, make sure another search doesn't happen - worker._startJobPolling(); - clock.tick(defaults.interval); - sinon.assert.calledTwice(searchSpy); + return firstWait + .then(() => { + sinon.assert.calledOnce(searchStub); + return waitForPoller(); + }) + .then(() => { + const secondWait = waitForSearch(); + clock.tick(defaults.interval); + return secondWait; + }) + .then(() => { + sinon.assert.calledTwice(searchStub); + return waitForPoller(); + }) + .then(() => { + // destroy the worker, move the clock, make sure another search doesn't happen + worker.destroy(); + + clock.tick(defaults.interval); + sinon.assert.calledTwice(searchStub); + + // manually call job poller, move the clock, make sure another search doesn't happen + worker._startJobPolling(); + clock.tick(defaults.interval); + sinon.assert.calledTwice(searchStub); + }); }); }); From 4813c5bfa37b3b4fec1dbaf1ad39d944c53b6966 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:12:14 -0700 Subject: [PATCH 7/9] update the changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 66957bd..3469b5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,12 @@ Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions. +## v3.0.0 + +- support for node v4 or earlier is no longer tested +- update several dependencies +- fix issue where job poller would not wait for ES response + ## v2.0.2 - Fix issue where creating a worker would not use the queue's doctype by default From bb898e26c267eea552a7fb6f56aadf4cfab6880e Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:16:05 -0700 Subject: [PATCH 8/9] add 20x delay when search results in an error --- src/worker.js | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/worker.js b/src/worker.js index 7be854a..4cee588 100644 --- a/src/worker.js +++ b/src/worker.js @@ -256,7 +256,8 @@ export default class Worker extends events.EventEmitter { } this._poller.timer = setTimeout(() => { - this._getPendingJobs().then((jobs) => { + this._getPendingJobs() + .then((jobs) => { if (!this._poller.running) return; const foundJobs = (!jobs || jobs.length === 0); @@ -266,6 +267,16 @@ export default class Worker extends events.EventEmitter { this._poller.running = false; this._startJobPolling(); }); + }, () => { + // if the search failed for some reason, back off the polling + // we assume errors came from a busy cluster + // TODO: check what error actually happened + const multiplier = 20; + + setTimeout(() => { + this._poller.running = false; + this._startJobPolling(); + }, this.checkInterval * multiplier); }); } , this.checkInterval); From 6620a5de39a1aa038640bd7b34690cebde9420ed Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Tue, 23 May 2017 20:16:56 -0700 Subject: [PATCH 9/9] update changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3469b5d..8da527d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes wh - support for node v4 or earlier is no longer tested - update several dependencies - fix issue where job poller would not wait for ES response +- when job polling search fails, wait for a 20x interval before searching again ## v2.0.2