diff --git a/src/index.js b/src/index.js index 9a76b2a..461e677 100644 --- a/src/index.js +++ b/src/index.js @@ -1,7 +1,6 @@ import events from 'events'; import createClient from './helpers/es_client'; import indexTimestamp from './helpers/index_timestamp'; -import createIndex from './helpers/create_index'; import Job from './job.js'; import Worker from './worker.js'; import { omit } from 'lodash'; @@ -22,20 +21,23 @@ export default class Elastique extends events.EventEmitter { } _initTasks() { - const timestamp = indexTimestamp(this.settings.interval); + var initTasks = [ this.client.ping({ timeout: 3000 }), - createIndex(this.client, `${this.index}-${timestamp}`), ]; + return Promise.all(initTasks); } add(type, payload, opts = {}) { + const timestamp = indexTimestamp(this.settings.interval); + const index = `${this.index}-${timestamp}`; + const options = Object.assign({ timeout: this.settings.timeout }, opts); - const job = new Job(this, type, payload, options); + const job = new Job(this.client, index, type, payload, options); return job; } diff --git a/src/job.js b/src/job.js index 446f1f1..567b70b 100644 --- a/src/job.js +++ b/src/job.js @@ -3,20 +3,21 @@ import { isPlainObject, omit } from 'lodash'; import { JOB_STATUS_PENDING } from './helpers/constants'; export default class Job extends events.EventEmitter { - constructor(queue, type, payload, timeout = 10000) { + constructor(client, index, type, payload, timeout = 10000) { if (typeof type !== 'string') throw new Error('Type must be a string'); if (!isPlainObject(payload)) throw new Error('Payload must be a plain object'); super(); - this.queue = queue; + this.client = client; + this.index = index; this.type = type; this.payload = payload; this.timeout = timeout; this.status = JOB_STATUS_PENDING; - this.ready = this.queue.client.index({ - index: this.queue.index, + this.ready = this.client.index({ + index: this.index, type: this.type, body: { payload: this.payload, diff --git a/test/src/job.js b/test/src/job.js index c241c1e..a07c297 100644 --- a/test/src/job.js +++ b/test/src/job.js @@ -7,33 +7,32 @@ import * as elasticsearchMock from '../fixtures/elasticsearch'; import { JOB_STATUS_PENDING } from '../../lib/helpers/constants'; describe('Job Class', function () { - let mockQueue; + let client; + let index; beforeEach(function () { - mockQueue = { - index: 'test', - client: new elasticsearchMock.Client(), - }; + index = 'test'; + client = new elasticsearchMock.Client(); }); it('should be an event emitter', function () { - const job = new Job(mockQueue, 'test', {}); + const job = new Job(client, index, 'test', {}); expect(job).to.be.an(events.EventEmitter); }); describe('invalid construction', function () { it('should throw with a missing type', function () { - const init = () => new Job(mockQueue); + const init = () => new Job(client, index); expect(init).to.throwException(/type.+string/i); }); it('should throw with an invalid type', function () { - const init = () => new Job(mockQueue, { 'not a string': true }); + const init = () => new Job(client, index, { 'not a string': true }); expect(init).to.throwException(/type.+string/i); }); it('should throw with an invalid payload', function () { - const init = () => new Job(mockQueue, 'type1', [1, 2, 3]); + const init = () => new Job(client, index, 'type1', [1, 2, 3]); expect(init).to.throwException(/plain.+object/i); }); }); @@ -53,41 +52,41 @@ describe('Job Class', function () { type = 'type1'; payload = { id: '123' }; timeout = 4567; - sinon.spy(mockQueue.client, 'index'); + sinon.spy(client, 'index'); }); it('should index the payload', function () { - new Job(mockQueue, type, payload); - const newDoc = validateDoc(mockQueue.client.index); - expect(newDoc).to.have.property('index', mockQueue.index); + new Job(client, index, type, payload); + const newDoc = validateDoc(client.index); + expect(newDoc).to.have.property('index', index); expect(newDoc).to.have.property('type', type); expect(newDoc).to.have.property('body'); expect(newDoc.body).to.have.property('payload', payload); }); it('should index timeout value from options', function () { - new Job(mockQueue, type, payload, timeout); - const newDoc = validateDoc(mockQueue.client.index); + new Job(client, index, type, payload, timeout); + const newDoc = validateDoc(client.index); expect(newDoc.body).to.have.property('timeout', timeout); }); it('should set event times', function () { - new Job(mockQueue, type, payload, timeout); - const newDoc = validateDoc(mockQueue.client.index); + new Job(client, index, type, payload, timeout); + const newDoc = validateDoc(client.index); expect(newDoc.body).to.have.property('created'); expect(newDoc.body).to.have.property('started'); expect(newDoc.body).to.have.property('completed'); }); it('should set attempt count', function () { - new Job(mockQueue, type, payload, timeout); - const newDoc = validateDoc(mockQueue.client.index); + new Job(client, index, type, payload, timeout); + const newDoc = validateDoc(client.index); expect(newDoc.body).to.have.property('attempts'); }); it('should set status as pending', function () { - new Job(mockQueue, type, payload, timeout); - const newDoc = validateDoc(mockQueue.client.index); + new Job(client, index, type, payload, timeout); + const newDoc = validateDoc(client.index); expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING); }); });