Appearance
Batch Ingestion
Overview
Batch Ingestion is the OR platform's configuration-driven data loader for slow-moving reference and geospatial datasets. While the real-time pipeline handles live sensor feeds and streaming data, Batch Ingestion manages the periodic update of static assets — traffic signal locations, road features, device registries, and geospatial reference data — that form the foundational layer the platform operates on. These datasets change infrequently (daily, weekly, or monthly) but must be kept current and correctly structured for consumption by the rest of the platform.
The service operates in two modes: as a CRON-controlled automated loader that runs on a pre-defined schedule without human intervention, and as a live microservice with GraphQL API integration for ad-hoc data loading during development or initial deployment. It collects data from multiple sources — client-provided APIs, flat files stored in S3, and open data hubs — and transforms it into OR-compliant structures before loading into the PostgreSQL reference datastore.
Batch Ingestion is controlled through a digraph configuration file that defines the data pipeline for each source. Each source has its own schedule, transformation logic, and target tables, making the service highly extensible for new data types without code changes.
Architecture
- Language: Julia
- Scaling: CRON-scheduled pods (one pod per ingestion job)
- Dependencies: PostgreSQL (RDS), S3, Experiment Manager (GraphQL)
Key Components
- Digraph Configuration — Each data source is defined in a YAML config that specifies the source location (API or S3), transformation SQL scripts, target tables, and scheduling parameters. The digraph structure allows complex multi-step ingestion pipelines.
- CRON Scheduler — Kubernetes CronJobs trigger ingestion pods at configured intervals. Each data source runs on its own schedule (daily, weekly, etc.).
- GraphQL API — Exposes an
executeBatchIngestionmutation for ad-hoc manual triggers during development and initial deployment. - SQL Transformation Scripts — Raw data loaded into temporary tables is transformed into the platform's target schema using SQL scripts before being committed to the main database.
Connections
| Direction | Service | Purpose |
|---|---|---|
| In | Client APIs | External REST APIs for device and asset data |
| In | AWS S3 | Flat files (CSV, JSON) for reference data |
| In | Vic Open Data Hub | Open geospatial datasets |
| Out | PostgreSQL (RDS) | Target reference datastore |
| Out | Experiment Manager (GraphQL) | API integration for manual triggers |
Data Flow
Data Sources
├── Client APIs (SCATS, STREAMS, device registries)
├── S3 flat files (reference data, clearways, restrictions)
└── Vic Open Data Hub (open geospatial data)
↓ (CRON schedule or manual GraphQL trigger)
Batch Ingestion Service
├── Load raw data into temporary tables
├── Apply SQL transformation scripts
└── Commit transformed data to main database
↓
PostgreSQL (RDS) — Reference tables
↓
Platform services (real-time pipeline, frontend, spatial)Ingested Data Sources
Batch Ingestion manages a wide catalogue of reference data sources:
Devices
| Source | Description |
|---|---|
| SCATS sites | Traffic signal intersection controllers |
| STREAMS detectors | Freeway and arterial traffic detectors |
| Pump Stations | Drainage and flood management pumps |
| AAWS | Adverse weather warning stations |
| AddInsight sites and links | Private traffic analytics network |
Road Features
| Source | Description |
|---|---|
| VMS | Variable Message Signs |
| CCTV cameras | Video surveillance camera locations |
| LUMS | Lane Use Management Signs |
| ESLS | Electronic Speed Limit Signs |
| Ramps | Freeway on/off ramp definitions |
| Ice Stations | Road ice detection stations |
| Clearways | Timed clearway zone definitions |
| Declared roads | Client-managed declared road network |
| Height clearance | Bridge and overpass height restrictions |
| Parking restrictions | Parking zone definitions |
| Turn restrictions | Intersection turn restrictions |
INFO
Live data for these sources is handled by the real-time ingestion pipeline (Data Ingestion, Data Stream Ingestion), not Batch Ingestion. Batch Ingestion only manages the static reference and geospatial attributes.
LUMS Mapping
Batch Ingestion includes a specialised mapping pipeline for LUMS (Lane Use Management Signs) that ensures no overlap of signs as they map to the OSM ways used for the platform's road network links.
The mapping uses a two-pass nearest-node approach:
- Retrieve STREAMS links — Get all STREAMS links associated with LUMS devices from the STREAMS Gateway
- Hidden Markov Method (HMM) mapping — Use the OSM network to find the best-fitting path of OSM ways for each STREAMS link's coordinates
- Overlap check — Lower-triangle-style comparison to detect and remove overlapping way assignments (FIFO removal to prevent overlap at road segment boundaries)
- First pass: nearest way — Map each LUMS to the leading edge of its nearest way using the first node, ensuring a minimum of one way per device
- Second pass: remaining ways — Map any unmapped ways to their nearest LUMS device by minimum distance
Edge Cases
- No link assigned to LUMS — Device cannot be mapped without a STREAMS link association
- HMM mapping failure — Link coordinates cannot be resolved to OSM ways
- More LUMS than ways — Only a number of LUMS equal to the number of ways will have mappings; closest devices are prioritised
Manual Trigger
For development, testing, or initial deployment, Batch Ingestion can be triggered manually via GraphQL:
graphql
mutation executeBatchIngestion {
executeBatchIngestion(sourceName: "TRAFFIC_LIGHTS") {
message
}
}In production, Batch Ingestion should only run on its CRON schedule.
Troubleshooting
Failed Ingestion Load
Symptoms: Duplicate data / multiple envObjId values for the same sObjSourceId, or CRON job logs show correct data but database state is wrong.
Common cause: Primary Key–Foreign Key relationship invalidated during the update step. Most likely for the Traffic Signals dataset due to strongly connected event data that can arrive during the delete step.
Resolution:
- Rerun the update manually:graphql
mutation updateScats { executeBatchIngestion(sourceName: "SCATS_INTERSECTION_GROUPS") { message } } - If it fails again: wait and retry, or stop Data Recorder pods and rerun. Restart Data Recorder after completion.
WARNING
Stopping Data Recorder will create a gap in playback data for the duration it is unavailable.
Stalled Update (30+ minutes)
Action: Do not restart or terminate the container. Investigate long-running database processes and identify the stalled query. Only cancel the process if you confirm it cannot self-resolve.
Monitoring Database Table Size
From Batch Ingestion v0.6.24+, run the following to check database table sizes:
graphql
mutation executeBatchIngestion {
executeBatchIngestion(sourceName: "TURN_RESTRICTIONS") {
message
}
}Results are visible on the Batch Ingestion Grafana dashboard page.
Related Services
- Data Ingestion — Handles live/real-time data ingestion; Batch Ingestion handles the complementary static reference data
- Data Stream Ingestion — Streaming data counterpart for continuous feeds
- Data Loader — Loads initial reference data on stack spin-up; Batch Ingestion handles ongoing updates
- Data Recorder — Writes live data snapshots; can conflict with Batch Ingestion during FK-constrained updates
- Experiment Manager — Central coordination service (GraphQL on
:5100); provides the API for manual batch triggers - Post Monitoring — Downstream consumer of reference data via Redshift ETL
- Data Exporter — Exports data that depends on correctly ingested reference datasets
