Quellcode für vyra_base.plugin.runtime

"""
vyra_base.plugin.runtime
=========================

Plugin-Runtime-Implementierungen.

Klassen:
    PluginCallError  — Basis-Exception für Fehler beim Plugin-Aufruf
    PluginRuntime    — Abstrakte Basis / WASM-Schnittstelle
    WasmRuntime      — Echter WASM-Executor via wasmtime (aktiv wenn verfügbar)
    StubRuntime      — Python-Stub ohne WASM (Fallback für Tests / Connectivity)

Die WasmRuntime führt beliebige WASM-Plugins aus. Welche Funktionen ein Plugin
anbietet, wird in seiner metadata.json unter ``exports[]`` beschrieben::

    {
        "exports": [
            {
                "name": "init",
                "args": [
                    {"name": "initial_value", "type": "i32"},
                    {"name": "step",          "type": "i32"}
                ]
            },
            {
                "name": "increment",
                "args": [{"name": "step", "type": "i32"}]
            },
            ...
        ]
    }

Die Runtime mappt die Keys eines ``data``-Dicts **in Reihenfolge der args-Definition**
auf i32-Parameter — kein hartkodiertes Wissen über einzelne Plugins nötig.

Nutze create_plugin_runtime() um automatisch die beste verfügbare Runtime zu erhalten.

Integrationsbeispiel (v2_modulemanager)::

    from vyra_base.plugin.runtime import create_plugin_runtime
    from mymodule.host_functions_impl import ModuleHostFunctions

    host_fns = ModuleHostFunctions(plugin_event_publisher, zenoh_session)
    runtime  = create_plugin_runtime(
        plugin_id   = "my-plugin",
        wasm_path   = "/opt/vyra/plugin_pool/my-plugin/1.0.0/logic.wasm",
        host        = host_fns,
        initial_state = {"initial_value": 0, "step": 1},
    )
    await runtime.start()

    result = await runtime.call("increment", {"step": 2})
    # → {"result": 2}   (WasmRuntime: echte WASM-Ausführung)
"""

from __future__ import annotations

import json
import logging
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any

from vyra_base.plugin.host_functions import HostFunctions, NullHostFunctions

logger = logging.getLogger(__name__)

# Prüfen ob wasmtime verfügbar ist (optionale Abhängigkeit)
try:
    from wasmtime import Store, Module as WasmModule, Instance, Linker, Engine  # type: ignore[import]
    _WASMTIME_AVAILABLE = True
except ImportError:
    _WASMTIME_AVAILABLE = False


class PluginCallError(Exception):
    """Wird geworfen, wenn ein Plugin-Aufruf fehlschlägt."""

    def __init__(self, plugin_id: str, function_name: str, reason: str):
        self.plugin_id     = plugin_id
        self.function_name = function_name
        self.reason        = reason
        super().__init__(f"[{plugin_id}] call '{function_name}' failed: {reason}")


class PluginRuntime(ABC):
    """
    Abstrakte Basis-Klasse für Plugin-Runtimes.

    Definiert die gemeinsame API, die sowohl die StubRuntime (aktuell)
    als auch die zukünftige ExtismRuntime (Phase 2) implementieren.
    """

    def __init__(
        self,
        plugin_id: str,
        wasm_path: str | Path,
        host: HostFunctions | None = None,
    ) -> None:
        self.plugin_id = plugin_id
        self.wasm_path = Path(wasm_path)
        self.host      = host or NullHostFunctions()
        self._started  = False

    @abstractmethod
    async def start(self) -> None:
        """Lädt das WASM-Modul und initialisiert die Runtime."""
        ...

    @abstractmethod
    async def stop(self) -> None:
        """Gibt Ressourcen der Runtime frei."""
        ...

    @abstractmethod
    async def call(self, function_name: str, data: dict[str, Any]) -> dict[str, Any]:
        """
        Ruft eine exportierte Funktion des Plugins auf.

        :param function_name: Name der WASM-Funktion (z.B. "increment", "get_state")
        :param data:          Eingabe-Parameter als JSON-serialisierbares Dict
        :returns:             Rückgabe-Wert als Dict
        :raises PluginCallError: Bei Fehler im Plugin oder nicht unterstützter Funktion
        """
        ...

    @abstractmethod
    async def on_event(self, event_name: str, data: dict[str, Any]) -> None:
        """
        Leitet ein externes Event (Zenoh/Redis) an das Plugin weiter.

        :param event_name: Event-Name (z.B. "zenoh.message", "module.state_changed")
        :param data:       Event-Payload
        """
        ...

    def is_running(self) -> bool:
        """Gibt an, ob die Runtime aktiv ist."""
        return self._started

    def __repr__(self) -> str:
        status = "running" if self._started else "stopped"
        return f"<{self.__class__.__name__} plugin_id={self.plugin_id!r} wasm={self.wasm_path.name!r} {status}>"


# ---------------------------------------------------------------------------
# WasmRuntime — führt die echte logic.wasm via wasmtime aus
# ---------------------------------------------------------------------------

[Doku] class WasmRuntime(PluginRuntime): """ Echter WASM-Executor für VYRA-Plugins via wasmtime. Lädt das kompilierte logic.wasm und liest die zur WASM-Datei gehörende metadata.json. Aus ``metadata.json["exports"]`` werden Funktionssignaturen dynamisch gecacht — kein hartkodiertes Wissen über einzelne Plugin-Funktionen. Aufruf-Konvention (Metadata-driven i32): - ``metadata.json`` enthält ``exports[]`` mit Funktionsnamen und arg-Definitionen - ``call(function_name, data)`` mappt ``data``-Keys in Reihenfolge der args auf i32 - Fehlende Keys → 0; überschüssige Keys → ignoriert - Rückgabe: WASM-Funktionen mit i32-Rückgabe → ``{"result": <int>}`` - ``ping`` ist eingebaut (kein WASM-Export nötig) Nur verfügbar wenn ``wasmtime`` installiert ist. Nutze create_plugin_runtime() für automatische Auswahl. """
[Doku] def __init__( self, plugin_id: str, wasm_path: str | Path, host: HostFunctions | None = None, initial_state: dict[str, Any] | None = None, ) -> None: if not _WASMTIME_AVAILABLE: raise ImportError( "wasmtime ist nicht installiert. Installiere es mit: pip install wasmtime" ) super().__init__(plugin_id, wasm_path, host) self._initial_state: dict[str, Any] = initial_state.copy() if initial_state else {} self._store: Any = None self._instance: Any = None # fn_name → wasmtime callable self._exports: dict[str, Any] = {} # fn_name → list of {"name": str, "type": str} (aus metadata.json) self._exports_meta: dict[str, list[dict[str, str]]] = {}
[Doku] async def start(self) -> None: if self._started: logger.warning("[%s] WasmRuntime already started", self.plugin_id) return if not self.wasm_path.exists(): raise FileNotFoundError( f"[{self.plugin_id}] WASM-Datei nicht gefunden: {self.wasm_path}" ) # --- metadata.json laden ----------------------------------------------- meta_path = self.wasm_path.parent / "metadata.json" if meta_path.exists(): try: meta = json.loads(meta_path.read_text()) for export in meta.get("exports", []): fn_name = export.get("name", "") if fn_name: self._exports_meta[fn_name] = export.get("args", []) logger.info( "📋 [%s] metadata.json geladen | exports=%s", self.plugin_id, list(self._exports_meta.keys()), ) except Exception as exc: logger.warning("[%s] metadata.json nicht lesbar: %s", self.plugin_id, exc) else: logger.warning( "[%s] Keine metadata.json neben WASM-Datei gefunden: %s", self.plugin_id, meta_path, ) # --- WASM laden -------------------------------------------------------- engine = Engine() self._store = Store(engine) wasm_module = WasmModule(engine, self.wasm_path.read_bytes()) linker = Linker(engine) self._instance = linker.instantiate(self._store, wasm_module) # Alle aus metadata.json bekannten Funktionen cachen exports_obj = self._instance.exports(self._store) for fn_name in self._exports_meta.keys(): fn = exports_obj.get(fn_name) if fn is not None: self._exports[fn_name] = fn else: logger.warning( "[%s] WASM exportiert '%s' nicht (in metadata.json deklariert)", self.plugin_id, fn_name, ) self._started = True logger.info( "✅ [%s] WasmRuntime gestartet | exports=%s | wasm=%s Bytes", self.plugin_id, list(self._exports.keys()), self.wasm_path.stat().st_size, ) # init aufrufen falls initial_state gesetzt und 'init' exportiert if self._initial_state and "init" in self._exports: await self.call("init", self._initial_state)
[Doku] async def stop(self) -> None: self._started = False self._instance = None self._exports = {} self._exports_meta = {} logger.info("🛑 [%s] WasmRuntime stopped", self.plugin_id)
[Doku] async def call(self, function_name: str, data: dict[str, Any]) -> dict[str, Any]: if not self._started: raise PluginCallError(self.plugin_id, function_name, "Runtime not started") logger.debug("[%s] wasm.call(%s, %s)", self.plugin_id, function_name, data) result = await self._dispatch_wasm(function_name, data) logger.debug("[%s] wasm.call(%s) -> %s", self.plugin_id, function_name, result) return result
[Doku] async def on_event(self, event_name: str, data: dict[str, Any]) -> None: logger.debug("[%s] on_event(%s) — kein WASM-Event-Handler", self.plugin_id, event_name)
def _get_wasm_fn(self, name: str) -> Any: fn = self._exports.get(name) if fn is None: raise PluginCallError( self.plugin_id, name, f"WASM-Funktion '{name}' nicht verfügbar. " f"Bekannte Exports: {list(self._exports.keys())}" ) return fn async def _dispatch_wasm(self, function_name: str, data: dict[str, Any]) -> dict[str, Any]: """ Generischer Metadata-driven WASM-Dispatch. Mappt ``data``-Keys auf i32-Parameter in Reihenfolge der ``exports_meta``-Definition. Gibt ``{"result": <wasm_return>}`` zurück und publiziert generisch via notify_ui. """ # ping ist eingebaut — kein WASM-Export nötig if function_name == "ping": return {"status": "ok", "plugin_id": self.plugin_id, "runtime": "wasm"} # Funktion in metadata bekannt? if function_name not in self._exports_meta: raise PluginCallError( self.plugin_id, function_name, f"Unbekannte Funktion '{function_name}'. " f"In metadata.json definierte Exports: {list(self._exports_meta.keys())}" ) fn = self._get_wasm_fn(function_name) arg_defs = self._exports_meta[function_name] # i32-Argumente in Reihenfolge der Metadaten-Definition bauen args: list[int] = [] for arg_def in arg_defs: arg_name = arg_def.get("name", "") args.append(int(data.get(arg_name, 0))) # WASM aufrufen raw_result = fn(self._store, *args) # Ergebnis normalisieren if isinstance(raw_result, int): result: dict[str, Any] = {"result": raw_result} elif isinstance(raw_result, (list, tuple)): result = {"result": list(raw_result)} elif raw_result is None: result = {} else: result = {"result": raw_result} # Generisches Event an UI/Frontend senden await self.host.notify_ui(f"plugin.{function_name}.result", { "plugin_id": self.plugin_id, "result": result, }) return result
# --------------------------------------------------------------------------- # Factory: wählt automatisch WasmRuntime oder StubRuntime # ---------------------------------------------------------------------------
[Doku] def create_plugin_runtime( plugin_id: str, wasm_path: str | Path, host: HostFunctions | None = None, initial_state: dict[str, Any] | None = None, prefer_stub: bool = False, ) -> "PluginRuntime": """ Factory-Funktion: Gibt WasmRuntime zurück wenn wasmtime installiert ist und die .wasm-Datei existiert. Andernfalls StubRuntime. :param plugin_id: Plugin-ID (z.B. "my-plugin") :param wasm_path: Pfad zur logic.wasm Datei (neben ihr muss metadata.json liegen) :param host: Host-Funktionen-Implementierung (optional, sonst NullHostFunctions) :param initial_state: Startzustand — Keys müssen zu den args des 'init'-Exports passen :param prefer_stub: Erzwinge StubRuntime auch wenn wasmtime verfügbar ist :returns: WasmRuntime oder StubRuntime Instanz Beispiel:: runtime = create_plugin_runtime( plugin_id = "my-plugin", wasm_path = "/opt/vyra/plugin_pool/my-plugin/1.0.0/logic.wasm", host = my_host_functions, initial_state = {"initial_value": 0, "step": 1}, ) await runtime.start() result = await runtime.call("increment", {"step": 2}) """ wasm_path = Path(wasm_path) use_wasm = ( not prefer_stub and _WASMTIME_AVAILABLE and wasm_path.exists() and wasm_path.stat().st_size > 8 ) if use_wasm: logger.info( "🔧 [%s] create_plugin_runtime → WasmRuntime (wasmtime=%s, wasm=%s Bytes)", plugin_id, _WASMTIME_AVAILABLE, wasm_path.stat().st_size ) return WasmRuntime(plugin_id, wasm_path, host, initial_state) else: reason = ( "prefer_stub=True" if prefer_stub else "wasmtime nicht installiert" if not _WASMTIME_AVAILABLE else f"wasm nicht gefunden: {wasm_path}" ) logger.info("🔧 [%s] create_plugin_runtime → StubRuntime (%s)", plugin_id, reason) return StubRuntime(plugin_id, wasm_path, host, initial_state)
# --------------------------------------------------------------------------- # StubRuntime — Pure-Python Fallback ohne WASM-Execution # ---------------------------------------------------------------------------
[Doku] class StubRuntime(PluginRuntime): """ Python-Stub-Implementierung der PluginRuntime. Fallback für Umgebungen ohne wasmtime oder wenn die WASM-Datei fehlt. Eignet sich für Connectivity-Tests und Infrastruktur-Validierung ohne echte Plugin-Logik. Unterstützte Funktionen (generisch für alle Plugins): - ``ping`` — Lebenszeichen zurückgeben - ``get_state`` — Internen State zurückgeben - ``set_state`` — Internen State setzen Für alle anderen Funktionsnamen wird ein ``PluginCallError`` geworfen, da Plugin-spezifische Logik ausschließlich in der WasmRuntime ausgeführt wird. Unterklassen können ``_dispatch()`` erweitern um eigene Test-Stubs hinzuzufügen. """
[Doku] def __init__( self, plugin_id: str, wasm_path: str | Path, host: HostFunctions | None = None, initial_state: dict[str, Any] | None = None, ) -> None: super().__init__(plugin_id, wasm_path, host) self._state: dict[str, Any] = initial_state.copy() if initial_state else {}
# ------------------------------------------------------------------ # Lifecycle # ------------------------------------------------------------------
[Doku] async def start(self) -> None: if self._started: logger.warning("[%s] StubRuntime already started", self.plugin_id) return if not self.wasm_path.exists(): logger.warning( "[%s] WASM-Datei nicht gefunden: %s — Stub läuft ohne WASM", self.plugin_id, self.wasm_path, ) else: logger.info( "[%s] ⚠️ StubRuntime gestartet (WASM vorhanden aber nicht ausgeführt)", self.plugin_id, ) self._started = True logger.info("✅ [%s] StubRuntime ready | state=%s", self.plugin_id, self._state)
[Doku] async def stop(self) -> None: self._started = False logger.info("🛑 [%s] StubRuntime stopped", self.plugin_id)
# ------------------------------------------------------------------ # Kernaufruf # ------------------------------------------------------------------
[Doku] async def call(self, function_name: str, data: dict[str, Any]) -> dict[str, Any]: if not self._started: raise PluginCallError(self.plugin_id, function_name, "Runtime not started") logger.debug("[%s] call(%s, %s)", self.plugin_id, function_name, data) result = await self._dispatch(function_name, data) logger.debug("[%s] call(%s) -> %s", self.plugin_id, function_name, result) return result
[Doku] async def on_event(self, event_name: str, data: dict[str, Any]) -> None: logger.debug("[%s] on_event(%s, %s) — Stub: ignoriert", self.plugin_id, event_name, data)
# ------------------------------------------------------------------ # Dispatch-Tabelle (überschreibbar für plugin-spezifische Stubs) # ------------------------------------------------------------------ async def _dispatch(self, function_name: str, data: dict[str, Any]) -> dict[str, Any]: """ Routing zu generischen Stub-Implementierungen. Unterklassen können diese Methode erweitern oder ersetzen. Plugin-spezifische Funktionen werden nicht unterstützt — dafür WasmRuntime nutzen. """ handlers: dict[str, Any] = { "ping": self._fn_ping, "get_state": self._fn_get_state, "set_state": self._fn_set_state, } handler = handlers.get(function_name) if handler is None: raise PluginCallError( self.plugin_id, function_name, f"Funktion '{function_name}' wird im StubRuntime nicht unterstützt. " f"Generisch verfügbar: {list(handlers.keys())}. " f"Plugin-spezifische Logik erfordert WasmRuntime.", ) return await handler(data) # ------------------------------------------------------------------ # Generische Stub-Implementierungen # ------------------------------------------------------------------ async def _fn_ping(self, _data: dict) -> dict: return {"status": "ok", "plugin_id": self.plugin_id, "runtime": "stub"} async def _fn_get_state(self, _data: dict) -> dict: return dict(self._state) async def _fn_set_state(self, data: dict) -> dict: self._state.update(data) return dict(self._state)