From df9508808b0959d1c0efd5657882e961514140b1 Mon Sep 17 00:00:00 2001 From: Joe Fleming Date: Thu, 12 May 2016 14:08:33 -0700 Subject: [PATCH] use jobtype in worker, update query also simplify the query a bit and use constant_score --- src/worker.js | 40 ++++++++++++++++++++++------------------ 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/worker.js b/src/worker.js index d90587a..b0628b3 100644 --- a/src/worker.js +++ b/src/worker.js @@ -2,7 +2,7 @@ import events from 'events'; import Puid from 'puid'; import moment from 'moment'; import logger from './helpers/logger'; -import { jobStatuses } from './helpers/constants'; +import constants from './helpers/constants'; import { WorkerTimeoutError } from './helpers/errors'; const puid = new Puid(); @@ -18,10 +18,11 @@ export default class Job extends events.EventEmitter { this.id = puid.generate(); this.queue = queue; this.client = this.queue.client; - this.type = type; + this.jobtype = type; this.workerFn = workerFn; this.checkInterval = opts.interval || 1500; this.checkSize = opts.size || 10; + this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE; this.debug = (...msg) => debug(...msg, `id: ${this.id}`); @@ -50,7 +51,7 @@ export default class Job extends events.EventEmitter { attempts: attempts, started_at: startTime, process_expiration: expirationTime, - status: jobStatuses.JOB_STATUS_PROCESSING, + status: constants.JOB_STATUS_PROCESSING, }; return this.client.update({ @@ -76,7 +77,7 @@ export default class Job extends events.EventEmitter { const completedTime = moment().toISOString(); const doc = { - status: jobStatuses.JOB_STATUS_FAILED, + status: constants.JOB_STATUS_FAILED, completed_at: completedTime, }; @@ -136,7 +137,7 @@ export default class Job extends events.EventEmitter { const docOutput = this._formatOutput(output); const doc = { - status: jobStatuses.JOB_STATUS_COMPLETED, + status: constants.JOB_STATUS_COMPLETED, completed_at: completedTime, output: docOutput }; @@ -226,20 +227,23 @@ export default class Job extends events.EventEmitter { _getPendingJobs() { const nowTime = moment().toISOString(); - const dateFilter = { - range: { - process_expiration: { - lte: nowTime - } - } - }; const query = { query: { - bool: { - should: [ - { bool: { must: [{ term: { status: 'pending'} }] }}, - { bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } } - ] + constant_score: { + filter: { + bool: { + must: { term: { jobtype: this.jobtype } }, + should: [ + { term: { status: 'pending'} }, + { bool: + { must: [ + { term: { status: 'processing' } }, + { range: { process_expiration: { lte: nowTime } } } + ] } + } + ] + } + } } }, sort: [ @@ -253,7 +257,7 @@ export default class Job extends events.EventEmitter { return this.client.search({ index: `${this.queue.index}-*`, - type: this.type, + type: this.doctype, version: true, body: query })