vyra_base.com.feeder package¶
Submodules¶
vyra_base.com.feeder.interfaces module¶
Abstract interface for VYRA feeders.
IFeeder is the contracts all feeders must satisfy. It is kept minimal
so that both built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) and
user-defined custom feeders can implement it without inheriting a large base
class.
- class vyra_base.com.feeder.interfaces.IFeeder[Quellcode]
Bases:
ABCAbstract base class for all VYRA feeders.
A feeder is a publisher that continuously pushes domain-typed data (state changes, news messages, errors, …) over a transport protocol (Zenoh, ROS2, Redis, UDS, …).
The protocol is resolved automatically from the module’s interface config JSON via
FeederConfigResolver.Lifecycle:
feeder = MyFeeder(...) await feeder.start() # resolve protocol + create publisher await feeder.feed(my_data) # async-safe publish alive = feeder.is_alive() # health check feeder.feed_count # read metrics
- abstractmethod async start()[Quellcode]
Initialise the feeder: resolve the transport protocol, create the publisher, and flush any buffered messages.
Must be awaited before the first
feed()call.- Rückgabetyp:
- abstractmethod async feed(message)[Quellcode]
Publish message immediately.
If called before
start()the message is buffered and flushed once the publisher is ready.
- abstractmethod feed_sync(message)[Quellcode]
Synchronous version of
feed(). Not all feeders support this.- Parameter:
message (
Any) – Domain object to publish.- Verursacht:
NotImplementedError – If the feeder does not support synchronous feeding.
- Rückgabetyp:
- abstractmethod get_feeder_name()[Quellcode]
Return the feeder’s name (=
functionnamein interface config).- Rückgabetyp:
- get_protocol()[Quellcode]
Return the resolved transport protocol string, or
Noneif the feeder has not been started yet.
- is_alive()[Quellcode]
Return
Trueif the feeder’s publisher is ready and the backing transport is reachable.Feeders that do not implement a liveness check always return
True.- Rückgabetyp:
- is_ready()[Quellcode]
Return
Trueafterstart()has completed successfully.- Rückgabetyp:
- get_buffer()[Quellcode]
Return the internal pre-start message buffer.
- Rückgabetyp:
- property feed_count: int
Number of messages successfully published since the feeder started.
- Rückgabetyp:
vyra_base.com.feeder.feeder module¶
- class vyra_base.com.feeder.feeder.BaseFeeder[Quellcode]
Bases:
IFeederConcrete base class for all VYRA feeders.
Implements
IFeederand provides:Protocol auto-resolution via
FeederConfigResolver— the transport protocol is read from the module’s interface config JSON (functionnamematched againstfeeder_name).Pre-start buffer — messages fed before
start()are queued and flushed automatically.Metrics —
feed_count,error_count,last_feed_at.Health check —
is_alive()probes the backing publisher.Retry policy — configurable
max_retriesandretry_delay.
Abstract class — subclasses set
_feederName,_type, optionally_interface_paths.- __init__()[Quellcode]
Initialize the BaseFeeder.
- Rückgabetyp:
None
- get_feeder_name()[Quellcode]
Return the feeder name (=
functionnamein interface config).- Rückgabetyp:
- get_protocol()[Quellcode]
Return the resolved transport protocol, or
Nonebefore start.
- is_alive()[Quellcode]
Return
Trueif the publisher is set and available.- Rückgabetyp:
- is_ready()[Quellcode]
Return
Trueafterstart()has completed successfully.- Rückgabetyp:
- get_buffer()[Quellcode]
Return the pre-start message buffer.
- Rückgabetyp:
- property feed_count: int
Number of successfully published messages.
- property error_count: int
Number of publish errors.
- property debounced_duplicate_count: int
Number of duplicate messages suppressed by debouncing.
- register_condition(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[Quellcode]
Register a synchronous bool-returning condition callback.
- unregister_condition(name)[Quellcode]
Remove a previously registered condition callback by name.
- evaluate_conditions(context, *, rule_names=None, tags=None, execution_point=None)[Quellcode]
Evaluate registered conditions and return resulting messages.
By default all registered conditions are evaluated. Use
rule_namesand/ortagsto limit evaluation to a subset.
- set_interface_paths(paths)[Quellcode]
Override the interface paths used for protocol resolution.
Called by a
VyraEntityafter constructing the feeder to provide module-specific config paths.
- async create(loggingOn=False)[Quellcode]
Create the publisher using protocol resolved from interface config.
Resolution order:
FeederConfigResolver.resolve(feeder_name, interface_paths)— reads the module’s JSON config, mapstagsto a protocol.If no config found (or
interface_pathsempty): fall back toInterfaceFactory.create_publisherwith the default chain[ZENOH, ROS2, REDIS, UDS].
- Verursacht:
FeederException – If no protocol is available at all.
- Parameter:
loggingOn (
bool) – Emit feeder messages also to the base logger.- Rückgabetyp:
- async start()[Quellcode]
Start the feeder (implements
IFeeder).Subclasses may override to add extra initialisation before calling
await super().start().- Rückgabetyp:
- async feed(msg)[Quellcode]
Enqueue msg for publishing (implements
IFeeder).If the feeder is not yet ready the message is buffered. Otherwise the publish path is executed synchronously (event-loop aware).
- feed_sync(msg)[Quellcode]
Sync version of
feed()for use in sync contexts.
- add_handler(handler)[Quellcode]
Attach a
CommunicationHandler.- Parameter:
handler (
CommunicationHandler) – Handler instance to attach.- Rückgabe:
Trueif added,Falseif already present.- Rückgabetyp:
- add_handler_class(handler_class)[Quellcode]
Register a handler class to be instantiated during
create().- Parameter:
handler_class (
Type[CommunicationHandler]) – Handler class (must subclass CommunicationHandler).- Rückgabetyp:
vyra_base.com.feeder.custom_feeder module¶
Custom feeder base class for VYRA application developers.
CustomBaseFeeder is the recommended starting point when you need to
publish proprietary domain data (e.g. sensor readings, machine parameters,
alarm setpoints) over the VYRA transport layer.
Quick start:
from vyra_base.com.feeder.custom_feeder import CustomBaseFeeder
from vyra_base.com.feeder.registry import register_feeder
from vyra_base.defaults.entries import ModuleEntry
@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
"""Publishes temperature readings from a PLC."""
def _build_message(self, raw: float) -> dict:
return {"value": raw, "unit": "°C", "sensor": self._sensor_id}
def _validate(self, raw: float) -> bool:
return -50.0 <= raw <= 300.0
# Usage in your module application:
feeder = TemperatureFeeder(
feeder_name="TemperatureFeed",
module_entity=my_module_entity,
)
await feeder.start()
await feeder.feed(87.3) # publishes {"value": 87.3, "unit": "°C", "sensor": "PT-01"}
The protocol is resolved automatically from the interface config JSON — add
a "type": "publisher", "functionname": "TemperatureFeed" entry with the
appropriate tags (e.g. ["zenoh"]).
- class vyra_base.com.feeder.custom_feeder.CustomBaseFeeder(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]
Bases:
BaseFeederBase class for user-defined VYRA feeders.
Extends
BaseFeederwith two hooks that subclasses can override to implement custom message mapping and validation without touching the transport layer.- Parameter:
feeder_name (
str) – The feeder’s name. Must match thefunctionnamein the module’s interface config JSON so that the protocol resolver can find the correct transport entry.module_entity (
ModuleEntry) – Module configuration (name, uuid, …).message_type (
Any) – Optional message class forwarded to the publisher (e.g. a protobuf class). PassNonefor dict-based protocols.node (
Optional[Any]) – ROS2 node (required only when using the ROS2 transport).loggingOn (
bool) – IfTrue, everyfeed()call is also logged via the standard Python logger.
- __init__(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]
Initialize the BaseFeeder.
- Parameter:
feeder_name (str)
module_entity (ModuleEntry)
message_type (Any)
node (Any | None)
loggingOn (bool)
- async feed(raw)[Quellcode]
Validate, transform, and publish raw.
Call
_validate()— skip onFalse.Call
_build_message()— transform to domain object.Delegate to
feed().
- async start()[Quellcode]
Resolve protocol from interface config and start the feeder.
- Rückgabetyp:
vyra_base.com.feeder.config_resolver module¶
Interface-config based protocol resolver for VYRA feeders.
When a feeder starts, it calls FeederConfigResolver.resolve() with its
feeder_name (= functionname in the JSON config). The resolver
searches all interface config JSON files for a matching publisher entry
and returns which transport protocol (zenoh, ros2, redis, uds, …) to use,
which protobuf/message file type to load, and what tags are attached to the
interface.
Interface config file schema (array of entries):
[
{
"type": "message", # only "message" is relevant for feeders
"functionname": "StateFeed", # matched against feeder_name
"tags": ["zenoh"], # → ProtocolType.ZENOH
"filetype": ["VBASEStateFeed.proto"],
"displayname": "State Feed",
"description": "...",
...
},
...
]
If no matching publisher entry is found the resolver logs an actionable error
message and uses fuzzy_match() to propose similar
publisher names from all discovered configs.
- class vyra_base.com.feeder.config_resolver.FeederResolverResult(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')[Quellcode]
Bases:
objectResult returned by
FeederConfigResolver.resolve().- Parameter:
feeder_name (
str) – Thefunctionnamethat was matched.protocol (
str) – The resolved protocol string (e.g."zenoh").tags (
list[str]) – Full list of tags from the matching entry.file_types (
list[str]) – List of interface file names (e.g.["VBASEStateFeed.proto"]).display_name (
str) – Human-readable display name from the config.description (
str) – Description from the config.raw_entry (
dict) – The full raw JSON entry dict.source_file (
str) – Path to the JSON file that contained this entry.
- feeder_name: str
- protocol: str
- display_name: str = ''
- description: str = ''
- raw_entry: dict
- source_file: str = ''
- __init__(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')
- class vyra_base.com.feeder.config_resolver.FeederConfigResolver[Quellcode]
Bases:
objectResolves a feeder’s transport protocol from interface config JSON files.
The resolver is stateless: every
resolve()call re-reads and parses the config files. In practice the files are small and cached by the OS page cache, so this is not a performance concern during module initialisation.Usage:
from vyra_base.com.feeder.config_resolver import FeederConfigResolver result = FeederConfigResolver.resolve( feeder_name="StateFeed", interface_paths=["/workspace/install/my_interfaces/share/my_interfaces/config"], ) if result is None: raise RuntimeError("StateFeed not found in interface configs") print(result.protocol) # "zenoh" print(result.file_types) # ["VBASEStateFeed.proto"]
- static resolve(feeder_name, interface_paths, *, use_fallback=True, case_sensitive=False)[Quellcode]
Find the interface config entry for feeder_name and resolve its transport protocol.
Searches all JSON files found (recursively) under each path in interface_paths. Only entries with
"type": "publisher"are considered.- Parameter:
feeder_name (
str) – Thefunctionnameto look up (e.g."StateFeed").interface_paths (
Sequence[str|Path]) – Directories or JSON file paths to search.use_fallback (
bool) – IfTrueand the matched entry carries no recognised protocol tag, fall back to the default chain[zenoh, ros2, redis, uds]and use the first available protocol. Defaults toTrue.case_sensitive (
bool) – Whetherfunctionnamecomparison is case-sensitive. Defaults toFalse.
- Rückgabe:
A
FeederResolverResultwhen a match is found,Noneotherwise. On failure an actionable error is logged with fuzzy-match suggestions.- Rückgabetyp:
Optional[FeederResolverResult]
vyra_base.com.feeder.registry module¶
Feeder registry for VYRA custom feeders.
FeederRegistry is a thread-safe singleton that keeps track of all
registered IFeeder subclasses.
Use the register_feeder() decorator to register a feeder class so the
framework (and downstream tooling) can discover it by name.
Usage:
from vyra_base.com.feeder.registry import register_feeder, FeederRegistry
from vyra_base.com.feeder.custom_feeder import CustomBaseFeeder
@register_feeder("TemperatureFeed")
class TemperatureFeeder(CustomBaseFeeder):
...
# Discover & instantiate
cls = FeederRegistry.get("TemperatureFeed")
feeder = cls(...)
- class vyra_base.com.feeder.registry.FeederRegistry[Quellcode]
Bases:
objectThread-safe registry of
IFeedersubclasses.All built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) are NOT pre-registered here — the registry is exclusively for user-defined custom feeders added via
register_feeder().- classmethod register(name, feeder_class)[Quellcode]
Register feeder_class under name.
- Parameter:
- Verursacht:
TypeError – If feeder_class does not subclass
IFeeder.ValueError – If name is already registered (use
override=Trueto replace).
- Rückgabetyp:
- classmethod get(name)[Quellcode]
Return the feeder class registered under name, or
None.
- classmethod list_feeders()[Quellcode]
Return a sorted list of all registered feeder names.
- classmethod unregister(name)[Quellcode]
Remove the feeder registered under name.
- vyra_base.com.feeder.registry.register_feeder(name)[Quellcode]
Class decorator that registers a feeder in
FeederRegistry.The decorated class must be a subclass of
IFeeder(orCustomBaseFeeder).- Parameter:
name (
str) – The name under which this feeder is registered. Should match thefunctionnamein the module’s interface config JSON so that the framework can select the correct protocol automatically.- Verursacht:
TypeError – If the decorated class is not an
IFeedersubclass.
Example:
@register_feeder("TemperatureFeed") class TemperatureFeeder(CustomBaseFeeder): pass
vyra_base.com.feeder.state_feeder module¶
- class vyra_base.com.feeder.state_feeder.StateFeeder(node, module_entity, loggingOn=False)[Quellcode]
Bases:
BaseFeederResponsible for loading a VYRA Handler and feeding StateEntry elements to this handler.
- Parameter:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Verursacht:
FeederException – If the publisher cannot be created.
- Variablen:
_feederName – Name of the feeder.
_doc – Documentation string for the feeder.
_level – Logging level.
_node – VyraNode instance.
_module_entity – Module configuration.
- __init__(node, module_entity, loggingOn=False)[Quellcode]
Initializes a StateFeeder instance for sending changes in state of a module.
- Parameter:
node (
Optional[Any]) – The VyraNode instance (optional, None if ROS2 unavailable).module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Verursacht:
FeederException – If the publisher cannot be created.
- async start()[Quellcode]
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
StateFeedentry, pickstags).- Rückgabetyp:
- async feed(stateElement)[Quellcode]
Feed a state entry to the feeder.
Validates the input type, then delegates to
feed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameter:
stateElement (
StateEntry) – The state entry to feed.- Verursacht:
FeederException – If the provided element is not of type StateEntry.
- Rückgabetyp:
- feed_sync(msg)[Quellcode]
Sync version of
feed()for use in sync contexts.
vyra_base.com.feeder.news_feeder module¶
- class vyra_base.com.feeder.news_feeder.NewsFeeder(node, module_entity, loggingOn=False)[Quellcode]
Bases:
BaseFeederCollection of the news messages.
- Parameter:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – Module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Verursacht:
FeederException – If the publisher cannot be created.
- __init__(node, module_entity, loggingOn=False)[Quellcode]
Initialize the BaseFeeder.
- Parameter:
node (Any | None)
module_entity (ModuleEntry)
loggingOn (bool)
- async start()[Quellcode]
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
NewsFeedentry, pickstags).- Rückgabetyp:
- async feed(newsElement)[Quellcode]
Feed a news entry to the feeder.
Normalises the input to a
NewsEntry, then delegates tofeed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameter:
newsElement (
Union[NewsEntry,str,list]) – The news entry to be fed. Can be a string or list of strings to be processed into a NewsEntry, or a NewsEntry object.- Verursacht:
FeederException – If the type of newsElement is not supported.
- Rückgabetyp:
- feed_sync(msg)[Quellcode]
Sync version of feed() — normalises input to NewsEntry before delegating.
- register_news_condition(condition_function, *, name=None, execution_point='ALWAYS', success_message=None, failure_message=None, tag='news')[Quellcode]
Register a synchronous bool condition for tracked feeder messages.
- monitor(*, tag='news', label=None, severity='INFO', entity=None, during_interval_seconds=0.05)[Quellcode]
Return a runtime-monitoring decorator bound to this feeder.
- async evaluate_tracked_conditions(context)[Quellcode]
Evaluate registered conditions and publish resulting tracker messages.
- evaluate_tracked_conditions_sync(context)[Quellcode]
Sync variant of
evaluate_tracked_conditions().
- build_newsfeed(*args)[Quellcode]
Build a well structured newsfeed entry from plain text and module information.
- Parameter:
args (
Any) – The arguments to be processed into a news entry. Can be a string or list of strings.- Rückgabe:
A structured news entry containing the message, level, timestamp, UUID, module name, module ID, module template, and type.
- Rückgabetyp:
- Verursacht:
FeederException – If the type of the message level is not valid.
- vyra_base.com.feeder.news_feeder.extract_level_from_msg(message)[Quellcode]
Extract the message level from a given message string.
- Parameter:
message (
str) – The message string from which to extract the level.- Rückgabe:
The extracted message level if found, otherwise None.
- Rückgabetyp:
vyra_base.com.feeder.error_feeder module¶
- class vyra_base.com.feeder.error_feeder.ErrorFeeder(node, module_entity, loggingOn=True)[Quellcode]
Bases:
BaseFeederCollection of the error messages.
- Parameter:
node (
Optional[Any]) – The VyraNode instance associated with this feeder.module_entity (
ModuleEntry) – The module configuration entry.loggingOn (
bool) – Flag to enable or disable logging next to feeding. Defaults to False.
- Verursacht:
FeederException – If the publisher cannot be created.
- __init__(node, module_entity, loggingOn=True)[Quellcode]
Initialize the BaseFeeder.
- Parameter:
node (Any | None)
module_entity (ModuleEntry)
loggingOn (bool)
- async start()[Quellcode]
Starts the feeder by initializing handlers.
Automatically resolves the transport protocol from the module’s interface config (reads
ErrorFeedentry, pickstags).- Rückgabetyp:
- async feed(errorElement)[Quellcode]
Feed an error entry to the feeder.
Normalises the input to an
ErrorEntry, then delegates tofeed()which calls_prepare_entry_for_publish()followed byMessageMapperfor protocol-aware conversion.- Parameter:
errorElement (
Union[ErrorEntry,dict]) – The error entry to be fed. Can be a dictionary with error details or anErrorEntryobject.- Verursacht:
FeederException – If the type of errorElement is neither a dict nor an
ErrorEntry.- Rückgabetyp:
- feed_sync(errorElement)[Quellcode]
Sync version of
feed()with ErrorEntry normalization.- Rückgabetyp:
- Parameter:
errorElement (ErrorEntry | dict)
- monitor(*, tag='error', label=None, severity='WARNING', entity=None, during_interval_seconds=0.05)[Quellcode]
Return an exception-monitoring decorator bound to this feeder.
- build_errorfeed(errorDict)[Quellcode]
Build an error entry from the given keyword arguments.
- Parameter:
errorDict (
dict) –A dictionary containing error details. Keys are:
code: int16 - Error code (default: 0x00000000)uuid: UUID - Unique identifier for the error (default: a new UUID)description: str - Description of the error (default: ‚‘)solution: str - Suggested solution for the error (default: ‚‘)miscellaneous: str - Additional information (default: ‚‘)level: ErrorEntry.ERROR_LEVEL - Level of the error (default: ErrorEntry.ERROR_LEVEL.MINOR_FAULT)
- Rückgabe:
An instance of
vyra_base.defaults.entries.ErrorEntrypopulated with the provided details.- Rückgabetyp:
Module contents¶
VYRA Feeder package — multi-protocol data publishers.
Available feeders:
- StateFeeder
- NewsFeeder
- ErrorFeeder
- CustomBaseFeeder
Protocol resolution:
- FeederConfigResolver
- FeederResolverResult
Registry:
- FeederRegistry
- register_feeder()
Interfaces:
- IFeeder
- class vyra_base.com.feeder.FeederConfigResolver[Quellcode]
Bases:
objectResolves a feeder’s transport protocol from interface config JSON files.
The resolver is stateless: every
resolve()call re-reads and parses the config files. In practice the files are small and cached by the OS page cache, so this is not a performance concern during module initialisation.Usage:
from vyra_base.com.feeder.config_resolver import FeederConfigResolver result = FeederConfigResolver.resolve( feeder_name="StateFeed", interface_paths=["/workspace/install/my_interfaces/share/my_interfaces/config"], ) if result is None: raise RuntimeError("StateFeed not found in interface configs") print(result.protocol) # "zenoh" print(result.file_types) # ["VBASEStateFeed.proto"]
- static resolve(feeder_name, interface_paths, *, use_fallback=True, case_sensitive=False)[Quellcode]
Find the interface config entry for feeder_name and resolve its transport protocol.
Searches all JSON files found (recursively) under each path in interface_paths. Only entries with
"type": "publisher"are considered.- Parameter:
feeder_name (
str) – Thefunctionnameto look up (e.g."StateFeed").interface_paths (
Sequence[str|Path]) – Directories or JSON file paths to search.use_fallback (
bool) – IfTrueand the matched entry carries no recognised protocol tag, fall back to the default chain[zenoh, ros2, redis, uds]and use the first available protocol. Defaults toTrue.case_sensitive (
bool) – Whetherfunctionnamecomparison is case-sensitive. Defaults toFalse.
- Rückgabe:
A
FeederResolverResultwhen a match is found,Noneotherwise. On failure an actionable error is logged with fuzzy-match suggestions.- Rückgabetyp:
Optional[FeederResolverResult]
- class vyra_base.com.feeder.FeederResolverResult(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')[Quellcode]
Bases:
objectResult returned by
FeederConfigResolver.resolve().- Parameter:
feeder_name (
str) – Thefunctionnamethat was matched.protocol (
str) – The resolved protocol string (e.g."zenoh").tags (
list[str]) – Full list of tags from the matching entry.file_types (
list[str]) – List of interface file names (e.g.["VBASEStateFeed.proto"]).display_name (
str) – Human-readable display name from the config.description (
str) – Description from the config.raw_entry (
dict) – The full raw JSON entry dict.source_file (
str) – Path to the JSON file that contained this entry.
- feeder_name: str
- protocol: str
- display_name: str = ''
- description: str = ''
- raw_entry: dict
- source_file: str = ''
- __init__(feeder_name, protocol, tags=<factory>, file_types=<factory>, display_name='', description='', raw_entry=<factory>, source_file='')
- class vyra_base.com.feeder.IFeeder[Quellcode]
Bases:
ABCAbstract base class for all VYRA feeders.
A feeder is a publisher that continuously pushes domain-typed data (state changes, news messages, errors, …) over a transport protocol (Zenoh, ROS2, Redis, UDS, …).
The protocol is resolved automatically from the module’s interface config JSON via
FeederConfigResolver.Lifecycle:
feeder = MyFeeder(...) await feeder.start() # resolve protocol + create publisher await feeder.feed(my_data) # async-safe publish alive = feeder.is_alive() # health check feeder.feed_count # read metrics
- abstractmethod async start()[Quellcode]
Initialise the feeder: resolve the transport protocol, create the publisher, and flush any buffered messages.
Must be awaited before the first
feed()call.- Rückgabetyp:
- abstractmethod async feed(message)[Quellcode]
Publish message immediately.
If called before
start()the message is buffered and flushed once the publisher is ready.
- abstractmethod feed_sync(message)[Quellcode]
Synchronous version of
feed(). Not all feeders support this.- Parameter:
message (
Any) – Domain object to publish.- Verursacht:
NotImplementedError – If the feeder does not support synchronous feeding.
- Rückgabetyp:
- abstractmethod get_feeder_name()[Quellcode]
Return the feeder’s name (=
functionnamein interface config).- Rückgabetyp:
- get_protocol()[Quellcode]
Return the resolved transport protocol string, or
Noneif the feeder has not been started yet.
- is_alive()[Quellcode]
Return
Trueif the feeder’s publisher is ready and the backing transport is reachable.Feeders that do not implement a liveness check always return
True.- Rückgabetyp:
- is_ready()[Quellcode]
Return
Trueafterstart()has completed successfully.- Rückgabetyp:
- get_buffer()[Quellcode]
Return the internal pre-start message buffer.
- Rückgabetyp:
- property feed_count: int
Number of messages successfully published since the feeder started.
- Rückgabetyp:
- class vyra_base.com.feeder.CustomBaseFeeder(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]
Bases:
BaseFeederBase class for user-defined VYRA feeders.
Extends
BaseFeederwith two hooks that subclasses can override to implement custom message mapping and validation without touching the transport layer.- Parameter:
feeder_name (
str) – The feeder’s name. Must match thefunctionnamein the module’s interface config JSON so that the protocol resolver can find the correct transport entry.module_entity (
ModuleEntry) – Module configuration (name, uuid, …).message_type (
Any) – Optional message class forwarded to the publisher (e.g. a protobuf class). PassNonefor dict-based protocols.node (
Optional[Any]) – ROS2 node (required only when using the ROS2 transport).loggingOn (
bool) – IfTrue, everyfeed()call is also logged via the standard Python logger.
- __init__(feeder_name, module_entity, message_type=None, node=None, loggingOn=False)[Quellcode]
Initialize the BaseFeeder.
- Parameter:
feeder_name (str)
module_entity (ModuleEntry)
message_type (Any)
node (Any | None)
loggingOn (bool)
- async feed(raw)[Quellcode]
Validate, transform, and publish raw.
Call
_validate()— skip onFalse.Call
_build_message()— transform to domain object.Delegate to
feed().
- async start()[Quellcode]
Resolve protocol from interface config and start the feeder.
- Rückgabetyp:
- class vyra_base.com.feeder.FeederRegistry[Quellcode]
Bases:
objectThread-safe registry of
IFeedersubclasses.All built-in feeders (StateFeeder, NewsFeeder, ErrorFeeder) are NOT pre-registered here — the registry is exclusively for user-defined custom feeders added via
register_feeder().- classmethod register(name, feeder_class)[Quellcode]
Register feeder_class under name.
- Parameter:
- Verursacht:
TypeError – If feeder_class does not subclass
IFeeder.ValueError – If name is already registered (use
override=Trueto replace).
- Rückgabetyp:
- classmethod get(name)[Quellcode]
Return the feeder class registered under name, or
None.
- classmethod list_feeders()[Quellcode]
Return a sorted list of all registered feeder names.
- classmethod unregister(name)[Quellcode]
Remove the feeder registered under name.
- vyra_base.com.feeder.register_feeder(name)[Quellcode]
Class decorator that registers a feeder in
FeederRegistry.The decorated class must be a subclass of
IFeeder(orCustomBaseFeeder).- Parameter:
name (
str) – The name under which this feeder is registered. Should match thefunctionnamein the module’s interface config JSON so that the framework can select the correct protocol automatically.- Verursacht:
TypeError – If the decorated class is not an
IFeedersubclass.
Example:
@register_feeder("TemperatureFeed") class TemperatureFeeder(CustomBaseFeeder): pass
- class vyra_base.com.feeder.FeedConditionRegistry[Quellcode]
Bases:
object- __init__()[Quellcode]
- register(condition_function, *, name=None, tag='news', execution_point='ALWAYS', success_message=None, failure_message=None)[Quellcode]
- unregister(name)[Quellcode]
- class vyra_base.com.feeder.FeedDebouncer(window_seconds=5.0)[Quellcode]
Bases:
object- Parameter:
window_seconds (float)
- __init__(window_seconds=5.0)[Quellcode]
- Parameter:
window_seconds (float)
- evaluate(signature)[Quellcode]
- Rückgabetyp:
DebounceHit- Parameter:
signature (str)