Quickstart — Building a VYRA Module¶
This guide walks you through building a minimal VYRA module from scratch.
It covers every layer of vyra_base you will use day-to-day.
Note
For a complete, working module see the vyra_module_template repository.
This guide focuses on explaining why and how each piece fits together.
—
Prerequisites¶
pip install vyra_base
# Transport (choose at least one):
pip install eclipse-zenoh # Zenoh — recommended default
pip install redis # Redis transport
# Database (SQLAlchemy is included with vyra_base):
# SQLite works out of the box — no extra install needed
—
Step 1 — Load the Entity (_base_.py)¶
Every VYRA module has a _base_.py that loads the central VyraEntity.
The entity is the glue between your component, the communication layer,
and the VYRA framework.
# src/my_module/my_module/_base_.py
from vyra_base.core.entity import VyraEntity, build_entity
from vyra_base.defaults.entries import ModuleEntry
PROJECT_SETTINGS = ModuleEntry(
module_name="my_module",
module_version="0.1.0",
module_id="my_module_abc123",
)
async def build_base() -> VyraEntity:
"""Create and configure the VyraEntity for this module."""
entity = await build_entity(PROJECT_SETTINGS)
base_interfaces = await _create_base_interfaces()
await entity.set_interfaces(base_interfaces) # registers all @remote_service methods
return entity
async def _create_base_interfaces():
"""Return configuration for built-in feeders and base services."""
# ... see module template for full example
return []
Important
Always call entity.set_interfaces() before using any communication.
This is what binds your decorated methods to live ROS2/Zenoh/Redis interfaces.
—
Step 2 — Communication Interfaces¶
Use the four decorators to expose your component’s methods over the network.
Services (request / response)¶
from vyra_base.com import remote_service, ProtocolType, AccessLevel
class MyComponent:
@remote_service(
name="get_status",
protocols=[ProtocolType.ZENOH],
namespace="my_module",
access_level=AccessLevel.PUBLIC,
)
async def get_status(self, request, response=None):
return {"status": "running", "uptime": 42}
Publisher (fire and forget)¶
from vyra_base.com import remote_publisher, ProtocolType
class MyComponent:
@remote_publisher(name="events", protocols=[ProtocolType.ZENOH], namespace="my_module")
async def publish_event(self, event_type: str, data: dict):
return {"type": event_type, "data": data}
Subscriber (callback)¶
from vyra_base.com import remote_subscriber, ProtocolType
class MyComponent:
@remote_subscriber(name="commands", protocols=[ProtocolType.ZENOH], namespace="my_module")
async def on_command(self, message: dict):
print(f"Command received: {message}")
Action Server (long-running task)¶
from vyra_base.com import remote_actionServer, IActionHandler, IGoalHandle, ProtocolType
class MyComponent(IActionHandler):
@remote_actionServer.on_goal(name="run_task", protocols=[ProtocolType.ZENOH])
async def on_goal(self, goal_request) -> bool:
return goal_request.get("target") is not None # accept/reject
@remote_actionServer.execute(name="run_task")
async def execute(self, goal_handle: IGoalHandle):
for i in range(100):
await goal_handle.publish_feedback({"progress": i})
goal_handle.succeed()
return {"result": "done"}
@remote_actionServer.on_cancel(name="run_task")
async def on_cancel(self, goal_handle: IGoalHandle) -> bool:
return True
See also
Communication (com) — Full communication module reference
—
Step 3 — Storage (Database & Redis)¶
SQLAlchemy Database¶
vyra_base ships a full SQLAlchemy async ORM layer.
All table models must use the tb_ prefix.
from vyra_base.storage import Base, DbAccess, DbManipulator, DBReturnValue, DBTYPE
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import String, Float, DateTime
import uuid
from datetime import datetime
# ── 1. Define your table ──────────────────────────────────────────────────
class tb_sensor_data(Base):
__tablename__ = "tb_sensor_data"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid.uuid4()))
sensor_id: Mapped[str] = mapped_column(String(64), index=True)
value: Mapped[float] = mapped_column(Float)
unit: Mapped[str] = mapped_column(String(16))
measured_at: Mapped[datetime] = mapped_column(DateTime, default=datetime.utcnow)
# ── 2. Open the database ──────────────────────────────────────────────────
db = DbAccess()
await db.connect(
db_type=DBTYPE.SQLITE,
db_path="/workspace/storage/my_module.db",
)
await db.create_tables([tb_sensor_data]) # creates tables if they don't exist
# ── 3. Write data ────────────────────────────────────────────────────────
manipulator = DbManipulator(db)
row = tb_sensor_data(sensor_id="sensor_01", value=23.4, unit="°C")
result: DBReturnValue = await manipulator.insert(row)
# ── 4. Read data ─────────────────────────────────────────────────────────
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
async with db.session() as session:
stmt = select(tb_sensor_data).where(tb_sensor_data.sensor_id == "sensor_01")
rows = (await session.execute(stmt)).scalars().all()
for row in rows:
print(row.value, row.unit)
Supported database backends:
Backend |
|
Notes |
|---|---|---|
SQLite (default) |
|
No extra install; file-based; ideal for single-node modules |
MySQL |
|
Requires |
PostgreSQL |
|
Requires |
Redis Key-Value / Pub-Sub¶
from vyra_base.storage import RedisClient, REDIS_TYPE
redis = RedisClient(
host="redis",
port=6379,
ssl=True,
ssl_ca_certs="/workspace/storage/certificates/redis/ca-cert.pem",
)
await redis.connect()
# Store a value
await redis.set("my_module:config:threshold", "42")
# Read a value
value = await redis.get("my_module:config:threshold")
# Publish
await redis.publish("my_module:events", '{"type": "started"}')
See also
Storage - Data Storage — Complete storage module reference
—
Step 4 — Logging¶
Use Python’s standard logging module.
import logging
logger = logging.getLogger(__name__)
logger.info("✅ Module started")
logger.warning("⚠ Low disk space")
logger.error("❌ Connection failed: %s", error)
Log configuration is loaded from config/core_logging.json in your module directory.
The default format includes timestamp, module name, level, and message.
Tip
Use structured log messages for better filtering in production:
logger.info("Request processed: service=%s, duration_ms=%d", service_name, duration)
—
Step 5 — State Machine¶
Every VYRA component inherits its lifecycle from OperationalStateMachine.
The metaclass handles state transitions automatically — never set state manually.
Three-Layer Architecture¶
Layer |
States |
Controls |
|---|---|---|
Lifecycle |
|
Module existence and boot sequence |
Operational |
|
Runtime activity |
Health |
|
Diagnostics and error escalation |
Using OperationalStateMachine¶
from vyra_base.state import OperationalStateMachine
class MyComponent(OperationalStateMachine):
async def initialize(self, request=None, response=None) -> bool:
# Called automatically when entity transitions to READY
await self._setup()
return True # returning True triggers READY → RUNNING
async def stop(self, request=None, response=None) -> bool:
await self._cleanup()
return True
from vyra_base.state import UnifiedStateMachine, LifecycleState, OperationalState
# Direct state machine (lower level):
sm = UnifiedStateMachine(component_id="my_module")
await sm.transition_lifecycle(LifecycleState.READY)
await sm.transition_operational(OperationalState.RUNNING)
print(sm.lifecycle_state) # LifecycleState.READY
print(sm.operational_state) # OperationalState.RUNNING
See also
State Machine — Complete state machine documentation
—
Step 6 — Security¶
VYRA provides a built-in 5-level security model for inter-module communication.
Security Levels¶
Level |
|
Description |
|---|---|---|
1 |
|
No checks — development only |
2 |
|
Module ID verification |
3 |
|
ID + timestamp validation (replay protection) |
4 |
|
HMAC-SHA256 message signatures |
5 |
|
Certificate-based PKI |
Server Side (SecurityManager)¶
from vyra_base.security import SecurityManager, SecurityLevel
import rclpy
class MyModule(rclpy.node.Node, SecurityManager):
def __init__(self):
super().__init__("my_module")
SecurityManager.__init__(self, max_security_level=SecurityLevel.HMAC)
Client Side (SecurePublisher)¶
from vyra_base.security import SecurePublisher, create_security_context, SecurityLevel
ctx = create_security_context(
module_id="my_module_abc123",
level=SecurityLevel.HMAC,
token="shared_secret",
hmac_key=b"super_secret_key",
)
pub = SecurePublisher(node, MsgType, "/topic", ctx)
pub.publish(msg)
See also
Security Framework — Complete security framework reference
—
Step 7 — Communication Overview¶
The communication module is organized into four sub-systems:
Transport Layer¶
Low-latency protocols for in-process or DDS-based communication.
Protocol |
|
Use case |
|---|---|---|
Zenoh (default) |
|
High-performance pub/sub; recommended for all new modules |
ROS2 |
|
Robotics / DDS ecosystem; requires ROS2 installation |
Redis |
|
Pub/sub + key-value; good for dashboard ↔ module communication |
UDS |
|
Unix Domain Sockets; zero-dependency local IPC |
Automatic fallback chain: Zenoh → ROS2 → Redis → UDS
External Layer¶
Cross-service protocols for integration with external systems.
Protocol |
Use case |
|---|---|
gRPC ( |
High-performance RPC over Unix sockets; backend-to-backend |
MQTT ( |
IoT devices; constrained networks; QoS 0/1/2 |
REST ( |
HTTP API integration; web services |
WebSocket ( |
Real-time browser clients; bidirectional streaming |
Shared Memory ( |
Zero-copy local IPC; sub-millisecond latency |
Industrial Layer¶
Northbound integration with industrial automation systems.
Protocol |
Use case |
|---|---|
Modbus ( |
PLC/SCADA integration; TCP or RTU |
OPC UA ( |
MES/SCADA northbound; rich data modelling |
Feeders¶
Feeders automatically publish internal module state over the transport layer.
They are set up in _base_.py and run in the background.
Feeder |
Publishes |
|---|---|
|
|
|
Informational messages ( |
|
Error reports ( |
|
Heartbeat / module availability discovery |
# Use feeders through the entity (simplest way):
entity.publish_news("Module started successfully")
entity.publish_error("Redis connection lost")
See also
Communication (com) — Full communication module with all transport, external and industrial details