use jobtype in worker, update query
also simplify the query a bit and use constant_score
This commit is contained in:
@@ -2,7 +2,7 @@ import events from 'events';
|
|||||||
import Puid from 'puid';
|
import Puid from 'puid';
|
||||||
import moment from 'moment';
|
import moment from 'moment';
|
||||||
import logger from './helpers/logger';
|
import logger from './helpers/logger';
|
||||||
import { jobStatuses } from './helpers/constants';
|
import constants from './helpers/constants';
|
||||||
import { WorkerTimeoutError } from './helpers/errors';
|
import { WorkerTimeoutError } from './helpers/errors';
|
||||||
|
|
||||||
const puid = new Puid();
|
const puid = new Puid();
|
||||||
@@ -18,10 +18,11 @@ export default class Job extends events.EventEmitter {
|
|||||||
this.id = puid.generate();
|
this.id = puid.generate();
|
||||||
this.queue = queue;
|
this.queue = queue;
|
||||||
this.client = this.queue.client;
|
this.client = this.queue.client;
|
||||||
this.type = type;
|
this.jobtype = type;
|
||||||
this.workerFn = workerFn;
|
this.workerFn = workerFn;
|
||||||
this.checkInterval = opts.interval || 1500;
|
this.checkInterval = opts.interval || 1500;
|
||||||
this.checkSize = opts.size || 10;
|
this.checkSize = opts.size || 10;
|
||||||
|
this.doctype = opts.doctype || constants.DEFAULT_SETTING_DOCTYPE;
|
||||||
|
|
||||||
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
this.debug = (...msg) => debug(...msg, `id: ${this.id}`);
|
||||||
|
|
||||||
@@ -50,7 +51,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
attempts: attempts,
|
attempts: attempts,
|
||||||
started_at: startTime,
|
started_at: startTime,
|
||||||
process_expiration: expirationTime,
|
process_expiration: expirationTime,
|
||||||
status: jobStatuses.JOB_STATUS_PROCESSING,
|
status: constants.JOB_STATUS_PROCESSING,
|
||||||
};
|
};
|
||||||
|
|
||||||
return this.client.update({
|
return this.client.update({
|
||||||
@@ -76,7 +77,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
|
|
||||||
const completedTime = moment().toISOString();
|
const completedTime = moment().toISOString();
|
||||||
const doc = {
|
const doc = {
|
||||||
status: jobStatuses.JOB_STATUS_FAILED,
|
status: constants.JOB_STATUS_FAILED,
|
||||||
completed_at: completedTime,
|
completed_at: completedTime,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -136,7 +137,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
const docOutput = this._formatOutput(output);
|
const docOutput = this._formatOutput(output);
|
||||||
|
|
||||||
const doc = {
|
const doc = {
|
||||||
status: jobStatuses.JOB_STATUS_COMPLETED,
|
status: constants.JOB_STATUS_COMPLETED,
|
||||||
completed_at: completedTime,
|
completed_at: completedTime,
|
||||||
output: docOutput
|
output: docOutput
|
||||||
};
|
};
|
||||||
@@ -226,20 +227,23 @@ export default class Job extends events.EventEmitter {
|
|||||||
|
|
||||||
_getPendingJobs() {
|
_getPendingJobs() {
|
||||||
const nowTime = moment().toISOString();
|
const nowTime = moment().toISOString();
|
||||||
const dateFilter = {
|
|
||||||
range: {
|
|
||||||
process_expiration: {
|
|
||||||
lte: nowTime
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
const query = {
|
const query = {
|
||||||
query: {
|
query: {
|
||||||
bool: {
|
constant_score: {
|
||||||
should: [
|
filter: {
|
||||||
{ bool: { must: [{ term: { status: 'pending'} }] }},
|
bool: {
|
||||||
{ bool: { must: [{ term: { status: 'processing'}} ], filter: dateFilter } }
|
must: { term: { jobtype: this.jobtype } },
|
||||||
]
|
should: [
|
||||||
|
{ term: { status: 'pending'} },
|
||||||
|
{ bool:
|
||||||
|
{ must: [
|
||||||
|
{ term: { status: 'processing' } },
|
||||||
|
{ range: { process_expiration: { lte: nowTime } } }
|
||||||
|
] }
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
sort: [
|
sort: [
|
||||||
@@ -253,7 +257,7 @@ export default class Job extends events.EventEmitter {
|
|||||||
|
|
||||||
return this.client.search({
|
return this.client.search({
|
||||||
index: `${this.queue.index}-*`,
|
index: `${this.queue.index}-*`,
|
||||||
type: this.type,
|
type: this.doctype,
|
||||||
version: true,
|
version: true,
|
||||||
body: query
|
body: query
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user