"""OAuth2 token manager for OTLP exporters."""
import threading
import logging
from typing import Optional
from datetime import datetime, timedelta
import requests
logger = logging.getLogger(__name__)
class OAuthTokenManager:
"""Manages OAuth2 token retrieval and automatic refresh."""
def __init__(
self,
client_id: str,
client_secret: str,
token_url: str,
scopes: str,
refresh_buffer: int = 300, # Refresh 5 minutes before expiry
):
self.client_id = client_id
self.client_secret = client_secret
self.token_url = token_url
self.scopes = scopes
self.refresh_buffer = refresh_buffer
self._token: Optional[str] = None
self._token_expiry: Optional[datetime] = None
self._lock = threading.Lock()
self._stop_refresh = threading.Event()
# Get initial token
self._fetch_token()
# Start background refresh
self._refresh_thread = threading.Thread(
target=self._refresh_loop, daemon=True, name="oauth-token-refresh"
)
self._refresh_thread.start()
def _fetch_token(self) -> None:
"""Fetch a new access token from the OAuth2 token endpoint."""
response = requests.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scopes,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=10,
)
response.raise_for_status()
token_data = response.json()
self._token = token_data["access_token"]
expires_in = token_data.get("expires_in", 3600)
self._token_expiry = datetime.now() + timedelta(seconds=expires_in)
logger.info("OAuth token fetched, expires in %d seconds", expires_in)
def _should_refresh(self) -> bool:
if self._token is None or self._token_expiry is None:
return True
return (self._token_expiry - datetime.now()).total_seconds() <= self.refresh_buffer
def _refresh_loop(self) -> None:
while not self._stop_refresh.is_set():
try:
if self._should_refresh():
with self._lock:
if self._should_refresh():
self._fetch_token()
except Exception:
logger.exception("Error refreshing OAuth token")
self._stop_refresh.wait(60)
def get_headers(self) -> dict:
"""Return authorization headers for the OTLP exporter."""
with self._lock:
if self._should_refresh():
self._fetch_token()
return {"Authorization": f"Bearer {self._token}"}
def stop(self) -> None:
"""Stop the background refresh thread."""
self._stop_refresh.set()