diff --git a/src/index.js b/src/index.js index 1b04193..683e2be 100644 --- a/src/index.js +++ b/src/index.js @@ -19,6 +19,7 @@ export default class Elastique extends events.EventEmitter { timeout: 10000, }, omit(options, [ 'client' ])); this.client = createClient(options.client || {}); + this.workers = []; this._initTasks().catch((err) => this.emit('error', err)); } @@ -42,12 +43,16 @@ export default class Elastique extends events.EventEmitter { timeout: this.settings.timeout }, opts); - const job = new Job(this.client, index, type, payload, options); - return job; + return new Job(this.client, index, type, payload, options); } - registerWorker(type, workerFn) { - const worker = new Worker(this, type, workerFn); + registerWorker(type, workerFn, opts) { + const worker = new Worker(this, type, workerFn, opts); + this.workers.push(worker); return worker; } + + destroy() { + this.workers.forEach((worker) => worker.destroy()); + } } diff --git a/test/src/index.js b/test/src/index.js index 7f65e93..093f78e 100644 --- a/test/src/index.js +++ b/test/src/index.js @@ -1,6 +1,7 @@ import events from 'events'; import expect from 'expect.js'; import sinon from 'sinon'; +import { noop } from 'lodash'; import elasticsearchMock from '../fixtures/elasticsearch'; import Elastique from '../../lib/index'; @@ -47,4 +48,17 @@ describe('Elastique class', function () { }); }); + describe('Registering workers', function () { + it('should keep track of workers', function () { + const queue = new Elastique('elastique', { client }); + expect(queue.workers).to.eql([]); + expect(queue.workers).to.have.length(0); + + queue.registerWorker('test', noop); + queue.registerWorker('test', noop); + queue.registerWorker('test2', noop); + expect(queue.workers).to.have.length(3); + }); + }); + }); \ No newline at end of file