Security Framework API Reference¶
Complete API documentation for the VYRA Security Framework.
Security Levels¶
Classes¶
- class vyra_base.security.security_levels.SecurityLevel(*values)[Quellcode]
Bases:
IntEnumEnumeration of security levels for module communication.
Each level builds upon the previous level, adding additional security mechanisms.
- Variablen:
NONE – No security checks performed
BASIC_AUTH – Module ID verification
EXTENDED_AUTH – Module ID + password authentication
HMAC – HMAC-SHA256 signature validation
DIGITAL_SIGNATURE – Certificate-based digital signature
- NONE = 1
- BASIC_AUTH = 2
- EXTENDED_AUTH = 3
- HMAC = 4
- DIGITAL_SIGNATURE = 5
- classmethod is_valid(level)[Quellcode]
Check if a given integer is a valid security level.
- classmethod get_name(level)[Quellcode]
Get the name of a security level.
- Parameter:
level (
int) – Security level integer- Rückgabetyp:
- Rückgabe:
Name of the security level
- Verursacht:
ValueError – If level is invalid
- requires_hmac()[Quellcode]
Check if this security level requires HMAC validation.
- Rückgabetyp:
- requires_certificate()[Quellcode]
Check if this security level requires certificate validation.
- Rückgabetyp:
- class vyra_base.security.security_levels.AccessStatus(*values)[Quellcode]
Bases:
IntEnumStatus codes for access request results.
- Variablen:
GRANTED – Access was granted successfully
DENIED – Access was denied
INVALID_REQUEST – Request was malformed or missing required fields
EXPIRED – Session or token has expired
INVALID_SIGNATURE – Signature validation failed
CERTIFICATE_ERROR – Certificate validation failed
INTERNAL_ERROR – Internal server error
- GRANTED = 0
- DENIED = 1
- INVALID_REQUEST = 2
- EXPIRED = 3
- INVALID_SIGNATURE = 4
- CERTIFICATE_ERROR = 5
- INTERNAL_ERROR = 6
- class vyra_base.security.security_levels.AlgorithmId(*values)[Quellcode]
-
Supported cryptographic algorithms.
- Variablen:
NONE – No algorithm (security level 1-3)
HMAC_SHA256 – HMAC with SHA-256 (security level 4)
RSA_SHA256 – RSA signature with SHA-256 (security level 5)
ECDSA_P256 – ECDSA with P-256 curve (security level 5)
- NONE = 'NONE'
- HMAC_SHA256 = 'HMAC-SHA256'
- RSA_SHA256 = 'RSA-SHA256'
- ECDSA_P256 = 'ECDSA-P256'
- classmethod for_security_level(level)[Quellcode]
Get the default algorithm for a security level.
- Parameter:
level (
SecurityLevel) – Security level- Rückgabetyp:
AlgorithmId- Rückgabe:
Default algorithm for that level
Exceptions¶
- exception vyra_base.security.security_levels.SecurityError[Quellcode]
Bases:
ExceptionBase exception for security-related errors.
- exception vyra_base.security.security_levels.InvalidSecurityLevelError[Quellcode]
Bases:
SecurityErrorRaised when an invalid security level is specified.
- exception vyra_base.security.security_levels.SignatureValidationError[Quellcode]
Bases:
SecurityErrorRaised when signature validation fails.
- exception vyra_base.security.security_levels.CertificateValidationError[Quellcode]
Bases:
SecurityErrorRaised when certificate validation fails.
- exception vyra_base.security.security_levels.TimestampValidationError[Quellcode]
Bases:
SecurityErrorRaised when timestamp validation fails.
- exception vyra_base.security.security_levels.SessionExpiredError[Quellcode]
Bases:
SecurityErrorRaised when a session has expired.
- exception vyra_base.security.security_levels.NonceValidationError[Quellcode]
Bases:
SecurityErrorRaised when nonce validation fails (replay attack detected).
Security Manager¶
Classes¶
- class vyra_base.security.security_manager.SecuritySession(module_name, module_id, role, security_level, session_token, hmac_key=None, certificate=None, created_at=<factory>, expires_at=<factory>, nonces_used=<factory>)[Quellcode]¶
Bases:
objectRepresents an active security session.
- Variablen:
module_name – Name of the authenticated module
module_id – UUID of the authenticated module
role – Role identifier of the module
security_level – Granted security level
session_token – Unique session token
hmac_key – HMAC key (for Level 4+)
certificate – Client certificate (for Level 5)
created_at – Session creation timestamp
expires_at – Session expiration timestamp
nonces_used – Set of nonces used (for replay attack prevention)
- Parameter:
- is_expired()[Quellcode]¶
Check if the session has expired.
- Rückgabetyp:
- add_nonce(nonce)[Quellcode]¶
Add a nonce to the used nonces set.
- has_nonce(nonce)[Quellcode]¶
Check if a nonce has been used (replay attack detection).
- __init__(module_name, module_id, role, security_level, session_token, hmac_key=None, certificate=None, created_at=<factory>, expires_at=<factory>, nonces_used=<factory>)¶
- class vyra_base.security.security_manager.SecurityManager(max_security_level=SecurityLevel.HMAC, session_duration_seconds=3600, ca_key_path=None, ca_cert_path=None, module_passwords=None)[Quellcode]¶
Bases:
objectManages security sessions and access control for a VYRA module.
This class can be used as a mixin or standalone component to provide security functionality. It handles: - Request access service implementation - Session token generation and validation - HMAC key distribution - Certificate signing (for Level 5)
- Usage:
- class MyModule(Node, SecurityManager):
- def __init__(self):
Node.__init__(self, ‚my_module‘) SecurityManager.__init__(self, max_security_level=SecurityLevel.HMAC) self.setup_security_service()
- Variablen:
max_security_level – Maximum security level this module supports
sessions – Dictionary mapping session tokens to SecuritySession objects
sessions_by_module – Dictionary mapping module IDs to session tokens
ca_private_key – CA private key (for Level 5 certificate signing)
ca_cert – CA certificate (for Level 5 certificate validation)
- Parameter:
- __init__(max_security_level=SecurityLevel.HMAC, session_duration_seconds=3600, ca_key_path=None, ca_cert_path=None, module_passwords=None)[Quellcode]¶
Initialize the SecurityManager.
- Parameter:
max_security_level (
SecurityLevel) – Maximum security level to supportsession_duration_seconds (
int) – Session validity durationca_key_path (
Optional[Path]) – Path to CA private key (required for Level 5)ca_cert_path (
Optional[Path]) – Path to CA certificate (required for Level 5)module_passwords (
Optional[Dict[str,str]]) – Dictionary mapping module_id to password (for Level 3+)
- sessions: Dict[str, SecuritySession]¶
- async request_access(request, response)[Quellcode]¶
Remote callable wrapper for request_access_impl.
- Parameter:
request – Request object with access parameters
response – Response object to populate
- Rückgabetyp:
- Rückgabe:
True if handled successfully, False otherwise
- async request_access_impl(module_name, module_id, requested_role, requested_sl, password='', certificate_csr='')[Quellcode]¶
Handle an access request from another module.
This is the main entry point for the RequestAccess service.
- Parameter:
module_name (
str) – Name of the requesting modulemodule_id (
UUID) – UUID of the requesting modulerequested_role (
str) – Role identifierrequested_sl (
int) – Requested security level (1-5)password (
str) – Password for Level 3+ authenticationcertificate_csr (
str) – CSR for Level 5 (base64-encoded PEM)
- Rückgabetyp:
- Rückgabe:
Tuple of (success, message, session_token, hmac_key, certificate, expires_at, granted_sl)
- get_session(session_token)[Quellcode]¶
Retrieve a session by token.
- Parameter:
session_token (
str) – Session token- Rückgabetyp:
- Rückgabe:
SecuritySession if valid and not expired, None otherwise
- async cleanup_expired_sessions()[Quellcode]¶
Remove all expired sessions. Should be called periodically.
- Rückgabetyp:
- get_session_count()[Quellcode]¶
Get the number of active sessions.
- Rückgabetyp:
- get_active_modules()[Quellcode]¶
Get list of active authenticated modules.
- Rückgabetyp:
- Rückgabe:
List of dicts with module info
Security Client¶
Classes¶
- class vyra_base.security.security_client.SecurityContext(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)[Quellcode]¶
Bases:
objectSecurity context for a client connection.
- Variablen:
module_name – Name of the client module
module_id – UUID of the client module
security_level – Security level in use
session_token – Session token from RequestAccess
hmac_key – HMAC key (for Level 4+)
certificate – Client certificate (for Level 5)
private_key_path – Path to private key (for Level 5)
- Parameter:
- __init__(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)¶
- class vyra_base.security.security_client.SafetyMetadataBuilder[Quellcode]¶
Bases:
objectHelper class to build SafetyMetadata for messages.
This class handles the creation of SafetyMetadata fields based on the security level and context.
- static build(security_context, additional_data='')[Quellcode]¶
Build SafetyMetadata dictionary.
- Parameter:
security_context (
SecurityContext) – Security context with authentication infoadditional_data (
str) – Additional data to include in signature
- Rückgabetyp:
- Rückgabe:
Dictionary with SafetyMetadata fields
- class vyra_base.security.security_client.SecurePublisher(node, msg_type, topic, security_context, qos_profile=10)[Quellcode]¶
Bases:
objectWrapper for ROS2 Publisher with automatic SafetyMetadata.
This class wraps a standard ROS2 publisher and automatically adds SafetyMetadata to each published message based on the security level.
Usage:
publisher = SecurePublisher( node, MessageType, 'topic_name', security_context, qos_profile ) publisher.publish(msg)
- Variablen:
publisher – Underlying ROS2 publisher
security_context – Security context for this publisher
- Parameter:
node (Node)
msg_type (Type)
topic (str)
security_context (SecurityContext)
qos_profile (Any)
- __init__(node, msg_type, topic, security_context, qos_profile=10)[Quellcode]¶
Initialize SecurePublisher.
- Parameter:
node (
Node) – ROS2 nodemsg_type (
Type) – Message type classtopic (
str) – Topic namesecurity_context (
SecurityContext) – Security contextqos_profile (
Any) – QoS profile
- publish(msg, additional_data='')[Quellcode]¶
Publish a message with automatic SafetyMetadata.
- destroy()[Quellcode]¶
Destroy the underlying publisher.
- Rückgabetyp:
- class vyra_base.security.security_client.SecureServiceClient(node, srv_type, service_name, security_context)[Quellcode]¶
Bases:
objectWrapper for ROS2 Service Client with automatic SafetyMetadata.
Similar to SecurePublisher, but for service calls.
Usage:
client = SecureServiceClient( node, ServiceType, 'service_name', security_context ) response = await client.call_async(request)
- Variablen:
client – Underlying ROS2 service client
security_context – Security context for this client
- Parameter:
node (Node)
srv_type (Type)
service_name (str)
security_context (SecurityContext)
- __init__(node, srv_type, service_name, security_context)[Quellcode]¶
Initialize SecureServiceClient.
- Parameter:
node (
Node) – ROS2 nodesrv_type (
Type) – Service type classservice_name (
str) – Service namesecurity_context (
SecurityContext) – Security context
- async call_async(request, additional_data='')[Quellcode]¶
Call service asynchronously with automatic SafetyMetadata.
- destroy()[Quellcode]¶
Destroy the underlying client.
- Rückgabetyp:
Functions¶
- vyra_base.security.security_client.security_required(security_level=SecurityLevel.BASIC_AUTH, validate_metadata=True)[Quellcode]¶
Decorator for service callbacks requiring authentication.
This decorator checks that:
SafetyMetadata is present in the request
Security level meets the minimum requirement
Session token is valid (if SecurityManager is available)
Module has sufficient access level (from granted session)
The decorator integrates with entity.security_manager to validate that the calling module has been granted sufficient access level.
Usage with @remote_service:
@remote_service() @security_required(security_level=SecurityLevel.HMAC) def my_service_callback(self, request, response): # Service implementation return response
Configuration in interface JSON:
{ "functionname": "my_service_callback", "access_level": 4, // SecurityLevel.HMAC ... }
- Parameter:
security_level (
SecurityLevel) – Minimum required security level (1-5)validate_metadata (
bool) – Whether to validate SafetyMetadata
- Rückgabe:
Decorator function
- vyra_base.security.security_client.create_security_context(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)[Quellcode]¶
Helper function to create a SecurityContext.
- Parameter:
- Rückgabetyp:
- Rückgabe:
SecurityContext instance
Security Validato¶
Classes¶
- class vyra_base.security.security_validator.SecurityValidator(security_manager=None, strict_mode=True)[Quellcode]¶
Bases:
objectValidates SafetyMetadata in incoming messages and service calls.
This class works in conjunction with SecurityManager to provide comprehensive validation of security metadata.
Usage:
validator = SecurityValidator(security_manager) # In service callback: try: validator.validate_metadata(request.safety_metadata, required_level=SecurityLevel.HMAC) # Process request except SecurityError as e: response.success = False response.message = str(e) return response
- Variablen:
security_manager – SecurityManager instance for session validation
strict_mode – If True, reject messages that fail any validation
- Parameter:
security_manager (SecurityManager | None)
strict_mode (bool)
- __init__(security_manager=None, strict_mode=True)[Quellcode]¶
Initialize SecurityValidator.
- Parameter:
security_manager (
Optional[SecurityManager]) – SecurityManager for session validationstrict_mode (
bool) – Enable strict validation mode
- validate_metadata(metadata, required_level=SecurityLevel.NONE, additional_data='')[Quellcode]¶
Validate SafetyMetadata from an incoming message/service call.
Performs validation based on the security level: - Level 1 (NONE): No validation - Level 2 (BASIC_AUTH): Module ID check - Level 3 (EXTENDED_AUTH): Module ID + password authentication (validated during RequestAccess) - Level 4 (HMAC): HMAC signature + timestamp validation - Level 5 (DIGITAL_SIGNATURE): Certificate signature validation
- Parameter:
- Rückgabetyp:
- Rückgabe:
SecuritySession if validation succeeds
- Verursacht:
SecurityError – If validation fails
- class vyra_base.security.security_validator.MessageSecurityFilter(validator, required_level=SecurityLevel.NONE)[Quellcode]¶
Bases:
objectFilter for incoming ROS2 messages with automatic security validation.
This class can be used as a subscription callback wrapper to automatically validate SafetyMetadata before processing messages.
Usage:
def my_callback(msg): # Process message pass filter = MessageSecurityFilter(validator, SecurityLevel.HMAC) filtered_callback = filter.wrap_callback(my_callback) subscription = node.create_subscription( MsgType, 'topic', filtered_callback, qos_profile )
- Variablen:
validator – SecurityValidator instance
required_level – Minimum required security level
- Parameter:
validator (SecurityValidator)
required_level (SecurityLevel)
- __init__(validator, required_level=SecurityLevel.NONE)[Quellcode]¶
Initialize MessageSecurityFilter.
- Parameter:
validator (
SecurityValidator) – SecurityValidator instancerequired_level (
SecurityLevel) – Minimum required security level
- wrap_callback(callback)[Quellcode]¶
Wrap a callback function with security validation.
Functions¶
- vyra_base.security.security_validator.create_secure_subscription(node, msg_type, topic, callback, validator, required_level=SecurityLevel.NONE, qos_profile=10)[Quellcode]¶
Helper function to create a subscription with automatic security validation.
- Parameter:
- Rückgabetyp:
- Rückgabe:
ROS2 subscription
Crypto Helpers¶
HMAC Functions¶
- vyra_base.helper.crypto_helper.generate_hmac_key()[Quellcode]¶
Generate a secure random HMAC key.
- Rückgabetyp:
- Rückgabe:
Hex-encoded HMAC key
- vyra_base.helper.crypto_helper.compute_hmac_signature(message, hmac_key)[Quellcode]¶
Compute HMAC-SHA256 signature for a message.
- Parameter:
- Rückgabetyp:
- Rückgabe:
Hex-encoded HMAC signature
- Verursacht:
ValueError – If hmac_key is invalid
- vyra_base.helper.crypto_helper.verify_hmac_signature(message, signature, hmac_key)[Quellcode]¶
Verify HMAC-SHA256 signature for a message.
- vyra_base.helper.crypto_helper.create_hmac_payload(sender_id, timestamp, nonce, hmac_key, additional_data='')[Quellcode]¶
Create HMAC payload for SafetyMetadata.
The message signed includes: sender_id, timestamp, nonce, and optional additional data.
Token Functions¶
- vyra_base.helper.crypto_helper.generate_session_token()[Quellcode]¶
Generate a secure random session token.
- Rückgabetyp:
- Rückgabe:
Hex-encoded session token
- vyra_base.helper.crypto_helper.generate_nonce()[Quellcode]¶
Generate a random nonce for replay attack prevention.
- Rückgabetyp:
- Rückgabe:
Random 64-bit unsigned integer
RSA Key Functions¶
- vyra_base.helper.crypto_helper.generate_rsa_keypair()[Quellcode]¶
Generate RSA key pair for certificate operations.
- Rückgabetyp:
Tuple[RSAPrivateKey,RSAPublicKey]- Rückgabe:
Tuple of (private_key, public_key)
- vyra_base.helper.crypto_helper.save_private_key(private_key, filepath, password=None)[Quellcode]¶
Save RSA private key to file.
- vyra_base.helper.crypto_helper.load_private_key(filepath, password=None)[Quellcode]¶
Load RSA private key from file.
- Parameter:
- Rückgabetyp:
RSAPrivateKey- Rückgabe:
RSA private key
- Verursacht:
FileNotFoundError – If key file doesn’t exist
Certificate Functions¶
- vyra_base.helper.crypto_helper.create_csr(module_name, module_id, private_key, organization='VYRA Framework', country='DE')[Quellcode]¶
Create a Certificate Signing Request (CSR).
- vyra_base.helper.crypto_helper.sign_csr(csr_pem, ca_private_key, ca_cert, validity_days=365)[Quellcode]¶
Sign a Certificate Signing Request with CA key.
- Parameter:
- Rückgabetyp:
- Rückgabe:
PEM-encoded signed certificate
- Verursacht:
CertificateValidationError – If CSR is invalid
- vyra_base.helper.crypto_helper.create_self_signed_cert(module_name, private_key, validity_days=365)[Quellcode]¶
Create a self-signed certificate (for CA or testing).
- vyra_base.helper.crypto_helper.verify_certificate(cert_pem, ca_cert_pem)[Quellcode]¶
Verify a certificate against a CA certificate.
- vyra_base.helper.crypto_helper.sign_message_rsa(message, private_key)[Quellcode]¶
Sign a message with RSA private key.
- vyra_base.helper.crypto_helper.verify_message_rsa(message, signature, cert_pem)[Quellcode]¶
Verify RSA signature of a message.
Constants¶
- vyra_base.helper.crypto_helper.DEFAULT_SESSION_DURATION_SECONDS: int = 3600¶
Default session validity duration (1 hour).
- vyra_base.helper.crypto_helper.DEFAULT_TOKEN_LENGTH: int = 32¶
Default session token length in bytes.
- vyra_base.helper.crypto_helper.MAX_TIMESTAMP_DRIFT_SECONDS: int = 300¶
Maximum allowed timestamp drift (5 minutes).
- vyra_base.helper.crypto_helper.CERT_VALIDITY_DAYS: int = 365¶
Default certificate validity period in days.
ROS2 Interfaces¶
VBASERequestAccess Service¶
Service definition for requesting authenticated access.
Request Fields:
string module_name # Name of requesting module
UUID module_id # UUID of requesting module
string requested_role # Role identifier
uint8 requested_sl # Requested security level (1-5)
string certificate_csr # CSR for Level 5 (optional)
Response Fields:
bool success # Access granted
string message # Result message
string session_token # Session token
string hmac_key # HMAC key (Level 4+)
string certificate # Signed certificate (Level 5)
Time expires_at # Token expiration
uint8 granted_sl # Granted security level
SafetyMetadata Message¶
Message type containing security metadata.
Fields:
uint8 security_level # Security level (1-5)
string sender_id # Sender module UUID
Time timestamp # Message creation time
uint64 nonce # Random nonce
string security_payload # Signature/HMAC
string algorithm_id # Algorithm identifier
Usage Examples¶
Authentication¶
Server Side:
from vyra_base.security.security_manager import SecurityManager
class MyModule(Node, SecurityManager):
def __init__(self):
SecurityManager.__init__(
self,
max_security_level=SecurityLevel.HMAC
)
Client Side:
from vyra_base.security.security_client import create_security_context
response = await access_client.call_async(request)
ctx = create_security_context(
module_id=str(uuid),
security_level=response.granted_sl,
session_token=response.session_token,
hmac_key=response.hmac_key
)
Secure Publishing¶
from vyra_base.security.security_client import SecurePublisher
pub = SecurePublisher(
node,
MessageType,
'topic_name',
security_context,
qos_profile=10
)
msg = MessageType()
msg.data = "Hello"
pub.publish(msg) # Automatic SafetyMetadata
Validation¶
from vyra_base.security.security_validator import SecurityValidator
validato = SecurityValidator(security_manager, strict_mode=True)
def callback(self, request, response):
try:
validato.validate_metadata(
request.safety_metadata,
required_level=SecurityLevel.HMAC
)
# Process request
except SecurityError as e:
response.success = False
response.message = str(e)
return response
Certificate Operations¶
from vyra_base.helper.crypto_helper import (
generate_rsa_keypair,
create_csr,
sign_csr
)
# Generate key pair
private_key, public_key = generate_rsa_keypair()
# Create CSR
csr = create_csr("my_module", "uuid-string", private_key)
# Sign CSR (server side)
certificate = sign_csr(csr, ca_private_key, ca_cert)
Type Hints¶
from typing import Optional, Tuple
from datetime import datetime
import uuid
# SecurityManager
async def handle_request_access(
self,
module_name: str,
module_id: uuid.UUID,
requested_role: str,
requested_sl: int,
certificate_csr: str = ""
) -> Tuple[bool, str, str, str, str, datetime, int]:
...
# SecurityValidator
def validate_metadata(
self,
metadata: Any,
required_level: SecurityLevel = SecurityLevel.NONE,
additional_data: str = ""
) -> Optional[SecuritySession]:
...
# Crypto helpers
def compute_hmac_signature(
message: str,
hmac_key: str
) -> str:
...
Error Handling¶
Exception Handling Pattern¶
from vyra_base.security.security_levels import (
SecurityError,
SignatureValidationError,
TimestampValidationError,
SessionExpiredError
)
try:
validato.validate_metadata(metadata, SecurityLevel.HMAC)
except SignatureValidationError:
# Handle invalid signature
pass
except TimestampValidationError:
# Handle timestamp issues
pass
except SessionExpiredError:
# Re-authenticate
pass
except SecurityError as e:
# Handle general security error
pass
Testing¶
Unit Test Example¶
import pytest
from vyra_base.security.security_manager import SecurityManager
@pytest.mark.asyncio
async def test_authentication():
manager = SecurityManager(
max_security_level=SecurityLevel.HMAC
)
result = await manager.handle_request_access(
"test_module",
uuid.uuid4(),
"tester",
4,
""
)
assert result[0] is True # success
assert result[6] == 4 # granted_sl
Mock Testing¶
from unittest.mock import Mock, patch
@patch('vyra_base.helper.crypto_helper.generate_hmac_key')
def test_with_mock_key(mock_key):
mock_key.return_value = "0" * 64
manager = SecurityManager()
# Test with predictable key
...
See Also¶
Security Framework Overview - Framework overview
Security Framework Quick Start - Quick start guide
Security Framework Examples - Example implementations
Module index: Modulindex
Index: Stichwortverzeichnis