178 lines
4.7 KiB
JavaScript
178 lines
4.7 KiB
JavaScript
require('dotenv').config();
|
|
const fs = require('fs');
|
|
const debug=require('debug')('debug');
|
|
|
|
const db = require('./src/db');
|
|
|
|
const tUO = require('./src/trip-update-obj');
|
|
|
|
const gtfsRealtimeBindings = require('gtfs-realtime-bindings');
|
|
|
|
const file=process.env.file||'file';
|
|
debug('file: '+file)
|
|
|
|
debug('start...');
|
|
|
|
//initialize array of trip update objects
|
|
const rryOfTripUpdateObjcs = new Array();
|
|
|
|
const buffer=fs.readFileSync(file);
|
|
if(buffer instanceof Buffer){
|
|
|
|
//TODO clean up debug('buffer is instance of Buffer');
|
|
const feed = gtfsRealtimeBindings.transit_realtime.FeedMessage.decode(buffer);
|
|
|
|
//TODO clean up
|
|
let countTrip = 0;
|
|
let countTripId = 0;
|
|
let countRouteId = 0;
|
|
let countTimestamp = 0;
|
|
let countDelay = 0;
|
|
|
|
feed.entity.forEach(entity => {
|
|
const entityTripUpdate = entity.tripUpdate;
|
|
if (entityTripUpdate !== null && entityTripUpdate !== undefined) {
|
|
|
|
const tuo = new tUO.TripUpdate();
|
|
const trip = entityTripUpdate.trip;
|
|
|
|
if (trip !== null && trip !== undefined) {
|
|
|
|
countTrip++
|
|
const t = new tUO.Trip();
|
|
const tripId = trip.tripId;
|
|
if (tripId !== null && tripId !== undefined) {
|
|
countTripId++;
|
|
t.tripId = tripId;
|
|
}
|
|
|
|
const routeId = trip.routeId;
|
|
if (routeId !== null && routeId !== undefined) {
|
|
countRouteId++;
|
|
t.routeId = routeId;
|
|
//TODO Why is the suffic '_X' concatenated to the id?
|
|
if (typeof routeId === 'string') {
|
|
const routeIdLngth = routeId.length;
|
|
if (routeIdLngth > 5) {
|
|
const indexOf = routeId.indexOf('_');
|
|
if (indexOf === 5) {
|
|
t.routeId = routeId.substring(0, 5);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
//create a copy of an object, you can use the spread operator
|
|
tuo.trip = {...t};
|
|
}
|
|
|
|
const timestamp = entityTripUpdate.timestamp;
|
|
if (timestamp !== null && timestamp !== undefined) {
|
|
countTimestamp++;
|
|
tuo.timestamp = timestamp;
|
|
}
|
|
|
|
const delay = entityTripUpdate.delay;
|
|
if (delay !== null && delay !== undefined) {
|
|
countDelay++;
|
|
tuo.delay = delay;
|
|
}
|
|
rryOfTripUpdateObjcs.push({...tuo});
|
|
}
|
|
});
|
|
|
|
//TODO clean up
|
|
debug('index: rryOfTripUpdateObjcs.length: ' + rryOfTripUpdateObjcs.length);
|
|
/**
|
|
debug('index: countTrip: ' + countTrip);
|
|
debug('index: countTripId: ' + countTripId);
|
|
debug('index: countRouteId: ' + countRouteId);
|
|
debug('index: countTimeStamp: ' + countTimestamp);
|
|
debug('index: countDelay: ' + countDelay);
|
|
*/
|
|
}else{
|
|
console.error('ERROR: buffer is NOT instance of Buffer');
|
|
}
|
|
|
|
|
|
async function run() {
|
|
debug('index:run(): started...');
|
|
//select trip updates of today from db as array of objects
|
|
|
|
const schema = process.env.DB_SCHEMA || 'schema';
|
|
|
|
|
|
//TODO some GTFSR feeds do not provide timestamp const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_gtfsr >= current_date';
|
|
|
|
//NOTE: https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
|
|
|
|
const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_pgsql >= current_date';
|
|
//TODO clean up debug('query: '+query);
|
|
|
|
const rsp = await db.query(query);
|
|
debug('rsp.length: '+rsp.length);
|
|
|
|
if (rsp !== null && rsp !== undefined && rsp.length < 0) {
|
|
//TODO clean up
|
|
debug('rsp[0]: ' + JSON.stringify(rsp[0]));
|
|
}
|
|
|
|
//TODO only continue if rsp is neither null or undefined
|
|
|
|
//transform array of objects into map [trip_trip_id, trip update object]
|
|
//TODO handle rsp NOT available
|
|
const mapDbTripUpdates = new Map();
|
|
rsp.forEach(element => {
|
|
mapDbTripUpdates.set(element.trip_trip_id, element);
|
|
});
|
|
|
|
debug('mapDbTripUpdates.size: ' + mapDbTripUpdates.size);
|
|
|
|
//compare map from db with array of trip update objcs from GTFS Realtime feed
|
|
//TODO clean up
|
|
let countRryElem = 0;
|
|
let countInsert = 0;
|
|
//TODO https://stackoverflow.com/a/37576787/15078958
|
|
//TODO according to this link forEach() has challenges for async/await
|
|
for (const element of rryOfTripUpdateObjcs) {
|
|
const value = mapDbTripUpdates.get(element.trip.tripId);
|
|
let query = '';
|
|
let rspPsql = null;
|
|
|
|
if (value === undefined) {
|
|
query = `INSERT INTO `
|
|
+ schema
|
|
+ `.trip_updates (trip_trip_id, trip_route_id, timestamp_gtfsr, delay) VALUES ('`
|
|
+ element.trip.tripId
|
|
+ `', '`
|
|
+ element.trip.routeId
|
|
+ `', to_timestamp(`
|
|
+ element.timestamp
|
|
+ `), '`
|
|
+ element.delay
|
|
+ `');`;
|
|
rspPsql = await db.query(query);
|
|
countInsert++;
|
|
|
|
if (countRryElem === 0) {
|
|
debug('value: ' + JSON.stringify(value));
|
|
debug('query: ' + query);
|
|
debug('respDb: ' + JSON.stringify(rspPsql));
|
|
}
|
|
countRryElem++;
|
|
}
|
|
}
|
|
|
|
debug('countInsert: ' + countInsert);
|
|
|
|
//other
|
|
debug('index:run(): done.');
|
|
};
|
|
|
|
run().catch(err => {
|
|
console.error('ERROR: ');
|
|
console.log(err)
|
|
});
|
|
|
|
debug('index: done.');
|