etoile_polaire/server/app/services/agent_service.py
2025-04-01 16:33:54 +02:00

75 lines
No EOL
2.4 KiB
Python

import uuid
from typing import List, Optional
from datetime import datetime
from ..models.agent import Agent, AgentCreate, AgentUpdate, AgentStatus
class AgentService:
def __init__(self):
# TODO: Remplacer par une vraie base de données
self.agents: List[Agent] = []
async def list_agents(self) -> List[Agent]:
"""Liste tous les agents."""
return self.agents
async def create_agent(self, agent: AgentCreate) -> Agent:
"""Crée un nouvel agent."""
new_agent = Agent(
id=str(uuid.uuid4()),
name=agent.name,
host=agent.host,
port=agent.port,
token=agent.token,
status="offline",
version="1.0.0",
last_seen=datetime.now(),
created_at=datetime.now(),
updated_at=datetime.now()
)
self.agents.append(new_agent)
return new_agent
async def get_agent(self, agent_id: str) -> Optional[Agent]:
"""Récupère un agent par son ID."""
return next((agent for agent in self.agents if agent.id == agent_id), None)
async def update_agent(self, agent_id: str, agent_update: AgentUpdate) -> Optional[Agent]:
"""Met à jour un agent."""
agent = await self.get_agent(agent_id)
if not agent:
return None
update_data = agent_update.dict(exclude_unset=True)
for key, value in update_data.items():
setattr(agent, key, value)
agent.updated_at = datetime.now()
return agent
async def delete_agent(self, agent_id: str) -> bool:
"""Supprime un agent."""
agent = await self.get_agent(agent_id)
if not agent:
return False
self.agents.remove(agent)
return True
async def get_agent_status(self, agent_id: str) -> Optional[AgentStatus]:
"""Récupère le statut d'un agent."""
agent = await self.get_agent(agent_id)
if not agent:
return None
# TODO: Implémenter la vérification réelle du statut
return AgentStatus(
status=agent.status,
version=agent.version,
last_seen=agent.last_seen,
containers_count=0,
system_info={
"os": "Linux",
"arch": "x86_64",
"docker_version": "20.10.0"
}
)