Quellcode für vyra_base.storage.db_access

import configparser
import os
import logging

from enum import Enum
from pathlib import Path
from typing import Type, Union, Any

from sqlalchemy import (
    inspect, 
    MetaData, 
    Table
)
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.ext.asyncio import (
    AsyncEngine, 
    async_sessionmaker, 
    create_async_engine
)
from sqlalchemy.orm import sessionmaker

from vyra_base.helper.error_handler import ErrorTraceback
from vyra_base.storage.storage import Storage
from vyra_base.storage.tb_base import Base

meta = MetaData()

logger = logging.getLogger(__name__)


[Doku] class DBSTATUS(str, Enum): """ Enumeration of database operation status codes. :cvar SUCCESS: Operation completed successfully. :cvar ERROR: An error occurred during the operation. :cvar NOT_FOUND: Requested resource was not found. :cvar NOT_ALLOWED: Operation is not allowed. :cvar NOT_AUTHORIZED: User is not authorized for this operation. :cvar CONFLICT: Operation conflicts with existing data. """ SUCCESS = "success" ERROR = "error" NOT_FOUND = "not_found" NOT_ALLOWED = "not_allowed" NOT_AUTHORIZED = "not_authorized" CONFLICT = "conflict"
[Doku] class DBTYPE(str, Enum): """ Enumeration of supported database types. :cvar SQLITE: SQLite database engine. :cvar MYSQL: MySQL database engine. :cvar POSTGRESQL: PostgreSQL database engine. """ SQLITE = "sqlite" MYSQL = "mysql" POSTGRESQL = "postgresql"
[Doku] class DBMESSAGE: """ Collection of standard database error messages. :cvar DEFAULT_ERROR: Generic error message for database operations. """ DEFAULT_ERROR = 'Something went wrong while processing the query. See the details.'
[Doku] class DbAccess(Storage): """Baseclass for database access."""
[Doku] def __init__(self, module_name: str, db_config_path: str = None, db_config: dict = None, db_type: DBTYPE = DBTYPE.SQLITE) -> None: """ Initialize database object. :param module_name: The id of the V.Y.R.A. module. :type module_name: str :param db_config_path: Path to the ini file of your sqlalchemy config. Defaults to WORKING_PATH+PATH_DB_CONFIG. :type db_config_path: str, optional :param db_config: Dictionary with database configuration. :type db_config: dict, optional :raises ValueError: If neither db_config_path nor db_config is provided, or if db_config is not a dict. """ self.module_name = module_name try: if db_config_path is not None and db_config is None: self._config = configparser.ConfigParser() self._config.read(db_config_path) elif db_config is not None: db_config = db_config[0] if isinstance(db_config, list) else db_config if not isinstance(db_config, dict): raise ValueError("db_config must be a dictionary.") self._config = db_config else: raise ValueError("Either db_config_path or db_config must be provided.") if db_type not in (item.value for item in DBTYPE): raise ValueError(f"Unsupported database type: {db_type}. Supported types: {list(DBTYPE)}") self.db_type = db_type self._user: str | None = os.environ.get("USER") self._path: str | None = self._config[self.db_type]['path'] self._port: str | None = self._config[self.db_type].get('port', None) self._host: str | None = self._config[self.db_type].get('host', None) self._path = self._path.replace("${user}", str(self._user)) self._database = self._config[self.db_type]['database'] self._database = self._database.replace("${module_name}", self.module_name) if not self._path.endswith('/'): self._path += '/' Path(self._path).mkdir(parents=True, exist_ok=True) self.db_engine: AsyncEngine = self._build_engine() # Configure sqlalchemy logger to use application logging format root_logger = logging.getLogger() for name in ['sqlalchemy', 'sqlalchemy.engine', 'sqlalchemy.pool']: logger = logging.getLogger(name) logger.setLevel(logging.WARNING) # Reduce verbosity logger.propagate = True # Let root logger handle formatting # Remove any default handlers for handler in list(logger.handlers): logger.removeHandler(handler) finally: ErrorTraceback.check_error_exist()
def _build_engine(self) -> AsyncEngine: """ Build the database engine based on the configuration. :returns: An instance of AsyncEngine. :rtype: AsyncEngine """ logger.info("Building database engine: " f"Type: {self.db_type}, Name: {self._database}, " f"Path: {self._path}, Host: {self._host}, " f"Port: {self._port}") if self.db_type == DBTYPE.SQLITE: return create_async_engine( f"sqlite+aiosqlite:///{self._path}{self._database}", echo=False, # Disable echo - use configured logger instead ) elif self.db_type == DBTYPE.MYSQL: return create_async_engine( f"mysql+aiomysql://{self._user}@{self._host}:{self._port}/{self._database}", echo=True, ) elif self.db_type == DBTYPE.POSTGRESQL: return create_async_engine( f"postgresql+asyncpg://{self._user}@{self._host}:{self._port}/{self._database}", echo=True, ) else: raise ValueError(f"Unsupported database type: {self.db_type}")
[Doku] def session(self) -> Union[sessionmaker, async_sessionmaker]: """ Create a session for the database. :returns: A session object. :rtype: sessionmaker or async_sessionmaker """ if isinstance(self.db_engine, AsyncEngine): return async_sessionmaker(self.db_engine, expire_on_commit=False) else: return sessionmaker(self.db_engine, expire_on_commit=False)
[Doku] async def create_all_tables(self) -> str: """ Create all database tables that are children of the SQLAlchemy Base class. This method will create all tables that are defined in the SQLAlchemy Base class. It will also create the tables if they do not exist. This method will not create the tables if they already exist. :returns: Status of the operation. :rtype: str (see DBSTATUS class) """ try: async with self.db_engine.begin() as conn: await conn.run_sync(Base.metadata.create_all) logger.info(f'Successfully created all defined tables') return DBSTATUS.SUCCESS finally: if ErrorTraceback.check_error_exist(): return DBSTATUS.ERROR
[Doku] async def create_selected_table(self, table_structs: list[Base]) -> str: """ Create new database table. :param table_structs: Table configurations as python classes (SQLAlchemy declarative base style). :type table_structs: list[Base] :returns: Status of the operation. :rtype: str (see DBSTATUS class) """ try: meta = MetaData() def load_table(sync_conn): """ Load table metadata from database in synchronous context. :param sync_conn: Synchronous database connection. """ Table(table_name, meta, autoload_with=sync_conn) async with self.db_engine.connect() as async_conn: for table_struct in table_structs: table_name = table_struct.__tablename__ if table_name in meta.tables: continue await async_conn.run_sync(load_table) async with self.db_engine.begin() as conn: await conn.run_sync(meta.create_all) logger.info(f'Successfully created table {table_name}') return DBSTATUS.SUCCESS finally: if ErrorTraceback.check_error_exist(): return DBSTATUS.ERROR
[Doku] async def drop_table(self, table: Base) -> str: """ Delete table from database. :param table: Table class (SQLAlchemy declarative base). :type table: Base :returns: Status of the operation. :rtype: str (see DBSTATUS class) """ try: table_name = table.__tablename__ async with self.db_engine.connect() as async_conn: def check_and_reflect(sync_conn): """ Check if table exists and reflect its metadata. :param sync_conn: Synchronous database connection. :return: Table object if exists, None otherwise. """ inspector = inspect(sync_conn) if not inspector.has_table(table_name): return None meta = MetaData() return Table(table_name, meta, autoload_with=sync_conn) table_obj = await async_conn.run_sync(check_and_reflect) if table_obj is not None: await async_conn.run_sync(lambda sync_conn: table_obj.drop(sync_conn)) logger.info(f"Tabelle '{table_name}' wurde gelöscht.") else: logger.warning(f"Tabelle '{table_name}' existiert nicht.") return DBSTATUS.NOT_FOUND return DBSTATUS.SUCCESS finally: if ErrorTraceback.check_error_exist(): return DBSTATUS.ERROR return DBSTATUS.ERROR
[Doku] async def check_table_exists(self, table: Type[Base]) -> bool: """ Check if a table exists in the database. :param table: Table class (SQLAlchemy declarative base). :type table: Base :returns: True if the table exists, False otherwise. :rtype: bool """ try: table_name = table.__tablename__ async with self.db_engine.connect() as conn: tables = await conn.run_sync( lambda sync_conn: inspect(sync_conn).get_table_names() ) return table_name in tables finally: if ErrorTraceback.check_error_exist(): return False