Quellcode für vyra_base.com.handler.database
"""
Database persistence handler for VYRA feeders.
``DBCommunicationHandler`` writes structured log/feed records to an
async-capable database backend. Any object that exposes an
``async write(record: dict) -> None`` coroutine is accepted as *database*
(e.g. a Redis client, an async SQLite connection, a custom InfluxDB writer).
Record schema
-------------
Every ``dispatch`` call persists the following fields:
.. code-block:: json
{
"timestamp": "<ISO-8601 UTC>",
"level": "INFO",
"source": "<feeder/initiator name>",
"message": "<str(message)>",
"metadata": {}
}
The schema is intentionally flat so it can be stored in Redis hashes, SQL
tables, InfluxDB measurements, or a time-series database without
additional transformation.
"""
from __future__ import annotations
import asyncio
import datetime
import logging
from logging import LogRecord
from typing import Any, Protocol, runtime_checkable
from vyra_base.com.handler.communication import CommunicationHandler
logger = logging.getLogger(__name__)
[Doku]
@runtime_checkable
class DatabaseWriter(Protocol):
"""Structural protocol for database backends.
Any object that implements ``async write(record: dict) -> None``
qualifies — no inheritance required.
"""
[Doku]
async def write(self, record: dict) -> None: # noqa: D102
...
[Doku]
class DBCommunicationHandler(CommunicationHandler):
"""Feeder handler that persists messages to a database backend.
Accepts **any** object that implements the :class:`DatabaseWriter`
protocol (``async write(dict) → None``). This makes the handler
independent of a specific DB technology.
:cvar __handlerName__: Identifies this handler as ``"DatabaseHandler"``.
:param database: Async-capable database writer implementing
:class:`DatabaseWriter`.
:type database: DatabaseWriter
:param source: Label used as ``"source"`` in the persisted record.
Defaults to ``"DBCommunicationHandler"``.
:type source: str, optional
"""
__handlerName__: str = 'DatabaseHandler'
__doc__: str = 'Database persistence handler'
[Doku]
def __init__(self, database: Any, source: str = 'DBCommunicationHandler'):
super().__init__()
self.database = database
self._source = source
try:
self._loop: asyncio.AbstractEventLoop = asyncio.get_event_loop()
except RuntimeError:
self._loop = asyncio.new_event_loop()
# ------------------------------------------------------------------
# IFeederHandler implementation
# ------------------------------------------------------------------
[Doku]
def get_protocol(self) -> str:
"""Return ``"database"``."""
return "database"
[Doku]
def is_available(self) -> bool:
"""Return ``True`` if a database writer is configured."""
return self.database is not None
[Doku]
async def dispatch(self, message: Any) -> None:
"""Persist *message* to the database with a structured schema.
:param message: Domain object or any value to persist. Converted
to string for the ``"message"`` field. If the object has a
``__dict__`` attribute the full dict is stored in
``"metadata"``.
:type message: Any
"""
if self.database is None:
logger.warning("DBCommunicationHandler: no database configured, skipping.")
return
metadata: dict = {}
if hasattr(message, '__dict__'):
try:
metadata = {k: str(v) for k, v in vars(message).items()}
except Exception:
pass
record: dict = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"level": "INFO",
"source": self._source,
"message": str(message),
"metadata": metadata,
}
await self._write(record)
# ------------------------------------------------------------------
# logging.Handler bridge — formats the LogRecord as a plain string
# ------------------------------------------------------------------
[Doku]
def emit(self, record: LogRecord) -> None: # type: ignore[override]
"""Persist the formatted log record string to the database.
Unlike transport handlers, the DB handler stores the *formatted*
string so that plain Python log records are human-readable.
:param record: Python log record to persist.
:type record: LogRecord
"""
try:
log_entry = self.format(record)
db_record: dict = {
"timestamp": datetime.datetime.utcnow().isoformat() + "Z",
"level": record.levelname,
"source": record.name,
"message": log_entry,
"metadata": {
"lineno": record.lineno,
"filename": record.filename,
"funcName": record.funcName,
},
}
if self._loop.is_running():
self._loop.create_task(self._write(db_record))
else:
self._loop.run_until_complete(self._write(db_record))
except Exception:
self.handleError(record)
async def _write(self, record: dict) -> None:
"""Internal coroutine that delegates to ``database.write``."""
try:
await self.database.write(record)
except Exception as exc:
logger.error("DBCommunicationHandler: write failed: %s", exc)