Appearance
Component Types
Overview
Every WorkflowTask in the MDK is classified by a component type that determines how the task is executed. The component type controls:
- How input data is prepared
- Where and how execution occurs
- How output data is captured and stored
Component types are how the MDK keeps specialist models independent while making them work together. Each type defines a different execution strategy that the Workflow Execution Manager follows — MODEL tasks invoke a containerised microservice over HTTP, OPERATOR tasks run custom Python logic via PyRunner, and IMPORT_DDK and EXPORT_DDK tasks bridge the workflow to DDK-generated data servers. Because every task speaks through these standardised contracts, teams can mix Python models, Julia simulators, Go services, and custom transformation scripts in a single pipeline without any integration code between them. See Data Flow & Transformations for how the DAL moves data between tasks once each one completes.
There are four component types, and the taxonomy is deliberate: not every workflow task is model inference. Before settling on four types, we considered a simpler model — a single "execute" type that the workflow author configures differently depending on what they want to do. That approach maximises flexibility but removes structure: there is no way to know from the task definition whether it reads from an external system, writes to one, or runs internal computation. The four-type taxonomy makes task intent explicit in the workflow design, which matters for auditing (you can see at a glance which tasks touch external systems), for validation (the WEM enforces type-appropriate configuration), and for the compatibility system (IMPORT_DDK and EXPORT_DDK tasks connect to DDK servers through defined configurations; MODEL and OPERATOR tasks have different input/output resolution paths).
| Component Type | Purpose | Execution Target |
|---|---|---|
EXPORT_DDK | Export data to external systems | Custom function → external API calls |
IMPORT_DDK | Import data from external systems | External API calls → custom function |
OPERATOR | Custom data transformation | Custom function (Python/Julia) |
MODEL | ML model inference | HTTP POST to containerised model |
Execution Decision Tree
When the WEM picks up a task for execution, the component type determines the code path:
WorkflowTask
│
├─ ComponentType?
│ │
│ ├─ EXPORT_DDK → Execute custom export function in PyRunner → Call external APIs
│ ├─ IMPORT_DDK → Query external APIs → Execute custom import function in PyRunner
│ ├─ OPERATOR → Execute custom transformation function in PyRunner
│ └─ MODEL → HTTP POST to containerised model endpointEXPORT_DDK
Use Case
Export workflow results to an external database or API. Uses an Operator-to-DDK resolver pattern — a custom function formats data, then external API calls push it out.
Execution Flow
Execute Custom Function
- Find file in directory or load function code from
Function.functionContents - Execute with data from upstream tasks
- Function formats data according to external API requirements
- Find file in directory or load function code from
Call External APIs
- Make HTTP requests to external endpoints
- Configuration from
ModelConfigOutput.outputConfig - One API call per output field/entity
Data Flow
[Upstream Tasks] → [Execute Export Function: Format Data]
↓
[HTTP POST /api/cities]
[HTTP POST /api/countries]
↓
[Mark Task Complete]Configuration Example
Each ModelConfigOutput entry defines an external endpoint and mutation:
json
// Model.ModelConfigOutputs[0]
{
"name": "cities",
"outputConfig": {
"resolverUrl": "https://api.example.com/graphql",
"mutation": "mutation CreateCity($name: String!) { ... }"
}
}The custom function is responsible for transforming upstream data into the structure the external API expects. The output config entries are iterated — one API call per configured output.
IMPORT_DDK
Use Case
Import data from an external GraphQL/REST API. Uses a DDK resolver-to-Operator pattern — external queries retrieve data, then a custom function transforms it for downstream consumption.
Execution Flow
Query External APIs
- Make HTTP requests to external data sources
- Configuration from
ModelConfigInput.inputConfig - Typically GraphQL queries or REST API calls
- One query per input source
Execute Custom Function
- Find file in directory or load function code from
Function.functionContents - Execute with raw API responses
- Function transforms and validates data
- Outputs structured data for downstream tasks
- Find file in directory or load function code from
Data Flow
[Start Task] → [HTTP GET/POST: GraphQL Query 1]
→ [HTTP GET/POST: GraphQL Query 2]
↓
[Execute Import Function: Transform Data]
↓
[Write Structured Output Files]
↓
[Downstream Tasks]Configuration Example
Each ModelConfigInput entry defines the source endpoint and query:
json
// Model.ModelConfigInputs[0].inputConfig
{
"resolverUrl": "https://api.example.com/graphql",
"resolverConfig": {
"query": "query GetCities($country: String) { cities(country: $country) { name population } }",
"variables": {"country": "USA"}
}
}The import function receives the raw query results and is responsible for parsing, validating, and structuring the data before writing output files.
OPERATOR
Use Case
Custom data transformation — merge datasets, filter rows, compute derived fields, run statistical operations, or any user-defined logic.
Execution Flow
- Execute Custom Function
- Find file in directory or load function code from
Function.functionContents - Execute via PyRunner (Python) or Julia runtime
- Inputs sourced from
configValueand/or upstream task outputs - Function performs transformation/computation
- Outputs written to filesystem for downstream tasks
- Find file in directory or load function code from
Data Flow
[Upstream Task A Output] →
[Upstream Task B Output] → [Execute Operator Function] → [Write Output] → [Downstream Tasks]paramName Mapping
When an operator has upstream dependencies with OutputDependency = true, the paramName field on WorkflowTaskDependency specifies which function parameter receives each upstream output:
Task A (output) ──[paramName: "dataset_a"]──> Merge Operator
Task B (output) ──[paramName: "dataset_b"]──> Merge OperatorThe function signature would be:
python
def execute(dataset_a, dataset_b):
merged = merge(dataset_a, dataset_b)
return mergedMerge Operator Special Case
When an operator's model path is /merge_operator:
- All upstream dependencies are automatically set to
OutputDependency = true - This ensures all upstream outputs are collected and available for merging
- No manual dependency configuration needed — the system enforces it
MODEL
Use Case
Run inference on a containerised AI/ML model or microservice deployed in the KinD cluster.
Execution Flow
Prepare Request
- Build request payload from
configValueand upstream outputs - Resolve model endpoint from
ModelRegistry
- Build request payload from
Execute HTTP Request
- POST to the containerised model's endpoint
- Model processes request and returns predictions
Store Output
- Save model predictions to file system or database
- Make available for downstream tasks
Data Flow
[Upstream Task Outputs] → [Build Request Payload]
↓
[HTTP POST /predict]
↓
[Pod / Docker Container: AI/ML Model]
↓
[Store Predictions]
↓
[Return DAL response]
↓
[Downstream Tasks pick up results from previous DAL response]Model Deployment
Models run as containerised services:
| Component | Role |
|---|---|
ModelRegistry | Stores repository URL (appRepository) and port (appPort) |
ModelRegistryVersion | Manages container tags and versions |
| AWS ECR | Hosts Docker images |
| KinD Pod | Runs the container in the local cluster |
The WEM resolves the model endpoint by combining ModelRegistry.appRepository (or custom image URL) with ModelRegistry.appPort, then sends an ORHttpRequest to that endpoint.
OR Types Contract
All MODEL tasks communicate using the OR Types system:
Request: ORHttpRequest → { inputs, params, version, output_path }
Response: ORHttpResponse → { output, metadata, logs }The model container must implement a handler that:
- Accepts
ORHttpRequestas POST body - Processes the
paramsandinputsfields - Returns
ORHttpResponsewith results inoutputand logs/metadata as needed
See Schema Guide — OR Types for the full type definitions.
HTTP + JSON was chosen for model invocation because it is the lowest-common-denominator interface for polyglot microservices: any language with an HTTP library can implement the OR contract, and any OR model can be tested with curl. This keeps specialist teams autonomous — a Julia modeller or Python data scientist can build and test models without managing proto definitions or OR-specific toolchain dependencies.
Component Type Comparison
| Aspect | EXPORT_DDK | IMPORT_DDK | OPERATOR | MODEL |
|---|---|---|---|---|
| Direction | Outbound | Inbound | Internal | Internal |
| External API | Yes (output) | Yes (input) | No | No |
| Custom Function | Yes (formatting) | Yes (transformation) | Yes (logic) | No |
| Container | No | No | No | Yes |
| Typical Use | Push data to external DB | Pull data from external API | Transform/merge data | AI/ML inference & Data Processing |
| Config Source | outputConfig | inputConfig | Python code itself | configValue |
Advanced Topics
Priority-Based Scheduling
When multiple tasks dependencies are required to break a cycle, the WEM uses the priority field on WorkflowTask to determine execution order:
- Each task has a
priorityfield (integer) - Higher priority tasks are scheduled first
- Default priority:
0 - Only is utilised during idle time when multiple tasks are ready but cannot run due to cyclic dependencies. The WEM will pick the task with the highest priority to execute first, which can help resolve and break cycles more efficiently.
Use cases:
- In a cycle between two tasks, setting one to higher priority can ensure it runs first and unblocks the other
- In a cycle with multiple tasks, priority can help control the execution order to ensure critical tasks run first and reduce overall execution time. (i.e. a feedback loop where you want the input task to start first, and then the feedback loop task to take effect after)
Function Versioning via Timestamp
Functions in the MDK are mutable (users can edit them). To ensure reproducibility:
- Function saved with
dtTimestamp = Unix timestamp - File written as
{function_name}_{dtTimestamp}.py - Task references the specific timestamped file
Benefits:
- Immutable function versions after save
- Rollback to previous versions
- Multiple experiments can use different function versions concurrently
Planning Agent Integration
The MDK supports an AI planning agent that dynamically decides which tools to invoke:
- Detection: Tasks with
Model.path == "/ai_planning_agent" - DAG Metadata Injection: Collects all
AI_TOOLtasks in the workflow and injects metadata (swagger URLs, use cases) into the agent's params:
json
{
"DAG": [
{
"workflowTaskId": "task_123",
"x_type": "AI_TOOL",
"x_use_case": "sentiment_analysis",
"route": "/analyze",
"swagger": "http://sentiment-model:8080/docs/schema"
}
]
}See Also
- Data Schema — WorkflowTask and ModelConfig entity structure that configures each component type
- DDK Overview — the data servers that IMPORT_DDK and EXPORT_DDK task types connect to
- Nexus — manages deployment of MODEL component containers into the cluster before the WEM can invoke them
