Quickstart — Building a VYRA Module

This guide walks you through building a minimal VYRA module from scratch. It covers every layer of vyra_base you will use day-to-day.

Note

For a complete, working module see the vyra_module_template repository. This guide focuses on explaining why and how each piece fits together.

Prerequisites

pip install vyra_base

# Transport (choose at least one):
pip install eclipse-zenoh   # Zenoh — recommended default
pip install redis            # Redis transport

# Database (SQLAlchemy is included with vyra_base):
# SQLite works out of the box — no extra install needed

Step 1 — Load the Entity (_base_.py)

Every VYRA module has a _base_.py that loads the central VyraEntity. The entity is the glue between your component, the communication layer, and the VYRA framework.

# src/my_module/my_module/_base_.py
from vyra_base.core.entity import VyraEntity, build_entity
from vyra_base.defaults.entries import ModuleEntry

PROJECT_SETTINGS = ModuleEntry(
    module_name="my_module",
    module_version="0.1.0",
    module_id="my_module_abc123",
)

async def build_base() -> VyraEntity:
    """Create and configure the VyraEntity for this module."""
    entity = await build_entity(PROJECT_SETTINGS)
    base_interfaces = await _create_base_interfaces()
    await entity.set_interfaces(base_interfaces)   # registers all @remote_service methods
    return entity

async def _create_base_interfaces():
    """Return configuration for built-in feeders and base services."""
    # ... see module template for full example
    return []

Important

Always call entity.set_interfaces() before using any communication. This is what binds your decorated methods to live ROS2/Zenoh/Redis interfaces.

Step 2 — Communication Interfaces

Use the four decorators to expose your component’s methods over the network.

Services (request / response)

from vyra_base.com import remote_service, ProtocolType, AccessLevel

class MyComponent:
    @remote_service(
        name="get_status",
        protocols=[ProtocolType.ZENOH],
        namespace="my_module",
        access_level=AccessLevel.PUBLIC,
    )
    async def get_status(self, request, response=None):
        return {"status": "running", "uptime": 42}

Publisher (fire and forget)

from vyra_base.com import remote_publisher, ProtocolType

class MyComponent:
    @remote_publisher(name="events", protocols=[ProtocolType.ZENOH], namespace="my_module")
    async def publish_event(self, event_type: str, data: dict):
        return {"type": event_type, "data": data}

Subscriber (callback)

from vyra_base.com import remote_subscriber, ProtocolType

class MyComponent:
    @remote_subscriber(name="commands", protocols=[ProtocolType.ZENOH], namespace="my_module")
    async def on_command(self, message: dict):
        print(f"Command received: {message}")

Action Server (long-running task)

from vyra_base.com import remote_actionServer, IActionHandler, IGoalHandle, ProtocolType

class MyComponent(IActionHandler):
    @remote_actionServer.on_goal(name="run_task", protocols=[ProtocolType.ZENOH])
    async def on_goal(self, goal_request) -> bool:
        return goal_request.get("target") is not None  # accept/reject

    @remote_actionServer.execute(name="run_task")
    async def execute(self, goal_handle: IGoalHandle):
        for i in range(100):
            await goal_handle.publish_feedback({"progress": i})
        goal_handle.succeed()
        return {"result": "done"}

    @remote_actionServer.on_cancel(name="run_task")
    async def on_cancel(self, goal_handle: IGoalHandle) -> bool:
        return True

See also

Communication (com) — Full communication module reference

Step 3 — Storage (Database & Redis)

SQLAlchemy Database

vyra_base ships a full SQLAlchemy async ORM layer. All table models must use the tb_ prefix.

from vyra_base.storage import Base, DbAccess, DbManipulator, DBReturnValue, DBTYPE
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Float, DateTime
import uuid
from datetime import datetime

# ── 1. Define your table ──────────────────────────────────────────────────
class tb_sensor_data(Base):
    __tablename__ = "tb_sensor_data"

    id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
    sensor_id: Mapped[str] = mapped_column(String(64), index=True)
    value: Mapped[float] = mapped_column(Float)
    unit: Mapped[str] = mapped_column(String(16))
    measured_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)

# ── 2. Open the database ──────────────────────────────────────────────────
db = DbAccess()
await db.connect(
    db_type=DBTYPE.SQLITE,
    db_path="/workspace/storage/my_module.db",
)
await db.create_tables([tb_sensor_data])     # creates tables if they don't exist

# ── 3. Write data ────────────────────────────────────────────────────────
manipulator = DbManipulator(db)

row = tb_sensor_data(sensor_id="sensor_01", value=23.4, unit="°C")
result: DBReturnValue = await manipulator.insert(row)

# ── 4. Read data ─────────────────────────────────────────────────────────
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

async with db.session() as session:
    stmt = select(tb_sensor_data).where(tb_sensor_data.sensor_id == "sensor_01")
    rows = (await session.execute(stmt)).scalars().all()
    for row in rows:
        print(row.value, row.unit)

Supported database backends:

Backend

DBTYPE

Notes

SQLite (default)

DBTYPE.SQLITE

No extra install; file-based; ideal for single-node modules

MySQL

DBTYPE.MYSQL

Requires aiomysql

PostgreSQL

DBTYPE.POSTGRESQL

Requires asyncpg

Redis Key-Value / Pub-Sub

from vyra_base.storage import RedisClient, REDIS_TYPE

redis = RedisClient(
    host="redis",
    port=6379,
    ssl=True,
    ssl_ca_certs="/workspace/storage/certificates/redis/ca-cert.pem",
)
await redis.connect()

# Store a value
await redis.set("my_module:config:threshold", "42")

# Read a value
value = await redis.get("my_module:config:threshold")

# Publish
await redis.publish("my_module:events", '{"type": "started"}')

See also

Storage - Data Storage — Complete storage module reference

Step 4 — Logging

Use Python’s standard logging module.

import logging

logger = logging.getLogger(__name__)

logger.info("✅ Module started")
logger.warning("⚠  Low disk space")
logger.error("❌ Connection failed: %s", error)

Log configuration is loaded from config/core_logging.json in your module directory. The default format includes timestamp, module name, level, and message.

Tip

Use structured log messages for better filtering in production:

logger.info("Request processed: service=%s, duration_ms=%d", service_name, duration)

Step 5 — State Machine

Every VYRA component inherits its lifecycle from OperationalStateMachine. The metaclass handles state transitions automatically — never set state manually.

Three-Layer Architecture

Layer

States

Controls

Lifecycle

UNINITIALIZED INITIALIZING READY DECOMMISSIONING DECOMMISSIONED

Module existence and boot sequence

Operational

IDLE RUNNING PAUSED STOPPED

Runtime activity

Health

HEALTHY DEGRADED FAILED CRITICAL_FAILURE

Diagnostics and error escalation

Using OperationalStateMachine

from vyra_base.state import OperationalStateMachine

class MyComponent(OperationalStateMachine):
    async def initialize(self, request=None, response=None) -> bool:
        # Called automatically when entity transitions to READY
        await self._setup()
        return True   # returning True triggers READY → RUNNING

    async def stop(self, request=None, response=None) -> bool:
        await self._cleanup()
        return True
from vyra_base.state import UnifiedStateMachine, LifecycleState, OperationalState

# Direct state machine (lower level):
sm = UnifiedStateMachine(component_id="my_module")
await sm.transition_lifecycle(LifecycleState.READY)
await sm.transition_operational(OperationalState.RUNNING)

print(sm.lifecycle_state)    # LifecycleState.READY
print(sm.operational_state)  # OperationalState.RUNNING

See also

State Machine — Complete state machine documentation

Step 6 — Security

VYRA provides a built-in 5-level security model for inter-module communication.

Security Levels

Level

SecurityLevel

Description

1

NONE

No checks — development only

2

BASIC_AUTH

Module ID verification

3

TIMESTAMP

ID + timestamp validation (replay protection)

4

HMAC

HMAC-SHA256 message signatures

5

DIGITAL_SIGNATURE

Certificate-based PKI

Server Side (SecurityManager)

from vyra_base.security import SecurityManager, SecurityLevel
import rclpy

class MyModule(rclpy.node.Node, SecurityManager):
    def __init__(self):
        super().__init__("my_module")
        SecurityManager.__init__(self, max_security_level=SecurityLevel.HMAC)

Client Side (SecurePublisher)

from vyra_base.security import SecurePublisher, create_security_context, SecurityLevel

ctx = create_security_context(
    module_id="my_module_abc123",
    level=SecurityLevel.HMAC,
    token="shared_secret",
    hmac_key=b"super_secret_key",
)
pub = SecurePublisher(node, MsgType, "/topic", ctx)
pub.publish(msg)

See also

Security Framework — Complete security framework reference

Step 7 — Communication Overview

The communication module is organized into four sub-systems:

Transport Layer

Low-latency protocols for in-process or DDS-based communication.

Protocol

ProtocolType

Use case

Zenoh (default)

ZENOH

High-performance pub/sub; recommended for all new modules

ROS2

ROS2

Robotics / DDS ecosystem; requires ROS2 installation

Redis

REDIS

Pub/sub + key-value; good for dashboard ↔ module communication

UDS

UDS

Unix Domain Sockets; zero-dependency local IPC

Automatic fallback chain: Zenoh ROS2 Redis UDS

External Layer

Cross-service protocols for integration with external systems.

Protocol

Use case

gRPC (GRPC)

High-performance RPC over Unix sockets; backend-to-backend

MQTT (MQTT)

IoT devices; constrained networks; QoS 0/1/2

REST (REST)

HTTP API integration; web services

WebSocket (WEBSOCKET)

Real-time browser clients; bidirectional streaming

Shared Memory (SHARED_MEMORY)

Zero-copy local IPC; sub-millisecond latency

Industrial Layer

Northbound integration with industrial automation systems.

Protocol

Use case

Modbus (MODBUS)

PLC/SCADA integration; TCP or RTU

OPC UA (OPCUA)

MES/SCADA northbound; rich data modelling

Feeders

Feeders automatically publish internal module state over the transport layer. They are set up in _base_.py and run in the background.

Feeder

Publishes

StateFeeder

LifecycleState + OperationalState changes

NewsFeeder

Informational messages (entity.publish_news("…"))

ErrorFeeder

Error reports (entity.publish_error("…"))

AvailableModuleFeeder

Heartbeat / module availability discovery

# Use feeders through the entity (simplest way):
entity.publish_news("Module started successfully")
entity.publish_error("Redis connection lost")

See also

Communication (com) — Full communication module with all transport, external and industrial details