Skip to content

Conversation

@dotjae
Copy link
Collaborator

@dotjae dotjae commented Nov 28, 2025

ETA Prediction Module

This Pull Request is divided into 4 layers, each associated to one commit with its respective name, and each one depending on the previous one.


PR1: GTFS Ingestion Stack


Summary

This commit introduces the data pipeline used for the subsequent parts of the full prediction module: the Django project (gtfs-rt-pipeline/). This project is responsible for ingesting, normalizing, and storing both static GTFS Schedule and real-time GTFS-RT (Vehicle Positions and Trip Updates) data into a PostgreSQL database.
This Django project depends on PostgreSQL 17, Redis and Celery (not to mention Django, of course).
Note: It is possible that there will be a conflict between the port to which the database connects if already being used by the machine in which is being tested. For that reason, it is somewhat urgent to containerize this project via Docker for better reproducibility. Will be done soon.

Purpose

This stack provides the data source required by all downstream models and services.

Core Components Added

  • gtfs-rt-pipeline/: Full Django project
    • rt_pipeline/ (realtime ingestion)
    • sch_pipeline/ (schedule ingestion)
  • README.md, pyproject.toml, uv.lock

Architecture

Celery workers fetch GTFS and GTFS-RT feeds and bulk upsert normalized entities into PostgreSQL.


Ingestion Diagnostics

Below are some diagnostics of the database used during the development of the project obtained by the analytics suite and the diagnostics.py script.

Schedule Data (GTFS)

Metric Value
Trips 99,840
Routes 395
Stops 10,280
StopTimes 2,445,320

Top Routes by Trip Count

  • 109: 30,974
  • 111: 30,462
  • 66: 30,246
  • 23: 28,675
  • Green-D: 26,778

Realtime Data (GTFS-RT)

Metric Value
VehiclePositions 1,256,283
TripUpdates 40,722,039
Unique trip_ids 10,314

Histogram of Number of Scheduled Trips per Route

trip_dist

Reviewer Smoke Test

cd gtfs-rt-pipeline
uv sync
uv run python manage.py check
uv run python manage.py makemigrations rt_pipeline
uv run python manage.py makemigrations sch_pipeline
uv run python manage.py migrate

Associated Issue(s)

  • Data ingestion and cleaning plan (ETL) #21

PR2: Feature Engineering Module


Summary

This commit introduces the Feature Engineering Module (feature_engineering/) transforming raw GTFS-RT telemetry into structured, model-ready datasets. It includes a test suite to be used with pytest under the tests directory.

Scope

  • feature_engineering/
  • datasets/ (Parquet outputs)

Architecture & Flow

  1. Ingest VehiclePositions from DB
  2. Stitch with static GTFS context
  3. Construct prediction models targets
  4. Generate spatial & temporal features
  5. Build training-ready dataset with all necessary columns

Note: Weather features are not woring properly due to the quotas established by the OpenMeteo API which was the one used to obtain them. Therefore, before this problem is fixed, it is better to dispense of the weather features.


Key Features

Feature Type Field Description
Target time_to_arrival_seconds The observed time (in seconds) until the vehicle arrives at a given stop.
Spatial distance_to_stop Shape-informed distance from the Vehicle Position (VP) to a given stop.
Spatial progress_ratio Shape-informed progress of a given Vehicle in its current Trip.
Temporal hour_of_day, day_of_week Features derived from the VP timestamp (e.g., peak hour, weekend).
Temporal is_peak_hour Boolean flag identifying defined rush hour periods.

Dataset Builder

Inside this module is located the dataset_builder.py script, which queries the DB in order to obtain the necessary GTFS and GTFS-RT information, compute the relevant features and training target, and creates a .parquet dataset ready to be trained on.

During development it was used to construct several test datasets via the build_eta_sample command of the gtfs-rt-pipeline project (found in /eta_prediction/gtfs-rt-pipeline/rt_pipeline/management/commands/build_eta_sample.py).

An example usage of this command is the following:

cd gtfs-rt-pipeline

uv run python manage.py build_eta_sample --out sample.parquet --top-routes 2 --max-stops-ahead 1 --no-weather

Dependency

Given this section's dependency on the schemas and data collected in PR1, it is possible that the build_eta_sample command will fail upon testing if the database is not setup properly. Thus, a sample dataset was included under /eta_prediction/datatsets in order to be able to test the subsequent PRs.


Reviewer Smoke Test

uv sync
uv run pytest feature_engineering

Associated Issues

  • Feature engineering module #22

PR3: Models Training, Evaluation and Registry


Summary

Introduces a full Model Training Workspace (models/) supporting training, evaluation, and registry-based persistence.

Scope

  • models/
    • common/ utilities
    • evaluation/
    • Model families
  • trained/ registry

Workflow

  1. Load engineered datasets
  2. Perform splits (temporal / route)
  3. Train model candidates
  4. Persist artifacts into registry

Model Families

Model Family Description
historical_mean Baseline model using the simple average of historical travel time as the prediction.
polyreg_distance Polynomial Regression model using only distance-to-stop as the primary feature.
polyreg_time Polynomial Regression model combining distance with temporal and spatial features.
ewma Exponentially Weighted Moving Average model, suitable for online updates and short-term trends.
xgboost Gradient-Boosted Tree Regressor (Best Performer), utilizing all enriched features for non-linear prediction.

Performance Benchmarks

There were plenty tests made with several different dataset configurations, but the final, most comprehensive one consisted of 15 different routes with a varied distribution of observations per route (from 60k to a few hundred). Below is a chart showing the different MAE (Mean Absolute Error) results for each trained model on each route.

1_model_mae_grouped_bar

And the following is the number of trips/observations per route distribution:

dataset_scope

Conclusion

It is clear that both the polynomial regression models and the XGBoost model significantly outperform the baseline models (especially the latter one) for the great majority of routes. The relative model performance trend is basically the same for each route except for a couple outliers.

Model Training and Inference

Each model family has a similar structure to conduct training and inference, making it easy to add new models with time. They can be trained and compared together by the train_all_models.py script.

Script Name Location Purpose
train.py models//train.py Entry point for fitting a specific model type (e.g., XGBoost). Handles data loading, fitting, evaluation, and saving the model artifact to the registry.
predict.py models//predict.py Handles inference logic for a specific model type. Loads the model from the registry, validates inputs, and produces a prediction.
train_all_models.py models/ root Master orchestration script that iterates through all defined model families and routes, executing the respective train.py scripts sequentially to populate the registry.

Model Registry

The Model Registry (models/common/registry.py) is the centralized metadata store for all trained artifacts. It serves two primary functions:

  1. Consistency: It ensures models are stored in a predictable format (.pkl) alongside their metadata (_meta.json), which records the specific features, dataset version, and metrics used during training.
  2. Runtime Lookup: It maintains a core index file (trained/registry.json) that allows the ETA Runtime Service (PR4) to quickly look up the best available model according to a specific metric for any given route.

Reviewer Smoke Test

uv sync

# No-save quick test (using sample dataset)
uv run python models/train_all_models.py --by-route --dataset sample --models xgboost polyreg_time --no-save

# Training + registry save
uv run python models/train_all_models.py --by-route --dataset sample 

# Train global model
uv run python models/train_all_models.py --dataset sample --models polyreg_time 

# Check registry
ls models/trained
uv run python models/check_registry.py

Associated Issues

  • Baseline models: historical median and polynomial regression #23
  • Rolling training and evaluation protocol #24
  • Model registry (PostgreSQL) and Redis cache #25

PR4: ETA Runtime Service


Summary

This final commit introduces the ETA Runtime Service (eta_service/), providing an interface for generating real-time arrival time predictions.
It serves as the consumer layer, bridging live vehicle telemetry streams (e.g., MQTT, Redis Streams) with pre-trained machine learning models.

Scope

  • eta_service/: Core runtime module
    • estimator.py: Primary prediction logic via estimate_stop_times()
    • test_estimator.py: Comprehensive test suite with mocked Vehicle Position, Stop Sequence, and Shape inputs

Architecture

The service is centered around the function estimate_stop_times(), designed to be invoked by a stream-processing layer (e.g., Bytewax, Flink) whenever a new Vehicle Position (VP) event is received.

  1. Feature Generation
    The service dynamically invokes Layer 2 feature logic to compute the required inference features using the live VP data and the upcoming stop sequence.
  2. Model Selection & Loading
    The service queries the model registry to select the appropriate model:
    • Route-Specific Priority: Attempts to load a dedicated, route-specific model.
    • Global Fallback: If no route-specific model is available, the system falls back to a global model trained on aggregated routes.
  3. Inference
    The selected model generates ETA predictions (in seconds) for the next N upcoming stops.

Current Status & Phase 2 Roadmap

Status Component Description
Core ETA Inference estimate_stop_times() logic, dynamic feature extraction, multi-stop predictions, and error handling are complete.
Model Registry Integration Successful loading of route-specific and global models from the trained/ registry.
Redis Caching Caching of stop sequences and prediction outputs planned for Phase 2.
Uncertainty Estimation Uncertainty estimates for made predictions planned for future phases.

Dependency Note:
This layer critically depends on:

  • Layer 2: Feature Engineering logic
  • Layer 3: Model Training Workspace and Registry

Reviewer Smoke Test

uv sync
uv run python eta_service/test_estimator.py

Associated Issues

  • Inference function estimate_stop_times() #26

PR5: Redis2Redis Bytewax Prediction Flow


Summary

Implements an end-to-end Bytewax flow that bridges MQTT vehicle telemetry to low-latency ETA predictions entirely in-memory. The runtime polls vehicle snapshots from Redis, enriches them with cached stops and shapes, invokes estimate_stop_times(), and publishes results to predictions:* keys for downstream consumers.

Scope

  • bytewax/

    • mqtt2redis.py, subscriber/ package: MQTT → Redis bridge with validation & TTL management
    • mock_stops_and_shapes.py: seeds mock stop sequences and shapes into Redis
    • pred2redis.py: Bytewax dataflow orchestrating enrichment, inference, caching, and persistence
    • test_redis_predictions.py: Redis monitor to inspect live predictions
    • README.md, pyproject.toml, uv.lock: developer guide + isolated runtime envs
  • eta_service/estimator.py: zero-DB inference path, shape-aware fallbacks

  • models/common/registry.py: filesystem-friendly registry initialization & lookups for the streaming job

Architecture & Flow

  1. MQTT → Redis Bridge
    bytewax/subscriber/mqtt2redis.py subscribes to transit/vehicles/bus/#, validates payloads, normalizes timestamps/headings, and caches the enriched JSON at vehicle:<vehicle_id> with a 5-minute TTL.

  2. Static Context Cache
    mock_stops_and_shapes.py creates deterministic stop sequences (route_stops:<route_id>) and GTFS shapes (route_shape:<route_id>) so the Bytewax job never hits Postgres. Shapes are stored as encoded polylines compatible with ShapePolyline.

  3. Bytewax Dataflow (pred2redis.py)

    • Custom FixedPartitionedSource polls Redis for fresh vehicle keys at configurable intervals.
    • Enrichment operator updates each vehicle snapshot with cached stops and ensures stop ordering metadata is present.
    • ShapeCache maintains an in-memory LRU per worker; shapes are deserialized once and reused across batches.
    • Records are routed through estimate_stop_times() (now capable of using pre-loaded shapes or fallbacks) and written back to Redis under predictions:<vehicle_id> with TTL + metadata for observability.
  4. Monitoring Utilities
    test_redis_predictions.py can check current predictions in cache, monitor all incoming predictions or focus on a single vehicle, formatting the JSON into a table for quick reviewer validation.

Redis Schema

Key Pattern Producer Payload
vehicle:<vehicle_id> MQTT bridge Latest vehicle snapshot (lat/lon/speed/route metadata + ingestion timestamps)
route_stops:<route_id> Seeder Ordered stop definitions with lat/lon + total segments
route_shape:<route_id> Seeder Encoded polyline & metadata for spatial features
predictions:<vehicle_id> Bytewax flow ETA array with the respective predictions for each vehicle-stop pair

Reviewer Smoke Tests

Note: It is important that the models/trained directory contains the registry of trained models (registry.json) as well as the trained models themselves in order for this flow to work. Make sure that is the case by running the Smoke Tests of PR3.

# 1. Bring up infra (external repo) and publish sample telemetry.
git clone https://github.com/simovilab/databus-mqtt.git
cd databus-mqtt && docker compose up
# new terminal inside databus-mqtt
uv run python publisher/publisher_example

# 2. Start MQTT → Redis bridge (subscriber package mimics deployable unit).
cd eta_prediction/bytewax/subscriber
uv run python mqtt2redis.py

# 3. Seed cache + run Bytewax flow.
cd ..
uv run python mock_stops_and_shapes.py
uv run python -m bytewax.run pred2redis

# 4. Monitor predictions from another terminal.
uv run python test_redis_predictions.py --continuous

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants