Source code for audible.crypto_provider.pycryptodome_provider

"""Pycryptodome crypto provider using native C extensions.

This module implements the crypto protocols using the pycryptodome library,
which provides high-performance cryptographic operations via C extensions.

This is significantly faster than the pure Python implementations but requires
pycryptodome to be installed as an optional dependency.
"""

from __future__ import annotations

import logging
import warnings
from collections.abc import Callable
from typing import TYPE_CHECKING, Any


if TYPE_CHECKING:
    from .protocols import HashAlgorithm


# Optional import - only available if pycryptodome is installed
try:
    from Crypto.Cipher import AES
    from Crypto.Hash import (
        MD5,
        SHA1,
        SHA224,
        SHA256,
        SHA384,
        SHA512,
    )
    from Crypto.Protocol.KDF import PBKDF2
    from Crypto.PublicKey import RSA
    from Crypto.Signature import pkcs1_15
    from Crypto.Util.Padding import pad, unpad

    PYCRYPTODOME_AVAILABLE = True
except ImportError:
    PYCRYPTODOME_AVAILABLE = False


logger = logging.getLogger("audible.crypto_provider.pycryptodome")


def _load_rsa_private_key_pycryptodome(pem_data: str) -> Any:
    """Load an RSA private key from PEM using pycryptodome.

    Note:
        This function does NOT cache keys. Caching is the caller's
        responsibility (e.g., Authenticator._cached_rsa_key for performance).

    Args:
        pem_data: RSA private key in PEM format.

    Returns:
        A parsed RSA.RsaKey object.
    """
    logger.debug("Loading RSA private key (pycryptodome provider)")
    return RSA.import_key(pem_data.encode("utf-8"))


[docs] class PycryptodomeAESProvider: """High-performance AES-CBC provider using pycryptodome. This implementation uses native C extensions from pycryptodome for significantly faster AES encryption/decryption compared to pure Python. """
[docs] def encrypt( self, key: bytes, iv: bytes, data: str, padding: str = "default" ) -> bytes: """Encrypt data using AES-CBC with pycryptodome. Args: key: The AES encryption key. iv: The initialization vector. data: The plaintext data to encrypt. padding: Padding mode ("default" for PKCS7, "none" for no padding). Returns: The encrypted ciphertext. Raises: ValueError: If key/IV sizes are invalid or padding mode unknown. """ if len(iv) != AES.block_size: raise ValueError(f"IV must be {AES.block_size} bytes, got {len(iv)}") if len(key) not in (16, 24, 32): raise ValueError(f"Key must be 16, 24, or 32 bytes, got {len(key)}") cipher = AES.new(key, AES.MODE_CBC, iv) data_bytes = data.encode("utf-8") if padding == "default": # Apply PKCS7 padding using pycryptodome's built-in function data_bytes = pad(data_bytes, AES.block_size, style="pkcs7") elif padding != "none": raise ValueError(f"Unknown padding mode: {padding}") return cipher.encrypt(data_bytes)
[docs] def decrypt( self, key: bytes, iv: bytes, encrypted_data: bytes, padding: str = "default" ) -> str: """Decrypt data using AES-CBC with pycryptodome. Args: key: The AES decryption key. iv: The initialization vector. encrypted_data: The ciphertext to decrypt. padding: Padding mode ("default" for PKCS7, "none" for no padding). Returns: The decrypted plaintext as a string. Raises: ValueError: If key/IV sizes invalid, padding mode unknown, PKCS7 padding invalid, or UTF-8 decoding fails (wrong key/IV or corrupted data). """ if len(iv) != AES.block_size: raise ValueError(f"IV must be {AES.block_size} bytes, got {len(iv)}") if len(key) not in (16, 24, 32): raise ValueError(f"Key must be 16, 24, or 32 bytes, got {len(key)}") cipher = AES.new(key, AES.MODE_CBC, iv) decrypted = cipher.decrypt(encrypted_data) if padding == "default": # Remove PKCS7 padding using pycryptodome's built-in function try: decrypted = unpad(decrypted, AES.block_size, style="pkcs7") except ValueError as e: msg = ( "Invalid PKCS7 padding - possible decryption key/IV mismatch or " "corrupted ciphertext" ) raise ValueError(msg) from e elif padding != "none": raise ValueError(f"Unknown padding mode: {padding}") # Decode decrypted bytes to string try: return decrypted.decode("utf-8") except UnicodeDecodeError as e: msg = ( "Failed to decode decrypted data as UTF-8 - possible decryption " "key/IV mismatch or corrupted ciphertext" ) raise ValueError(msg) from e
[docs] class PycryptodomePBKDF2Provider: """High-performance PBKDF2 provider using pycryptodome. This implementation uses native C extensions from pycryptodome for faster key derivation compared to pure Python. """
[docs] def derive_key( self, password: str, salt: bytes, iterations: int, key_size: int, hashmod: Callable[..., HashAlgorithm], ) -> bytes: """Derive a key using PBKDF2 with pycryptodome. Args: password: The password to derive from. salt: Random salt for derivation. iterations: Number of iterations (clamped to 65535 max). key_size: Desired key size in bytes. hashmod: Hash function factory. Returns: The derived key. Raises: ValueError: If hashmod is invalid or unsupported. """ # Map hashmod callable to hash algorithm name try: hash_obj = hashmod() hash_name = hash_obj.name except (AttributeError, TypeError) as e: msg = "Invalid hashmod: must be a callable returning a hash object with 'name' attribute" raise ValueError(msg) from e # Map hash algorithm names to pycryptodome hash modules (static references) hash_mapping = { "sha256": SHA256, "sha1": SHA1, "sha512": SHA512, "sha224": SHA224, "sha384": SHA384, "md5": MD5, } if hash_name not in hash_mapping: msg = f"Unsupported hash algorithm: {hash_name}" raise ValueError(msg) # Get the hash module directly from the mapping pycrypto_hash = hash_mapping[hash_name] return PBKDF2( password, salt, key_size, count=min(iterations, 65535), hmac_hash_module=pycrypto_hash, )
[docs] class PycryptodomeRSAProvider: """High-performance RSA provider using pycryptodome. This implementation uses native C extensions from pycryptodome for faster RSA operations. """
[docs] def load_private_key(self, pem_data: str) -> Any: """Load an RSA private key from PEM. Note: Caching is the caller's responsibility for optimal performance. Args: pem_data: RSA private key in PEM format. Returns: A parsed RSA.RsaKey object. """ return _load_rsa_private_key_pycryptodome(pem_data)
[docs] def sign(self, key: Any, data: bytes, algorithm: str = "SHA-256") -> bytes: """Sign data with an RSA private key using PKCS#1 v1.5. Args: key: A parsed RSA.RsaKey object. data: The data to sign. algorithm: The hash algorithm. Only "SHA-256" is supported. Returns: The signature bytes. Raises: TypeError: If key is not a valid RSA key object. ValueError: If algorithm is not "SHA-256". """ # Validate key type - check for RSA key attributes if not hasattr(key, "n") or not hasattr(key, "e"): raise TypeError( f"Expected RSA.RsaKey object with required attributes, " f"got {type(key).__name__}" ) # Create hash object if algorithm == "SHA-256": hash_obj = SHA256.new(data) else: raise ValueError(f"Unsupported algorithm: {algorithm}") # Sign using PKCS#1 v1.5 return pkcs1_15.new(key).sign(hash_obj)
[docs] class PycryptodomeHashProvider: """High-performance hash provider using pycryptodome. This implementation uses native C extensions from pycryptodome for faster hashing operations. """ _sha1_warning_shown = False
[docs] def sha256(self, data: bytes) -> bytes: """Compute SHA-256 digest using pycryptodome. Args: data: The data to hash. Returns: The SHA-256 digest. """ return SHA256.new(data).digest()
[docs] def sha1(self, data: bytes) -> bytes: """Compute SHA-1 digest using pycryptodome. Args: data: The data to hash. Returns: The SHA-1 digest. Note: SHA-1 is cryptographically broken. Use only for legacy compatibility. """ if not self.__class__._sha1_warning_shown: warnings.warn( "SHA-1 is deprecated and should only be used for legacy compatibility", DeprecationWarning, stacklevel=2, ) self.__class__._sha1_warning_shown = True return SHA1.new(data).digest()
[docs] class PycryptodomeProvider: """Unified pycryptodome crypto provider. This provider implements all cryptographic operations using the pycryptodome library, which provides high-performance C-based implementations. Performance characteristics: - 5-10x faster AES operations vs pure Python - 10-20x faster RSA operations vs pure Python - 3-5x faster PBKDF2 key derivation vs pure Python - 5-10x faster hashing operations vs pure Python Raises: ImportError: If pycryptodome is not installed. Example: >>> from audible.crypto_provider import get_crypto_providers, PycryptodomeProvider # doctest: +SKIP >>> providers = get_crypto_providers(PycryptodomeProvider) # doctest: +SKIP >>> providers.provider_name # doctest: +SKIP 'pycryptodome' """ def __init__(self) -> None: if not PYCRYPTODOME_AVAILABLE: raise ImportError( "pycryptodome is not installed. Install with: pip install " "audible[pycryptodome] (or audible[cryptography,pycryptodome] for full coverage)." ) self._aes = PycryptodomeAESProvider() self._pbkdf2 = PycryptodomePBKDF2Provider() self._rsa = PycryptodomeRSAProvider() self._hash = PycryptodomeHashProvider() @property def aes(self) -> PycryptodomeAESProvider: """Get the AES provider.""" return self._aes @property def pbkdf2(self) -> PycryptodomePBKDF2Provider: """Get the PBKDF2 provider.""" return self._pbkdf2 @property def rsa(self) -> PycryptodomeRSAProvider: """Get the RSA provider.""" return self._rsa @property def hash(self) -> PycryptodomeHashProvider: """Get the hash provider.""" return self._hash @property def provider_name(self) -> str: """Get provider name.""" return "pycryptodome"
__all__ = [ "PYCRYPTODOME_AVAILABLE", "PycryptodomeAESProvider", "PycryptodomeHashProvider", "PycryptodomePBKDF2Provider", "PycryptodomeProvider", "PycryptodomeRSAProvider", ]