etoile_polaire/server/app/services/docker_service.py
2025-04-01 16:33:54 +02:00

77 lines
No EOL
3.3 KiB
Python

import docker
from typing import List, Dict, Any
from datetime import datetime
from ..models.container import Container, ContainerLog, ContainerStats
class DockerService:
def __init__(self):
self.client = docker.from_env()
async def list_containers(self) -> List[Container]:
"""Liste tous les conteneurs Docker."""
containers = self.client.containers.list(all=True)
return [
Container(
id=container.id,
name=container.name,
image=container.image.tags[0] if container.image.tags else container.image.id,
status=container.status,
ports=container.ports,
created_at=datetime.fromtimestamp(container.attrs['Created']),
updated_at=datetime.now()
)
for container in containers
]
async def get_container_logs(self, container_id: str) -> List[ContainerLog]:
"""Récupère les logs d'un conteneur."""
container = self.client.containers.get(container_id)
logs = container.logs(tail=100, timestamps=True).decode('utf-8').split('\n')
return [
ContainerLog(
timestamp=datetime.fromisoformat(line.split(' ')[0].replace('T', ' ').replace('Z', '')),
stream=line.split(' ')[1].strip('[]'),
message=' '.join(line.split(' ')[2:])
)
for line in logs if line
]
async def update_container(self, container_id: str) -> bool:
"""Met à jour un conteneur avec la dernière version de son image."""
try:
container = self.client.containers.get(container_id)
image = container.image
container.stop()
container.remove()
new_container = self.client.containers.run(
image=image.id,
name=container.name,
ports=container.ports,
environment=container.attrs['Config']['Env'],
restart_policy={"Name": container.attrs['HostConfig']['RestartPolicy']['Name']}
)
return True
except Exception as e:
print(f"Erreur lors de la mise à jour du conteneur : {e}")
return False
async def get_container_stats(self, container_id: str) -> ContainerStats:
"""Récupère les statistiques d'un conteneur."""
container = self.client.containers.get(container_id)
stats = container.stats(stream=False)
# Calcul du pourcentage CPU
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
cpu_percent = (cpu_delta / system_delta) * 100.0 if system_delta > 0 else 0.0
return ContainerStats(
cpu_percent=cpu_percent,
memory_usage=stats['memory_stats']['usage'],
memory_limit=stats['memory_stats']['limit'],
network_rx=stats['networks']['eth0']['rx_bytes'],
network_tx=stats['networks']['eth0']['tx_bytes'],
block_read=stats['blkio_stats']['io_service_bytes_recursive'][0]['value'],
block_write=stats['blkio_stats']['io_service_bytes_recursive'][1]['value'],
timestamp=datetime.now()
)