Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
402 changes: 402 additions & 0 deletions CONTRIBUTING.md

Large diffs are not rendered by default.

165 changes: 164 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,167 @@ Developed by [Simovi Lab](https://github.com/simovilab) for processing and manag

---

For more information about GTFS, visit the [General Transit Feed Specification](https://gtfs.org/) website.
For more information about GTFS, visit the [General Transit Feed Specification](https://gtfs.org/) website.



---

## Reproducible Sample Data

This module includes **small, deterministic GTFS-Realtime fixtures** for testing and documentation purposes.
They allow developers to run the system and its unit tests without relying on live MBTA feeds or external network calls.

These fixtures capture a **minimal snapshot of TripUpdate, VehiclePosition, and Alert entities**, and can be regenerated at any time from the local database.

---

### Fixture Location

The reproducible sample data is stored under:

gtfs/fixtures/

├── trip_update_fixture.json

├── vehicle_position_fixture.json

└── alert_fixture.json


Each file contains a few representative rows from the respective realtime tables, exported as JSON.

---

### Regeneration Script

Fixtures can be rebuilt at any time using the script:

```bash
python -m gtfs.scripts.regenerate_fixtures
```
---

## Running the Realtime Streamer (MBTA)

After installation, no additional database configuration is required — the project uses **SQLite** by default for testing and development.
Once dependencies are installed and migrations have run, you can start streaming live data directly from the MBTA GTFS-Realtime feeds.

Run the following command from the project root:

```bash
python -m gtfs.scripts.stream_mbta_feeds
```

---

## Minimal Producers & Consumers (GTFS-Realtime)

This section documents the minimal producer and consumer patterns already implemented for GTFS-Realtime, based on:

- `tests/test_realtime.py`
- `gtfs/scripts/stream_mbta_feeds.py`
- `gtfs/scripts/regenerate_fixtures.py`

### Producer (Serialization Example)

The project already includes minimal producer patterns in `tests/test_realtime.py` and in the fixture generator `regenerate_fixtures.py`.
The following snippet, taken directly from the serialization test, shows how a `FeedMessage` is built and converted into a Protobuf binary:

```python
feed = gtfs_realtime_pb2.FeedMessage()
self._add_header(feed)

entity = feed.entity.add(id="test_entity_1")
trip_update = entity.trip_update
trip_update.trip.trip_id = self.test_data["trip_id"]
trip_update.trip.route_id = self.test_data["route_id"]

stop_time = trip_update.stop_time_update.add()
stop_time.stop_sequence = 1
stop_time.arrival.delay = 60

serialized = feed.SerializeToString()
```

A deterministic producer is also used when regenerating fixtures:
`realtime.build_trip_updates_bytewax()`

This function internally constructs a reproducible TripUpdates feed and writes both JSON and `.pb` files.


### Consumer (Parsing Example)

The project also includes minimal consumer patterns that read GTFS-Realtime Protobuf messages and parse them into `FeedMessage` objects.

A typical consumer is shown in `stream_mbta_feeds.py`, where the MBTA feeds are fetched and parsed:

```python
response = requests.get(url, timeout=20)
response.raise_for_status()

feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(response.content)
```

The unit tests also demonstrate how a local `.pb` file is parsed:

```python
feed = gtfs_rt.FeedMessage()
feed.ParseFromString(content)
```

Both patterns match the recommended way of decoding GTFS-Realtime messages:
load the binary, call `ParseFromString()`, and then iterate over feed.entity.

### Error Handling Patterns

The existing modules already include simple and practical error-handling patterns for GTFS-Realtime processing.
These patterns can be reused by developers who implement their own producers or consumers.

#### Network and fetch validation (`stream_mbta_feeds.py`)
```python
response = requests.get(url, timeout=20)
response.raise_for_status()
```

If the feed cannot be retrieved, the fetcher logs the error and skips processing:
```python
except Exception as e:
print(f"[ERROR] Failed to fetch {url}: {e}")
return None
```

#### Protobuf parsing

Both the streamer and the tests rely on `ParseFromString()`:
```python
feed = gtfs_realtime_pb2.FeedMessage()
feed.ParseFromString(response.content)
```
If the binary is corrupted, Protobuf will raise a decoding error.

#### Structural validation (test_realtime.py)

```python
if not feed.header.gtfs_realtime_version:
return False
if not feed.entity:
return False
```

These checks ensure the feed includes the required GTFS-Realtime fields before being processed.

### References

- GTFS-Realtime Specification
https://gtfs.org/realtime/reference/

- Google Protocol Buffers
https://developers.google.com/protocol-buffers

- SimoviLab Contribution Guidelines
https://github.com/simovilab/.github/blob/main/CONTRIBUTING.md

- Bytewax (stream processing engine used for deterministic TripUpdates)
https://docs.bytewax.io/stable/guide/index.html
48 changes: 48 additions & 0 deletions feed/files/trip_updates_bytewax.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"header": {
"gtfs_realtime_version": "2.0",
"incrementality": "FULL_DATASET",
"timestamp": 1763932780
},
"entity": [
{
"id": "trip_FAKE_TRIP_001",
"trip_update": {
"timestamp": 1763932780,
"trip": {
"trip_id": "FAKE_TRIP_001",
"route_id": "FAKE_ROUTE_1",
"direction_id": 0,
"start_time": "21:19:40",
"start_date": "20251123",
"schedule_relationship": "SCHEDULED"
},
"vehicle": {
"id": "V001",
"label": "Test Vehicle",
"license_plate": "TEST123"
},
"stop_time_update": [
{
"stop_id": "STOP_A",
"arrival": {
"time": 1763932949
}
},
{
"stop_id": "STOP_B",
"arrival": {
"time": 1763932870
}
},
{
"stop_id": "STOP_C",
"arrival": {
"time": 1763932995
}
}
]
}
}
]
}
Binary file added feed/files/trip_updates_bytewax.pb
Binary file not shown.
86 changes: 86 additions & 0 deletions gtfs/fixtures/realtime_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
{
"entities": {
"feed_message": {
"primary_key": "id",
"fields": {
"id": "string",
"header": "feed_header",
"entity": "array<feed_entity>"
}
},
"feed_header": {
"primary_key": "header_id",
"fields": {
"header_id": "string",
"gtfs_realtime_version": "string",
"incrementality": "string",
"timestamp": "integer"
}
},
"feed_entity": {
"primary_key": "entity_id",
"fields": {
"entity_id": "string",
"is_deleted": "boolean",
"vehicle": "vehicle_position",
"trip_update": "trip_update",
"alert": "alert"
}
},
"vehicle_position": {
"primary_key": "id",
"fields": {
"id": "string",
"trip_id": "string",
"vehicle_id": "string",
"vehicle_label": "string",
"vehicle_license": "string",
"position_lat": "float",
"position_lon": "float",
"position_bearing": "float",
"timestamp": "integer",
"stop_id": "string"
},
"foreign_keys": {
"trip_id": "trip_update.trip_id"
}
},
"trip_update": {
"primary_key": "trip_update_id",
"fields": {
"trip_update_id": "string",
"trip_id": "string",
"vehicle_id": "string",
"stop_time_updates": "array<stop_time_update>",
"timestamp": "integer",
"delay": "integer"
}
},
"stop_time_update": {
"primary_key": "stop_time_update_id",
"fields": {
"stop_time_update_id": "string",
"stop_sequence": "integer",
"stop_id": "string",
"arrival_time": "integer",
"departure_time": "integer",
"schedule_relationship": "string"
}
},
"alert": {
"primary_key": "alert_id",
"fields": {
"alert_id": "string",
"active_period": "array<period>",
"informed_entity": "array<entity_selector>",
"cause": "string",
"effect": "string",
"url": "string",
"header_text": "string",
"description_text": "string"
}
}
},
"version": "1.0.0",
"spec": "GTFS Realtime v2.0"
}
10 changes: 10 additions & 0 deletions gtfs/fixtures/sample_alerts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"alert_id": "A001",
"route_id": "R01",
"header_text": "Service interruption on R01",
"description_text": "Maintenance from 14:00 to 18:00",
"severity": "moderate",
"timestamp": 1762132640
}
]
48 changes: 48 additions & 0 deletions gtfs/fixtures/sample_trip_updates.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"header": {
"gtfs_realtime_version": "2.0",
"incrementality": "FULL_DATASET",
"timestamp": 1762132640
},
"entity": [
{
"id": "trip_FAKE_TRIP_001",
"trip_update": {
"timestamp": 1762132640,
"trip": {
"trip_id": "FAKE_TRIP_001",
"route_id": "FAKE_ROUTE_1",
"direction_id": 0,
"start_time": "01:17:20",
"start_date": "20251103",
"schedule_relationship": "SCHEDULED"
},
"vehicle": {
"id": "V001",
"label": "Test Vehicle",
"license_plate": "TEST123"
},
"stop_time_update": [
{
"stop_id": "STOP_A",
"arrival": {
"time": 1762132702
}
},
{
"stop_id": "STOP_B",
"arrival": {
"time": 1762132749
}
},
{
"stop_id": "STOP_C",
"arrival": {
"time": 1762132867
}
}
]
}
}
]
}
10 changes: 10 additions & 0 deletions gtfs/fixtures/sample_vehicle_positions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[
{
"vehicle_id": "V001",
"trip_id": "FAKE_TRIP_001",
"latitude": 9.93,
"longitude": -84.08,
"speed": 15,
"timestamp": 1762132640
}
]
Loading