require('dotenv').config(); const fs = require('fs'); const debug=require('debug')('debug'); const db = require('./src/db'); const tUO = require('./src/trip-update-obj'); const gtfsRealtimeBindings = require('gtfs-realtime-bindings'); const file=process.env.file||'file'; debug('file: '+file) debug('start...'); //initialize array of trip update objects const rryOfTripUpdateObjcs = new Array(); const buffer=fs.readFileSync(file); if(buffer instanceof Buffer){ //TODO clean up debug('buffer is instance of Buffer'); const feed = gtfsRealtimeBindings.transit_realtime.FeedMessage.decode(buffer); //TODO clean up let countTrip = 0; let countTripId = 0; let countRouteId = 0; let countTimestamp = 0; let countDelay = 0; feed.entity.forEach(entity => { const entityTripUpdate = entity.tripUpdate; if (entityTripUpdate !== null && entityTripUpdate !== undefined) { const tuo = new tUO.TripUpdate(); const trip = entityTripUpdate.trip; if (trip !== null && trip !== undefined) { countTrip++ const t = new tUO.Trip(); const tripId = trip.tripId; if (tripId !== null && tripId !== undefined) { countTripId++; t.tripId = tripId; } const routeId = trip.routeId; if (routeId !== null && routeId !== undefined) { countRouteId++; t.routeId = routeId; //TODO Why is the suffic '_X' concatenated to the id? if (typeof routeId === 'string') { const routeIdLngth = routeId.length; if (routeIdLngth > 5) { const indexOf = routeId.indexOf('_'); if (indexOf === 5) { t.routeId = routeId.substring(0, 5); } } } } //create a copy of an object, you can use the spread operator tuo.trip = {...t}; } const timestamp = entityTripUpdate.timestamp; if (timestamp !== null && timestamp !== undefined) { countTimestamp++; tuo.timestamp = timestamp; } const delay = entityTripUpdate.delay; if (delay !== null && delay !== undefined) { countDelay++; tuo.delay = delay; } rryOfTripUpdateObjcs.push({...tuo}); } }); //TODO clean up debug('index: rryOfTripUpdateObjcs.length: ' + rryOfTripUpdateObjcs.length); /** debug('index: countTrip: ' + countTrip); debug('index: countTripId: ' + countTripId); debug('index: countRouteId: ' + countRouteId); debug('index: countTimeStamp: ' + countTimestamp); debug('index: countDelay: ' + countDelay); */ }else{ console.error('ERROR: buffer is NOT instance of Buffer'); } async function run() { debug('index:run(): started...'); //select trip updates of today from db as array of objects const schema = process.env.DB_SCHEMA || 'schema'; //TODO some GTFSR feeds do not provide timestamp const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_gtfsr >= current_date'; //NOTE: https://www.postgresql.org/docs/current/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT const query = 'SELECT * FROM ' + schema + '.trip_updates WHERE timestamp_pgsql >= current_date'; //TODO clean up debug('query: '+query); const rsp = await db.query(query); debug('rsp.length: '+rsp.length); if (rsp !== null && rsp !== undefined && rsp.length < 0) { //TODO clean up debug('rsp[0]: ' + JSON.stringify(rsp[0])); } //TODO only continue if rsp is neither null or undefined //transform array of objects into map [trip_trip_id, trip update object] //TODO handle rsp NOT available const mapDbTripUpdates = new Map(); rsp.forEach(element => { mapDbTripUpdates.set(element.trip_trip_id, element); }); debug('mapDbTripUpdates.size: ' + mapDbTripUpdates.size); //compare map from db with array of trip update objcs from GTFS Realtime feed //TODO clean up let countRryElem = 0; let countInsert = 0; //TODO https://stackoverflow.com/a/37576787/15078958 //TODO according to this link forEach() has challenges for async/await for (const element of rryOfTripUpdateObjcs) { const value = mapDbTripUpdates.get(element.trip.tripId); let query = ''; let rspPsql = null; if (value === undefined) { query = `INSERT INTO ` + schema + `.trip_updates (trip_trip_id, trip_route_id, timestamp_gtfsr, delay) VALUES ('` + element.trip.tripId + `', '` + element.trip.routeId + `', to_timestamp(` + element.timestamp + `), '` + element.delay + `');`; rspPsql = await db.query(query); countInsert++; if (countRryElem === 0) { debug('value: ' + JSON.stringify(value)); debug('query: ' + query); debug('respDb: ' + JSON.stringify(rspPsql)); } countRryElem++; } } debug('countInsert: ' + countInsert); //other debug('index:run(): done.'); }; run().catch(err => { console.error('ERROR: '); console.log(err) }); debug('index: done.');