"""
Security Client Components for VYRA Framework
==============================================
This module provides client-side security components:
- SecurePublisher: Wrapper for ROS2 publishers with automatic SafetyMetadata
- SecureServiceClient: Wrapper for ROS2 service clients with security
- security_required: Decorator for service callbacks requiring authentication
- SecureMessageBuilder: Helper for creating secure messages
These components automatically handle:
- SafetyMetadata field population
- HMAC signature generation (Level 4)
- Digital signature generation (Level 5)
- Session token management
- Nonce generation
Author: VYRA Framework Team
License: Proprietary
"""
from __future__ import annotations
import importlib.util
import time
import uuid
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Optional, Type
from functools import wraps
from dataclasses import dataclass
try:
import rclpy as _rclpy_check # noqa: F401
_ROS2_AVAILABLE = True
except ImportError:
_ROS2_AVAILABLE = False
if TYPE_CHECKING or _ROS2_AVAILABLE:
import rclpy
from rclpy.node import Node
from rclpy.publisher import Publisher
from rclpy.client import Client
from vyra_base.security.security_levels import (
SecurityLevel,
AlgorithmId,
SecurityError,
)
from vyra_base.helper.crypto_helper import (
generate_nonce,
create_hmac_payload,
sign_message_rsa,
load_private_key,
)
import logging
logger = logging.getLogger(__name__)
[docs]
@dataclass
class SecurityContext:
"""
Security context for a client connection.
:ivar module_name: Name of the client module
:ivar module_id: UUID of the client module
:ivar security_level: Security level in use
:ivar session_token: Session token from RequestAccess
:ivar hmac_key: HMAC key (for Level 4+)
:ivar certificate: Client certificate (for Level 5)
:ivar private_key_path: Path to private key (for Level 5)
"""
module_name: str
module_id: str
security_level: int
session_token: str
hmac_key: Optional[str] = None
certificate: Optional[str] = None
private_key_path: Optional[str] = None
[docs]
class SecurePublisher:
"""
Wrapper for ROS2 Publisher with automatic SafetyMetadata.
This class wraps a standard ROS2 publisher and automatically adds
SafetyMetadata to each published message based on the security level.
Usage::
publisher = SecurePublisher(
node,
MessageType,
'topic_name',
security_context,
qos_profile
)
publisher.publish(msg)
:ivar publisher: Underlying ROS2 publisher
:ivar security_context: Security context for this publisher
"""
[docs]
def __init__(
self,
node: Node,
msg_type: Type,
topic: str,
security_context: SecurityContext,
qos_profile: Any = 10
):
"""
Initialize SecurePublisher.
:param node: ROS2 node
:param msg_type: Message type class
:param topic: Topic name
:param security_context: Security context
:param qos_profile: QoS profile
"""
self.publisher = node.create_publisher(msg_type, topic, qos_profile)
self.security_context = security_context
self.node = node
logger.debug(f"SecurePublisher created for topic '{topic}' with SL{security_context.security_level}")
[docs]
def publish(self, msg: Any, additional_data: str = "") -> None:
"""
Publish a message with automatic SafetyMetadata.
:param msg: Message to publish (must have safety_metadata field)
:param additional_data: Additional data to include in signature
:raises SecurityError: If message doesn't support SafetyMetadata
"""
if not hasattr(msg, 'safety_metadata'):
raise SecurityError(
f"Message type {type(msg).__name__} does not have safety_metadata field. "
"Ensure the message definition includes SafetyMetadata safety_metadata."
)
try:
# Build and attach metadata
metadata_dict = SafetyMetadataBuilder.build(self.security_context, additional_data)
# Convert dict to message fields
msg.safety_metadata.security_level = metadata_dict['security_level']
msg.safety_metadata.sender_id = metadata_dict['sender_id']
msg.safety_metadata.timestamp.sec = metadata_dict['timestamp']['sec']
msg.safety_metadata.timestamp.nanosec = metadata_dict['timestamp']['nanosec']
msg.safety_metadata.nonce = metadata_dict['nonce']
msg.safety_metadata.security_payload = metadata_dict['security_payload']
msg.safety_metadata.algorithm_id = metadata_dict['algorithm_id']
# Publish
self.publisher.publish(msg)
except Exception as e:
logger.error(f"Failed to publish secure message: {e}")
raise
[docs]
def destroy(self) -> None:
"""Destroy the underlying publisher."""
self.node.destroy_publisher(self.publisher)
[docs]
class SecureServiceClient:
"""
Wrapper for ROS2 Service Client with automatic SafetyMetadata.
Similar to SecurePublisher, but for service calls.
Usage::
client = SecureServiceClient(
node,
ServiceType,
'service_name',
security_context
)
response = await client.call_async(request)
:ivar client: Underlying ROS2 service client
:ivar security_context: Security context for this client
"""
[docs]
def __init__(
self,
node: Node,
srv_type: Type,
service_name: str,
security_context: SecurityContext
):
"""
Initialize SecureServiceClient.
:param node: ROS2 node
:param srv_type: Service type class
:param service_name: Service name
:param security_context: Security context
"""
self.client = node.create_client(srv_type, service_name)
self.security_context = security_context
self.node = node
logger.debug(f"SecureServiceClient created for service '{service_name}' with SL{security_context.security_level}")
[docs]
async def call_async(self, request: Any, additional_data: str = "") -> Any:
"""
Call service asynchronously with automatic SafetyMetadata.
:param request: Service request (must have safety_metadata field)
:param additional_data: Additional data to include in signature
:return: Service response
:raises SecurityError: If request doesn't support SafetyMetadata
"""
if not hasattr(request, 'safety_metadata'):
raise SecurityError(
f"Request type {type(request).__name__} does not have safety_metadata field."
)
try:
# Build and attach metadata
metadata_dict = SafetyMetadataBuilder.build(self.security_context, additional_data)
# Convert dict to message fields
request.safety_metadata.security_level = metadata_dict['security_level']
request.safety_metadata.sender_id = metadata_dict['sender_id']
request.safety_metadata.timestamp.sec = metadata_dict['timestamp']['sec']
request.safety_metadata.timestamp.nanosec = metadata_dict['timestamp']['nanosec']
request.safety_metadata.nonce = metadata_dict['nonce']
request.safety_metadata.security_payload = metadata_dict['security_payload']
request.safety_metadata.algorithm_id = metadata_dict['algorithm_id']
# Wait for service
if not self.client.wait_for_service(timeout_sec=5.0):
raise SecurityError(f"Service not available: {self.client.srv_name}")
# Call service
future = self.client.call_async(request)
return await future
except Exception as e:
logger.error(f"Failed to call secure service: {e}")
raise
[docs]
def destroy(self) -> None:
"""Destroy the underlying client."""
self.node.destroy_client(self.client)
[docs]
def security_required(
security_level: SecurityLevel = SecurityLevel.BASIC_AUTH,
validate_metadata: bool = True
):
"""
Decorator for service callbacks requiring authentication.
This decorator checks that:
1. SafetyMetadata is present in the request
2. Security level meets the minimum requirement
3. Session token is valid (if SecurityManager is available)
4. Module has sufficient access level (from granted session)
The decorator integrates with entity.security_manager to validate
that the calling module has been granted sufficient access level.
Usage with @remote_service::
@remote_service()
@security_required(security_level=SecurityLevel.HMAC)
def my_service_callback(self, request, response):
# Service implementation
return response
Configuration in interface JSON::
{
"functionname": "my_service_callback",
"access_level": 4, // SecurityLevel.HMAC
...
}
:param security_level: Minimum required security level (1-5)
:param validate_metadata: Whether to validate SafetyMetadata
:return: Decorator function
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(self, request, response):
if not validate_metadata:
return func(self, request, response)
# Check if security is enabled on entity
entity = getattr(self, 'entity', self)
security_manager = getattr(entity, 'security_manager', None)
if security_manager is None:
logger.debug(
f"Security not enabled for {func.__name__}, "
f"skipping access level check (required: {security_level.value})"
)
return func(self, request, response)
# Check if request has SafetyMetadata
if not hasattr(request, 'safety_metadata'):
logger.warn(f"Service call to {func.__name__} missing SafetyMetadata")
response.success = False
response.message = "Missing SafetyMetadata - authentication required"
return response
metadata = request.safety_metadata
# Validate session token and get granted level
session_token = metadata.session_token
session = security_manager.get_session(session_token)
if session is None:
logger.error(
f"No valid session found for service call to {func.__name__} "
f"(module_id: {metadata.module_id})"
)
response.success = False
response.message = "Invalid or expired session"
return response
# Check if session's granted level is sufficient
granted_level = session.security_level
if granted_level < security_level.value:
logger.warning(
f"Access denied: Module {metadata.module_id} (granted level {granted_level}) "
f"attempted to call {func.__name__} (requires level {security_level.value})"
)
response.success = False
response.message = (
f"Insufficient access level: {SecurityLevel.get_name(granted_level)} "
f"(required: {SecurityLevel.get_name(security_level.value)})"
)
return response
logger.debug(
f"Access granted: Module {metadata.module_id} (level {granted_level}) "
f"calling {func.__name__} (requires {security_level.value})"
)
return func(self, request, response)
# Mark the required access level for introspection
setattr(wrapper, '_required_access_level', security_level.value)
return wrapper
return decorator
[docs]
def create_security_context(
module_name: str,
module_id: str,
security_level: int,
session_token: str,
hmac_key: Optional[str] = None,
certificate: Optional[str] = None,
private_key_path: Optional[str] = None
) -> SecurityContext:
"""
Helper function to create a SecurityContext.
:param module_name: Module name (human-readable)
:param module_id: Module UUID
:param security_level: Security level (1-5)
:param session_token: Session token from RequestAccess
:param hmac_key: HMAC key (for Level 4+)
:param certificate: Certificate (for Level 5)
:param private_key_path: Path to private key (for Level 5)
:return: SecurityContext instance
"""
return SecurityContext(
module_name=module_name,
module_id=module_id,
security_level=security_level,
session_token=session_token,
hmac_key=hmac_key,
certificate=certificate,
private_key_path=private_key_path
)