68 lines
No EOL
2.9 KiB
Python
68 lines
No EOL
2.9 KiB
Python
# backend/services/france_travail_auth_service.py
|
|
import httpx
|
|
import logging
|
|
from core.config import settings
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class FranceTravailAuthService:
|
|
_instance = None
|
|
_token_cache = {} # Cache pour stocker le token
|
|
|
|
def __new__(cls):
|
|
if cls._instance is None:
|
|
cls._instance = super(FranceTravailAuthService, cls).__new__(cls)
|
|
return cls._instance
|
|
|
|
async def get_access_token(self):
|
|
# Vérifiez si le token est encore valide dans le cache
|
|
if self._token_cache and self._token_cache.get("expires_at", 0) > httpx._compat.current_time():
|
|
logger.info("Utilisation du token France Travail depuis le cache.")
|
|
return self._token_cache["access_token"]
|
|
|
|
logger.info("Obtention d'un nouveau token France Travail...")
|
|
token_url = settings.FRANCE_TRAVAIL_TOKEN_URL
|
|
client_id = settings.FRANCE_TRAVAIL_CLIENT_ID
|
|
client_secret = settings.FRANCE_TRAVAIL_CLIENT_SECRET
|
|
scope = "o2dsoffre api_offresdemploiv2" # Assurez-vous que ces scopes sont activés pour votre application
|
|
|
|
data = {
|
|
"grant_type": "client_credentials",
|
|
"client_id": client_id,
|
|
"client_secret": client_secret,
|
|
"scope": scope
|
|
}
|
|
|
|
headers = {
|
|
"Content-Type": "application/x-www-form-urlencoded" # C'est très important !
|
|
}
|
|
|
|
try:
|
|
async with httpx.AsyncClient() as client:
|
|
response = await client.post(token_url, data=data, headers=headers)
|
|
response.raise_for_status() # Lève une exception pour les codes d'erreur HTTP
|
|
|
|
token_data = response.json()
|
|
access_token = token_data.get("access_token")
|
|
expires_in = token_data.get("expires_in") # Durée de validité en secondes
|
|
|
|
if not access_token:
|
|
raise ValueError("Le token d'accès n'a pas été trouvé dans la réponse de France Travail.")
|
|
|
|
# Mettre à jour le cache
|
|
self._token_cache = {
|
|
"access_token": access_token,
|
|
"expires_at": httpx._compat.current_time() + expires_in - 60 # 60 secondes de marge de sécurité
|
|
}
|
|
logger.info("Nouveau token France Travail obtenu et mis en cache.")
|
|
return access_token
|
|
|
|
except httpx.HTTPStatusError as e:
|
|
logger.error(f"Erreur HTTP lors de l'obtention du token France Travail: {e.response.status_code} - {e.response.text}")
|
|
# Re-raise une RuntimeError pour que le service appelant puisse la gérer
|
|
raise RuntimeError(f"Erreur d'authentification France Travail: {e.response.text}")
|
|
except Exception as e:
|
|
logger.error(f"Erreur inattendue lors de l'obtention du token France Travail: {e}")
|
|
raise RuntimeError(f"Erreur inattendue lors de l'obtention du token France Travail: {e}")
|
|
|
|
france_travail_auth_service = FranceTravailAuthService() |