change the job scontructor, update tests

This commit is contained in:
2016-04-22 17:03:01 -07:00
parent 3318c775c0
commit 730ec68bfa
3 changed files with 31 additions and 29 deletions

View File

@@ -1,7 +1,6 @@
import events from 'events'; import events from 'events';
import createClient from './helpers/es_client'; import createClient from './helpers/es_client';
import indexTimestamp from './helpers/index_timestamp'; import indexTimestamp from './helpers/index_timestamp';
import createIndex from './helpers/create_index';
import Job from './job.js'; import Job from './job.js';
import Worker from './worker.js'; import Worker from './worker.js';
import { omit } from 'lodash'; import { omit } from 'lodash';
@@ -22,20 +21,23 @@ export default class Elastique extends events.EventEmitter {
} }
_initTasks() { _initTasks() {
const timestamp = indexTimestamp(this.settings.interval);
var initTasks = [ var initTasks = [
this.client.ping({ timeout: 3000 }), this.client.ping({ timeout: 3000 }),
createIndex(this.client, `${this.index}-${timestamp}`),
]; ];
return Promise.all(initTasks); return Promise.all(initTasks);
} }
add(type, payload, opts = {}) { add(type, payload, opts = {}) {
const timestamp = indexTimestamp(this.settings.interval);
const index = `${this.index}-${timestamp}`;
const options = Object.assign({ const options = Object.assign({
timeout: this.settings.timeout timeout: this.settings.timeout
}, opts); }, opts);
const job = new Job(this, type, payload, options); const job = new Job(this.client, index, type, payload, options);
return job; return job;
} }

View File

@@ -3,20 +3,21 @@ import { isPlainObject, omit } from 'lodash';
import { JOB_STATUS_PENDING } from './helpers/constants'; import { JOB_STATUS_PENDING } from './helpers/constants';
export default class Job extends events.EventEmitter { export default class Job extends events.EventEmitter {
constructor(queue, type, payload, timeout = 10000) { constructor(client, index, type, payload, timeout = 10000) {
if (typeof type !== 'string') throw new Error('Type must be a string'); if (typeof type !== 'string') throw new Error('Type must be a string');
if (!isPlainObject(payload)) throw new Error('Payload must be a plain object'); if (!isPlainObject(payload)) throw new Error('Payload must be a plain object');
super(); super();
this.queue = queue; this.client = client;
this.index = index;
this.type = type; this.type = type;
this.payload = payload; this.payload = payload;
this.timeout = timeout; this.timeout = timeout;
this.status = JOB_STATUS_PENDING; this.status = JOB_STATUS_PENDING;
this.ready = this.queue.client.index({ this.ready = this.client.index({
index: this.queue.index, index: this.index,
type: this.type, type: this.type,
body: { body: {
payload: this.payload, payload: this.payload,

View File

@@ -7,33 +7,32 @@ import * as elasticsearchMock from '../fixtures/elasticsearch';
import { JOB_STATUS_PENDING } from '../../lib/helpers/constants'; import { JOB_STATUS_PENDING } from '../../lib/helpers/constants';
describe('Job Class', function () { describe('Job Class', function () {
let mockQueue; let client;
let index;
beforeEach(function () { beforeEach(function () {
mockQueue = { index = 'test';
index: 'test', client = new elasticsearchMock.Client();
client: new elasticsearchMock.Client(),
};
}); });
it('should be an event emitter', function () { it('should be an event emitter', function () {
const job = new Job(mockQueue, 'test', {}); const job = new Job(client, index, 'test', {});
expect(job).to.be.an(events.EventEmitter); expect(job).to.be.an(events.EventEmitter);
}); });
describe('invalid construction', function () { describe('invalid construction', function () {
it('should throw with a missing type', function () { it('should throw with a missing type', function () {
const init = () => new Job(mockQueue); const init = () => new Job(client, index);
expect(init).to.throwException(/type.+string/i); expect(init).to.throwException(/type.+string/i);
}); });
it('should throw with an invalid type', function () { it('should throw with an invalid type', function () {
const init = () => new Job(mockQueue, { 'not a string': true }); const init = () => new Job(client, index, { 'not a string': true });
expect(init).to.throwException(/type.+string/i); expect(init).to.throwException(/type.+string/i);
}); });
it('should throw with an invalid payload', function () { it('should throw with an invalid payload', function () {
const init = () => new Job(mockQueue, 'type1', [1, 2, 3]); const init = () => new Job(client, index, 'type1', [1, 2, 3]);
expect(init).to.throwException(/plain.+object/i); expect(init).to.throwException(/plain.+object/i);
}); });
}); });
@@ -53,41 +52,41 @@ describe('Job Class', function () {
type = 'type1'; type = 'type1';
payload = { id: '123' }; payload = { id: '123' };
timeout = 4567; timeout = 4567;
sinon.spy(mockQueue.client, 'index'); sinon.spy(client, 'index');
}); });
it('should index the payload', function () { it('should index the payload', function () {
new Job(mockQueue, type, payload); new Job(client, index, type, payload);
const newDoc = validateDoc(mockQueue.client.index); const newDoc = validateDoc(client.index);
expect(newDoc).to.have.property('index', mockQueue.index); expect(newDoc).to.have.property('index', index);
expect(newDoc).to.have.property('type', type); expect(newDoc).to.have.property('type', type);
expect(newDoc).to.have.property('body'); expect(newDoc).to.have.property('body');
expect(newDoc.body).to.have.property('payload', payload); expect(newDoc.body).to.have.property('payload', payload);
}); });
it('should index timeout value from options', function () { it('should index timeout value from options', function () {
new Job(mockQueue, type, payload, timeout); new Job(client, index, type, payload, timeout);
const newDoc = validateDoc(mockQueue.client.index); const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('timeout', timeout); expect(newDoc.body).to.have.property('timeout', timeout);
}); });
it('should set event times', function () { it('should set event times', function () {
new Job(mockQueue, type, payload, timeout); new Job(client, index, type, payload, timeout);
const newDoc = validateDoc(mockQueue.client.index); const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('created'); expect(newDoc.body).to.have.property('created');
expect(newDoc.body).to.have.property('started'); expect(newDoc.body).to.have.property('started');
expect(newDoc.body).to.have.property('completed'); expect(newDoc.body).to.have.property('completed');
}); });
it('should set attempt count', function () { it('should set attempt count', function () {
new Job(mockQueue, type, payload, timeout); new Job(client, index, type, payload, timeout);
const newDoc = validateDoc(mockQueue.client.index); const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('attempts'); expect(newDoc.body).to.have.property('attempts');
}); });
it('should set status as pending', function () { it('should set status as pending', function () {
new Job(mockQueue, type, payload, timeout); new Job(client, index, type, payload, timeout);
const newDoc = validateDoc(mockQueue.client.index); const newDoc = validateDoc(client.index);
expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING); expect(newDoc.body).to.have.property('status', JOB_STATUS_PENDING);
}); });
}); });