feat: working data download and indexing
This commit is contained in:
@@ -1,3 +1,43 @@
|
||||
export default function() {
|
||||
// es6 module code goes here
|
||||
import elasticsearch from 'elasticsearch';
|
||||
import logger from './lib/logger.mjs';
|
||||
import { createIndex, bulkInsert } from './lib/data-source.mjs';
|
||||
import getData from './lib/get-data.mjs';
|
||||
|
||||
const positionSourceMap = ['ADS-B', 'ASTERIX', 'MLAT'];
|
||||
|
||||
export default async function(indexName, opts = {}) {
|
||||
const client = new elasticsearch.Client({
|
||||
host: opts.elasticsearch.host,
|
||||
log: opts.elasticsearch.log,
|
||||
});
|
||||
|
||||
// make sure we can connect to the node
|
||||
await client.ping();
|
||||
|
||||
// create the target index
|
||||
const index = await createIndex(client, indexName);
|
||||
|
||||
const records = await getData();
|
||||
logger.debug(`ADS-B records:, ${records.length}`);
|
||||
|
||||
// index all the data
|
||||
const documents = records.map(rec => ({
|
||||
transponder: rec[0],
|
||||
callsign: rec[1],
|
||||
origin_country: rec[2],
|
||||
time_position: rec[3],
|
||||
last_contact: rec[4],
|
||||
location: rec[5] && rec[6] ? `${rec[6]},${rec[5]}` : null,
|
||||
baro_altitude: rec[7],
|
||||
geo_altitude: rec[13],
|
||||
on_ground: rec[8],
|
||||
velocity: rec[9],
|
||||
vertical_rate: rec[11],
|
||||
squawk: rec[14],
|
||||
spi: rec[15],
|
||||
position_source: positionSourceMap[rec[16]],
|
||||
}));
|
||||
|
||||
await bulkInsert(client, index, documents);
|
||||
logger.debug(`Successfully indexed ${records.length} records to ${index}`);
|
||||
}
|
||||
|
||||
80
src/lib/data-source.mjs
Normal file
80
src/lib/data-source.mjs
Normal file
@@ -0,0 +1,80 @@
|
||||
import logger from './logger.mjs';
|
||||
import { BadRequest } from './es-errors.mjs';
|
||||
import zeroPad from './zero-pad.mjs';
|
||||
|
||||
const doctype = 'doc';
|
||||
|
||||
// helper for time-based indices
|
||||
function getIndexName(index) {
|
||||
const d = new Date();
|
||||
const [year, month, day] = [
|
||||
d.getFullYear(),
|
||||
zeroPad(d.getMonth() + 1, 2),
|
||||
zeroPad(d.getDate(), 2),
|
||||
];
|
||||
return `${index}-${year}.${month}.${day}`;
|
||||
}
|
||||
|
||||
export async function createIndex(client, index) {
|
||||
const realIndex = getIndexName(index);
|
||||
return client.indices
|
||||
.create({
|
||||
index: realIndex,
|
||||
body: {
|
||||
settings: {},
|
||||
mappings: {
|
||||
[doctype]: {
|
||||
properties: {
|
||||
transponder: { type: 'keyword' },
|
||||
callsign: { type: 'keyword' },
|
||||
origin_country: { type: 'keyword' },
|
||||
time_position: { type: 'date' },
|
||||
last_contact: { type: 'date' },
|
||||
location: { type: 'geo_point' },
|
||||
baro_altitude: { type: 'float' },
|
||||
geo_altitude: { type: 'float' },
|
||||
on_ground: { type: 'boolean' },
|
||||
velocity: { type: 'float' },
|
||||
vertical_rate: { type: 'float' },
|
||||
squawk: { type: 'keyword' },
|
||||
spi: { type: 'boolean' },
|
||||
position_source: { type: 'keyword' },
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
.then(res => {
|
||||
logger.debug(`Index created: ${res.index}`);
|
||||
return res.index;
|
||||
})
|
||||
.catch(err => {
|
||||
// check for existing index
|
||||
if (err instanceof BadRequest) {
|
||||
logger.debug(`Index exists: ${realIndex}`);
|
||||
return client.indices.get({ index: realIndex }).then(() => realIndex);
|
||||
}
|
||||
|
||||
throw err;
|
||||
});
|
||||
}
|
||||
|
||||
export async function createDocument(client, index, body) {
|
||||
client.index({
|
||||
index,
|
||||
type: doctype,
|
||||
body,
|
||||
});
|
||||
}
|
||||
|
||||
export async function bulkInsert(client, index, docs) {
|
||||
const body = docs.reduce((collection, doc) => {
|
||||
collection.push({ index: { _index: index, _type: doctype } });
|
||||
collection.push(doc);
|
||||
return collection;
|
||||
}, []);
|
||||
|
||||
client.bulk({
|
||||
body,
|
||||
});
|
||||
}
|
||||
3
src/lib/es-errors.mjs
Normal file
3
src/lib/es-errors.mjs
Normal file
@@ -0,0 +1,3 @@
|
||||
import elasticsearch from 'elasticsearch';
|
||||
|
||||
export const { BadRequest, NotFound, Forbidden, Conflict } = elasticsearch.errors;
|
||||
13
src/lib/fetch.mjs
Normal file
13
src/lib/fetch.mjs
Normal file
@@ -0,0 +1,13 @@
|
||||
import axios from 'axios';
|
||||
|
||||
const api = 'https://opensky-network.org/api';
|
||||
|
||||
export default axios.create({
|
||||
baseURL: api,
|
||||
timeout: 3000,
|
||||
responseType: 'json',
|
||||
headers: {
|
||||
'X-Custom-Header': 'foobar',
|
||||
'Content-Type': 'application/json; charset=utf-8',
|
||||
},
|
||||
});
|
||||
6
src/lib/get-data.mjs
Normal file
6
src/lib/get-data.mjs
Normal file
@@ -0,0 +1,6 @@
|
||||
import fetch from './fetch.mjs';
|
||||
|
||||
export default async function getData() {
|
||||
const res = await fetch.get('/states/all');
|
||||
return res.data.states;
|
||||
}
|
||||
35
src/lib/logger.mjs
Normal file
35
src/lib/logger.mjs
Normal file
@@ -0,0 +1,35 @@
|
||||
/* eslint no-console: 0 */
|
||||
import zeroPad from './zero-pad.mjs';
|
||||
|
||||
const getTimestamp = () => {
|
||||
const d = new Date();
|
||||
return `${[d.getFullYear(), zeroPad(d.getMonth() + 1, 2), zeroPad(d.getDate(), 2)].join('/')} ${[
|
||||
zeroPad(d.getHours(), 2),
|
||||
zeroPad(d.getMinutes(), 2),
|
||||
zeroPad(d.getSeconds(), 2),
|
||||
].join(':')}`;
|
||||
};
|
||||
|
||||
const wrapMessage = msg => `[${getTimestamp()}]: ${msg.map(m => JSON.stringify(m)).join(' ')}`;
|
||||
|
||||
const logger = {
|
||||
log(...args) {
|
||||
console.log(wrapMessage(args));
|
||||
},
|
||||
warn(...args) {
|
||||
console.warn(wrapMessage(args));
|
||||
},
|
||||
error(...args) {
|
||||
if (args[0] instanceof Error) {
|
||||
console.error(getTimestamp(), args[0].stack);
|
||||
return;
|
||||
}
|
||||
console.error(wrapMessage(args));
|
||||
},
|
||||
debug(...args) {
|
||||
if (!process.env.DEBUG) return;
|
||||
console.log(wrapMessage(args));
|
||||
},
|
||||
};
|
||||
|
||||
export default logger;
|
||||
9
src/lib/zero-pad.mjs
Normal file
9
src/lib/zero-pad.mjs
Normal file
@@ -0,0 +1,9 @@
|
||||
export default function zeroPad(str, len) {
|
||||
let output = `${str}`;
|
||||
if (!len || len <= output.length) return str;
|
||||
|
||||
while (output.length < len) {
|
||||
output = `0${output}`;
|
||||
}
|
||||
return output;
|
||||
}
|
||||
Reference in New Issue
Block a user