vyra_base.com.feeder package

Submodules

vyra_base.com.feeder.interfaces module

Abstract interface for VYRA feeders.

IFeeder is the contracts all feeders must satisfy. It is kept minimal so that both built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) and user-defined custom feeders can implement it without inheriting a large base class.

class vyra_base.com.feeder.interfaces.IFeeder[Quellcode]

Bases: ABC

Abstract base class for all VYRA feeders.

A feeder is a publisher that continuously pushes domain-typed data (state changes, news messages, errors, …) over a transport protocol (Zenoh, ROS2, Redis, UDS, …).

The protocol is resolved automatically from the module’s interface config JSON via FeederConfigResolver.

Lifecycle:

feeder = MyFeeder(...)
await feeder.start()          # resolve protocol + create publisher
await feeder.feed(my_data)          # async-safe publish
alive = feeder.is_alive()     # health check
feeder.feed_count             # read metrics
abstractmethod async start()[Quellcode]

Initialise the feeder: resolve the transport protocol, create the publisher, and flush any buffered messages.

Must be awaited before the first feed() call.

Rückgabetyp:

None

abstractmethod async feed(message)[Quellcode]

Publish message immediately.

If called before start() the message is buffered and flushed once the publisher is ready.

Parameter:

message (Any) – Domain object to publish.

Rückgabetyp:

None

abstractmethod feed_sync(message)[Quellcode]

Synchronous version of feed(). Not all feeders support this.

Parameter:

message (Any) – Domain object to publish.

Verursacht:

NotImplementedError – If the feeder does not support synchronous feeding.

Rückgabetyp:

None

abstractmethod get_feeder_name()[Quellcode]

Return the feeder’s name (= functionname in interface config).

Rückgabetyp:

str

get_protocol()[Quellcode]

Return the resolved transport protocol string, or None if the feeder has not been started yet.

Rückgabetyp:

Optional[str]

is_alive()[Quellcode]

Return True if the feeder’s publisher is ready and the backing transport is reachable.

Feeders that do not implement a liveness check always return True.

Rückgabetyp:

bool

is_ready()[Quellcode]

Return True after start() has completed successfully.

Rückgabetyp:

bool

get_buffer()[Quellcode]

Return the internal pre-start message buffer.

Rückgabetyp:

deque

property feed_count: int

Number of messages successfully published since the feeder started.

Rückgabetyp:

int

property error_count: int

Number of publish errors since the feeder started.

Rückgabetyp:

int

vyra_base.com.feeder.feeder module

class vyra_base.com.feeder.feeder.BaseFeeder[Quellcode]

Bases: IFeeder

Concrete base class for all VYRA feeders.

Implements IFeeder and provides:

  • Protocol auto-resolution via FeederConfigResolver — the transport protocol is read from the module’s interface config JSON (functionname matched against feeder_name).

  • Pre-start buffer — messages fed before start() are queued and flushed automatically.

  • Metricsfeed_count, error_count, last_feed_at.

  • Health checkis_alive() probes the backing publisher.

  • Retry policy — configurable max_retries and retry_delay.

Abstract class — subclasses set _feederName, _type, optionally _interface_paths.

__init__()[Quellcode]

Initialize the BaseFeeder.

Rückgabetyp:

None

get_feeder_name()[Quellcode]

Return the feeder name (= functionname in interface config).

Rückgabetyp:

str

get_protocol()[Quellcode]

Return the resolved transport protocol, or None before start.

Rückgabetyp:

Optional[str]

is_alive()[Quellcode]

Return True if the publisher is set and available.

Rückgabetyp:

bool

is_ready()[Quellcode]

Return True after start() has completed successfully.

Rückgabetyp:

bool

get_buffer()[Quellcode]

Return the pre-start message buffer.

Rückgabetyp:

deque

property feed_count: int

Number of successfully published messages.

property error_count: int

Number of publish errors.

property last_feed_at: datetime | None

Timestamp of the last successful feed() call.

property debounced_duplicate_count: int

Number of duplicate messages suppressed by debouncing.

register_condition(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[Quellcode]

Register a synchronous bool-returning condition callback.

Rückgabetyp:

str

Parameter:
  • condition_function (Callable[[dict[str, Any]], bool])

  • name (str | None)

  • tag (str)

  • execution_point (Literal['BEFORE', 'DURING', 'AFTER', 'ALWAYS'])

  • success_message (str | None)

  • failure_message (str | None)

unregister_condition(name)[Quellcode]

Remove a previously registered condition callback by name.

Rückgabetyp:

bool

Parameter:

name (str)

evaluate_conditions(context, *, rule_names=None, tags=None, execution_point=None)[Quellcode]

Evaluate registered conditions and return resulting messages.

By default all registered conditions are evaluated. Use rule_names and/or tags to limit evaluation to a subset.

Rückgabetyp:

list[tuple[str, str]]

Parameter:
set_interface_paths(paths)[Quellcode]

Override the interface paths used for protocol resolution.

Called by a VyraEntity after constructing the feeder to provide module-specific config paths.

Parameter:

paths (Union[Sequence[str], list[Path]]) – List of directory or JSON file paths.

Rückgabetyp:

None

async create(loggingOn=False)[Quellcode]

Create the publisher using protocol resolved from interface config.

Resolution order:

  1. FeederConfigResolver.resolve(feeder_name, interface_paths) — reads the module’s JSON config, maps tags to a protocol.

  2. If no config found (or interface_paths empty): fall back to InterfaceFactory.create_publisher with the default chain [ZENOH, ROS2, REDIS, UDS].

Verursacht:

FeederException – If no protocol is available at all.

Parameter:

loggingOn (bool) – Emit feeder messages also to the base logger.

Rückgabetyp:

None

async start()[Quellcode]

Start the feeder (implements IFeeder).

Subclasses may override to add extra initialisation before calling await super().start().

Rückgabetyp:

None

async feed(msg)[Quellcode]

Enqueue msg for publishing (implements IFeeder).

If the feeder is not yet ready the message is buffered. Otherwise the publish path is executed synchronously (event-loop aware).

Parameter:

msg (Any) – Message to publish.

Rückgabetyp:

None

feed_sync(msg)[Quellcode]

Sync version of feed() for use in sync contexts.

Rückgabetyp:

None

Parameter:

msg (Any)

add_handler(handler)[Quellcode]

Attach a CommunicationHandler.

Parameter:

handler (CommunicationHandler) – Handler instance to attach.

Rückgabe:

True if added, False if already present.

Rückgabetyp:

bool

add_handler_class(handler_class)[Quellcode]

Register a handler class to be instantiated during create().

Parameter:

handler_class (Type[CommunicationHandler]) – Handler class (must subclass CommunicationHandler).

Rückgabetyp:

None

vyra_base.com.feeder.custom_feeder module

Custom feeder base class for VYRA application developers.

CustomBaseFeeder is the recommended starting point when you need to publish proprietary domain data (e.g. sensor readings, machine parameters, alarm setpoints) over the VYRA transport layer.

Quick start:

from vyra_base.com.feeder.custom_feeder import CustomBaseFeeder
from vyra_base.com.feeder.registry import register_feeder
from vyra_base.defaults.entries import ModuleEntry

@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
    """Publishes temperature readings from a PLC."""

    def _build_message(self, raw: float) -> dict:
        return {"value": raw, "unit": "°C", "sensor": self._sensor_id}

    def _validate(self, raw: float) -> bool:
        return -50.0 <= raw <= 300.0

# Usage in your module application:
feeder = TemperatureFeeder(
    feeder_name="TemperatureFeed",
    module_entity=my_module_entity,
)
await feeder.start()
await feeder.feed(87.3)     # publishes {"value": 87.3, "unit": "°C", "sensor": "PT-01"}

The protocol is resolved automatically from the interface config JSON — add a "type": "publisher", "functionname": "TemperatureFeed" entry with the appropriate tags (e.g. ["zenoh"]).

class vyra_base.com.feeder.custom_feeder.CustomBaseFeeder(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]

Bases: BaseFeeder

Base class for user-defined VYRA feeders.

Extends BaseFeeder with two hooks that subclasses can override to implement custom message mapping and validation without touching the transport layer.

Parameter:
  • feeder_name (str) – The feeder’s name. Must match the functionname in the module’s interface config JSON so that the protocol resolver can find the correct transport entry.

  • module_entity (ModuleEntry) – Module configuration (name, uuid, …).

  • message_type (Any) – Optional message class forwarded to the publisher (e.g. a protobuf class). Pass None for dict-based protocols.

  • node (Optional[Any]) – ROS2 node (required only when using the ROS2 transport).

  • loggingOn (bool) – If True, every feed() call is also logged via the standard Python logger.

__init__(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]

Initialize the BaseFeeder.

Parameter:
async feed(raw)[Quellcode]

Validate, transform, and publish raw.

  1. Call _validate() — skip on False.

  2. Call _build_message() — transform to domain object.

  3. Delegate to feed().

Parameter:

raw (Any) – The raw value to validate, transform, and publish.

Rückgabetyp:

None

async start()[Quellcode]

Resolve protocol from interface config and start the feeder.

Rückgabetyp:

None

monitor(*, tag=None, label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[Quellcode]

Return a runtime-monitoring decorator bound to this feeder.

Rückgabetyp:

Callable

Parameter:
  • tag (str | None)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

vyra_base.com.feeder.config_resolver module

Interface-config based protocol resolver for VYRA feeders.

When a feeder starts, it calls FeederConfigResolver.resolve() with its feeder_name (= functionname in the JSON config). The resolver searches all interface config JSON files for a matching publisher entry and returns which transport protocol (zenoh, ros2, redis, uds, …) to use, which protobuf/message file type to load, and what tags are attached to the interface.

Interface config file schema (array of entries):

[
    {
        "type":         "message",          # only "message" is relevant for feeders
        "functionname": "StateFeed",        # matched against feeder_name
        "tags":         ["zenoh"],          # → ProtocolType.ZENOH
        "filetype":     ["VBASEStateFeed.proto"],
        "displayname":  "State Feed",
        "description":  "...",
        ...
    },
    ...
]

If no matching publisher entry is found the resolver logs an actionable error message and uses fuzzy_match() to propose similar publisher names from all discovered configs.

class vyra_base.com.feeder.config_resolver.FeederResolverResult(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')[Quellcode]

Bases: object

Result returned by FeederConfigResolver.resolve().

Parameter:
  • feeder_name (str) – The functionname that was matched.

  • protocol (str) – The resolved protocol string (e.g. "zenoh").

  • tags (list[str]) – Full list of tags from the matching entry.

  • file_types (list[str]) – List of interface file names (e.g. ["VBASEStateFeed.proto"]).

  • display_name (str) – Human-readable display name from the config.

  • description (str) – Description from the config.

  • raw_entry (dict) – The full raw JSON entry dict.

  • source_file (str) – Path to the JSON file that contained this entry.

feeder_name: str
protocol: str
tags: list[str]
file_types: list[str]
display_name: str = ''
description: str = ''
raw_entry: dict
source_file: str = ''
__init__(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')
Parameter:
Rückgabetyp:

None

class vyra_base.com.feeder.config_resolver.FeederConfigResolver[Quellcode]

Bases: object

Resolves a feeder’s transport protocol from interface config JSON files.

The resolver is stateless: every resolve() call re-reads and parses the config files. In practice the files are small and cached by the OS page cache, so this is not a performance concern during module initialisation.

Usage:

from vyra_base.com.feeder.config_resolver import FeederConfigResolver

result = FeederConfigResolver.resolve(
    feeder_name="StateFeed",
    interface_paths=["/workspace/install/my_interfaces/share/my_interfaces/config"],
)
if result is None:
    raise RuntimeError("StateFeed not found in interface configs")

print(result.protocol)   # "zenoh"
print(result.file_types) # ["VBASEStateFeed.proto"]
static resolve(feeder_name, interface_paths, *, use_fallback=True, case_sensitive=False)[Quellcode]

Find the interface config entry for feeder_name and resolve its transport protocol.

Searches all JSON files found (recursively) under each path in interface_paths. Only entries with "type": "publisher" are considered.

Parameter:
  • feeder_name (str) – The functionname to look up (e.g. "StateFeed").

  • interface_paths (Sequence[str | Path]) – Directories or JSON file paths to search.

  • use_fallback (bool) – If True and the matched entry carries no recognised protocol tag, fall back to the default chain [zenoh, ros2, redis, uds] and use the first available protocol. Defaults to True.

  • case_sensitive (bool) – Whether functionname comparison is case-sensitive. Defaults to False.

Rückgabe:

A FeederResolverResult when a match is found, None otherwise. On failure an actionable error is logged with fuzzy-match suggestions.

Rückgabetyp:

Optional[FeederResolverResult]

vyra_base.com.feeder.registry module

Feeder registry for VYRA custom feeders.

FeederRegistry is a thread-safe singleton that keeps track of all registered IFeeder subclasses. Use the register_feeder() decorator to register a feeder class so the framework (and downstream tooling) can discover it by name.

Usage:

from vyra_base.com.feeder.registry import register_feeder, FeederRegistry
from vyra_base.com.feeder.custom_feeder import CustomBaseFeeder

@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
    ...

# Discover & instantiate
cls = FeederRegistry.get("TemperatureFeed")
feeder = cls(...)
class vyra_base.com.feeder.registry.FeederRegistry[Quellcode]

Bases: object

Thread-safe registry of IFeeder subclasses.

All built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) are NOT pre-registered here — the registry is exclusively for user-defined custom feeders added via register_feeder().

classmethod register(name, feeder_class)[Quellcode]

Register feeder_class under name.

Parameter:
  • name (str) – Unique key (normally the functionname from the interface config, e.g. "TemperatureFeed").

  • feeder_class (Type[IFeeder]) – The feeder class to register. Must be a subclass of IFeeder.

Verursacht:
  • TypeError – If feeder_class does not subclass IFeeder.

  • ValueError – If name is already registered (use override=True to replace).

Rückgabetyp:

None

classmethod get(name)[Quellcode]

Return the feeder class registered under name, or None.

Parameter:

name (str) – Feeder name to look up.

Rückgabetyp:

Optional[Type[IFeeder]]

classmethod list_feeders()[Quellcode]

Return a sorted list of all registered feeder names.

Rückgabetyp:

list[str]

classmethod unregister(name)[Quellcode]

Remove the feeder registered under name.

Parameter:

name (str) – Feeder name to remove.

Rückgabe:

True if the entry existed and was removed.

Rückgabetyp:

bool

vyra_base.com.feeder.registry.register_feeder(name)[Quellcode]

Class decorator that registers a feeder in FeederRegistry.

The decorated class must be a subclass of IFeeder (or CustomBaseFeeder).

Parameter:

name (str) – The name under which this feeder is registered. Should match the functionname in the module’s interface config JSON so that the framework can select the correct protocol automatically.

Verursacht:

TypeError – If the decorated class is not an IFeeder subclass.

Example:

@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
    pass

vyra_base.com.feeder.state_feeder module

class vyra_base.com.feeder.state_feeder.StateFeeder(node, module_entity, loggingOn=False)[Quellcode]

Bases: BaseFeeder

Responsible for loading a VYRA Handler and feeding StateEntry elements to this handler.

Parameter:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Verursacht:

FeederException – If the publisher cannot be created.

Variablen:
  • _feederName – Name of the feeder.

  • _doc – Documentation string for the feeder.

  • _level – Logging level.

  • _node – VyraNode instance.

  • _module_entity – Module configuration.

__init__(node, module_entity, loggingOn=False)[Quellcode]

Initializes a StateFeeder instance for sending changes in state of a module.

Parameter:
  • node (Optional[Any]) – The VyraNode instance (optional, None if ROS2 unavailable).

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Verursacht:

FeederException – If the publisher cannot be created.

async start()[Quellcode]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads StateFeed entry, picks tags).

Rückgabetyp:

None

async feed(stateElement)[Quellcode]

Feed a state entry to the feeder.

Validates the input type, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameter:

stateElement (StateEntry) – The state entry to feed.

Verursacht:

FeederException – If the provided element is not of type StateEntry.

Rückgabetyp:

None

feed_sync(msg)[Quellcode]

Sync version of feed() for use in sync contexts.

Rückgabetyp:

None

Parameter:

msg (Any)

vyra_base.com.feeder.news_feeder module

class vyra_base.com.feeder.news_feeder.NewsFeeder(node, module_entity, loggingOn=False)[Quellcode]

Bases: BaseFeeder

Collection of the news messages.

Parameter:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – Module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Verursacht:

FeederException – If the publisher cannot be created.

__init__(node, module_entity, loggingOn=False)[Quellcode]

Initialize the BaseFeeder.

Parameter:
async start()[Quellcode]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads NewsFeed entry, picks tags).

Rückgabetyp:

None

async feed(newsElement)[Quellcode]

Feed a news entry to the feeder.

Normalises the input to a NewsEntry, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameter:

newsElement (Union[NewsEntry, str, list]) – The news entry to be fed. Can be a string or list of strings to be processed into a NewsEntry, or a NewsEntry object.

Verursacht:

FeederException – If the type of newsElement is not supported.

Rückgabetyp:

None

feed_sync(msg)[Quellcode]

Sync version of feed() — normalises input to NewsEntry before delegating.

Rückgabetyp:

None

Parameter:

msg (Any)

register_news_condition(condition_function, *, name=None, execution_point='ALWAYS', success_message=None, failure_message=None, tag='news')[Quellcode]

Register a synchronous bool condition for tracked feeder messages.

Rückgabetyp:

str

Parameter:
  • condition_function (Callable[[dict[str, Any]], bool])

  • name (str | None)

  • execution_point (Literal['BEFORE', 'DURING', 'AFTER', 'ALWAYS'])

  • success_message (str | None)

  • failure_message (str | None)

  • tag (str)

monitor(*, tag='news', label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[Quellcode]

Return a runtime-monitoring decorator bound to this feeder.

Rückgabetyp:

Callable

Parameter:
  • tag (str)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

async evaluate_tracked_conditions(context)[Quellcode]

Evaluate registered conditions and publish resulting tracker messages.

Rückgabetyp:

None

Parameter:

context (dict[str, Any])

evaluate_tracked_conditions_sync(context)[Quellcode]

Sync variant of evaluate_tracked_conditions().

Rückgabetyp:

None

Parameter:

context (dict[str, Any])

build_newsfeed(*args)[Quellcode]

Build a well structured newsfeed entry from plain text and module information.

Parameter:

args (Any) – The arguments to be processed into a news entry. Can be a string or list of strings.

Rückgabe:

A structured news entry containing the message, level, timestamp, UUID, module name, module ID, module template, and type.

Rückgabetyp:

NewsEntry

Verursacht:

FeederException – If the type of the message level is not valid.

vyra_base.com.feeder.news_feeder.extract_level_from_msg(message)[Quellcode]

Extract the message level from a given message string.

Parameter:

message (str) – The message string from which to extract the level.

Rückgabe:

The extracted message level if found, otherwise None.

Rückgabetyp:

Optional[MESSAGE_LEVEL]

vyra_base.com.feeder.error_feeder module

class vyra_base.com.feeder.error_feeder.ErrorFeeder(node, module_entity, loggingOn=True)[Quellcode]

Bases: BaseFeeder

Collection of the error messages.

Parameter:
  • node (Optional[Any]) – The VyraNode instance associated with this feeder.

  • module_entity (ModuleEntry) – The module configuration entry.

  • loggingOn (bool) – Flag to enable or disable logging next to feeding. Defaults to False.

Verursacht:

FeederException – If the publisher cannot be created.

__init__(node, module_entity, loggingOn=True)[Quellcode]

Initialize the BaseFeeder.

Parameter:
async start()[Quellcode]

Starts the feeder by initializing handlers.

Automatically resolves the transport protocol from the module’s interface config (reads ErrorFeed entry, picks tags).

Rückgabetyp:

None

async feed(errorElement)[Quellcode]

Feed an error entry to the feeder.

Normalises the input to an ErrorEntry, then delegates to feed() which calls _prepare_entry_for_publish() followed by MessageMapper for protocol-aware conversion.

Parameter:

errorElement (Union[ErrorEntry, dict]) – The error entry to be fed. Can be a dictionary with error details or an ErrorEntry object.

Verursacht:

FeederException – If the type of errorElement is neither a dict nor an ErrorEntry.

Rückgabetyp:

None

feed_sync(errorElement)[Quellcode]

Sync version of feed() with ErrorEntry normalization.

Rückgabetyp:

None

Parameter:

errorElement (ErrorEntry | dict)

monitor(*, tag='error', label=None, severity='WARNING', entity=None, during_interval_seconds=0.05)[Quellcode]

Return an exception-monitoring decorator bound to this feeder.

Rückgabetyp:

Callable

Parameter:
  • tag (str)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

build_errorfeed(errorDict)[Quellcode]

Build an error entry from the given keyword arguments.

Parameter:

errorDict (dict) –

A dictionary containing error details. Keys are:

  • code: int16 - Error code (default: 0x00000000)

  • uuid: UUID - Unique identifier for the error (default: a new UUID)

  • description: str - Description of the error (default: ‚‘)

  • solution: str - Suggested solution for the error (default: ‚‘)

  • miscellaneous: str - Additional information (default: ‚‘)

  • level: ErrorEntry.ERROR_LEVEL - Level of the error (default: ErrorEntry.ERROR_LEVEL.MINOR_FAULT)

Rückgabe:

An instance of vyra_base.defaults.entries.ErrorEntry populated with the provided details.

Rückgabetyp:

ErrorEntry

Module contents

VYRA Feeder package — multi-protocol data publishers.

Available feeders: - StateFeeder - NewsFeeder - ErrorFeeder - CustomBaseFeeder

Protocol resolution: - FeederConfigResolver - FeederResolverResult

Registry: - FeederRegistry - register_feeder()

Interfaces: - IFeeder

class vyra_base.com.feeder.FeederConfigResolver[Quellcode]

Bases: object

Resolves a feeder’s transport protocol from interface config JSON files.

The resolver is stateless: every resolve() call re-reads and parses the config files. In practice the files are small and cached by the OS page cache, so this is not a performance concern during module initialisation.

Usage:

from vyra_base.com.feeder.config_resolver import FeederConfigResolver

result = FeederConfigResolver.resolve(
    feeder_name="StateFeed",
    interface_paths=["/workspace/install/my_interfaces/share/my_interfaces/config"],
)
if result is None:
    raise RuntimeError("StateFeed not found in interface configs")

print(result.protocol)   # "zenoh"
print(result.file_types) # ["VBASEStateFeed.proto"]
static resolve(feeder_name, interface_paths, *, use_fallback=True, case_sensitive=False)[Quellcode]

Find the interface config entry for feeder_name and resolve its transport protocol.

Searches all JSON files found (recursively) under each path in interface_paths. Only entries with "type": "publisher" are considered.

Parameter:
  • feeder_name (str) – The functionname to look up (e.g. "StateFeed").

  • interface_paths (Sequence[str | Path]) – Directories or JSON file paths to search.

  • use_fallback (bool) – If True and the matched entry carries no recognised protocol tag, fall back to the default chain [zenoh, ros2, redis, uds] and use the first available protocol. Defaults to True.

  • case_sensitive (bool) – Whether functionname comparison is case-sensitive. Defaults to False.

Rückgabe:

A FeederResolverResult when a match is found, None otherwise. On failure an actionable error is logged with fuzzy-match suggestions.

Rückgabetyp:

Optional[FeederResolverResult]

class vyra_base.com.feeder.FeederResolverResult(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')[Quellcode]

Bases: object

Result returned by FeederConfigResolver.resolve().

Parameter:
  • feeder_name (str) – The functionname that was matched.

  • protocol (str) – The resolved protocol string (e.g. "zenoh").

  • tags (list[str]) – Full list of tags from the matching entry.

  • file_types (list[str]) – List of interface file names (e.g. ["VBASEStateFeed.proto"]).

  • display_name (str) – Human-readable display name from the config.

  • description (str) – Description from the config.

  • raw_entry (dict) – The full raw JSON entry dict.

  • source_file (str) – Path to the JSON file that contained this entry.

feeder_name: str
protocol: str
tags: list[str]
file_types: list[str]
display_name: str = ''
description: str = ''
raw_entry: dict
source_file: str = ''
__init__(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')
Parameter:
Rückgabetyp:

None

class vyra_base.com.feeder.IFeeder[Quellcode]

Bases: ABC

Abstract base class for all VYRA feeders.

A feeder is a publisher that continuously pushes domain-typed data (state changes, news messages, errors, …) over a transport protocol (Zenoh, ROS2, Redis, UDS, …).

The protocol is resolved automatically from the module’s interface config JSON via FeederConfigResolver.

Lifecycle:

feeder = MyFeeder(...)
await feeder.start()          # resolve protocol + create publisher
await feeder.feed(my_data)          # async-safe publish
alive = feeder.is_alive()     # health check
feeder.feed_count             # read metrics
abstractmethod async start()[Quellcode]

Initialise the feeder: resolve the transport protocol, create the publisher, and flush any buffered messages.

Must be awaited before the first feed() call.

Rückgabetyp:

None

abstractmethod async feed(message)[Quellcode]

Publish message immediately.

If called before start() the message is buffered and flushed once the publisher is ready.

Parameter:

message (Any) – Domain object to publish.

Rückgabetyp:

None

abstractmethod feed_sync(message)[Quellcode]

Synchronous version of feed(). Not all feeders support this.

Parameter:

message (Any) – Domain object to publish.

Verursacht:

NotImplementedError – If the feeder does not support synchronous feeding.

Rückgabetyp:

None

abstractmethod get_feeder_name()[Quellcode]

Return the feeder’s name (= functionname in interface config).

Rückgabetyp:

str

get_protocol()[Quellcode]

Return the resolved transport protocol string, or None if the feeder has not been started yet.

Rückgabetyp:

Optional[str]

is_alive()[Quellcode]

Return True if the feeder’s publisher is ready and the backing transport is reachable.

Feeders that do not implement a liveness check always return True.

Rückgabetyp:

bool

is_ready()[Quellcode]

Return True after start() has completed successfully.

Rückgabetyp:

bool

get_buffer()[Quellcode]

Return the internal pre-start message buffer.

Rückgabetyp:

deque

property feed_count: int

Number of messages successfully published since the feeder started.

Rückgabetyp:

int

property error_count: int

Number of publish errors since the feeder started.

Rückgabetyp:

int

class vyra_base.com.feeder.CustomBaseFeeder(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]

Bases: BaseFeeder

Base class for user-defined VYRA feeders.

Extends BaseFeeder with two hooks that subclasses can override to implement custom message mapping and validation without touching the transport layer.

Parameter:
  • feeder_name (str) – The feeder’s name. Must match the functionname in the module’s interface config JSON so that the protocol resolver can find the correct transport entry.

  • module_entity (ModuleEntry) – Module configuration (name, uuid, …).

  • message_type (Any) – Optional message class forwarded to the publisher (e.g. a protobuf class). Pass None for dict-based protocols.

  • node (Optional[Any]) – ROS2 node (required only when using the ROS2 transport).

  • loggingOn (bool) – If True, every feed() call is also logged via the standard Python logger.

__init__(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]

Initialize the BaseFeeder.

Parameter:
async feed(raw)[Quellcode]

Validate, transform, and publish raw.

  1. Call _validate() — skip on False.

  2. Call _build_message() — transform to domain object.

  3. Delegate to feed().

Parameter:

raw (Any) – The raw value to validate, transform, and publish.

Rückgabetyp:

None

async start()[Quellcode]

Resolve protocol from interface config and start the feeder.

Rückgabetyp:

None

monitor(*, tag=None, label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[Quellcode]

Return a runtime-monitoring decorator bound to this feeder.

Rückgabetyp:

Callable

Parameter:
  • tag (str | None)

  • label (str | None)

  • severity (str)

  • entity (Any)

  • during_interval_seconds (float)

class vyra_base.com.feeder.FeederRegistry[Quellcode]

Bases: object

Thread-safe registry of IFeeder subclasses.

All built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) are NOT pre-registered here — the registry is exclusively for user-defined custom feeders added via register_feeder().

classmethod register(name, feeder_class)[Quellcode]

Register feeder_class under name.

Parameter:
  • name (str) – Unique key (normally the functionname from the interface config, e.g. "TemperatureFeed").

  • feeder_class (Type[IFeeder]) – The feeder class to register. Must be a subclass of IFeeder.

Verursacht:
  • TypeError – If feeder_class does not subclass IFeeder.

  • ValueError – If name is already registered (use override=True to replace).

Rückgabetyp:

None

classmethod get(name)[Quellcode]

Return the feeder class registered under name, or None.

Parameter:

name (str) – Feeder name to look up.

Rückgabetyp:

Optional[Type[IFeeder]]

classmethod list_feeders()[Quellcode]

Return a sorted list of all registered feeder names.

Rückgabetyp:

list[str]

classmethod unregister(name)[Quellcode]

Remove the feeder registered under name.

Parameter:

name (str) – Feeder name to remove.

Rückgabe:

True if the entry existed and was removed.

Rückgabetyp:

bool

vyra_base.com.feeder.register_feeder(name)[Quellcode]

Class decorator that registers a feeder in FeederRegistry.

The decorated class must be a subclass of IFeeder (or CustomBaseFeeder).

Parameter:

name (str) – The name under which this feeder is registered. Should match the functionname in the module’s interface config JSON so that the framework can select the correct protocol automatically.

Verursacht:

TypeError – If the decorated class is not an IFeeder subclass.

Example:

@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
    pass
class vyra_base.com.feeder.FeedConditionRegistry[Quellcode]

Bases: object

__init__()[Quellcode]
register(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[Quellcode]
Rückgabetyp:

str

Parameter:
  • condition_function (Callable[[dict[str, Any]], bool])

  • name (str | None)

  • tag (str)

  • execution_point (Literal['BEFORE', 'DURING', 'AFTER', 'ALWAYS'])

  • success_message (str | None)

  • failure_message (str | None)

unregister(name)[Quellcode]
Rückgabetyp:

bool

Parameter:

name (str)

evaluate(context, *, rule_names=None, tags=None, execution_point=None)[Quellcode]
Rückgabetyp:

list[tuple[str, str]]

Parameter:
class vyra_base.com.feeder.FeedDebouncer(window_seconds=5.0)[Quellcode]

Bases: object

Parameter:

window_seconds (float)

__init__(window_seconds=5.0)[Quellcode]
Parameter:

window_seconds (float)

evaluate(signature)[Quellcode]
Rückgabetyp:

DebounceHit

Parameter:

signature (str)