
171 lines
4.5 KiB

const fs = require('fs');
const debug=require('debug')('debug');
const Pbf = require('pbf');
const db = require('./src/db');
const tUO = require('./src/trip-update-obj');
const gtfsRt=require('../proto2js/js/gtfs-rt.js');
const file=process.env.file||'file';
debug('file: '+file)
const FeedMessage = gtfsRt.FeedMessage;
//initialize array of trip update objects
const rryOfTripUpdateObjcs = new Array();
const buffer=fs.readFileSync(file);
if(buffer instanceof Buffer){
debug('buffer is instance of Buffer');
const pbf = new Pbf(buffer);
const feed =;
let countTrip = 0;
let countTripId = 0;
let countRouteId = 0;
let countTimestamp = 0;
let countDelay = 0;
feed.entity.forEach(entity => {
const entityTripUpdate = entity.trip_update;
if (entityTripUpdate !== null && entityTripUpdate !== undefined) {
const tuo = new tUO.TripUpdate();
const trip = entityTripUpdate.trip;
if (trip !== null && trip !== undefined) {
const t = new tUO.Trip();
const tripId = trip.trip_id;
if (tripId !== null && tripId !== undefined) {
t.tripId = tripId;
const routeId = trip.route_id;
if (routeId !== null && routeId !== undefined) {
t.routeId = routeId;
//create a copy of an object, you can use the spread operator
tuo.trip = {...t};
const timestamp = entityTripUpdate.timestamp;
if (timestamp !== null && timestamp !== undefined) {
tuo.timestamp = timestamp;
const delay = entityTripUpdate.delay;
if (delay !== null && delay !== undefined) {
tuo.delay = delay;
//TODO clean up
debug('index:intervalFunc(): rryOfTripUpdateObjcs.length: ' + rryOfTripUpdateObjcs.length);
debug('index:intervalFunc(): countTrip: ' + countTrip);
debug('index:intervalFunc(): countTripId: ' + countTripId);
debug('index:intervalFunc(): countRouteId: ' + countRouteId);
debug('index:intervalFunc(): countTimeStamp: ' + countTimestamp);
debug('index:intervalFunc(): countDelay: ' + countDelay);
console.error('ERROR: buffer is NOT instance of Buffer');
async function run() {
debug('index:run(): started...');
//select trip updates of today from db as array of objects
const schema = process.env.DB_SCHEMA || 'schema';
//TODO some GTFSR feeds do not provide timestamp const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_gtfsr >= current_date';
const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_pgsql >= current_date';
debug('query: '+query);
const rsp = await db.query(query);
debug('rsp.length: '+rsp.length);
if (rsp !== null && rsp !== undefined && rsp.length < 0) {
//TODO clean up
debug('rsp[0]: ' + JSON.stringify(rsp[0]));
//TODO only continue if rsp is neither null or undefined
//transform array of objects into map [trip_trip_id, trip update object]
//TODO handle rsp NOT available
const mapDbTripUpdates = new Map();
rsp.forEach(element => {
mapDbTripUpdates.set(element.trip_trip_id, element);
debug('mapDbTripUpdates.size: ' + mapDbTripUpdates.size);
//compare map from db with array of trip update objcs from GTFS Realtime feed
//TODO clean up
let countRryElem = 0;
let countInsert = 0;
//TODO according to this link forEach() has challenges for async/await
for (const element of rryOfTripUpdateObjcs) {
const value = mapDbTripUpdates.get(element.trip.tripId);
let query = '';
let rspPsql = null;
if (value === undefined) {
query = `INSERT INTO `
+ schema
+ `.trip_updates (trip_trip_id, trip_route_id, timestamp_gtfsr, delay) VALUES ('`
+ element.trip.tripId
+ `', '`
+ element.trip.routeId
+ `', to_timestamp(`
+ element.timestamp
+ `), '`
+ element.delay
+ `');`;
rspPsql = await db.query(query);
if (countRryElem === 0) {
debug('value: ' + JSON.stringify(value));
debug('query: ' + query);
debug('respDb: ' + JSON.stringify(rspPsql));
debug('countInsert: ' + countInsert);
debug('index:run(): done.');
run().catch(err => {
console.error('ERROR: ');