Security Framework API Reference

Complete API documentation for the VYRA Security Framework.

Security Levels

Classes

class vyra_base.security.security_levels.SecurityLevel(*values)[Quellcode]

Bases: IntEnum

Enumeration of security levels for module communication.

Each level builds upon the previous level, adding additional security mechanisms.

Variablen:
  • NONE – No security checks performed

  • BASIC_AUTH – Module ID verification

  • EXTENDED_AUTH – Module ID + password authentication

  • HMAC – HMAC-SHA256 signature validation

  • DIGITAL_SIGNATURE – Certificate-based digital signature

NONE = 1
BASIC_AUTH = 2
EXTENDED_AUTH = 3
HMAC = 4
DIGITAL_SIGNATURE = 5
classmethod is_valid(level)[Quellcode]

Check if a given integer is a valid security level.

Parameter:

level (int) – Security level to validate

Rückgabetyp:

bool

Rückgabe:

True if valid, False otherwise

classmethod get_name(level)[Quellcode]

Get the name of a security level.

Parameter:

level (int) – Security level integer

Rückgabetyp:

str

Rückgabe:

Name of the security level

Verursacht:

ValueError – If level is invalid

requires_hmac()[Quellcode]

Check if this security level requires HMAC validation.

Rückgabetyp:

bool

requires_certificate()[Quellcode]

Check if this security level requires certificate validation.

Rückgabetyp:

bool

class vyra_base.security.security_levels.AccessStatus(*values)[Quellcode]

Bases: IntEnum

Status codes for access request results.

Variablen:
  • GRANTED – Access was granted successfully

  • DENIED – Access was denied

  • INVALID_REQUEST – Request was malformed or missing required fields

  • EXPIRED – Session or token has expired

  • INVALID_SIGNATURE – Signature validation failed

  • CERTIFICATE_ERROR – Certificate validation failed

  • INTERNAL_ERROR – Internal server error

GRANTED = 0
DENIED = 1
INVALID_REQUEST = 2
EXPIRED = 3
INVALID_SIGNATURE = 4
CERTIFICATE_ERROR = 5
INTERNAL_ERROR = 6
class vyra_base.security.security_levels.AlgorithmId(*values)[Quellcode]

Bases: str, Enum

Supported cryptographic algorithms.

Variablen:
  • NONE – No algorithm (security level 1-3)

  • HMAC_SHA256 – HMAC with SHA-256 (security level 4)

  • RSA_SHA256 – RSA signature with SHA-256 (security level 5)

  • ECDSA_P256 – ECDSA with P-256 curve (security level 5)

NONE = 'NONE'
HMAC_SHA256 = 'HMAC-SHA256'
RSA_SHA256 = 'RSA-SHA256'
ECDSA_P256 = 'ECDSA-P256'
classmethod for_security_level(level)[Quellcode]

Get the default algorithm for a security level.

Parameter:

level (SecurityLevel) – Security level

Rückgabetyp:

AlgorithmId

Rückgabe:

Default algorithm for that level

Exceptions

exception vyra_base.security.security_levels.SecurityError[Quellcode]

Bases: Exception

Base exception for security-related errors.

exception vyra_base.security.security_levels.InvalidSecurityLevelError[Quellcode]

Bases: SecurityError

Raised when an invalid security level is specified.

exception vyra_base.security.security_levels.SignatureValidationError[Quellcode]

Bases: SecurityError

Raised when signature validation fails.

exception vyra_base.security.security_levels.CertificateValidationError[Quellcode]

Bases: SecurityError

Raised when certificate validation fails.

exception vyra_base.security.security_levels.TimestampValidationError[Quellcode]

Bases: SecurityError

Raised when timestamp validation fails.

exception vyra_base.security.security_levels.SessionExpiredError[Quellcode]

Bases: SecurityError

Raised when a session has expired.

exception vyra_base.security.security_levels.NonceValidationError[Quellcode]

Bases: SecurityError

Raised when nonce validation fails (replay attack detected).

Security Manager

Classes

class vyra_base.security.security_manager.SecuritySession(module_name, module_id, role, security_level, session_token, hmac_key=None, certificate=None, created_at=<factory>, expires_at=<factory>, nonces_used=<factory>)[Quellcode]

Bases: object

Represents an active security session.

Variablen:
  • module_name – Name of the authenticated module

  • module_id – UUID of the authenticated module

  • role – Role identifier of the module

  • security_level – Granted security level

  • session_token – Unique session token

  • hmac_key – HMAC key (for Level 4+)

  • certificate – Client certificate (for Level 5)

  • created_at – Session creation timestamp

  • expires_at – Session expiration timestamp

  • nonces_used – Set of nonces used (for replay attack prevention)

Parameter:
module_name: str
module_id: str
role: str
security_level: int
session_token: str
hmac_key: str | None = None
certificate: str | None = None
created_at: datetime
expires_at: datetime
nonces_used: set
is_expired()[Quellcode]

Check if the session has expired.

Rückgabetyp:

bool

add_nonce(nonce)[Quellcode]

Add a nonce to the used nonces set.

Rückgabetyp:

None

Parameter:

nonce (int)

has_nonce(nonce)[Quellcode]

Check if a nonce has been used (replay attack detection).

Rückgabetyp:

bool

Parameter:

nonce (int)

__init__(module_name, module_id, role, security_level, session_token, hmac_key=None, certificate=None, created_at=<factory>, expires_at=<factory>, nonces_used=<factory>)
Parameter:
Rückgabetyp:

None

class vyra_base.security.security_manager.SecurityManager(max_security_level=SecurityLevel.HMAC, session_duration_seconds=3600, ca_key_path=None, ca_cert_path=None, module_passwords=None)[Quellcode]

Bases: object

Manages security sessions and access control for a VYRA module.

This class can be used as a mixin or standalone component to provide security functionality. It handles: - Request access service implementation - Session token generation and validation - HMAC key distribution - Certificate signing (for Level 5)

Usage:
class MyModule(Node, SecurityManager):
def __init__(self):

Node.__init__(self, ‚my_module‘) SecurityManager.__init__(self, max_security_level=SecurityLevel.HMAC) self.setup_security_service()

Variablen:
  • max_security_level – Maximum security level this module supports

  • sessions – Dictionary mapping session tokens to SecuritySession objects

  • sessions_by_module – Dictionary mapping module IDs to session tokens

  • ca_private_key – CA private key (for Level 5 certificate signing)

  • ca_cert – CA certificate (for Level 5 certificate validation)

Parameter:
  • max_security_level (SecurityLevel)

  • session_duration_seconds (int)

  • ca_key_path (Path | None)

  • ca_cert_path (Path | None)

  • module_passwords (Dict[str, str])

__init__(max_security_level=SecurityLevel.HMAC, session_duration_seconds=3600, ca_key_path=None, ca_cert_path=None, module_passwords=None)[Quellcode]

Initialize the SecurityManager.

Parameter:
  • max_security_level (SecurityLevel) – Maximum security level to support

  • session_duration_seconds (int) – Session validity duration

  • ca_key_path (Optional[Path]) – Path to CA private key (required for Level 5)

  • ca_cert_path (Optional[Path]) – Path to CA certificate (required for Level 5)

  • module_passwords (Optional[Dict[str, str]]) – Dictionary mapping module_id to password (for Level 3+)

sessions: Dict[str, SecuritySession]
sessions_by_module: Dict[str, str]
module_passwords: Dict[str, str]
async request_access(request, response)[Quellcode]

Remote callable wrapper for request_access_impl.

Parameter:
  • request – Request object with access parameters

  • response – Response object to populate

Rückgabetyp:

bool

Rückgabe:

True if handled successfully, False otherwise

async request_access_impl(module_name, module_id, requested_role, requested_sl, password='', certificate_csr='')[Quellcode]

Handle an access request from another module.

This is the main entry point for the RequestAccess service.

Parameter:
  • module_name (str) – Name of the requesting module

  • module_id (UUID) – UUID of the requesting module

  • requested_role (str) – Role identifier

  • requested_sl (int) – Requested security level (1-5)

  • password (str) – Password for Level 3+ authentication

  • certificate_csr (str) – CSR for Level 5 (base64-encoded PEM)

Rückgabetyp:

Tuple[bool, str, str, str, str, datetime, int]

Rückgabe:

Tuple of (success, message, session_token, hmac_key, certificate, expires_at, granted_sl)

get_session(session_token)[Quellcode]

Retrieve a session by token.

Parameter:

session_token (str) – Session token

Rückgabetyp:

Optional[SecuritySession]

Rückgabe:

SecuritySession if valid and not expired, None otherwise

async cleanup_expired_sessions()[Quellcode]

Remove all expired sessions. Should be called periodically.

Rückgabetyp:

None

get_session_count()[Quellcode]

Get the number of active sessions.

Rückgabetyp:

int

get_active_modules()[Quellcode]

Get list of active authenticated modules.

Rückgabetyp:

list

Rückgabe:

List of dicts with module info

Security Client

Classes

class vyra_base.security.security_client.SecurityContext(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)[Quellcode]

Bases: object

Security context for a client connection.

Variablen:
  • module_name – Name of the client module

  • module_id – UUID of the client module

  • security_level – Security level in use

  • session_token – Session token from RequestAccess

  • hmac_key – HMAC key (for Level 4+)

  • certificate – Client certificate (for Level 5)

  • private_key_path – Path to private key (for Level 5)

Parameter:
  • module_name (str)

  • module_id (str)

  • security_level (int)

  • session_token (str)

  • hmac_key (str | None)

  • certificate (str | None)

  • private_key_path (str | None)

module_name: str
module_id: str
security_level: int
session_token: str
hmac_key: str | None = None
certificate: str | None = None
private_key_path: str | None = None
__init__(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)
Parameter:
  • module_name (str)

  • module_id (str)

  • security_level (int)

  • session_token (str)

  • hmac_key (str | None)

  • certificate (str | None)

  • private_key_path (str | None)

Rückgabetyp:

None

class vyra_base.security.security_client.SafetyMetadataBuilder[Quellcode]

Bases: object

Helper class to build SafetyMetadata for messages.

This class handles the creation of SafetyMetadata fields based on the security level and context.

static build(security_context, additional_data='')[Quellcode]

Build SafetyMetadata dictionary.

Parameter:
  • security_context (SecurityContext) – Security context with authentication info

  • additional_data (str) – Additional data to include in signature

Rückgabetyp:

dict

Rückgabe:

Dictionary with SafetyMetadata fields

class vyra_base.security.security_client.SecurePublisher(node, msg_type, topic, security_context, qos_profile=10)[Quellcode]

Bases: object

Wrapper for ROS2 Publisher with automatic SafetyMetadata.

This class wraps a standard ROS2 publisher and automatically adds SafetyMetadata to each published message based on the security level.

Usage:

publisher = SecurePublisher(
    node,
    MessageType,
    'topic_name',
    security_context,
    qos_profile
)
publisher.publish(msg)
Variablen:
  • publisher – Underlying ROS2 publisher

  • security_context – Security context for this publisher

Parameter:
  • node (Node)

  • msg_type (Type)

  • topic (str)

  • security_context (SecurityContext)

  • qos_profile (Any)

__init__(node, msg_type, topic, security_context, qos_profile=10)[Quellcode]

Initialize SecurePublisher.

Parameter:
  • node (Node) – ROS2 node

  • msg_type (Type) – Message type class

  • topic (str) – Topic name

  • security_context (SecurityContext) – Security context

  • qos_profile (Any) – QoS profile

publish(msg, additional_data='')[Quellcode]

Publish a message with automatic SafetyMetadata.

Parameter:
  • msg (Any) – Message to publish (must have safety_metadata field)

  • additional_data (str) – Additional data to include in signature

Verursacht:

SecurityError – If message doesn’t support SafetyMetadata

Rückgabetyp:

None

destroy()[Quellcode]

Destroy the underlying publisher.

Rückgabetyp:

None

class vyra_base.security.security_client.SecureServiceClient(node, srv_type, service_name, security_context)[Quellcode]

Bases: object

Wrapper for ROS2 Service Client with automatic SafetyMetadata.

Similar to SecurePublisher, but for service calls.

Usage:

client = SecureServiceClient(
    node,
    ServiceType,
    'service_name',
    security_context
)
response = await client.call_async(request)
Variablen:
  • client – Underlying ROS2 service client

  • security_context – Security context for this client

Parameter:
__init__(node, srv_type, service_name, security_context)[Quellcode]

Initialize SecureServiceClient.

Parameter:
  • node (Node) – ROS2 node

  • srv_type (Type) – Service type class

  • service_name (str) – Service name

  • security_context (SecurityContext) – Security context

async call_async(request, additional_data='')[Quellcode]

Call service asynchronously with automatic SafetyMetadata.

Parameter:
  • request (Any) – Service request (must have safety_metadata field)

  • additional_data (str) – Additional data to include in signature

Rückgabetyp:

Any

Rückgabe:

Service response

Verursacht:

SecurityError – If request doesn’t support SafetyMetadata

destroy()[Quellcode]

Destroy the underlying client.

Rückgabetyp:

None

Functions

vyra_base.security.security_client.security_required(security_level=SecurityLevel.BASIC_AUTH, validate_metadata=True)[Quellcode]

Decorator for service callbacks requiring authentication.

This decorator checks that:

  1. SafetyMetadata is present in the request

  2. Security level meets the minimum requirement

  3. Session token is valid (if SecurityManager is available)

  4. Module has sufficient access level (from granted session)

The decorator integrates with entity.security_manager to validate that the calling module has been granted sufficient access level.

Usage with @remote_service:

@remote_service()
@security_required(security_level=SecurityLevel.HMAC)
def my_service_callback(self, request, response):
    # Service implementation
    return response

Configuration in interface JSON:

{
    "functionname": "my_service_callback",
    "access_level": 4,  // SecurityLevel.HMAC
    ...
}
Parameter:
  • security_level (SecurityLevel) – Minimum required security level (1-5)

  • validate_metadata (bool) – Whether to validate SafetyMetadata

Rückgabe:

Decorator function

vyra_base.security.security_client.create_security_context(module_name, module_id, security_level, session_token, hmac_key=None, certificate=None, private_key_path=None)[Quellcode]

Helper function to create a SecurityContext.

Parameter:
  • module_name (str) – Module name (human-readable)

  • module_id (str) – Module UUID

  • security_level (int) – Security level (1-5)

  • session_token (str) – Session token from RequestAccess

  • hmac_key (Optional[str]) – HMAC key (for Level 4+)

  • certificate (Optional[str]) – Certificate (for Level 5)

  • private_key_path (Optional[str]) – Path to private key (for Level 5)

Rückgabetyp:

SecurityContext

Rückgabe:

SecurityContext instance

Security Validato

Classes

class vyra_base.security.security_validator.SecurityValidator(security_manager=None, strict_mode=True)[Quellcode]

Bases: object

Validates SafetyMetadata in incoming messages and service calls.

This class works in conjunction with SecurityManager to provide comprehensive validation of security metadata.

Usage:

validator = SecurityValidator(security_manager)

# In service callback:
try:
    validator.validate_metadata(request.safety_metadata, required_level=SecurityLevel.HMAC)
    # Process request
except SecurityError as e:
    response.success = False
    response.message = str(e)
    return response
Variablen:
  • security_manager – SecurityManager instance for session validation

  • strict_mode – If True, reject messages that fail any validation

Parameter:
__init__(security_manager=None, strict_mode=True)[Quellcode]

Initialize SecurityValidator.

Parameter:
  • security_manager (Optional[SecurityManager]) – SecurityManager for session validation

  • strict_mode (bool) – Enable strict validation mode

validate_metadata(metadata, required_level=SecurityLevel.NONE, additional_data='')[Quellcode]

Validate SafetyMetadata from an incoming message/service call.

Performs validation based on the security level: - Level 1 (NONE): No validation - Level 2 (BASIC_AUTH): Module ID check - Level 3 (EXTENDED_AUTH): Module ID + password authentication (validated during RequestAccess) - Level 4 (HMAC): HMAC signature + timestamp validation - Level 5 (DIGITAL_SIGNATURE): Certificate signature validation

Parameter:
  • metadata (Any) – SafetyMetadata message

  • required_level (SecurityLevel) – Minimum required security level

  • additional_data (str) – Additional data that was included in signature

Rückgabetyp:

Optional[SecuritySession]

Rückgabe:

SecuritySession if validation succeeds

Verursacht:

SecurityError – If validation fails

class vyra_base.security.security_validator.MessageSecurityFilter(validator, required_level=SecurityLevel.NONE)[Quellcode]

Bases: object

Filter for incoming ROS2 messages with automatic security validation.

This class can be used as a subscription callback wrapper to automatically validate SafetyMetadata before processing messages.

Usage:

def my_callback(msg):
    # Process message
    pass

filter = MessageSecurityFilter(validator, SecurityLevel.HMAC)
filtered_callback = filter.wrap_callback(my_callback)

subscription = node.create_subscription(
    MsgType,
    'topic',
    filtered_callback,
    qos_profile
)
Variablen:
  • validator – SecurityValidator instance

  • required_level – Minimum required security level

Parameter:
__init__(validator, required_level=SecurityLevel.NONE)[Quellcode]

Initialize MessageSecurityFilter.

Parameter:
  • validator (SecurityValidator) – SecurityValidator instance

  • required_level (SecurityLevel) – Minimum required security level

wrap_callback(callback)[Quellcode]

Wrap a callback function with security validation.

Parameter:

callback (Callable) – Original callback function

Rückgabetyp:

Callable

Rückgabe:

Wrapped callback with validation

Functions

vyra_base.security.security_validator.create_secure_subscription(node, msg_type, topic, callback, validator, required_level=SecurityLevel.NONE, qos_profile=10)[Quellcode]

Helper function to create a subscription with automatic security validation.

Parameter:
  • node (Any) – ROS2 node

  • msg_type (type) – Message type

  • topic (str) – Topic name

  • callback (Callable) – Callback function

  • validator (SecurityValidator) – SecurityValidator instance

  • required_level (SecurityLevel) – Minimum required security level

  • qos_profile (Any) – QoS profile

Rückgabetyp:

Any

Rückgabe:

ROS2 subscription

Crypto Helpers

HMAC Functions

vyra_base.helper.crypto_helper.generate_hmac_key()[Quellcode]

Generate a secure random HMAC key.

Rückgabetyp:

str

Rückgabe:

Hex-encoded HMAC key

vyra_base.helper.crypto_helper.compute_hmac_signature(message, hmac_key)[Quellcode]

Compute HMAC-SHA256 signature for a message.

Parameter:
  • message (str) – Message to sign

  • hmac_key (str) – Hex-encoded HMAC key

Rückgabetyp:

str

Rückgabe:

Hex-encoded HMAC signature

Verursacht:

ValueError – If hmac_key is invalid

vyra_base.helper.crypto_helper.verify_hmac_signature(message, signature, hmac_key)[Quellcode]

Verify HMAC-SHA256 signature for a message.

Parameter:
  • message (str) – Original message

  • signature (str) – Hex-encoded signature to verify

  • hmac_key (str) – Hex-encoded HMAC key

Rückgabetyp:

bool

Rückgabe:

True if signature is valid, False otherwise

Verursacht:

SignatureValidationError – If signature format is invalid

vyra_base.helper.crypto_helper.create_hmac_payload(sender_id, timestamp, nonce, hmac_key, additional_data='')[Quellcode]

Create HMAC payload for SafetyMetadata.

The message signed includes: sender_id, timestamp, nonce, and optional additional data.

Parameter:
  • sender_id (str) – Sender module ID

  • timestamp (int) – Message timestamp (seconds since epoch)

  • nonce (int) – Random nonce

  • hmac_key (str) – Hex-encoded HMAC key

  • additional_data (str) – Optional additional data to include in signature

Rückgabetyp:

str

Rückgabe:

Hex-encoded HMAC signature

Token Functions

vyra_base.helper.crypto_helper.generate_session_token()[Quellcode]

Generate a secure random session token.

Rückgabetyp:

str

Rückgabe:

Hex-encoded session token

vyra_base.helper.crypto_helper.generate_nonce()[Quellcode]

Generate a random nonce for replay attack prevention.

Rückgabetyp:

int

Rückgabe:

Random 64-bit unsigned integer

RSA Key Functions

vyra_base.helper.crypto_helper.generate_rsa_keypair()[Quellcode]

Generate RSA key pair for certificate operations.

Rückgabetyp:

Tuple[RSAPrivateKey, RSAPublicKey]

Rückgabe:

Tuple of (private_key, public_key)

vyra_base.helper.crypto_helper.save_private_key(private_key, filepath, password=None)[Quellcode]

Save RSA private key to file.

Parameter:
  • private_key (RSAPrivateKey) – RSA private key

  • filepath (Path) – Path to save the key

  • password (Optional[str]) – Optional password for encryption

Rückgabetyp:

None

vyra_base.helper.crypto_helper.load_private_key(filepath, password=None)[Quellcode]

Load RSA private key from file.

Parameter:
  • filepath (Path) – Path to the key file

  • password (Optional[str]) – Optional password if key is encrypted

Rückgabetyp:

RSAPrivateKey

Rückgabe:

RSA private key

Verursacht:

FileNotFoundError – If key file doesn’t exist

Certificate Functions

vyra_base.helper.crypto_helper.create_csr(module_name, module_id, private_key, organization='VYRA Framework', country='DE')[Quellcode]

Create a Certificate Signing Request (CSR).

Parameter:
  • module_name (str) – Name of the module

  • module_id (str) – UUID of the module

  • private_key (RSAPrivateKey) – RSA private key

  • organization (str) – Organization name

  • country (str) – Country code (ISO 3166-1 alpha-2)

Rückgabetyp:

str

Rückgabe:

Base64-encoded PEM CSR

vyra_base.helper.crypto_helper.sign_csr(csr_pem, ca_private_key, ca_cert, validity_days=365)[Quellcode]

Sign a Certificate Signing Request with CA key.

Parameter:
  • csr_pem (str) – Base64-encoded PEM CSR

  • ca_private_key (RSAPrivateKey) – CA private key

  • ca_cert (Certificate) – CA certificate

  • validity_days (int) – Certificate validity in days

Rückgabetyp:

str

Rückgabe:

PEM-encoded signed certificate

Verursacht:

CertificateValidationError – If CSR is invalid

vyra_base.helper.crypto_helper.create_self_signed_cert(module_name, private_key, validity_days=365)[Quellcode]

Create a self-signed certificate (for CA or testing).

Parameter:
  • module_name (str) – Name for the certificate

  • private_key (RSAPrivateKey) – RSA private key

  • validity_days (int) – Certificate validity in days

Rückgabetyp:

Certificate

Rückgabe:

Self-signed certificate

vyra_base.helper.crypto_helper.verify_certificate(cert_pem, ca_cert_pem)[Quellcode]

Verify a certificate against a CA certificate.

Parameter:
  • cert_pem (str) – PEM-encoded certificate to verify

  • ca_cert_pem (str) – PEM-encoded CA certificate

Rückgabetyp:

bool

Rückgabe:

True if certificate is valid

Verursacht:

CertificateValidationError – If certificate is invalid

vyra_base.helper.crypto_helper.sign_message_rsa(message, private_key)[Quellcode]

Sign a message with RSA private key.

Parameter:
  • message (str) – Message to sign

  • private_key (RSAPrivateKey) – RSA private key

Rückgabetyp:

str

Rückgabe:

Base64-encoded signature

vyra_base.helper.crypto_helper.verify_message_rsa(message, signature, cert_pem)[Quellcode]

Verify RSA signature of a message.

Parameter:
  • message (str) – Original message

  • signature (str) – Base64-encoded signature

  • cert_pem (str) – PEM-encoded certificate containing public key

Rückgabetyp:

bool

Rückgabe:

True if signature is valid

Verursacht:

SignatureValidationError – If signature is invalid

Constants

vyra_base.helper.crypto_helper.DEFAULT_SESSION_DURATION_SECONDS: int = 3600

Default session validity duration (1 hour).

vyra_base.helper.crypto_helper.DEFAULT_TOKEN_LENGTH: int = 32

Default session token length in bytes.

vyra_base.helper.crypto_helper.DEFAULT_NONCE_LENGTH: int = 16

Default nonce length in bytes.

vyra_base.helper.crypto_helper.MAX_TIMESTAMP_DRIFT_SECONDS: int = 300

Maximum allowed timestamp drift (5 minutes).

vyra_base.helper.crypto_helper.HMAC_KEY_LENGTH: int = 32

HMAC key length in bytes (256 bits).

vyra_base.helper.crypto_helper.CERT_KEY_SIZE: int = 2048

RSA key size for certificates.

vyra_base.helper.crypto_helper.CERT_VALIDITY_DAYS: int = 365

Default certificate validity period in days.

vyra_base.helper.crypto_helper.MAX_SECURITY_PAYLOAD_SIZE: int = 4096

Maximum size for security payloads (4KB).

vyra_base.helper.crypto_helper.MAX_CERTIFICATE_SIZE: int = 8192

Maximum size for certificates (8KB).

ROS2 Interfaces

VBASERequestAccess Service

Service definition for requesting authenticated access.

Request Fields:

string module_name           # Name of requesting module
UUID module_id              # UUID of requesting module
string requested_role        # Role identifier
uint8 requested_sl          # Requested security level (1-5)
string certificate_csr       # CSR for Level 5 (optional)

Response Fields:

bool success                # Access granted
string message              # Result message
string session_token        # Session token
string hmac_key            # HMAC key (Level 4+)
string certificate          # Signed certificate (Level 5)
Time expires_at            # Token expiration
uint8 granted_sl           # Granted security level

SafetyMetadata Message

Message type containing security metadata.

Fields:

uint8 security_level        # Security level (1-5)
string sender_id           # Sender module UUID
Time timestamp             # Message creation time
uint64 nonce              # Random nonce
string security_payload    # Signature/HMAC
string algorithm_id        # Algorithm identifier

Usage Examples

Authentication

Server Side:

from vyra_base.security.security_manager import SecurityManager

class MyModule(Node, SecurityManager):
    def __init__(self):
        SecurityManager.__init__(
            self,
            max_security_level=SecurityLevel.HMAC
        )

Client Side:

from vyra_base.security.security_client import create_security_context

response = await access_client.call_async(request)

ctx = create_security_context(
    module_id=str(uuid),
    security_level=response.granted_sl,
    session_token=response.session_token,
    hmac_key=response.hmac_key
)

Secure Publishing

from vyra_base.security.security_client import SecurePublisher

pub = SecurePublisher(
    node,
    MessageType,
    'topic_name',
    security_context,
    qos_profile=10
)

msg = MessageType()
msg.data = "Hello"
pub.publish(msg)  # Automatic SafetyMetadata

Validation

from vyra_base.security.security_validator import SecurityValidator

validato = SecurityValidator(security_manager, strict_mode=True)

def callback(self, request, response):
    try:
        validato.validate_metadata(
            request.safety_metadata,
            required_level=SecurityLevel.HMAC
        )
        # Process request
    except SecurityError as e:
        response.success = False
        response.message = str(e)
    return response

Certificate Operations

from vyra_base.helper.crypto_helper import (
    generate_rsa_keypair,
    create_csr,
    sign_csr
)

# Generate key pair
private_key, public_key = generate_rsa_keypair()

# Create CSR
csr = create_csr("my_module", "uuid-string", private_key)

# Sign CSR (server side)
certificate = sign_csr(csr, ca_private_key, ca_cert)

Type Hints

from typing import Optional, Tuple
from datetime import datetime
import uuid

# SecurityManager
async def handle_request_access(
    self,
    module_name: str,
    module_id: uuid.UUID,
    requested_role: str,
    requested_sl: int,
    certificate_csr: str = ""
) -> Tuple[bool, str, str, str, str, datetime, int]:
    ...

# SecurityValidator
def validate_metadata(
    self,
    metadata: Any,
    required_level: SecurityLevel = SecurityLevel.NONE,
    additional_data: str = ""
) -> Optional[SecuritySession]:
    ...

# Crypto helpers
def compute_hmac_signature(
    message: str,
    hmac_key: str
) -> str:
    ...

Error Handling

Exception Handling Pattern

from vyra_base.security.security_levels import (
    SecurityError,
    SignatureValidationError,
    TimestampValidationError,
    SessionExpiredError
)

try:
    validato.validate_metadata(metadata, SecurityLevel.HMAC)
except SignatureValidationError:
    # Handle invalid signature
    pass
except TimestampValidationError:
    # Handle timestamp issues
    pass
except SessionExpiredError:
    # Re-authenticate
    pass
except SecurityError as e:
    # Handle general security error
    pass

Testing

Unit Test Example

import pytest
from vyra_base.security.security_manager import SecurityManager

@pytest.mark.asyncio
async def test_authentication():
    manager = SecurityManager(
        max_security_level=SecurityLevel.HMAC
    )

    result = await manager.handle_request_access(
        "test_module",
        uuid.uuid4(),
        "tester",
        4,
        ""
    )

    assert result[0] is True  # success
    assert result[6] == 4     # granted_sl

Mock Testing

from unittest.mock import Mock, patch

@patch('vyra_base.helper.crypto_helper.generate_hmac_key')
def test_with_mock_key(mock_key):
    mock_key.return_value = "0" * 64

    manager = SecurityManager()
    # Test with predictable key
    ...

See Also