Source code for MONet.auth

from __future__ import annotations

import datetime
import json
import os

import jwt
import requests
from jwt import decode


[docs] def get_token(username: str, password: str) -> str: """ Obtain an authentication token from the KTH IAM OpenID Connect service. Parameters ---------- username : str The username for authentication. password : str The password for authentication. Returns ------- str The authentication token as a string. Raises ------ requests.HTTPError If the request to the authentication service fails. """ url = "https://iam.cloud.cbh.kth.se/realms/cloud/protocol/openid-connect/token" data = {"grant_type": "password", "username": username, "password": password, "client_id": "monailabel-app"} response = requests.post(url, data=data) response.raise_for_status() return response.json()
[docs] def verify_valid_token_exists(username: str) -> bool: """ Checks if a valid authentication token exists for the specified user. This function looks for an authentication token file in the user's home directory, loads the token, and verifies its validity by decoding the JWT token. It also prints the token's expiration time if available. Parameters ---------- username : str The username for which to check the authentication token. Returns ------- bool True if a valid token exists and is not expired, False otherwise. Raises ------ None """ home = os.path.expanduser("~") auth_path = os.path.join(home, ".monet", f"{username}_auth.json") if not os.path.exists(auth_path): return False with open(auth_path, "r") as token_file: token_data = json.load(token_file) token = token_data.get("access_token") if not token: return False try: decode(token, options={"verify_signature": False}) print(f"Token for {username} is valid.") expiration = decode(token, options={"verify_signature": False}).get("exp") if expiration: expires_at = datetime.datetime.fromtimestamp(expiration, tz=datetime.timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") print(f"Token expires at: {expires_at}") # Check if token is expired if expiration and datetime.datetime.now(datetime.timezone.utc).timestamp() > expiration: print("Token is expired.") # Try to refresh the token using refresh_token if available refresh_token = token_data.get("refresh_token") if refresh_token: url = "https://iam.cloud.cbh.kth.se/realms/cloud/protocol/openid-connect/token" data = {"grant_type": "refresh_token", "refresh_token": refresh_token, "client_id": "monailabel-app"} try: response = requests.post(url, data=data) response.raise_for_status() new_token_data = response.json() # Save new token data to file with open(auth_path, "w") as token_file: json.dump(new_token_data, token_file) print("Token refreshed successfully.") return True except requests.RequestException as e: print(f"Failed to refresh token: {e}") return False return True except jwt.ExpiredSignatureError: return False except jwt.DecodeError: return False
[docs] def welcome_message(token: str) -> str: """ Generates a welcome message using the preferred username from a JWT token. Decodes the provided JWT token without verifying its signature, extracts the expiration time, prints when the token expires, and returns a welcome message including the preferred username. Parameters ---------- token : str The JWT token as a string. Returns ------- str A welcome message containing the preferred username from the token, or 'User' if not present. """ decoded_token = decode(token, options={"verify_signature": False}) expiration = decoded_token.get("exp") if expiration: expires_at = datetime.datetime.utcfromtimestamp(expiration).strftime("%Y-%m-%d %H:%M:%S UTC") print(f"Token expires at: {expires_at}") return f"Welcome {decoded_token.get('preferred_username', 'User')}!"