74 lines
No EOL
2.5 KiB
Python
74 lines
No EOL
2.5 KiB
Python
import uuid
|
|
from typing import List, Optional, Dict
|
|
from datetime import datetime, timedelta
|
|
from ..models import Agent, AgentRegistration
|
|
from .docker import DockerService
|
|
|
|
class AgentService:
|
|
def __init__(self):
|
|
self.agents: Dict[str, Agent] = {}
|
|
self.docker_service = DockerService()
|
|
|
|
async def register_agent(self, registration: AgentRegistration) -> Agent:
|
|
"""Enregistre un nouvel agent."""
|
|
agent_id = str(uuid.uuid4())
|
|
agent = Agent(
|
|
id=agent_id,
|
|
name=registration.name,
|
|
hostname=registration.hostname,
|
|
ip_address=registration.ip_address,
|
|
docker_version=registration.docker_version,
|
|
status="active",
|
|
last_seen=datetime.utcnow(),
|
|
containers=[]
|
|
)
|
|
self.agents[agent_id] = agent
|
|
return agent
|
|
|
|
async def update_agent_status(self, agent_id: str) -> Optional[Agent]:
|
|
"""Met à jour le statut d'un agent."""
|
|
if agent_id not in self.agents:
|
|
return None
|
|
|
|
agent = self.agents[agent_id]
|
|
agent.last_seen = datetime.utcnow()
|
|
agent.status = "active"
|
|
return agent
|
|
|
|
async def get_agent(self, agent_id: str) -> Optional[Agent]:
|
|
"""Récupère un agent par son ID."""
|
|
return self.agents.get(agent_id)
|
|
|
|
async def list_agents(self) -> List[Agent]:
|
|
"""Liste tous les agents enregistrés."""
|
|
return list(self.agents.values())
|
|
|
|
async def remove_agent(self, agent_id: str) -> bool:
|
|
"""Supprime un agent."""
|
|
if agent_id in self.agents:
|
|
del self.agents[agent_id]
|
|
return True
|
|
return False
|
|
|
|
async def cleanup_inactive_agents(self, timeout_minutes: int = 5) -> List[str]:
|
|
"""Nettoie les agents inactifs."""
|
|
now = datetime.utcnow()
|
|
inactive_agents = [
|
|
agent_id for agent_id, agent in self.agents.items()
|
|
if (now - agent.last_seen) > timedelta(minutes=timeout_minutes)
|
|
]
|
|
|
|
for agent_id in inactive_agents:
|
|
await self.remove_agent(agent_id)
|
|
|
|
return inactive_agents
|
|
|
|
async def update_agent_containers(self, agent_id: str) -> Optional[Agent]:
|
|
"""Met à jour la liste des conteneurs d'un agent."""
|
|
if agent_id not in self.agents:
|
|
return None
|
|
|
|
agent = self.agents[agent_id]
|
|
agent.containers = await self.docker_service.list_containers()
|
|
agent.last_seen = datetime.utcnow()
|
|
return agent |