Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions eta_prediction/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# GTFS-RT Tools

Utilities for exploring **GTFS-Realtime** feeds such as Vehicle Positions, Trip Updates, and Alerts.
This repository is part of experiments with the **bUCR Realtime feeds** and other open transit data sources.

---

## Installation

For installation and dependency management we are using [uv](https://github.com/astral-sh/uv), a fast drop-in replacement for pip and venv.

1. **Install uv** (if you don’t already have it):

```bash
curl -LsSf https://astral.sh/uv/install.sh | sh
```

2. **Clone this repository**:

```bash
git clone https://github.com/simovilab/gtfs-django.git
cd eta-prediction
```

3. **Install dependencies** from `pyproject.toml`:

```bash
uv sync
```

This will create a virtual environment and install:

- [`gtfs-realtime-bindings`](https://github.com/MobilityData/gtfs-realtime-bindings) (protobuf definitions for GTFS-RT)
- [`requests`](https://docs.python-requests.org/)

---

## Usage

Example: fetch and print the first 10 vehicle positions.

```bash
uv run gtfs_rt_bindings_VP.py
```
---

## Switching Feeds

Inside each script, change the `URL` variable to any GTFS-RT VehiclePositions endpoint.

Examples:
- bUCR (default):
```
https://databus.bucr.digital/feed/realtime/vehicle_positions.pb
```
- MBTA:
```
https://cdn.mbta.com/realtime/VehiclePositions.pb
```
60 changes: 60 additions & 0 deletions eta_prediction/bytewax/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Bytewax ETA Prediction Flow

Low-latency ETA predictions are generated by streaming vehicle telemetry from MQTT into Redis, enriching the data with cached stops/shapes, and running the Bytewax flow that calls `eta_service`. This README explains how to bring all moving pieces online locally.

## Prerequisites
- This repository cloned locally (paths below assume the root is `eta_prediction/`).
- [`uv`](https://github.com/astral-sh/uv) or another tool capable of running the provided `pyproject.toml` environments.
- Docker (or Podman) for running the MQTT + Redis stack provided by [`simovilab/databus-mqtt`](https://github.com/simovilab/databus-mqtt).

## End-to-End Workflow

### 1. Start the MQTT/Redis stack
```bash
git clone https://github.com/simovilab/databus-mqtt.git
cd databus-mqtt
docker compose up
```
> Leave this terminal running; it launches the Mosquitto broker, Redis, and any helper services defined in that repo.

### 2. Publish sample vehicle data
In another terminal inside the `databus-mqtt` repository:
```bash
uv run python publisher/publisher_example
```
Wait for the publisher to log that it is connected—this continuously streams mock vehicle events into the MQTT broker.

### 3. Bridge MQTT → Redis
Back in this repository:
```bash
cd bytewax/subscriber
uv run python mqtt2redis.py
```
This subscriber listens to `transit/vehicles/bus/#`, validates each payload, and caches the latest vehicle state under `vehicle:*` keys in Redis. Keep it running.

### 4. Seed mock stops and shapes
From `eta_prediction/bytewax/`:
```bash
uv run python mock_stops_and_shapes.py
```
This loads placeholder stop sequences (`route_stops:*`) and encoded GTFS shapes (`route_shape:*`) into Redis. The Bytewax flow expects this cache to exist; in production it will be populated by another service.

### 5. Run the Bytewax prediction flow
Still inside `bytewax/`:
```bash
uv run python -m bytewax.run pred2redis
```
The flow polls Redis for new vehicle entries, enriches them with the cached stops/shapes, calls `estimate_stop_times`, and stores the resulting ETAs under `predictions:<vehicle_id>`.

### 6. Monitor predictions
Optionally observe the cached ETAs from another terminal:
```bash
uv run python test_redis_predictions.py --continuous
# or monitor a single vehicle:
uv run python test_redis_predictions.py --vehicle BUS-004 --continuous
```
The monitor subscribes to Redis and pretty-prints each `predictions:*` payload so you can confirm the pipeline is healthy.

## Notes
- All scripts default to `localhost` for MQTT and Redis; adjust the constants in `mqtt2redis.py` or provide environment overrides if your services run elsewhere.
- If you restart Redis you will need to re-run `mock_stops_and_shapes.py` to repopulate the cache before restarting the Bytewax flow.
Loading