TagEngine — Design & Architecture
This document describes the design and architecture of the TagEngine, which is a backend state machine with connectors to other services. It covers the TagEngine's data model, its pub/sub mechanism, the adapter and configuration layers, and how higher-level features like forwarding, alarms, and historian are built on top of it.
┌──────────────────────────────────────────────────────┐
│ │
│ │
│ ┌────────────────────────────────────────────────┐ │
│ │ TagEngine (in-memory store + pub/sub) │ │
│ └──────────────────┬─────────────────────────────┘ │
│ ┌─────────────┼──────────────┐ │
│ ▼ ▼ ▼ │
│ Adapters Configuration Subscribers │
│ (OPC UA, Layer (Forwarding, │
│ MQTT, (hot-reload) Alarms, │
│ Kafka, …) Historian, …) │
│ │
│ i3X API (REST, SSE, WebSocket) │
└──────────────────────────────────────────────────────┘
1. Tag
A Tag is the fundamental unit of data in the system. It represents a single named value — a temperature reading, a motor status, an OEE (Overall Equipment Effectiveness) score — bundled with its type, quality, timestamp, and metadata.
type Tag struct {
Path string // hierarchical: "site/plant-1/line-1/temperature"
Value interface{} // typed runtime value
DataType string // float64 | int64 | string | bool | map
Quality string // good | bad | uncertain | stale
Timestamp time.Time
Metadata map[string]interface{} // engineering unit, deadband, description, …
}
Snapshot semantics
A Tag is always an immutable snapshot. When you read a tag, you get a copy of its state at that moment — it won't change under your feet. The TagEngine never hands out references to internal state. This eliminates a whole class of concurrency bugs and makes it safe to pass tags across goroutines without coordination.
A subscription can optionally maintain a snapshot queue — a bounded buffer of recent snapshots that enables store-and-forward when a downstream consumer is temporarily unavailable. We'll cover how this works in the TagEngine's subscription mechanism below.
Type system
A tag's type can be declared explicitly in its definition, but it doesn't have to be. When no type is specified, the TagEngine infers it from the first value that arrives — this is part of the tag discovery system, where new data sources can be explored without upfront configuration. Once established (either explicitly or by inference), the type is enforced on all subsequent writes, with one exception: int64 can widen to float64, but not the reverse. This catches misconfiguration early — if an MQTT adapter suddenly sends a string where a float is expected, the write is rejected rather than silently corrupting downstream calculations.
Quality
Every tag carries a quality indicator: good, bad, uncertain, or stale. This matters enormously in industrial settings — a temperature reading of 0.0 means something very different when the quality is good vs bad. Quality propagates through derived tags: if any input is bad, the output is bad too. This prevents derived tags from silently propagating invalid data.
Metadata
Metadata is an open key-value map attached to each tag, designed to provide a flexible way to extend tags with additional information that may vary across applications. It carries context that isn't part of the value itself but matters to consumers — for example, engineering_unit (e.g. "°C", "bar"), deadband (minimum change before a new value is considered significant), description, or domain-specific flags like equipment_type or safety_critical. Adapters and subscribers can use metadata for filtering, formatting, and decision-making — a historian subscriber, for instance, could select only tags where equipment_type is "sensor".
2. TagEngine
The TagEngine is the centralized state manager for all operational data — a thread-safe, in-memory key-value store where every key is a tag path and every value is a Tag. It is the single source of truth: every component in the system reads from and writes to this one structure, rather than passing messages between independent services and hoping the state stays consistent. On top of the store sits a pub/sub layer — any component can subscribe to tag changes using glob patterns and get notified in real time.
The interface is minimal:
Set(path, value, timestamp)— update a tag, notify matching subscribersGet(path)— read an immutable snapshotSubscribe(pattern, callback)— register a glob-pattern listenerMeta(path, metadata)— merge metadata into a tagSetQuality(path, quality)— update quality indicator
The TagEngine itself doesn't know about Kafka, MQTT, WebSockets, or any other adapters. It only knows about tags, values, and subscriptions. Everything else plugs in through these operations.
Subscriptions
Subscribe(pattern, callback) is the main way data flows out of the TagEngine. A subscriber registers a glob pattern (e.g. site/plant-1/*/temperature) and a callback function. Whenever a matching tag changes, the callback fires with the new snapshot and a subscription handle:
engine.Subscribe("site/plant-1/*/temperature", func(snapshot Tag, sub Subscription) {
// read the snapshot
// optionally write to child paths via sub.Set()
})
The subscription handle allows writing to child paths of the subscribed tag — for example, sub.Set("alarm/state", "active", timestamp) creates a tag at temperature/alarm/state. This is the mechanism used for attaching alarm status, derived metadata, or any subscriber-produced state to existing tags. Because writes go through the sub handle rather than engine.Set(), the TagEngine knows which subscription produced which child tag. At registration time, it adds these relationships to the same dependency graph used for derived tags and checks for cycles — if a circular dependency would be introduced, the registration is rejected.
Subscriptions can optionally request a snapshot queue by specifying a buffer size:
engine.Subscribe("site/plant-1/*/temperature", callback, WithQueueSize(1000))
When a callback returns an error — because the destination is offline, a database write failed, or a network connection dropped — the TagEngine starts buffering snapshots for that subscriber instead of discarding them. On a periodic retry, if the next delivery succeeds, the queue drains in order. If the buffer fills, the oldest snapshots are dropped. The buffer size determines how long an outage the subscriber can tolerate — a queue of 1000 on a tag that updates once per second covers roughly 16 minutes of downtime. Subscribers that don't request a queue simply miss updates during failures — which is the right default for real-time consumers like dashboards.
Error handling. Callback errors are the only errors the TagEngine actively manages — by buffering or dropping, depending on the queue configuration. Other errors follow standard Go conventions: a Set() rejected due to type mismatch, a Subscribe() rejected due to a circular dependency, or a failed adapter registration all return an error to the caller. The TagEngine does not stop or crash on individual errors; each subscriber fails independently without affecting others.
Hierarchical namespace
Tags are organized as paths: site/plant-1/line-1/temperature. This isn't cosmetic — the folder-like structure enables:
- Glob subscriptions:
site/plant-1/*/temperaturematches all lines in plant-1 - Browsable APIs: Clients can navigate the tag tree like a filesystem
- Configuration mapping: The folder structure on disk maps directly to tag paths
This mirrors how industrial systems have always organized data (think OPC UA address spaces), but with the simplicity of file paths.
Thread safety
The TagEngine uses a single sync.RWMutex. Reads take a read-lock, writes take a write-lock. But there's a subtlety in how subscriptions fire:
1. Acquire write lock
2. Update the tag value
3. Copy the list of matching subscriptions
4. Release the lock
5. Fire callbacks outside the lock
If we fired callbacks while holding the lock, a subscriber that calls back into the TagEngine would deadlock. By copying and releasing first, we keep the critical section minimal and let callbacks do whatever they need — including reading other tags.
Each callback also runs with panic recovery, so one misbehaving subscriber can't take down the entire notification pipeline.
Derived tags
Some tags don't come from external sources — their values are computed from other tags. A temperature in Fahrenheit derived from a Celsius reading. An OEE score calculated from availability, performance, and quality metrics.
From the TagEngine's perspective, a derived tag is just a subscriber that writes back. When an input tag changes, the expression recalculates and calls Set() on the output tag, which in turn notifies its own subscribers. The derivation chain is just the pub/sub mechanism applied recursively.
This raises an obvious question: what happens with circular dependencies? If tag A derives from tag B and tag B derives from tag A, the chain would loop forever. The TagEngine maintains a dependency graph that tracks all relationships between tags — derived tags, sub.Set() child writes, and any other subscription that produces output. When a new relationship is registered, the graph is checked for cycles. If a circular dependency is detected, the registration is rejected and the user is notified. Catching cycles at registration time surfaces configuration mistakes early and gives the user a chance to fix the dependency structure.
3. Adapters
Adapters are the bridge between the TagEngine and the outside world. Each adapter implements a connection to an external component and interacts with the TagEngine through its operations: Set() to write values, Subscribe() to read changes, Meta() to attach metadata, and SetQuality() to update quality indicators.
type Adapter interface {
Start(engine *TagEngine) error
Stop() error
}
To illustrate, here are some adapters that exist or are planned:
| Adapter | Direction | Protocol |
|---|---|---|
| Kafka | Source / Destination | Protobuf over Kafka |
| MQTT | Source / Destination | JSON/binary over MQTT |
| OPC UA | Source / Destination | OPC UA |
| TimescaleDB | Source / Destination | SQL |
| HTTP | Source / Destination | REST polling / webhooks |
| Random | Source | Generated test data |
The adapter interface is deliberately simple, so adding new ones is straightforward. Each adapter runs as an independent goroutine, and none of them know about each other.
Demand-driven connections: Adapter connections follow actual demand. A source adapter connects during discovery to browse the device and build tags.json, then disconnects. It only reconnects when a subscriber registers for matching tags, and disconnects again when the last subscriber goes away. Subscribers similarly don't initiate outbound connections until data actually flows. You could have 50 adapters configured but only 3 actively connected — because only 3 have subscribers. When a source adapter closes its connection, it marks all its tags as stale — so the TagEngine never silently serves outdated values with good quality.
4. Configuration Layer
The TagEngine starts empty. It needs to be told which tags exist, where their data comes from, and where it should go. This is the job of the configuration layer.
Configuration lives in a watched directory tree. The config manager watches the entire tree for changes and hot-reloads without restarting the service — update a connection, add a tag file, or drop in a new subscriber, and the changes take effect within seconds.
config/
├── adapters/
│ ├── plc-line3.yaml # OPC UA connection
│ └── cloud-broker.yaml # MQTT cloud connection
├── tags/
│ └── site/plant-1/
│ ├── tags.json # explicit tag definitions
│ └── discovery.json # auto-discovery rules
└── subscribers/
└── cloud-forward.json # forward tags to cloud
adapters/
The adapters/ folder contains YAML files with connection parameters — host, port, protocol, credentials. An adapter config is just a connection definition; it doesn't say anything about which tags flow through it or in which direction. To illustrate, here are two typical configs:
# adapters/plc-line3.yaml
protocol: opcua
host: 192.168.1.50
port: 4840
security_mode: sign_and_encrypt
# adapters/cloud-broker.yaml
protocol: mqtt
host: cloud.example.com
port: 8883
tls: true
sparkplug:
group_id: "factory-1"
edge_node_id: "line-3"
tags/
The tags/ folder defines which tags exist and where their data comes from. Both discovery.json and tags.json reference adapters by name rather than embedding connection parameters directly — this means an adapter's host, port, or credentials can be changed without touching any tag definitions.
The system supports two modes of tag creation. A discovery.json file defines rules for automatically discovering tags from incoming data — useful during initial setup or when exploring a new data source. The discovered tags are written out to a tags.json file and discovery.json is ignored from that point on. Removing tags.json re-enters discovery mode. This gives you the convenience of auto-discovery without the unpredictability of unbounded tag creation in production.
It's the tags.json that the configuration layer uses to register tags in the TagEngine — discovery is a one-time step that produces this file. The configuration layer reads tags.json, creates the corresponding tags, and wires them to the appropriate adapters.
subscribers/
The subscribers/ folder defines what happens when tag values change — forwarding data to another system, triggering alarms, recording to a historian. Each subscriber config specifies a tag pattern to watch and what to do with the matched values.
MongoDB
Configuration can also be stored in MongoDB. It's a practical option in containerized environments where mounting a filesystem might not always be straightforward. MongoDB Change Streams provide the same live-reload behavior as the filesystem watcher.
5. Forwarding Data
Forwarding data from any adapter to any other is one of the most common applications of the TagEngine. Any data that flows into the TagEngine from one source can be forwarded to one or more destinations — regardless of protocol. The TagEngine doesn't care about the direction; it's all just Subscribe() calls wired to adapter write paths.
A subscriber declares what tags it cares about (via selectors) and which adapter to use for delivery. The same tag can feed multiple subscribers independently. This decoupling means new data flows can be added without modifying existing tag definitions or adapters.
Detailed example: Device to Mqtt
Reading tags from an OPC UA device on the factory floor and forwarding them to a cloud MQTT broker using Sparkplug B. We already defined both adapters (plc-line3 and cloud-broker) in the configuration layer above.
Step 1 — discover the device tags. A discovery.json references the OPC UA adapter and defines how to map device paths to TagEngine paths:
{
"adapter": "plc-line3",
"browse": { "root_node": "ns=2;s=Line3" },
"tag_mapping": {
"path_template": "plant-1/line-3/{{ node_path }}"
}
}
The adapter connects to the device, browses its address space starting from the root node, and writes the results to tags.json — a concrete list of tag definitions with their mapped paths and inferred types:
{
"adapter": "plc-line3",
"tags": [
{
"path": "plant-1/line-3/motor-1/temperature",
"source_path": "ns=2;s=Line3.Motor1.Temperature",
"type": "float64"
},
{
"path": "plant-1/line-3/motor-1/speed",
"source_path": "ns=2;s=Line3.Motor1.Speed",
"type": "int64"
}
]
}
From this point on, the configuration layer loads tags from tags.json into the TagEngine like any other tag definition.
Step 2 — forward to the cloud. A subscriber watches the discovered tags and forwards them to the cloud broker:
{
"adapter": "cloud-broker",
"protocol": "sparkplug-b",
"selector": {
"paths": ["plant-1/line-3/**"]
},
"tag_mapping": {
"metric_name": "{{ last_segment(path) }}"
}
}
The subscriber watches all tags under plant-1/line-3/ and, whenever a value changes, publishes it as a Sparkplug B metric to the cloud broker. The tag_mapping reverses what the discovery step did — mapping TagEngine paths back to Sparkplug metric names.
6. Alarms
In industrial environments, abnormal conditions need immediate attention — a temperature rising above a safe threshold, a pressure drop indicating a leak, or a motor drawing excessive current. Alarms turn raw data into actionable notifications so operators can respond before a minor anomaly becomes a costly failure.
The alarm logic lives entirely in the alarm adapter. It subscribes to tags, evaluates conditions, and manages the full alarm lifecycle — the TagEngine itself knows nothing about alarms. The alarm adapter can have different implementations, and this document doesn't aim to fix its design. What follows are the expectations we have for any implementation.
We expect an alarm adapter to support different alarm types — threshold alarms with severity levels, rate-of-change, deviation from setpoint, quality-based (sensor failure) — and to track each alarm through a state machine: normal → active → acknowledged → normal. The alarm state can be exposed as a tag in the TagEngine (site/plant-1/line-1/temperature/alarm/state), so dashboards and other subscribers can react to alarm changes the same way they react to process data.
We also expect notification capabilities — email, SMS, or other channels — when an alarm transitions to a new state. Different alarm subscribers can subscribe to different tags with different rules and notification strategies, so a critical safety alarm can page an operator immediately while a minor deviation just logs a warning.
7. Historian
A historian records tag values over time for trend analysis, reporting, and compliance. In its simplest form, a historian is just a subscriber with a database:
engine.Subscribe("site/**", func(snapshot Tag, sub Subscription) {
db.Insert(snapshot.Path, snapshot.Value, snapshot.Timestamp)
})
Every tag change is written to a time-series database (TimescaleDB, InfluxDB, QuestDB). If the database is temporarily unavailable, the snapshot queue buffers values and drains them when the connection recovers — the existing snapshot queue mechanism handles store-and-forward. Optimizations like deadband filtering or max-interval writes can be added later in the historian adapter if write volume becomes a problem.
Querying bypasses the TagEngine entirely — historical reads go straight to the database. The TagEngine is a real-time system; it facilitated the original writes, but time-range queries, aggregations, and interpolation are the database's job.
8. i3X Compliance and TagEngine
The Industrial Information Interoperability eXchange (i3X) is an open specification that defines how industrial data systems expose their data to external clients. It standardizes endpoints for browsing namespaces, reading and writing values, and subscribing to real-time changes — using REST and Server-Sent Events (SSE).
TagEngine maps naturally to i3X: tag paths are namespaces, Get() and Set() are reads and writes, and Subscribe() is real-time streaming. The API layer implements the i3X specification on top of these primitives.
REST API
The i3X REST API provides endpoints for browsing the tag namespace, reading and writing individual tag values, and querying metadata. It follows standard HTTP semantics — GET to read, PUT to write — so any HTTP client can interact with the TagEngine without a specialized SDK.
Server-Sent Events (SSE)
For real-time streaming, i3X uses SSE. A client opens a persistent HTTP connection and receives tag updates as they happen. SSE is simpler than WebSocket — it's one-directional (server to client), works through proxies and firewalls without special configuration, and requires no handshake beyond a standard HTTP request.
Beyond the spec
Where the i3X specification doesn't cover a use case or a simpler alternative exists, TagEngine extends it. For example, we also expose a WebSocket interface for bidirectional communication — clients can subscribe, read, write, and browse the tag tree over a single connection. This is particularly useful for interactive dashboards that need both real-time updates and user-initiated writes.
The i3X project also provides a web-based user interface built on top of the i3X specification. It allows users to browse the tag namespace, inspect live values, and plot tags on charts. Because TagEngine implements the i3X API, this interface is compatible with the TagEngine without additional integration work.