Quellcode fΓΌr vyra_base.com.external.grpc.grpc_client
"""
gRPC Client for vyra_base
High-level wrapper for gRPC communication.
Provides simple interface for unary RPC calls.
Example:
>>> client = GrpcClient(target="localhost:50051")
>>> await client.connect()
>>> response = await client.call_method("/service/Method", request_bytes)
>>> await client.close()
"""
from __future__ import annotations
import asyncio
import logging
from pathlib import Path
from typing import Any, Optional
from vyra_base.helper.error_handler import ErrorTraceback
logger = logging.getLogger(__name__)
# Check if grpcio is available
try:
import grpc
from grpc import aio
GRPC_AVAILABLE = True
except ImportError:
GRPC_AVAILABLE = False
[Doku]
class GrpcClient:
"""
High-level gRPC Client wrapper.
Provides simple interface for gRPC communication without requiring
proto files or code generation.
Features:
- Unary RPC (request-response)
- Automatic connection management
- SSL/TLS support
Args:
target: gRPC server target (e.g., "localhost:50051")
credentials: Optional SSL credentials
options: Optional channel options
timeout: Default timeout for operations
Example:
>>> client = GrpcClient(target="192.168.1.10:50051")
>>> await client.connect()
>>>
>>> # Unary call
>>> response = await client.call_method("/service/Method", request_bytes)
>>>
>>> await client.close()
"""
@ErrorTraceback.w_check_error_exist
def __init__(
self,
target: str | Path,
credentials: Optional[Any] = None,
options: Optional[list] = None,
timeout: float = 30.0,
):
"""
Initialize gRPC client.
Args:
target: Server target (host:port)
credentials: Optional SSL credentials
options: Optional channel options
timeout: Default timeout in seconds
"""
if not GRPC_AVAILABLE:
raise ImportError("grpcio not installed. Install with: pip install grpcio")
if isinstance(target, Path):
target = f"unix://{target.as_posix()}"
self.target = target
self.credentials = credentials
self.options = options or []
self.timeout = timeout
self._channel: Optional[aio.Channel] = None
self._connected = False
@property
def is_connected(self) -> bool:
"""Check if client is connected."""
return self._connected
@property
def channel(self) -> Optional[aio.Channel]:
"""Get the gRPC channel."""
return self._channel
@ErrorTraceback.w_check_error_exist
async def connect(self) -> None:
"""Establish connection to gRPC server."""
try:
logger.info(f"π Connecting to gRPC server: {self.target}")
# Create channel
if self.credentials:
self._channel = aio.secure_channel(
self.target,
self.credentials,
options=self.options
)
else:
self._channel = aio.insecure_channel(
self.target,
options=self.options
)
# Wait for channel to be ready (with timeout)
try:
await asyncio.wait_for(
self._channel.channel_ready(),
timeout=self.timeout
)
except asyncio.TimeoutError:
raise ConnectionError(f"Connection timeout to {self.target}")
self._connected = True
logger.info(f"β
Connected to gRPC server: {self.target}")
except Exception as e:
logger.error(f"β Failed to connect to gRPC server: {e}")
self._connected = False
if self._channel:
await self._channel.close()
self._channel = None
raise
@ErrorTraceback.w_check_error_exist
async def close(self) -> None:
"""Close connection to gRPC server."""
if self._channel:
logger.info(f"π Closing gRPC connection: {self.target}")
await self._channel.close()
self._channel = None
self._connected = False
logger.info("β
gRPC connection closed")
def _ensure_connected(self):
"""Ensure client is connected."""
if not self._connected or not self._channel:
raise ConnectionError("gRPC client not connected. Call connect() first.")
@ErrorTraceback.w_check_error_exist
async def call_method(
self,
method: str,
request: bytes,
timeout: Optional[float] = None,
metadata: Optional[list] = None,
) -> bytes:
"""
Call a unary gRPC method.
Args:
method: Method path (e.g., "/service.Service/Method")
request: Serialized request bytes
timeout: Optional timeout override
metadata: Optional metadata tuples
Returns:
Serialized response bytes
"""
self._ensure_connected()
if self._channel is None:
raise RuntimeError("gRPC channel is not initialized")
try:
logger.debug(f"π€ Calling gRPC method: {method}")
# Create unary-unary call
call = self._channel.unary_unary(
method,
request_serializer=None, # Already serialized
response_deserializer=lambda x: x, # Keep as bytes
)
# Make call
response = await call(
request,
timeout=timeout or self.timeout,
metadata=metadata
)
logger.debug(f"β
gRPC call completed: {method}")
return response
except grpc.RpcError as e:
logger.error(f"β gRPC call failed: {method} - {e.code()}: {e.details()}")
raise
except Exception as e:
logger.error(f"β gRPC call failed: {method} - {e}")
raise
async def __aenter__(self):
"""Async context manager entry."""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Async context manager exit."""
await self.close()