270 lines
No EOL
12 KiB
Python
270 lines
No EOL
12 KiB
Python
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect
|
|
from typing import List, Dict, Any, Set
|
|
from ..models.container import Container, ContainerUpdate, ContainerStats
|
|
from ..services.docker import DockerService
|
|
from ..services.redis import RedisService
|
|
import asyncio
|
|
import logging
|
|
import docker
|
|
from weakref import WeakSet
|
|
from contextlib import asynccontextmanager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
router = APIRouter(prefix="/api/containers", tags=["containers"])
|
|
docker_service = DockerService()
|
|
redis_service = RedisService()
|
|
|
|
# Garder une trace des connexions WebSocket actives avec leurs tâches associées
|
|
active_connections: Dict[str, Dict[WebSocket, asyncio.Task]] = {}
|
|
|
|
async def cleanup_connection(container_id: str, websocket: WebSocket):
|
|
"""Nettoie proprement une connexion WebSocket."""
|
|
try:
|
|
if container_id in active_connections and websocket in active_connections[container_id]:
|
|
# Annuler la tâche associée si elle existe
|
|
task = active_connections[container_id][websocket]
|
|
if not task.done():
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
# Supprimer la connexion
|
|
del active_connections[container_id][websocket]
|
|
|
|
# Si plus de connexions pour ce conteneur, nettoyer le dictionnaire
|
|
if not active_connections[container_id]:
|
|
del active_connections[container_id]
|
|
|
|
# Fermer la connexion WebSocket
|
|
await websocket.close()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du nettoyage de la connexion pour {container_id}: {e}")
|
|
|
|
@asynccontextmanager
|
|
async def manage_websocket_connection(websocket: WebSocket, container_id: str):
|
|
"""Gère le cycle de vie d'une connexion WebSocket."""
|
|
try:
|
|
await websocket.accept()
|
|
|
|
# Initialiser le dictionnaire pour ce conteneur si nécessaire
|
|
if container_id not in active_connections:
|
|
active_connections[container_id] = {}
|
|
|
|
yield
|
|
|
|
finally:
|
|
await cleanup_connection(container_id, websocket)
|
|
|
|
@router.get("/", response_model=List[Container])
|
|
async def list_containers():
|
|
"""Liste tous les conteneurs Docker."""
|
|
try:
|
|
logger.info("Début de la récupération de la liste des conteneurs")
|
|
containers = await docker_service.list_containers()
|
|
|
|
if not containers:
|
|
logger.warning("Aucun conteneur trouvé")
|
|
return []
|
|
|
|
logger.info(f"Conteneurs trouvés avec succès : {len(containers)}")
|
|
return containers
|
|
|
|
except docker.errors.APIError as e:
|
|
logger.error(f"Erreur API Docker lors de la récupération des conteneurs : {e}")
|
|
raise HTTPException(status_code=500, detail=f"Erreur Docker: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur inattendue lors de la récupération des conteneurs : {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
def validate_container_id(container_id: str) -> None:
|
|
"""Valide l'ID d'un conteneur."""
|
|
if not container_id or container_id == "undefined":
|
|
logger.error("ID de conteneur invalide ou undefined")
|
|
raise HTTPException(status_code=400, detail="ID de conteneur invalide")
|
|
|
|
@router.get("/{container_id}", response_model=Container)
|
|
async def get_container(container_id: str):
|
|
"""Récupère les informations d'un conteneur spécifique."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
container = await docker_service.get_container(container_id)
|
|
return container
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la récupération du conteneur {container_id} : {e}")
|
|
raise HTTPException(status_code=404, detail="Conteneur non trouvé")
|
|
|
|
@router.get("/{container_id}/logs")
|
|
async def get_container_logs(container_id: str, tail: int = 100):
|
|
"""Récupère les logs d'un conteneur."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
logger.info(f"Requête de logs pour le conteneur {container_id}")
|
|
|
|
# Récupérer les logs
|
|
logs = await docker_service.get_container_logs(container_id, tail=tail)
|
|
|
|
if not logs:
|
|
logger.warning(f"Aucun log disponible pour le conteneur {container_id}")
|
|
return {"message": "Aucun log disponible pour ce conteneur"}
|
|
|
|
logger.info(f"Logs récupérés avec succès pour le conteneur {container_id}")
|
|
return {"logs": logs}
|
|
|
|
except HTTPException:
|
|
raise
|
|
except docker.errors.NotFound:
|
|
logger.error(f"Conteneur {container_id} non trouvé")
|
|
raise HTTPException(status_code=404, detail="Conteneur non trouvé")
|
|
except docker.errors.APIError as e:
|
|
logger.error(f"Erreur API Docker pour le conteneur {container_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=f"Erreur Docker: {str(e)}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la récupération des logs du conteneur {container_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.get("/{container_id}/stats")
|
|
async def get_container_stats(container_id: str):
|
|
"""Récupère les statistiques d'un conteneur."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
logger.info(f"Récupération des statistiques pour le conteneur {container_id}")
|
|
stats = await docker_service.get_container_stats(container_id)
|
|
return stats
|
|
except HTTPException:
|
|
raise
|
|
except docker.errors.NotFound:
|
|
logger.error(f"Conteneur {container_id} non trouvé")
|
|
raise HTTPException(status_code=404, detail=f"Conteneur {container_id} non trouvé")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de la récupération des stats du conteneur {container_id}: {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post("/{container_id}/start")
|
|
async def start_container(container_id: str):
|
|
"""Démarre un conteneur."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
await docker_service.start_container(container_id)
|
|
return {"message": "Conteneur démarré avec succès"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du démarrage du conteneur {container_id} : {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post("/{container_id}/stop")
|
|
async def stop_container(container_id: str):
|
|
"""Arrête un conteneur."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
await docker_service.stop_container(container_id)
|
|
return {"message": "Conteneur arrêté avec succès"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'arrêt du conteneur {container_id} : {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.post("/{container_id}/restart")
|
|
async def restart_container(container_id: str):
|
|
"""Redémarre un conteneur."""
|
|
try:
|
|
validate_container_id(container_id)
|
|
await docker_service.restart_container(container_id)
|
|
return {"message": "Conteneur redémarré avec succès"}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du redémarrage du conteneur {container_id} : {e}")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@router.websocket("/{container_id}/logs/ws")
|
|
async def websocket_logs(websocket: WebSocket, container_id: str):
|
|
"""WebSocket pour les logs en temps réel d'un conteneur."""
|
|
if not container_id or container_id == "undefined":
|
|
logger.error("ID de conteneur invalide ou undefined")
|
|
try:
|
|
await websocket.send_json({"error": True, "message": "ID de conteneur invalide"})
|
|
except:
|
|
pass
|
|
return
|
|
|
|
async with manage_websocket_connection(websocket, container_id):
|
|
try:
|
|
# Créer une tâche pour suivre les logs
|
|
task = asyncio.create_task(handle_container_logs(websocket, container_id))
|
|
active_connections[container_id][websocket] = task
|
|
|
|
# Attendre que la tâche se termine ou que le client se déconnecte
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Tâche de suivi des logs annulée pour {container_id}")
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Client WebSocket déconnecté pour le conteneur {container_id}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Erreur WebSocket pour le conteneur {container_id}: {e}")
|
|
try:
|
|
await websocket.send_json({"error": True, "message": str(e)})
|
|
except:
|
|
pass
|
|
|
|
async def handle_container_logs(websocket: WebSocket, container_id: str):
|
|
"""Gère l'envoi des logs pour un conteneur."""
|
|
try:
|
|
# Buffer pour accumuler les logs
|
|
log_buffer = []
|
|
last_send_time = asyncio.get_event_loop().time()
|
|
BUFFER_SIZE = 100 # Nombre maximum de logs dans le buffer
|
|
FLUSH_INTERVAL = 0.1 # Intervalle minimum entre les envois (100ms)
|
|
|
|
logger.info(f"Démarrage du suivi des logs pour le conteneur {container_id}")
|
|
|
|
async for log in docker_service.follow_container_logs(container_id):
|
|
# Vérifier si la connexion est toujours active
|
|
if container_id not in active_connections or websocket not in active_connections[container_id]:
|
|
logger.info(f"Connexion WebSocket fermée pour le conteneur {container_id}")
|
|
break
|
|
|
|
# Ajouter le log au buffer
|
|
log_buffer.append(log)
|
|
current_time = asyncio.get_event_loop().time()
|
|
|
|
# Envoyer les logs si le buffer est plein ou si assez de temps s'est écoulé
|
|
if len(log_buffer) >= BUFFER_SIZE or (current_time - last_send_time) >= FLUSH_INTERVAL:
|
|
if log_buffer:
|
|
try:
|
|
await websocket.send_json({"logs": log_buffer})
|
|
logger.debug(f"Envoi de {len(log_buffer)} logs pour le conteneur {container_id}")
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Client WebSocket déconnecté pendant l'envoi des logs pour {container_id}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'envoi des logs pour {container_id}: {e}")
|
|
break
|
|
log_buffer = []
|
|
last_send_time = current_time
|
|
|
|
# Envoyer les logs restants
|
|
if log_buffer:
|
|
try:
|
|
await websocket.send_json({"logs": log_buffer})
|
|
logger.debug(f"Envoi des {len(log_buffer)} logs restants pour le conteneur {container_id}")
|
|
except WebSocketDisconnect:
|
|
logger.info(f"Client WebSocket déconnecté pendant l'envoi des logs restants pour {container_id}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors de l'envoi des logs restants pour {container_id}: {e}")
|
|
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Suivi des logs annulé pour le conteneur {container_id}")
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Erreur lors du suivi des logs pour {container_id}: {e}")
|
|
raise |