Merge pull request #1 from w33ble/develop

Fix issue with slow responses from ES
This commit is contained in:
Joe Fleming
2017-08-08 11:02:43 -07:00
committed by GitHub
7 changed files with 107 additions and 33 deletions

View File

@@ -1 +1 @@
4.x
6.x

1
.npmrc
View File

@@ -1 +0,0 @@
save-prefix='~'

View File

@@ -2,6 +2,13 @@
Notable changes to the esqueue project. Pay attention to `[BREAKING]` changes when upgrading in pre-1.0 versions. As of 1.0, breaking changes will only happen in major versions.
## v3.0.0
- support for node v4 or earlier is no longer tested
- update several dependencies
- fix issue where job poller would not wait for ES response
- when job polling search fails, wait for a 20x interval before searching again
## v2.0.2
- Fix issue where creating a worker would not use the queue's doctype by default

View File

@@ -29,20 +29,21 @@
"@elastic/eslint-config-kibana": "^0.3.0",
"babel-cli": "^6.23.0",
"babel-core": "^6.23.1",
"babel-eslint": "^7.1.1",
"babel-eslint": "6.1.2",
"babel-plugin-add-module-exports": "^0.2.1",
"babel-preset-es2015": "^6.22.0",
"elasticsearch": "^12.0.0",
"eslint": "^3.16.1",
"eslint-plugin-mocha": "^4.8.0",
"eslint-plugin-react": "^6.10.0",
"elasticsearch": "^13.0.1",
"eslint": "3.11.1",
"eslint-plugin-babel": "4.0.0",
"eslint-plugin-mocha": "4.7.0",
"eslint-plugin-react": "^7.0.1",
"expect.js": "~0.3.1",
"lodash": "^4.17.4",
"mocha": "^3.2.0",
"nyc": "^10.1.2",
"proxyquire": "^1.7.4",
"retire": "^1.2.12",
"sinon": "^1.17.3"
"sinon": "^2.3.1"
},
"peerDependencies": {
"elasticsearch": ">=11.0.1"

View File

@@ -5,6 +5,8 @@ export default {
EVENT_JOB_CREATE_ERROR: 'job:creation error',
EVENT_WORKER_COMPLETE: 'worker:job complete',
EVENT_WORKER_JOB_CLAIM_ERROR: 'worker:claim job error',
EVENT_WORKER_JOB_POLLING_READY: 'worker:job poller ready',
EVENT_WORKER_JOB_SEARCH_COMPLETE: 'worker:pending jobs returned',
EVENT_WORKER_JOB_SEARCH_ERROR: 'worker:pending jobs error',
EVENT_WORKER_JOB_UPDATE_ERROR: 'worker:update job error',
EVENT_WORKER_JOB_FAIL: 'worker:job failed',

View File

@@ -34,14 +34,18 @@ export default class Worker extends events.EventEmitter {
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
this._checker = false;
this._running = true;
this._poller = {
timer: false,
enabled: true,
running: false,
};
this.debug(`Created worker for job type ${this.jobtype}`);
this._startJobPolling();
}
destroy() {
this._running = false;
this._poller.enabled = false;
this._stopJobPolling();
}
@@ -247,24 +251,45 @@ export default class Worker extends events.EventEmitter {
}
_startJobPolling() {
if (!this._running) {
if (!this._poller.enabled || this._poller.running) {
return;
}
this._checker = setInterval(() => {
this._poller.timer = setTimeout(() => {
this._getPendingJobs()
.then((jobs) => this._claimPendingJobs(jobs));
.then((jobs) => {
if (!this._poller.running) return;
const foundJobs = (!jobs || jobs.length === 0);
const task = foundJobs ? Promise.resolve() : this._claimPendingJobs(jobs);
task.then(() => {
this._poller.running = false;
this._startJobPolling();
});
}, () => {
// if the search failed for some reason, back off the polling
// we assume errors came from a busy cluster
// TODO: check what error actually happened
const multiplier = 20;
setTimeout(() => {
this._poller.running = false;
this._startJobPolling();
}, this.checkInterval * multiplier);
});
} , this.checkInterval);
this._poller.running = true;
this.emit(constants.EVENT_WORKER_JOB_POLLING_READY);
}
_stopJobPolling() {
clearInterval(this._checker);
this._poller.running = false;
clearTimeout(this._poller.timer);
}
_claimPendingJobs(jobs) {
if (!jobs || jobs.length === 0) return;
this._stopJobPolling();
_claimPendingJobs(jobs = []) {
let claimed = false;
// claim a single job, stopping after first successful claim
@@ -290,10 +315,8 @@ export default class Worker extends events.EventEmitter {
this.debug(`Claimed job ${claimedJob._id}`);
return this._performJob(claimedJob);
})
.then(() => this._startJobPolling())
.catch((err) => {
this.debug('Error claiming jobs', err);
this._startJobPolling();
});
}
@@ -338,7 +361,10 @@ export default class Worker extends events.EventEmitter {
})
.then((results) => {
const jobs = results.hits.hits;
this.debug(`${jobs.length} outstanding jobs returned`);
this.emit(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, jobs);
return jobs;
})
.catch((err) => {

View File

@@ -187,24 +187,63 @@ describe('Worker class', function () {
});
it('should not poll once destroyed', function () {
// remove the search spy
mockQueue.client.search.restore();
// mock the search, return 0 new jobs
const zeroHits = { hits: { hits: [] } };
const searchStub = sinon.stub(mockQueue.client, 'search', () => Promise.resolve(zeroHits));
const worker = new Worker(mockQueue, 'test', noop);
// move the clock a couple times, test for searches each time
sinon.assert.notCalled(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledOnce(searchSpy);
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
function waitForSearch() {
return new Promise((resolve) => {
worker.once(constants.EVENT_WORKER_JOB_SEARCH_COMPLETE, () => {
resolve()
});
});
}
function waitForPoller() {
return new Promise((resolve) => {
worker.once(constants.EVENT_WORKER_JOB_POLLING_READY, () => {
resolve()
});
});
}
// move the clock a couple times, test for searches each time
sinon.assert.notCalled(searchStub);
const firstWait = waitForSearch();
clock.tick(defaults.interval);
return firstWait
.then(() => {
sinon.assert.calledOnce(searchStub);
return waitForPoller();
})
.then(() => {
const secondWait = waitForSearch();
clock.tick(defaults.interval);
return secondWait;
})
.then(() => {
sinon.assert.calledTwice(searchStub);
return waitForPoller();
})
.then(() => {
// destroy the worker, move the clock, make sure another search doesn't happen
worker.destroy();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
sinon.assert.calledTwice(searchStub);
// manually call job poller, move the clock, make sure another search doesn't happen
worker._startJobPolling();
clock.tick(defaults.interval);
sinon.assert.calledTwice(searchSpy);
sinon.assert.calledTwice(searchStub);
});
});
});