Skip to content

Data Stream Ingestion

Platform Users — Engineers & Low-code Ops Users (ORA / Panel Builder) OR Platform ORA — AI Planning Interface Agent Workflows Plan Visualisation ADK Integration SDK UI — Frontend Shell FDK Architecture Low code Config-driven DDK Schema Definition Code Generator Generated Server MDK WEM DAL Experiment Manager Nexus Deployment Control Live Monitoring Registry Browser SCDK Source Control Pipeline Mgmt Azure DevOps deploys ↓ SDK API — GraphQL Federation Gateway Federation Gateway Component Resolvers Auth & Licensing Plugins: gql-autogeneration Migrator Helm KinD Boilerplate GenAI ··· Microservices — Domain IP Services Data Pipeline Core Platform Metrics & Analytics Spatial & Geo Simulation Event Detection Camera & Device Fire & Resource Opt. Satellite Modelling ↓ Nexus deploys Deployed OR Applications Rail Ops Dashboard Mine Mgmt Dashboard Port Ops Dashboard ··· FDK-built · DDK-backed · MDK-powered · deployed via Nexus ↑ Application Users — Operations Teams (shift managers, analysts, planners)

Overview

Data Stream Ingestion handles event-driven, real-time data feeds that cannot be served effectively through polling. Where Data Ingestion periodically requests data from HTTP endpoints, Data Stream Ingestion subscribes to streaming sources — receiving messages as they are published with minimal latency. This is essential for the OR platform's real-time situational awareness capability, ensuring that operators see traffic conditions as they happen rather than on a polling delay.

The service currently connects to AMQP 1.0 (Advanced Message Queuing Protocol) topics on the client's ESB integration platform, subscribing to data from STREAMS and SCATS and forwarding it directly to the Data Transformer. By receiving published messages rather than polling, the platform achieves very low latency on event-driven data and avoids placing unnecessary load on upstream endpoints.

Data Stream Ingestion is designed to be extensible. While it currently operates with AMQP protocol connections, it can be extended to support other pub-sub and data stream formats from internal, third-party, or external services — making it a flexible component in OR's ingestion layer.

Architecture

  • Port: :5700
  • Language: Python
  • Scaling: Singleton
  • Protocol: AMQPS 1.0 (SSL/TLS encrypted)

Key Components

  • AMQP connection handler — Built on the Apache QPID Proton connector package, extending the MessagingHandler class with Prometheus metric instrumentation on top.
  • Per-topic processes — Each AMQP Topic connection runs on its own process, isolating data sources and preventing one slow topic from blocking others.
  • Configuration file — Defines the AMQPS protocol endpoint and the set of Topic connections to subscribe to. Configs are split between Prod and Non-Prod environments and stored in-code.
  • HTTP health server — The main spawning process runs an HTTP server to handle /ready and /live checks from the Kubernetes control plane.
  • Certificate-based auth — Required certificates are mounted using Kubernetes volume mounts. The system is designed for SSH over TLS.

Data Flow

Client ESB (AMQP 1.0 Topics)
        ↓ (event-driven push)
Data Stream Ingestion [:5700]
  ├── Topic 1 process (e.g. STREAMS SVO)
  ├── Topic 2 process (e.g. STREAMS SVTT)
  ├── Topic 3 process (e.g. SCATS Detector Counts)
  └── ...
        ↓ (HTTP POST to /data endpoint)
Data Transformer [:5800]

Sources

Data Stream Ingestion currently subscribes to the following AMQP topics from the client ESB:

SourceData ProvidedNotes
SCATS Detector CountsDevices, Ways (traffic volume, flow)Volume updated every 5 minutes per detector, standardised to 20-second intervals
STREAMS SVOWays (speed, occupancy, volume, flow, productivity)Detectors mapped to closest road; updated every 20 seconds
STREAMS SVTTWays (speed, travel time)Mapping shared with SVO; instantaneous measures
STREAMS TISEnvironment Objects (VMS data)Variable message sign content
STREAMS FCSEnvironment Objects (VMS data)Freeway control system signs

Configuration

The service is configured through a file that specifies:

  1. AMQPS endpoint — The protocol connection string for the ESB
  2. Topic connections — A list of topic subscriptions, each running as an independent process

Each topic handler subscribes to its data source and passes received messages through to the Data Transformer /data endpoint as raw or batched data.

  • Data Transformer — Downstream consumer that normalises streamed data
  • Data Ingestion — Sibling ingestion service for HTTP/FTP polling sources
  • Data Redis Ingestion — Sibling ingestion service for Redis-based sources
  • Data Fusion — Fuses transformed data from multiple sources
  • Experiment Manager — Central coordination service (GraphQL on :5100)

User documentation for Optimal Reality