from datetime import datetime, timedelta from typing import Optional from jose import JWTError, jwt from passlib.context import CryptContext from cryptography.fernet import Fernet import base64 import hashlib from config import settings pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_password_hash(password: str) -> str: return pwd_context.hash(password) def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str: to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt def decode_access_token(token: str) -> Optional[str]: try: payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) username: str = payload.get("sub") if username is None: return None return username except JWTError: return None def _get_fernet_key() -> bytes: key = hashlib.sha256(settings.SECRET_KEY.encode()).digest() return base64.urlsafe_b64encode(key) def encrypt_api_key(api_key: str) -> str: if not api_key: return "" fernet = Fernet(_get_fernet_key()) encrypted = fernet.encrypt(api_key.encode()) return encrypted.decode() def decrypt_api_key(encrypted_key: str) -> Optional[str]: if not encrypted_key: return None try: fernet = Fernet(_get_fernet_key()) decrypted = fernet.decrypt(encrypted_key.encode()) return decrypted.decode() except Exception: return None