from typing import AsyncGenerator, Dict, List import docker import json from datetime import datetime from fastapi import WebSocket from app.core.config import settings class LogsService: def __init__(self): self.client = docker.from_env() self.active_connections: Dict[str, List[WebSocket]] = {} async def get_container_logs(self, container_id: str, follow: bool = True) -> AsyncGenerator[str, None]: """Récupère les logs d'un conteneur en temps réel.""" try: container = self.client.containers.get(container_id) for log in container.logs(stream=True, follow=follow, timestamps=True): log_entry = { "timestamp": datetime.fromisoformat(log.decode().split()[0].decode()), "container_id": container_id, "container_name": container.name, "message": log.decode().split(b' ', 1)[1].decode(), "stream": "stdout" if log.decode().split(b' ', 1)[1].decode().startswith("stdout") else "stderr" } yield json.dumps(log_entry) except docker.errors.NotFound: yield json.dumps({ "error": f"Conteneur {container_id} non trouvé" }) except Exception as e: yield json.dumps({ "error": str(e) }) async def connect(self, websocket: WebSocket, container_id: str = None): """Connecte un client WebSocket pour recevoir les logs.""" await websocket.accept() if container_id not in self.active_connections: self.active_connections[container_id] = [] self.active_connections[container_id].append(websocket) def disconnect(self, websocket: WebSocket, container_id: str = None): """Déconnecte un client WebSocket.""" if container_id in self.active_connections: self.active_connections[container_id].remove(websocket) if not self.active_connections[container_id]: del self.active_connections[container_id] async def broadcast_log(self, log: str, container_id: str = None): """Diffuse un log à tous les clients connectés.""" if container_id in self.active_connections: for connection in self.active_connections[container_id]: try: await connection.send_text(log) except Exception: self.disconnect(connection, container_id) logs_service = LogsService()