Appearance
Data Stream Ingestion
Overview
Data Stream Ingestion handles event-driven, real-time data feeds that cannot be served effectively through polling. Where Data Ingestion periodically requests data from HTTP endpoints, Data Stream Ingestion subscribes to streaming sources — receiving messages as they are published with minimal latency. This is essential for the OR platform's real-time situational awareness capability, ensuring that operators see traffic conditions as they happen rather than on a polling delay.
The service currently connects to AMQP 1.0 (Advanced Message Queuing Protocol) topics on the client's ESB integration platform, subscribing to data from STREAMS and SCATS and forwarding it directly to the Data Transformer. By receiving published messages rather than polling, the platform achieves very low latency on event-driven data and avoids placing unnecessary load on upstream endpoints.
Data Stream Ingestion is designed to be extensible. While it currently operates with AMQP protocol connections, it can be extended to support other pub-sub and data stream formats from internal, third-party, or external services — making it a flexible component in OR's ingestion layer.
Architecture
- Port:
:5700 - Language: Python
- Scaling: Singleton
- Protocol: AMQPS 1.0 (SSL/TLS encrypted)
Key Components
- AMQP connection handler — Built on the Apache QPID Proton connector package, extending the
MessagingHandlerclass with Prometheus metric instrumentation on top. - Per-topic processes — Each AMQP Topic connection runs on its own process, isolating data sources and preventing one slow topic from blocking others.
- Configuration file — Defines the AMQPS protocol endpoint and the set of Topic connections to subscribe to. Configs are split between Prod and Non-Prod environments and stored in-code.
- HTTP health server — The main spawning process runs an HTTP server to handle
/readyand/livechecks from the Kubernetes control plane. - Certificate-based auth — Required certificates are mounted using Kubernetes volume mounts. The system is designed for SSH over TLS.
Data Flow
Client ESB (AMQP 1.0 Topics)
↓ (event-driven push)
Data Stream Ingestion [:5700]
├── Topic 1 process (e.g. STREAMS SVO)
├── Topic 2 process (e.g. STREAMS SVTT)
├── Topic 3 process (e.g. SCATS Detector Counts)
└── ...
↓ (HTTP POST to /data endpoint)
Data Transformer [:5800]Sources
Data Stream Ingestion currently subscribes to the following AMQP topics from the client ESB:
| Source | Data Provided | Notes |
|---|---|---|
| SCATS Detector Counts | Devices, Ways (traffic volume, flow) | Volume updated every 5 minutes per detector, standardised to 20-second intervals |
| STREAMS SVO | Ways (speed, occupancy, volume, flow, productivity) | Detectors mapped to closest road; updated every 20 seconds |
| STREAMS SVTT | Ways (speed, travel time) | Mapping shared with SVO; instantaneous measures |
| STREAMS TIS | Environment Objects (VMS data) | Variable message sign content |
| STREAMS FCS | Environment Objects (VMS data) | Freeway control system signs |
Configuration
The service is configured through a file that specifies:
- AMQPS endpoint — The protocol connection string for the ESB
- Topic connections — A list of topic subscriptions, each running as an independent process
Each topic handler subscribes to its data source and passes received messages through to the Data Transformer /data endpoint as raw or batched data.
Related Services
- Data Transformer — Downstream consumer that normalises streamed data
- Data Ingestion — Sibling ingestion service for HTTP/FTP polling sources
- Data Redis Ingestion — Sibling ingestion service for Redis-based sources
- Data Fusion — Fuses transformed data from multiple sources
- Experiment Manager — Central coordination service (GraphQL on
:5100)
