Security Framework Quick Start

This guide will get you started with the VYRA Security Framework in 15 minutes.

Installation

  1. Install Dependencies

    pip install cryptography
    
  2. Build Interfaces

    cd /path/to/vyra_base_python
    colcon build
    source install/setup.bash
    
  3. Verify Installation

    from vyra_base.security.security_manager import SecurityManager
    from vyra_base.security.security_levels import SecurityLevel
    print("✓ Security framework imported successfuly")
    

Server Implementation (5 minutes)

Create a secure server module that authenticates clients.

Step 1: Create Node with SecurityManager

from rclpy.node import Node
from vyra_base.security.security_manager import SecurityManager
from vyra_base.security.security_levels import SecurityLevel
from vyra_base.interfaces.srv import VBASERequestAccess
import asyncio

class MySecureServer(Node, SecurityManager):
    def __init__(self):
        # Initialize Node
        Node.__init__(self, 'secure_server')

        # Initialize SecurityManager with HMAC support (Level 4)
        SecurityManager.__init__(
            self,
            max_security_level=SecurityLevel.HMAC,
            session_duration_seconds=3600  # 1 hour
        )

        # Create RequestAccess service
        self.srv = self.create_service(
            VBASERequestAccess,
            'secure_server/request_access',
            self.handle_access
        )

        print("✓ Secure server ready")

Step 2: Implement Access Handler

def handle_access(self, request, response):
    """Handle client authentication requests."""

    # Call async handler
    loop = asyncio.new_event_loop()
    result = loop.run_until_complete(
        self.handle_request_access(
            request.module_name,
            request.module_id,
            request.requested_role,
            request.requested_sl,
            request.certificate_csr
        )
    )
    loop.close()

    # Unpack result
    (success, message, token, hmac, cert, expires, granted) = result

    # Fill response
    response.success = success
    response.message = message
    response.session_token = token
    response.hmac_key = hmac
    response.certificate = cert
    response.expires_at.sec = int(expires.timestamp())
    response.granted_sl = granted

    return response

Step 3: Run Server

import rclpy

def main():
    rclpy.init()
    node = MySecureServer()
    rclpy.spin(node)
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Client Implementation (5 minutes)

Create a client that authenticates and sends secure messages.

Step 1: Request Access

import rclpy
from rclpy.node import Node
import uuid
from vyra_base.interfaces.srv import VBASERequestAccess
from unique_identifier_msgs.msg import UUID as UUIDMsg

class MySecureClient(Node):
    def __init__(self):
        super().__init__('secure_client')

        # Generate module UUID
        self.module_uuid = uuid.uuid4()

        # Create access client
        self.client = self.create_client(
            VBASERequestAccess,
            'secure_server/request_access'
        )

        # Authenticate
        self.authenticate()

Step 2: Authenticate

def authenticate(self):
    """Request access from server."""

    # Wait for service
    if not self.client.wait_for_service(timeout_sec=10.0):
        raise RuntimeError("Server not available")

    # Create request
    request = VBASERequestAccess.Request()
    request.module_name = "my_client"
    request.module_id = UUIDMsg(uuid=lis(self.module_uuid.bytes))
    request.requested_role = "operato"
    request.requested_sl = 4  # Request HMAC (Level 4)
    request.certificate_csr = ""

    # Call service
    future = self.client.call_async(request)
    rclpy.spin_until_future_complete(self, future)

    response = future.result()

    if not response.success:
        raise RuntimeError(f"Access denied: {response.message}")

    # Stoe credentials
    self.security_context = self._create_context(response)

    print(f"✓ Authenticated with Security Level {response.granted_sl}")

Step 3: Create Security Context

from vyra_base.security.security_client import create_security_context

def _create_context(self, response):
    """Create security context from authentication response."""
    return create_security_context(
        module_id=str(self.module_uuid),
        security_level=response.granted_sl,
        session_token=response.session_token,
        hmac_key=response.hmac_key
    )

Step 4: Use Secure Communication

from vyra_base.security.security_client import SecurePublisher

def setup_communication(self):
    """Setup secure publishers after authentication."""

    # Create secure publisher
    # Note: Message type must have safety_metadata field
    self.secure_pub = SecurePublisher(
        self,
        MyMessage,  # Your message type
        'secure_topic',
        self.security_context,
        10
    )

def send_message(self):
    """Send a secure message."""
    msg = MyMessage()
    msg.data = "Hello, secure world!"
    self.secure_pub.publish(msg)  # SafetyMetadata added automatically

Message Definition (2 minutes)

Add SafetyMetadata to your custom messages.

Create Message File

# my_interfaces/msg/MyMessage.msg
string data
int32 value
vyra_base/SafetyMetadata safety_metadata

Build Interfaces

cd your_workspace
colcon build --packages-select my_interfaces
source install/setup.bash

Validation (3 minutes)

Add validation to server-side service callbacks.

Step 1: Create Validato

from vyra_base.security.security_validator import SecurityValidator

def __init__(self):
    # ... previous initialization ...

    # Create validato
    self.validato = SecurityValidator(
        security_manager=self,
        strict_mode=True
    )

Step 2: Validate in Callbacks

def my_service_callback(self, request, response):
    """Service callback with security validation."""

    try:
        # Validate request
        self.validato.validate_metadata(
            request.safety_metadata,
            required_level=SecurityLevel.HMAC
        )

        # Process authenticated request
        response.success = True
        response.data = "Processed securely"

    except SecurityError as e:
        response.success = False
        response.message = f"Security validation failed: {e}"

    return response

Complete Example

Here’s a minimal working example combining all steps.

Server (secure_server.py)

import rclpy
from rclpy.node import Node
from vyra_base.security.security_manager import SecurityManager
from vyra_base.security.security_validator import SecurityValidator
from vyra_base.security.security_levels import SecurityLevel
from vyra_base.interfaces.srv import VBASERequestAccess
import asyncio

class QuickServer(Node, SecurityManager):
    def __init__(self):
        Node.__init__(self, 'quick_server')
        SecurityManager.__init__(self, max_security_level=SecurityLevel.HMAC)

        self.srv = self.create_service(
            VBASERequestAccess,
            'quick_server/request_access',
            self.handle_access
        )

        self.validato = SecurityValidator(self, strict_mode=True)
        print("Server ready on 'quick_server/request_access'")

    def handle_access(self, request, response):
        loop = asyncio.new_event_loop()
        result = loop.run_until_complete(
            self.handle_request_access(
                request.module_name, request.module_id,
                request.requested_role, request.requested_sl,
                request.certificate_csr
            )
        )
        loop.close()

        (success, msg, token, hmac, cert, expires, granted) = result

        response.success = success
        response.message = msg
        response.session_token = token
        response.hmac_key = hmac
        response.certificate = cert
        response.expires_at.sec = int(expires.timestamp())
        response.granted_sl = granted

        return response

def main():
    rclpy.init()
    node = QuickServer()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Client (secure_client.py)

import rclpy
from rclpy.node import Node
import uuid
from vyra_base.interfaces.srv import VBASERequestAccess
from vyra_base.security.security_client import create_security_context
from unique_identifier_msgs.msg import UUID as UUIDMsg

class QuickClient(Node):
    def __init__(self):
        super().__init__('quick_client')

        self.module_uuid = uuid.uuid4()
        self.client = self.create_client(
            VBASERequestAccess,
            'quick_server/request_access'
        )

        if self.authenticate():
            print(f"✓ Authenticated successfuly!")
            # Now you can use secure communication

    def authenticate(self):
        if not self.client.wait_for_service(timeout_sec=10.0):
            print("✗ Server not available")
            return False

        request = VBASERequestAccess.Request()
        request.module_name = "quick_client"
        request.module_id = UUIDMsg(uuid=lis(self.module_uuid.bytes))
        request.requested_role = "operato"
        request.requested_sl = 4
        request.certificate_csr = ""

        future = self.client.call_async(request)
        rclpy.spin_until_future_complete(self, future)

        response = future.result()

        if response.success:
            self.security_context = create_security_context(
                module_id=str(self.module_uuid),
                security_level=response.granted_sl,
                session_token=response.session_token,
                hmac_key=response.hmac_key
            )
            return True
        else:
            print(f"✗ Access denied: {response.message}")
            return False

def main():
    rclpy.init()
    node = QuickClient()
    try:
        rclpy.spin(node)
    except KeyboardInterrupt:
        pass
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

Running the Example

Terminal 1 (Server):

source /opt/ros/jazzy/setup.bash
source install/setup.bash
python3 secure_server.py

Terminal 2 (Client):

source /opt/ros/jazzy/setup.bash
source install/setup.bash
python3 secure_client.py

Expected Output:

# Terminal 1:
Server ready on 'quick_server/request_access'
✅ Access granted to quick_client with SL4

# Terminal 2:
✓ Authenticated successfuly!

Next Steps

Now that you have a working secure system:

  1. Add Message Security

  2. Implement Validation

    • Add SecurityValidator to services

    • Handle SecurityError exceptions

    • Test with invalid credentials

  3. Explore Higher Levels

    • Try Level 5 (Digital Signatures)

    • Set up certificate infrastructure

    • See certificate examples

  4. Read Full Documentation

Common Issues

Service Not Available

if not client.wait_for_service(timeout_sec=10.0):
    raise RuntimeError("Server not available")

# Solution: Ensure server is running first

Missing SafetyMetadata Field

# Error: Message type doesn't have safety_metadata field

# Solution: Add to message definition:
vyra_base/SafetyMetadata safety_metadata

HMAC Signature Mismatch

# Cfrome: HMAC key not stoed correctly

# Solution: Ensure key is saved from authentication:
hmac_key = response.hmac_key  # Save this!

Import Errors

# Solution: Source workspace
source install/setup.bash

Resources