transitlog-hfp-csv-sink is an application for saving HFP data to Blob Storage in compressed CSV files.
The data is saved in Zstandard-compressed CSV files where the first row is header. The columns are in alphabetical order. Note that new columns can be added in the future, which means that the header must be used to find the correct column when parsing the data.
Currently used columns are listed here:
Column | Data format | Notes |
---|---|---|
acc |
Float | |
desi |
String | |
dir |
Integer | |
directionId |
Integer | |
dl |
Integer | |
drType |
Integer | |
drst |
Boolean | |
eventType |
String | |
geohashLevel |
Integer | |
hdg |
Integer | |
headsign |
String | |
isOngoing |
Boolean | |
journeyStartTime |
String | hh:mm . The local time when the journey starts. Note that this value cannot be directly combined with value of oday to get the timestamp when the journey starts due to the way operating days are used for journeys starting after midnight. |
journeyType |
String | |
jrn |
Integer | |
latitude |
Float | |
line |
Integer | |
locationQualityMethod |
String | |
longitude |
Float | |
mode |
String | |
nextStopId |
String | |
occu |
Integer | |
oday |
String | yyyy-MM-dd . Note that this is not the true date when the journey is running, but instead the "schedule date" of the journey (e.g. a journey starting at 2 AM on 2022-01-02 would have oday value of 2022-01-01 ). |
odo |
Float | |
oper |
Integer | |
ownerOperatorId |
Integer | Opeator ID from the MQTT topic. Can be different than oper . See HFP documentation for more details |
receivedAt |
ISO 8601 | |
route |
String | |
routeId |
String | |
seq |
Integer | |
spd |
Float | |
start |
String | Same as journeyStartTime |
stop |
String | |
topicLatitude |
Float | |
topicLongitude |
Float | |
topicPrefix |
String | |
topicVersion |
String | |
tsi |
Integer | |
tst |
ISO 8601 | |
uniqueVehicleId |
String | Combination of operator ID and vehicle ID from the MQTT topic |
uuid |
String | Unique identifier randomly generated by the sink |
veh |
Integer | |
vehicleNumber |
Integer |
For light priority events, the following columns are also available:
Column | Data format |
---|---|
sid |
Integer |
signalGroupId |
Integer |
tlpAttSeq |
Integer |
tlpDecision |
String |
tlpFrequency |
Integer |
tlpLineConfigId |
Integer |
tlpPointConfigId |
Integer |
tlpPriorityLevel |
String |
tlpProtocol |
String |
tlpReason |
String |
tlpRequestId |
Integer |
tlpRequestType |
String |
tlpSignalGroupNbr |
Integer |
Pulsar configuration needs some optimisation for running this application. Important settings that need to be adjusted are:
backlogQuotaDefaultLimitGB
- This option sets the limit for amount of data that can be stored in the backlog. HFP data is produced at around ~1GB per hour on average. The backlog quota should be adjusted to be able to store data for at least few days in case the sink is not working, so that no data is lost.
maxUnackedMessagesPerConsumer
andmaxUnackedMessagesPerSubscription
- These options limit the amount of unacked messages. If the amount of unacked messages is over the limit, Pulsar will stop sending messages to the sink. The sink will acknowledge messages once they have been uploaded to blob storage. If the unacked messages limit is too small, the sink cannot read enough messages to create the CSV files. If the limit is too high, the sink will read too many messages and crash due to memory usage (the sink needs to store message ID and checksum in memory for all messages before they are acknowledged).