Source code for audible.crypto_provider.cryptography_provider

"""Cryptography provider using the cryptography library.

This module implements the crypto protocols using the cryptography library,
which provides modern, Rust-accelerated cryptographic operations.

The cryptography library is the recommended choice for new projects due to:
- Active maintenance and security updates
- Modern API design
- Rust-based performance optimizations
- Comprehensive cryptographic primitive support

This is the preferred provider when available, selected first during auto-detection.
"""

from __future__ import annotations

import logging
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Any


if TYPE_CHECKING:
    from .protocols import HashAlgorithm


# Optional import - only available if cryptography is installed
try:
    from cryptography.exceptions import InvalidKey, UnsupportedAlgorithm
    from cryptography.hazmat.backends import default_backend
    from cryptography.hazmat.primitives import hashes, serialization
    from cryptography.hazmat.primitives import padding as sym_padding
    from cryptography.hazmat.primitives.asymmetric import padding as asym_padding
    from cryptography.hazmat.primitives.asymmetric import rsa
    from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
    from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

    CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
    CRYPTOGRAPHY_AVAILABLE = False


logger = logging.getLogger("audible.crypto_provider.cryptography")


def _load_rsa_private_key_cryptography(pem_data: str) -> Any:
    """Load an RSA private key from PEM using cryptography.

    Note:
        This function does NOT cache keys. Caching is the caller's
        responsibility (e.g., Authenticator._cached_rsa_key for performance).

    Args:
        pem_data: RSA private key in PEM format.

    Returns:
        Parsed RSA private key object.

    Raises:
        ValueError: If PEM data is invalid or not an RSA key.
    """
    try:
        key = serialization.load_pem_private_key(
            pem_data.encode("utf-8"), password=None, backend=default_backend()
        )
        if not isinstance(key, rsa.RSAPrivateKey):
            raise ValueError("Key is not an RSA private key")
        return key
    except (ValueError, TypeError, UnsupportedAlgorithm, InvalidKey) as exc:
        logger.error("Failed to load RSA private key: %s", exc)
        raise ValueError("Failed to load RSA private key") from exc


[docs] class CryptographyAESProvider: """AES provider using cryptography library. Implements AES-CBC encryption/decryption with PKCS7 padding support. Uses Rust-accelerated primitives from the cryptography library. """
[docs] def encrypt( self, key: bytes, iv: bytes, data: str, padding: str = "default" ) -> bytes: """Encrypt data using AES-CBC. Args: key: AES key (16, 24, or 32 bytes for AES-128/192/256). iv: Initialization vector (16 bytes). data: Plaintext string to encrypt. padding: "default" for PKCS7 padding, "none" for no padding. Returns: Encrypted ciphertext as bytes. Raises: ValueError: If key/IV sizes are invalid or padding mode unknown. """ if len(iv) != 16: raise ValueError(f"IV must be 16 bytes, got {len(iv)}") if len(key) not in (16, 24, 32): raise ValueError(f"Key must be 16, 24, or 32 bytes, got {len(key)}") plaintext = data.encode("utf-8") # Apply padding if requested if padding == "default": padder = sym_padding.PKCS7(128).padder() plaintext = padder.update(plaintext) + padder.finalize() elif padding != "none": raise ValueError(f"Unknown padding mode: {padding}") cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) encryptor = cipher.encryptor() ciphertext = encryptor.update(plaintext) + encryptor.finalize() return ciphertext
[docs] def decrypt( self, key: bytes, iv: bytes, encrypted_data: bytes, padding: str = "default" ) -> str: """Decrypt data using AES-CBC. Args: key: AES key (16, 24, or 32 bytes). iv: Initialization vector (16 bytes). encrypted_data: Ciphertext to decrypt. padding: "default" for PKCS7 padding, "none" for no padding. Returns: Decrypted plaintext as string. Raises: ValueError: If key/IV sizes invalid, padding incorrect, or decryption fails. """ if len(iv) != 16: raise ValueError(f"IV must be 16 bytes, got {len(iv)}") if len(key) not in (16, 24, 32): raise ValueError(f"Key must be 16, 24, or 32 bytes, got {len(key)}") cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor() plaintext = decryptor.update(encrypted_data) + decryptor.finalize() # Remove padding if requested if padding == "default": try: unpadder = sym_padding.PKCS7(128).unpadder() plaintext = unpadder.update(plaintext) + unpadder.finalize() except ValueError as e: msg = ( "Invalid PKCS7 padding - possible decryption key/IV mismatch or " "corrupted ciphertext" ) raise ValueError(msg) from e elif padding != "none": raise ValueError(f"Unknown padding mode: {padding}") # Decode decrypted bytes to string try: return plaintext.decode("utf-8") except UnicodeDecodeError as e: msg = ( "Failed to decode decrypted data as UTF-8 - possible decryption " "key/IV mismatch or corrupted ciphertext" ) raise ValueError(msg) from e
[docs] class CryptographyPBKDF2Provider: """PBKDF2 provider using cryptography library. Implements PBKDF2 key derivation with configurable hash algorithms. Uses Rust-accelerated KDF from the cryptography library. """
[docs] def derive_key( self, password: str, salt: bytes, iterations: int, key_size: int, hashmod: Callable[..., HashAlgorithm], ) -> bytes: """Derive a key from password using PBKDF2. Args: password: Password string. salt: Random salt bytes. iterations: Number of PBKDF2 iterations (1-65535). key_size: Desired key length in bytes. hashmod: Hash function factory (e.g., hashlib.sha256). Returns: Derived key bytes. Raises: ValueError: If iterations out of range or parameters invalid. """ if not (1 <= iterations <= 65535): raise ValueError(f"Iterations must be 1-65535, got {iterations}") # Map hashlib hash functions to cryptography hash algorithms hash_name = hashmod().name hash_algo_map = { "sha256": hashes.SHA256(), "sha1": hashes.SHA1(), # noqa: S303 - legacy compatibility "sha224": hashes.SHA224(), "sha384": hashes.SHA384(), "sha512": hashes.SHA512(), "md5": hashes.MD5(), # noqa: S303 - legacy compatibility } hash_algorithm = hash_algo_map.get(hash_name) if hash_algorithm is None: raise ValueError(f"Unsupported hash algorithm: {hash_name}") kdf = PBKDF2HMAC( algorithm=hash_algorithm, length=key_size, salt=salt, iterations=iterations, backend=default_backend(), ) return kdf.derive(password.encode("utf-8"))
[docs] class CryptographyRSAProvider: """RSA provider using cryptography library. Implements RSA private key loading and PKCS#1 v1.5 signing with SHA-256. """
[docs] def load_private_key(self, pem_data: str) -> Any: """Load an RSA private key from PEM format. Args: pem_data: RSA private key in PEM format. Returns: Parsed RSA private key object. """ return _load_rsa_private_key_cryptography(pem_data)
[docs] def sign(self, key: Any, data: bytes, algorithm: str = "SHA-256") -> bytes: """Sign data with RSA private key using PKCS#1 v1.5. Args: key: RSA private key from load_private_key(). data: Data to sign. algorithm: Hash algorithm, only "SHA-256" supported (Audible API requirement). Returns: Signature bytes. Raises: ValueError: If algorithm is not "SHA-256" or key is invalid. """ if algorithm != "SHA-256": raise ValueError(f"Only SHA-256 supported, got {algorithm}") if not isinstance(key, rsa.RSAPrivateKey): raise ValueError("Key must be an RSA private key") signature = key.sign(data, asym_padding.PKCS1v15(), hashes.SHA256()) return signature
[docs] class CryptographyHashProvider: """Hash provider using cryptography library. Implements SHA-256 and SHA-1 hash functions. Uses Rust-accelerated hash primitives. """ _sha1_warning_shown = False
[docs] def sha256(self, data: bytes) -> bytes: """Compute SHA-256 hash. Args: data: Data to hash. Returns: SHA-256 digest bytes (32 bytes). """ digest = hashes.Hash(hashes.SHA256(), backend=default_backend()) digest.update(data) return digest.finalize()
[docs] def sha1(self, data: bytes) -> bytes: """Compute SHA-1 hash. Args: data: Data to hash. Returns: SHA-1 digest bytes (20 bytes). Note: SHA-1 is cryptographically broken. Only use for legacy compatibility. """ if not self.__class__._sha1_warning_shown: warnings.warn( "SHA-1 is deprecated and should only be used for legacy compatibility", DeprecationWarning, stacklevel=2, ) self.__class__._sha1_warning_shown = True digest = hashes.Hash(hashes.SHA1(), backend=default_backend()) # noqa: S303 - legacy compatibility digest.update(data) return digest.finalize()
[docs] class CryptographyProvider: """Unified cryptography crypto provider. This provider implements all cryptographic operations using the cryptography library, which provides modern, Rust-accelerated implementations. Performance characteristics: - Comparable or better than pycryptodome - Modern API and active maintenance - Preferred choice for new installations Raises: ImportError: If cryptography library is not installed. Example: >>> from audible.crypto_provider import get_crypto_providers, CryptographyProvider # doctest: +SKIP >>> providers = get_crypto_providers(CryptographyProvider) # doctest: +SKIP >>> providers.provider_name # doctest: +SKIP 'cryptography' """ def __init__(self) -> None: if not CRYPTOGRAPHY_AVAILABLE: raise ImportError( # doctest: +SKIP "cryptography is not installed. Install with: pip install " "audible[cryptography] (or audible[cryptography,pycryptodome] for full coverage)." # doctest: +SKIP ) self._aes = CryptographyAESProvider() self._pbkdf2 = CryptographyPBKDF2Provider() self._rsa = CryptographyRSAProvider() self._hash = CryptographyHashProvider() @property def aes(self) -> CryptographyAESProvider: """Get the AES provider.""" return self._aes @property def pbkdf2(self) -> CryptographyPBKDF2Provider: """Get the PBKDF2 provider.""" return self._pbkdf2 @property def rsa(self) -> CryptographyRSAProvider: """Get the RSA provider.""" return self._rsa @property def hash(self) -> CryptographyHashProvider: """Get the hash provider.""" return self._hash @property def provider_name(self) -> str: """Get provider name.""" return "cryptography"
__all__ = [ "CRYPTOGRAPHY_AVAILABLE", "CryptographyAESProvider", "CryptographyHashProvider", "CryptographyPBKDF2Provider", "CryptographyProvider", "CryptographyRSAProvider", ]