Feature/eta prediction #19
Open
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
ETA Prediction Module
This Pull Request is divided into 4 layers, each associated to one commit with its respective name, and each one depending on the previous one.
PR1: GTFS Ingestion Stack
Summary
This commit introduces the data pipeline used for the subsequent parts of the full prediction module: the Django project (
gtfs-rt-pipeline/). This project is responsible for ingesting, normalizing, and storing both static GTFS Schedule and real-time GTFS-RT (Vehicle Positions and Trip Updates) data into a PostgreSQL database.This Django project depends on PostgreSQL 17, Redis and Celery (not to mention Django, of course).
Note: It is possible that there will be a conflict between the port to which the database connects if already being used by the machine in which is being tested. For that reason, it is somewhat urgent to containerize this project via Docker for better reproducibility. Will be done soon.
Purpose
This stack provides the data source required by all downstream models and services.
Core Components Added
gtfs-rt-pipeline/: Full Django projectrt_pipeline/(realtime ingestion)sch_pipeline/(schedule ingestion)README.md,pyproject.toml,uv.lockArchitecture
Celery workers fetch GTFS and GTFS-RT feeds and bulk upsert normalized entities into PostgreSQL.
Ingestion Diagnostics
Below are some diagnostics of the database used during the development of the project obtained by the
analyticssuite and thediagnostics.pyscript.Schedule Data (GTFS)
Top Routes by Trip Count
Realtime Data (GTFS-RT)
Histogram of Number of Scheduled Trips per Route
Reviewer Smoke Test
cd gtfs-rt-pipeline uv sync uv run python manage.py check uv run python manage.py makemigrations rt_pipeline uv run python manage.py makemigrations sch_pipeline uv run python manage.py migrateAssociated Issue(s)
PR2: Feature Engineering Module
Summary
This commit introduces the Feature Engineering Module (
feature_engineering/) transforming raw GTFS-RT telemetry into structured, model-ready datasets. It includes a test suite to be used withpytestunder thetestsdirectory.Scope
feature_engineering/datasets/(Parquet outputs)Architecture & Flow
Note: Weather features are not woring properly due to the quotas established by the OpenMeteo API which was the one used to obtain them. Therefore, before this problem is fixed, it is better to dispense of the weather features.
Key Features
Dataset Builder
Inside this module is located the
dataset_builder.pyscript, which queries the DB in order to obtain the necessary GTFS and GTFS-RT information, compute the relevant features and training target, and creates a.parquetdataset ready to be trained on.During development it was used to construct several test datasets via the
build_eta_samplecommand of thegtfs-rt-pipelineproject (found in/eta_prediction/gtfs-rt-pipeline/rt_pipeline/management/commands/build_eta_sample.py).An example usage of this command is the following:
cd gtfs-rt-pipeline uv run python manage.py build_eta_sample --out sample.parquet --top-routes 2 --max-stops-ahead 1 --no-weatherDependency
Given this section's dependency on the schemas and data collected in PR1, it is possible that the
build_eta_samplecommand will fail upon testing if the database is not setup properly. Thus, a sample dataset was included under/eta_prediction/datatsetsin order to be able to test the subsequent PRs.Reviewer Smoke Test
Associated Issues
PR3: Models Training, Evaluation and Registry
Summary
Introduces a full Model Training Workspace (
models/) supporting training, evaluation, and registry-based persistence.Scope
models/common/utilitiesevaluation/trained/registryWorkflow
Model Families
Performance Benchmarks
There were plenty tests made with several different dataset configurations, but the final, most comprehensive one consisted of 15 different routes with a varied distribution of observations per route (from 60k to a few hundred). Below is a chart showing the different MAE (Mean Absolute Error) results for each trained model on each route.
And the following is the number of trips/observations per route distribution:
Conclusion
It is clear that both the polynomial regression models and the XGBoost model significantly outperform the baseline models (especially the latter one) for the great majority of routes. The relative model performance trend is basically the same for each route except for a couple outliers.
Model Training and Inference
Each model family has a similar structure to conduct training and inference, making it easy to add new models with time. They can be trained and compared together by the
train_all_models.pyscript.Model Registry
The Model Registry (models/common/registry.py) is the centralized metadata store for all trained artifacts. It serves two primary functions:
Reviewer Smoke Test
Associated Issues
PR4: ETA Runtime Service
Summary
This final commit introduces the ETA Runtime Service (
eta_service/), providing an interface for generating real-time arrival time predictions.It serves as the consumer layer, bridging live vehicle telemetry streams (e.g., MQTT, Redis Streams) with pre-trained machine learning models.
Scope
eta_service/: Core runtime moduleestimator.py: Primary prediction logic viaestimate_stop_times()test_estimator.py: Comprehensive test suite with mocked Vehicle Position, Stop Sequence, and Shape inputsArchitecture
The service is centered around the function
estimate_stop_times(), designed to be invoked by a stream-processing layer (e.g., Bytewax, Flink) whenever a new Vehicle Position (VP) event is received.The service dynamically invokes Layer 2 feature logic to compute the required inference features using the live VP data and the upcoming stop sequence.
The service queries the model registry to select the appropriate model:
The selected model generates ETA predictions (in seconds) for the next
Nupcoming stops.Current Status & Phase 2 Roadmap
estimate_stop_times()logic, dynamic feature extraction, multi-stop predictions, and error handling are complete.trained/registry.Dependency Note:
This layer critically depends on:
Reviewer Smoke Test
Associated Issues
PR5: Redis2Redis Bytewax Prediction Flow
Summary
Implements an end-to-end Bytewax flow that bridges MQTT vehicle telemetry to low-latency ETA predictions entirely in-memory. The runtime polls vehicle snapshots from Redis, enriches them with cached stops and shapes, invokes
estimate_stop_times(), and publishes results topredictions:*keys for downstream consumers.Scope
bytewax/mqtt2redis.py,subscriber/package: MQTT → Redis bridge with validation & TTL managementmock_stops_and_shapes.py: seeds mock stop sequences and shapes into Redispred2redis.py: Bytewax dataflow orchestrating enrichment, inference, caching, and persistencetest_redis_predictions.py: Redis monitor to inspect live predictionsREADME.md,pyproject.toml,uv.lock: developer guide + isolated runtime envseta_service/estimator.py: zero-DB inference path, shape-aware fallbacksmodels/common/registry.py: filesystem-friendly registry initialization & lookups for the streaming jobArchitecture & Flow
MQTT → Redis Bridge
bytewax/subscriber/mqtt2redis.pysubscribes totransit/vehicles/bus/#, validates payloads, normalizes timestamps/headings, and caches the enriched JSON atvehicle:<vehicle_id>with a 5-minute TTL.Static Context Cache
mock_stops_and_shapes.pycreates deterministic stop sequences (route_stops:<route_id>) and GTFS shapes (route_shape:<route_id>) so the Bytewax job never hits Postgres. Shapes are stored as encoded polylines compatible withShapePolyline.Bytewax Dataflow (
pred2redis.py)FixedPartitionedSourcepolls Redis for fresh vehicle keys at configurable intervals.ShapeCachemaintains an in-memory LRU per worker; shapes are deserialized once and reused across batches.estimate_stop_times()(now capable of using pre-loaded shapes or fallbacks) and written back to Redis underpredictions:<vehicle_id>with TTL + metadata for observability.Monitoring Utilities
test_redis_predictions.pycan check current predictions in cache, monitor all incoming predictions or focus on a single vehicle, formatting the JSON into a table for quick reviewer validation.Redis Schema
vehicle:<vehicle_id>route_stops:<route_id>route_shape:<route_id>predictions:<vehicle_id>Reviewer Smoke Tests
Note: It is important that the
models/traineddirectory contains the registry of trained models (registry.json) as well as the trained models themselves in order for this flow to work. Make sure that is the case by running the Smoke Tests of PR3.