gtfsr2pgsql/index.js

178 lines
4.7 KiB
JavaScript

require('dotenv').config();
const fs = require('fs');
const debug=require('debug')('debug');
const db = require('./src/db');
const tUO = require('./src/trip-update-obj');
const gtfsRealtimeBindings = require('gtfs-realtime-bindings');
const file=process.env.file||'file';
debug('file: '+file)
debug('start...');
//initialize array of trip update objects
const rryOfTripUpdateObjcs = new Array();
const buffer=fs.readFileSync(file);
if(buffer instanceof Buffer){
//TODO clean up debug('buffer is instance of Buffer');
const feed = gtfsRealtimeBindings.transit_realtime.FeedMessage.decode(buffer);
//TODO clean up
let countTrip = 0;
let countTripId = 0;
let countRouteId = 0;
let countTimestamp = 0;
let countDelay = 0;
feed.entity.forEach(entity => {
const entityTripUpdate = entity.tripUpdate;
if (entityTripUpdate !== null && entityTripUpdate !== undefined) {
const tuo = new tUO.TripUpdate();
const trip = entityTripUpdate.trip;
if (trip !== null && trip !== undefined) {
countTrip++
const t = new tUO.Trip();
const tripId = trip.tripId;
if (tripId !== null && tripId !== undefined) {
countTripId++;
t.tripId = tripId;
}
const routeId = trip.routeId;
if (routeId !== null && routeId !== undefined) {
countRouteId++;
t.routeId = routeId;
//TODO Why is the suffic '_X' concatenated to the id?
if (typeof routeId === 'string') {
const routeIdLngth = routeId.length;
if (routeIdLngth > 5) {
const indexOf = routeId.indexOf('_');
if (indexOf === 5) {
t.routeId = routeId.substring(0, 5);
}
}
}
}
//create a copy of an object, you can use the spread operator
tuo.trip = {...t};
}
const timestamp = entityTripUpdate.timestamp;
if (timestamp !== null && timestamp !== undefined) {
countTimestamp++;
tuo.timestamp = timestamp;
}
const delay = entityTripUpdate.delay;
if (delay !== null && delay !== undefined) {
countDelay++;
tuo.delay = delay;
}
rryOfTripUpdateObjcs.push({...tuo});
}
});
//TODO clean up
debug('index: rryOfTripUpdateObjcs.length: ' + rryOfTripUpdateObjcs.length);
/**
debug('index: countTrip: ' + countTrip);
debug('index: countTripId: ' + countTripId);
debug('index: countRouteId: ' + countRouteId);
debug('index: countTimeStamp: ' + countTimestamp);
debug('index: countDelay: ' + countDelay);
*/
}else{
console.error('ERROR: buffer is NOT instance of Buffer');
}
async function run() {
debug('index:run(): started...');
//select trip updates of today from db as array of objects
const schema = process.env.DB_SCHEMA || 'schema';
//TODO some GTFSR feeds do not provide timestamp const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_gtfsr >= current_date';
//NOTE: https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_pgsql >= current_date';
//TODO clean up debug('query: '+query);
const rsp = await db.query(query);
debug('rsp.length: '+rsp.length);
if (rsp !== null && rsp !== undefined && rsp.length < 0) {
//TODO clean up
debug('rsp[0]: ' + JSON.stringify(rsp[0]));
}
//TODO only continue if rsp is neither null or undefined
//transform array of objects into map [trip_trip_id, trip update object]
//TODO handle rsp NOT available
const mapDbTripUpdates = new Map();
rsp.forEach(element => {
mapDbTripUpdates.set(element.trip_trip_id, element);
});
debug('mapDbTripUpdates.size: ' + mapDbTripUpdates.size);
//compare map from db with array of trip update objcs from GTFS Realtime feed
//TODO clean up
let countRryElem = 0;
let countInsert = 0;
//TODO https://stackoverflow.com/a/37576787/15078958
//TODO according to this link forEach() has challenges for async/await
for (const element of rryOfTripUpdateObjcs) {
const value = mapDbTripUpdates.get(element.trip.tripId);
let query = '';
let rspPsql = null;
if (value === undefined) {
query = `INSERT INTO `
+ schema
+ `.trip_updates (trip_trip_id, trip_route_id, timestamp_gtfsr, delay) VALUES ('`
+ element.trip.tripId
+ `', '`
+ element.trip.routeId
+ `', to_timestamp(`
+ element.timestamp
+ `), '`
+ element.delay
+ `');`;
rspPsql = await db.query(query);
countInsert++;
if (countRryElem === 0) {
debug('value: ' + JSON.stringify(value));
debug('query: ' + query);
debug('respDb: ' + JSON.stringify(rspPsql));
}
countRryElem++;
}
}
debug('countInsert: ' + countInsert);
//other
debug('index:run(): done.');
};
run().catch(err => {
console.error('ERROR: ');
console.log(err)
});
debug('index: done.');