Teil 3: Praktische Implementierung
Schritt-für-Schritt-Anleitung zur Integration von OPC UA und MQTT
Praktische Implementierung: So gelingt die Integration Schritt für Schritt
PRAXISLEITFADEN
Die erfolgreiche Implementierung einer Shopfloor-to-Cloud-Architektur mit OPC UA und MQTT erfordert eine strukturierte Vorgehensweise. Die folgenden fünf Schritte führen von der Konzeption bis zur produktiven Inbetriebnahme.
1 Datenquellen identifizieren
Bevor Hardware beschafft oder Code geschrieben wird, muss klar sein: Welche Maschinendaten sind für welche Cloud-Anwendung relevant?
Systematische Datenerfassung
- Bestandsaufnahme der Maschinen: Welche Anlagen sollen angebunden werden? Welche Steuerungen (SPS-Typen, Baujahre) sind verbaut?
- Use-Case-Definition: Was soll mit den Daten geschehen?
- Predictive Maintenance: Vibrationsdaten, Temperaturen, Betriebsstunden
- Qualitätsüberwachung: Prozessparameter, Toleranzabweichungen, Ausschuss-Raten
- OEE-Berechnung: Maschinenzustände (Run/Stop/Error), Taktzahlen, Stillstandsursachen
- Energiemanagement: Leistungsaufnahme, Druckluftverbrauch
- Datenprioritäten festlegen: Nicht alle Daten sind gleich wichtig. Ein Maschinenstillstand ist kritischer als eine Temperaturänderung von 0,5°C.
Beispiel: Datenmatrix für CNC-Fräsmaschine
| Datenpunkt | Quelle | Sampling | Use Case |
|---|---|---|---|
| Spindeldrehzahl | SPS Siemens S7-1500 | 1 Hz | Qualität, OEE |
| Spindeltemperatur | Pt100-Sensor via Modbus | 0.1 Hz | Predictive Maint. |
| Vibration X/Y/Z | IIoT-Sensor (wireless) | 10 Hz | Predictive Maint. |
| Maschinenstatus | SPS Siemens S7-1500 | Event-based | OEE, Monitoring |
| Energieverbrauch | Stromzähler via M-Bus | 0.017 Hz (1/min) | Energie-Mgmt. |
2 Edge Gateway auswählen und konfigurieren
Das Edge Gateway ist die zentrale Komponente der Architektur. Für KMU gibt es drei grundlegende Ansätze:
Hardware-Optionen
Option A: Industrieller Edge Computer
Geräte wie Siemens SIMATIC IPC127E, Beckhoff CX-Serie oder Wago Edge Controller bieten robuste Hardware für raue Industrieumgebungen.
- Vorteile: Zertifiziert für industrielle Umgebung (EMV, Temperatur), vorinstallierte Software-Stacks
- Nachteile: Höherer Preis (1500–5000 Euro), teilweise proprietäre Ökosysteme
- Für wen: Unternehmen mit hohen Verfügbarkeitsanforderungen, wenig IT-Expertise
Option B: Industrie-PC mit Open-Source-Software
Standard-Industrie-PCs (z.B. OnLogic Helix-Serie, Advantech UNO-Serie) mit Linux und Open-Source-Tools (Node-RED, Ignition Edge, Eclipse IoT).
- Vorteile: Flexibel, kostengünstig (500–1500 Euro), große Community, keine Vendor Lock-ins
- Nachteile: Mehr Konfigurationsaufwand, weniger Out-of-the-Box-Support
- Für wen: Unternehmen mit IT-Expertise, individuelle Anforderungen
Option C: Raspberry Pi als Budget-Lösung
Raspberry Pi 4 (8 GB RAM) mit Hutschienen-Gehäuse und industrieller microSD-Karte.
- Vorteile: Sehr kostengünstig (150–300 Euro komplett), ideal für Prototypen und kleine Installationen
- Nachteile: Keine industrielle Zertifizierung, begrenzte Rechenleistung, SD-Karte als Single-Point-of-Failure
- Für wen: Pilotprojekte, Proof-of-Concept, bis 10 Maschinen
Software-Stack für das Gateway
Ein typischer Open-Source-Stack umfasst:
# Gateway Software-Komponenten
# Betriebssystem
Ubuntu Server 22.04 LTS (oder Debian 12)
# OPC UA Client Library
pip install opcua # Python-Bibliothek
# Alternative: open62541 (C), node-opcua (Node.js)
# MQTT Client Library
pip install paho-mqtt # Python-Bibliothek
# Lokale Datenspeicherung (optional)
sudo apt install influxdb # Time-series Database
# oder SQLite für einfache Pufferung
# Orchestrierung & Monitoring
sudo snap install node-red # Flow-basierte Programmierung
sudo apt install prometheus # Metriken & Monitoring
3 OPC UA Server am Shopfloor einrichten
Je nach Maschinenausstattung gibt es verschiedene Wege zur OPC UA-Anbindung:
Variante 1: Native OPC UA-Unterstützung der SPS
Moderne SPSen (Siemens S7-1500, Beckhoff CX, B&R X20) haben integrierte OPC UA Server. Die Konfiguration erfolgt über das Engineering-Tool:
- OPC UA Server in der SPS-Konfiguration aktivieren
- Variablen für OPC UA freigeben (Address Space definieren)
- Sicherheitseinstellungen konfigurieren (Zertifikate, Benutzerrollen)
- Endpunkt-URL notieren (z.B.
opc.tcp://10.0.1.50:4840)
Beispiel: Siemens TIA Portal
- TIA Portal öffnen → Geräte & Netze → S7-1500 auswählen
- Eigenschaften → OPC UA → „OPC UA Server aktivieren“ ankreuzen
- Server-Interfaces → Port 4840 freigeben, Sicherheitsmodus auswählen
- Variablentabelle → gewünschte Variablen markieren → „OPC UA-sichtbar“ aktivieren
- Projekt kompilieren und auf SPS laden
Variante 2: Retrofit mit OPC UA Gateway
Für ältere Maschinen ohne OPC UA-Unterstützung dienen Protokoll-Gateways als Adapter:
- Modbus TCP → OPC UA: Geräte wie Anybus X-Gateway oder Software-Lösungen (KEPServerEX, Ignition)
- PROFINET → OPC UA: Siemens Industrial Edge, Softing Industrial Gateway
- Analoge Signale → OPC UA: Wago 750-Serie mit OPC UA Firmware
Sicherheitseinrichtung (Best Practice)
# OPC UA Client mit Zertifikat-Authentifizierung (Python)
from opcua import Client
from opcua.crypto import security_policies
client = Client("opc.tcp://10.0.1.50:4840")
# Sicherheitsmodus: Sign & Encrypt
client.set_security(
security_policies.SecurityPolicyBasic256Sha256,
certificate="client-cert.der",
private_key="client-key.pem",
server_certificate="server-cert.der"
)
# Benutzer-Authentifizierung
client.set_user("gateway_user")
client.set_password("secure_password_123")
client.connect()
print("Sicher verbunden mit OPC UA Server")
# Daten lesen
root = client.get_root_node()
objects = client.get_objects_node()
machine = objects.get_child(["2:Machine", "2:Spindle"])
temperature = machine.get_child(["2:Temperature"])
print(f"Temperatur: {temperature.get_value()}°C")
client.disconnect()
4 MQTT Broker und Topics definieren
Die Wahl des MQTT-Brokers hängt von Skalierung und Budget ab:
Broker-Optionen
Managed Cloud-Broker (empfohlen für Einstieg)
- HiveMQ Cloud: Kostenloser Tier bis 100 Clients, Pay-as-you-grow
- AWS IoT Core: Tief integriert in AWS-Ökosystem, serverless
- Azure IoT Hub: Microsoft-Stack, Device Management inklusive
Vorteil: Keine Infrastruktur-Verwaltung, automatische Skalierung, hohe Verfügbarkeit
Self-Hosted Open-Source-Broker
- Mosquitto: Leichtgewichtig, einfach, aber begrenzte Skalierung
- EMQX: Hochskalierbar, Cluster-fähig, Web-Dashboard
- VerneMQ: Erlang-basiert, sehr stabil, für große Installationen
Vorteil: Volle Kontrolle, keine Cloud-Abhängigkeit, DSGVO-konform on-premise
Topic-Struktur definieren
Eine durchdachte Topic-Hierarchie erleichtert spätere Skalierung:
# Empfohlene Topic-Struktur (nach ISA-95)
company/site/area/line/machine/datatype
# Konkrete Beispiele:
acme-gmbh/wiesbaden/assembly/line-a/cnc-05/temperature
acme-gmbh/wiesbaden/assembly/line-a/cnc-05/status
acme-gmbh/wiesbaden/assembly/line-a/cnc-05/vibration
acme-gmbh/wiesbaden/assembly/line-a/cnc-05/energy
# Wildcard-Subscriptions:
acme-gmbh/wiesbaden/+/+/+/temperature # Alle Temperaturen in Wiesbaden
acme-gmbh/# # Alle Daten des Unternehmens
⚠️ Häufige Fehler bei Topic-Design
- Zu flache Hierarchie:
machine05_tempskaliert nicht bei 100 Maschinen - Umlaute und Sonderzeichen: Besser
cnc-05stattCNC_Maschine_5_(Halle_A) - Keine Versionierung: Bei Payload-Änderungen besser
/v1/temperature,/v2/temperature
Payload-Format & Quality of Service
# JSON-Payload mit Metadaten (Best Practice)
{
"timestamp": "2025-11-08T14:35:22Z",
"machine_id": "cnc-05",
"site": "wiesbaden",
"data": {
"temperature": 72.3,
"unit": "celsius",
"sensor_id": "PT100-A5"
},
"quality": "good",
"source": "gateway-01"
}
# QoS-Empfehlungen:
# QoS 0: Nicht-kritische Telemetrie (Energie, Temperatur)
# QoS 1: Standard-Maschinendaten (Drehzahl, Status) ← Standard
# QoS 2: Steuerbefehle, Alarm-Bestätigungen
5 Datenfluss implementieren und testen
Vollständige Gateway-Implementierung (Python)
#!/usr/bin/env python3
"""
OPC UA zu MQTT Gateway
Liest Maschinendaten via OPC UA und publiziert sie via MQTT in die Cloud
"""
import time
import json
from datetime import datetime
from opcua import Client as OPCClient
import paho.mqtt.client as mqtt
import logging
# Logging konfigurieren
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Konfiguration
OPC_SERVER = "opc.tcp://10.0.1.50:4840"
MQTT_BROKER = "mqtt.mein-unternehmen.cloud"
MQTT_PORT = 8883
MQTT_TOPIC_BASE = "acme-gmbh/wiesbaden/assembly/line-a"
class DataGateway:
def __init__(self):
# OPC UA Client
self.opc_client = OPCClient(OPC_SERVER)
# MQTT Client
self.mqtt_client = mqtt.Client(client_id="gateway-01")
self.mqtt_client.on_connect = self.on_mqtt_connect
self.mqtt_client.on_disconnect = self.on_mqtt_disconnect
self.mqtt_client.tls_set(ca_certs="ca-cert.pem")
self.mqtt_client.username_pw_set("gateway-01", "secure_pw")
# Datenpuffer bei Offline-Betrieb
self.offline_buffer = []
self.is_mqtt_connected = False
def on_mqtt_connect(self, client, userdata, flags, rc):
if rc == 0:
logger.info("MQTT verbunden")
self.is_mqtt_connected = True
self.flush_offline_buffer()
else:
logger.error(f"MQTT Verbindung fehlgeschlagen: {rc}")
def on_mqtt_disconnect(self, client, userdata, rc):
logger.warning("MQTT getrennt")
self.is_mqtt_connected = False
def connect(self):
# OPC UA verbinden
try:
self.opc_client.connect()
logger.info(f"OPC UA verbunden: {OPC_SERVER}")
except Exception as e:
logger.error(f"OPC UA Fehler: {e}")
raise
# MQTT verbinden
try:
self.mqtt_client.connect(MQTT_BROKER, MQTT_PORT, keepalive=60)
self.mqtt_client.loop_start()
except Exception as e:
logger.error(f"MQTT Fehler: {e}")
raise
def read_opc_data(self, machine_id, node_ids):
"""Liest mehrere OPC UA Nodes"""
data = {}
for key, node_id in node_ids.items():
try:
node = self.opc_client.get_node(node_id)
data[key] = node.get_value()
except Exception as e:
logger.warning(f"Fehler beim Lesen {key}: {e}")
data[key] = None
return data
def publish_to_cloud(self, machine_id, data):
"""Publiziert Daten via MQTT"""
payload = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"machine_id": machine_id,
"site": "wiesbaden",
"data": data,
"source": "gateway-01"
}
topic = f"{MQTT_TOPIC_BASE}/{machine_id}/telemetry"
if self.is_mqtt_connected:
result = self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"Gesendet: {machine_id}")
else:
logger.error(f"Publish fehlgeschlagen: {result.rc}")
else:
# Offline: in Puffer speichern
self.offline_buffer.append((topic, payload))
logger.warning(f"Offline - gepuffert: {len(self.offline_buffer)} Nachrichten")
def flush_offline_buffer(self):
"""Sendet gepufferte Nachrichten nach Reconnect"""
if not self.offline_buffer:
return
logger.info(f"Sende {len(self.offline_buffer)} gepufferte Nachrichten")
for topic, payload in self.offline_buffer:
self.mqtt_client.publish(topic, json.dumps(payload), qos=1)
self.offline_buffer.clear()
def run(self):
"""Hauptschleife"""
# Node IDs für CNC-Maschine
cnc05_nodes = {
"temperature": "ns=2;s=Machine.Spindle.Temperature",
"rpm": "ns=2;s=Machine.Spindle.RPM",
"vibration": "ns=2;s=Machine.Vibration.RMS",
"status": "ns=2;s=Machine.Status"
}
logger.info("Gateway läuft...")
try:
while True:
# Daten von OPC UA lesen
data = self.read_opc_data("cnc-05", cnc05_nodes)
# Einfache Edge-Filterung: nur senden wenn Maschine läuft
if data.get("status") == "Running":
self.publish_to_cloud("cnc-05", data)
time.sleep(5) # 0.2 Hz Sampling
except KeyboardInterrupt:
logger.info("Gateway gestoppt")
finally:
self.disconnect()
def disconnect(self):
self.opc_client.disconnect()
self.mqtt_client.loop_stop()
self.mqtt_client.disconnect()
if __name__ == "__main__":
gateway = DataGateway()
gateway.connect()
gateway.run()
Testen und Validieren
Test-Checkliste
- OPC UA Verbindung: Können Daten von der SPS gelesen werden?
- MQTT Verbindung: Erreichen Nachrichten den Broker?
- Datenqualität: Sind Werte plausibel? Timestamps korrekt?
- Offline-Verhalten: Werden Daten gepuffert bei Netzausfall?
- Performance: CPU/RAM-Auslastung des Gateways im Normalbetrieb
- Latenz: Wie lange dauert Sensor → Cloud?
- Sicherheit: TLS aktiv? Zertifikate gültig?
MQTT-Test mit Mosquitto-Tools:
# MQTT Subscriber zum Testen (Linux/Mac)
mosquitto_sub -h mqtt.mein-unternehmen.cloud -p 8883 \
--cafile ca-cert.pem \
-u gateway-01 -P secure_pw \
-t "acme-gmbh/wiesbaden/#" \
-v
# Ausgabe (Beispiel):
# acme-gmbh/wiesbaden/assembly/line-a/cnc-05/telemetry
# {"timestamp":"2025-11-08T14:35:22Z","machine_id":"cnc-05",...}
Wichtige Sicherheitshinweise
🔒 Security Best Practices
1. Netzwerksegmentierung
Das Gateway sollte in einer DMZ (Demilitarized Zone) zwischen Produktionsnetzwerk und Internet stehen. Firewalls erlauben nur:
- Inbound: OPC UA Port 4840 von SPS-Netz
- Outbound: MQTT Port 8883 zur Cloud
- Management: SSH Port 22 nur aus Admin-Netz
2. Authentifizierung und Autorisierung
- OPC UA: Zertifikatsbasierte Authentifizierung + Benutzerpasswort (2-Faktor)
- MQTT: Username/Password + TLS Client-Zertifikat (optional)
- Keine Passwörter im Code: Nutzung von Umgebungsvariablen oder Secrets-Management (HashiCorp Vault, AWS Secrets Manager)
# Passwörter aus Umgebung laden (sicherer)
import os
MQTT_USER = os.environ.get("MQTT_USERNAME")
MQTT_PASS = os.environ.get("MQTT_PASSWORD")
if not MQTT_USER or not MQTT_PASS:
raise ValueError("MQTT Credentials fehlen in Umgebung")
3. Verschlüsselung über die gesamte Kette
- OPC UA: Security Mode „SignAndEncrypt“ (nicht „None“!)
- MQTT: TLS 1.2 minimum (besser TLS 1.3), valide Zertifikate
- Payload: Bei hochsensiblen Daten zusätzliche End-to-End-Verschlüsselung (z.B. AES256 auf Anwendungsebene)
4. Regelmäßige Updates und Monitoring
- Gateway-Software und Bibliotheken monatlich aktualisieren
- Intrusion Detection (Fail2ban, Snort) auf Gateway-System
- Logging aller Verbindungsversuche (erfolgreiche + fehlgeschlagene)
- Zertifikate rechtzeitig erneuern (Monitoring mit 30 Tage Vorlauf)
5. Incident Response Plan
Was passiert bei einem Sicherheitsvorfall?
- Gateway vom Netz trennen:
sudo systemctl stop gateway - Logs sichern:
sudo journalctl -u gateway > incident.log - Zertifikate widerrufen und neu ausstellen
- Passwörter ändern (OPC UA, MQTT, SSH)
- Forensische Analyse durch IT-Sicherheitsbeauftragten
Monitoring und Fehlerbehandlung
Ein produktives Gateway benötigt Monitoring, um Ausfälle frühzeitig zu erkennen:
# Erweiterung: Health-Check via MQTT
def publish_health_status(self):
"""Sendet Gateway-Status"""
import psutil
health = {
"timestamp": datetime.utcnow().isoformat() + "Z",
"gateway_id": "gateway-01",
"status": "healthy",
"metrics": {
"cpu_percent": psutil.cpu_percent(),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"uptime_seconds": time.time() - self.start_time
},
"connections": {
"opc_ua": self.opc_client.is_connected(),
"mqtt": self.is_mqtt_connected
}
}
self.mqtt_client.publish(
f"{MQTT_TOPIC_BASE}/gateway-01/health",
json.dumps(health),
qos=1
)
# In run()-Loop alle 60 Sekunden aufrufen
if time.time() % 60 < 5:
self.publish_health_status()
Cloud-seitig können diese Health-Nachrichten überwacht werden (z.B. mit AWS CloudWatch Alarms, Grafana Alerts), um bei Gateway-Ausfällen sofort benachrichtigt zu werden.
Schreibe einen Kommentar