Source code for audible.auth

from __future__ import annotations

import base64
import logging
from collections.abc import Callable, Generator, Iterator
from datetime import datetime, timedelta, timezone
from typing import (
    TYPE_CHECKING,
    Any,
    Literal,
    cast,
    overload,
)

import httpx
from httpx import Cookies

from .activation_bytes import get_activation_bytes as get_ab
from .aescipher import AESCipher, detect_file_encryption
from .crypto_provider import get_crypto_providers
from .exceptions import AuthFlowError, FileEncryptionError, NoRefreshToken
from .json_provider import get_json_provider
from .login import external_login, login
from .register import deregister as deregister_
from .register import register as register_
from .utils import test_convert


if TYPE_CHECKING:
    import pathlib

    from ._types import TrueFalseT
    from .crypto_provider.protocols import CryptoProvider
    from .localization import Locale


logger = logging.getLogger("audible.auth")


[docs] def refresh_access_token( refresh_token: str, domain: str, with_username: bool = False ) -> dict[str, Any]: """Refreshes an access token. Args: refresh_token: The refresh token obtained after a device registration. domain: The top level domain of the requested Amazon server (e.g. com). with_username: If ``True`` uses `audible` domain instead of `amazon`. Returns: A dict with the new access token and expiration timestamp. Note: The new access token is valid for 60 minutes. .. versionadded:: v0.8 The with_username argument """ body = { "app_name": "Audible", "app_version": "3.56.2", "source_token": refresh_token, "requested_token_type": "access_token", "source_token_type": "refresh_token", } target_domain = "audible" if with_username else "amazon" resp = httpx.post(f"https://api.{target_domain}.{domain}/auth/token", data=body) resp.raise_for_status() resp_dict = resp.json() expires_in_sec = int(resp_dict["expires_in"]) expires = ( datetime.now(timezone.utc) + timedelta(seconds=expires_in_sec) ).timestamp() return {"access_token": resp_dict["access_token"], "expires": expires}
[docs] def refresh_website_cookies( refresh_token: str, domain: str, cookies_domain: str, with_username: bool = False ) -> dict[str, str]: """Fetches website cookies for a specific domain. Args: refresh_token: The refresh token obtained after a device registration. domain: The top level domain of the requested Amazon server (e.g. com, de, fr). cookies_domain: The top level domain scope for the cookies (e.g. com, de, fr). with_username: If ``True`` uses `audible` domain instead of `amazon`. Returns: The requested cookies for the Amazon and Audible website for the given `cookies_domain` scope. .. versionadded:: v0.8 The with_username argument """ target_domain = "audible" if with_username else "amazon" url = f"https://www.{target_domain}.{domain}/ap/exchangetoken/cookies" body = { "app_name": "Audible", "app_version": "3.56.2", "source_token": refresh_token, "requested_token_type": "auth_cookies", "source_token_type": "refresh_token", "domain": f".{target_domain}.{cookies_domain}", } resp = httpx.post(url, data=body) resp.raise_for_status() resp_dict = resp.json() raw_cookies = resp_dict["response"]["tokens"]["cookies"] website_cookies = {} for domain_cookies in raw_cookies: for cookie in raw_cookies[domain_cookies]: website_cookies[cookie["Name"]] = cookie["Value"].replace(r'"', r"") return website_cookies
[docs] def user_profile(access_token: str, domain: str) -> dict[str, Any]: """Returns user profile from Amazon. Args: access_token: The valid access token for authentication. domain: The top level domain of the requested Amazon server (e.g. com, de, fr). Returns: The Amazon user profile for the authenticated user. Raises: Exception: If the user profile is malformed """ headers = {"Authorization": f"Bearer {access_token}"} resp = httpx.get(f"https://api.amazon.{domain}/user/profile", headers=headers) resp.raise_for_status() profile = resp.json() if not isinstance(profile, dict) and "user_id" not in profile: raise Exception("Malformed user profile response.") return profile
[docs] def user_profile_audible(access_token: str, domain: str) -> dict[str, Any]: """Returns user profile from Audible. Args: access_token: The valid access token for authentication. domain: The top level domain of the requested Audible server (e.g. com, de, fr). Returns: The Audible user profile for the authenticated user. Raises: Exception: If the user profile is malformed """ headers = {"Authorization": f"Bearer {access_token}"} resp = httpx.get(f"https://api.audible.{domain}/user/profile", headers=headers) resp.raise_for_status() profile = resp.json() if not isinstance(profile, dict) and "user_id" not in profile: raise Exception("Malformed user profile response.") return profile
[docs] def sign_request( method: str, path: str, body: bytes, adp_token: str, private_key: str, cached_key: Any = None, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, ) -> dict[str, str]: """Helper function who creates signed headers for http requests. Args: method: The http request method (GET, POST, DELETE, ...). path: The requested http url path and query. body: The http message body. adp_token: The adp token obtained after a device registration. private_key: The rsa key obtained after device registration. cached_key: Optional pre-loaded RSA key object for performance. If provided, private_key parsing is skipped. crypto_provider: Optional provider override (class or instance). Returns: A dict with the signed headers. """ date = datetime.now(timezone.utc).isoformat("T") + "Z" str_body = body.decode("utf-8") data = f"{method}\n{path}\n{date}\n{str_body}\n{adp_token}" providers = get_crypto_providers(crypto_provider) # Use cached key if provided, otherwise load from PEM if cached_key is None: key = providers.rsa.load_private_key(private_key) else: key = cached_key cipher = providers.rsa.sign(key, data.encode(), "SHA-256") signed_encoded = base64.b64encode(cipher) signature = f"{signed_encoded.decode()}:{date}" return { "x-adp-token": adp_token, "x-adp-alg": "SHA256withRSA:1.0", "x-adp-signature": signature, }
[docs] class Authenticator(httpx.Auth): """Audible Authenticator class. Note: A new class instance have to be instantiated with :meth:`Authenticator.from_login` or :meth:`Authenticator.from_file`. .. versionadded:: v0.8 The with_username attribute. Note: Auth data saved with v0.8 or later can not be loaded with versions less than v0.8! If an auth file for a pre-Amazon account (with_username=True) was created with v0.7.1 or v0.7.2 set `auth.with_username` to `True` and save the data again. After this deregistration, refreshing access tokens and requesting cookies for another domain will work for pre-Amazon accounts. Thread Safety: Authenticator instances are **not thread-safe** and should not be shared across threads. The internal state (access tokens, RSA key cache) can be modified during request signing and token refresh operations, leading to race conditions. For multi-threaded applications, create separate Authenticator instances per thread, or use appropriate locking mechanisms around Authenticator usage. Note: The ``device_private_key`` attribute is monitored for changes. When modified, the cached RSA key is automatically invalidated to ensure the correct key is used for signing. The new key will be loaded and cached on the next signing operation. """ access_token: str | None = None activation_bytes: str | None = None adp_token: str | None = None crypter: AESCipher | None = None customer_info: dict[str, Any] | None = None device_info: dict[str, Any] | None = None device_private_key: str | None = None encryption: str | bool | None = None expires: float | None = None filename: pathlib.Path | None = None locale: Locale | None = None refresh_token: str | None = None store_authentication_cookie: dict[str, Any] | None = None website_cookies: dict[str, Any] | None = None with_username: bool | None = False requires_request_body: bool = True _forbid_new_attrs: bool = True _apply_test_convert: bool = True _cached_rsa_key: Any = None _crypto_provider: CryptoProvider | None = None def __init__( self, *, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, ) -> None: super().__init__() if crypto_provider is not None: self._crypto_provider = get_crypto_providers(crypto_provider) def __setattr__(self, attr: str, value: Any) -> None: if self._forbid_new_attrs and not hasattr(self, attr): msg = f"{self.__class__.__name__} is frozen, can't add attribute: {attr}." logger.error(msg) raise AttributeError(msg) if self._apply_test_convert: value = test_convert(attr, value) # Invalidate RSA key cache if device_private_key changes if attr == "device_private_key" and hasattr(self, "_cached_rsa_key"): object.__setattr__(self, "_cached_rsa_key", None) logger.debug("Invalidated cached RSA key due to device_private_key change") object.__setattr__(self, attr, value) def __iter__(self) -> Iterator[str]: for i in self.__dict__: if self.__dict__[i] is not None and not i.startswith("_"): yield i def __len__(self) -> int: return len(list(iter(self))) def __repr__(self) -> str: return f"{type(self).__name__}({self.__dict__})" def _update_attrs(self, **kwargs: Any) -> None: for attr, value in kwargs.items(): setattr(self, attr, value) def _get_crypto(self) -> CryptoProvider: """Return the configured crypto provider for this authenticator.""" if self._crypto_provider is not None: return self._crypto_provider self._crypto_provider = get_crypto_providers() return self._crypto_provider
[docs] @classmethod def from_dict( cls, data: dict[str, Any], locale: str | Locale | None = None, *, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, ) -> Authenticator: """Instantiate an Authenticator from authentication file. .. versionadded:: v0.7.1 Args: data: A dictionary with the authentication data locale: The country code of the Audible marketplace to interact with. If ``None`` the country code from file is used. crypto_provider: Optional provider override (class or instance). Returns: A new Authenticator instance. """ auth = cls(crypto_provider=crypto_provider) locale_code: str | None = data.pop("locale_code", None) auth.locale = cast("Locale", locale or locale_code) if "login_cookies" in data: auth.website_cookies = data.pop("login_cookies") auth._update_attrs(**data) logger.info("load data from dictionary for locale %s", auth.locale.country_code) return auth
[docs] @classmethod def from_file( cls, filename: str | pathlib.Path, password: str | None = None, locale: str | Locale | None = None, encryption: bool | str | None = None, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, **kwargs: Any, ) -> Authenticator: """Instantiate an Authenticator from authentication file. .. versionadded:: v0.5.0 Args: filename: The name of the file with the authentication data. password: The password of the authentication file. locale: The country code of the Audible marketplace to interact with. If ``None`` the country code from file is used. encryption: The encryption style to use. Can be ``json`` or ``bytes``. If ``None``, encryption will be auto detected. crypto_provider: Optional provider override (class or instance). **kwargs: Keyword arguments are passed to the :class:`~audible.aescipher.AESCipher` class. See below. Keyword Arguments: key_size (int, Optional): salt_marker (Optional[bytes]): kdf_iterations (:obj:`int`, optional): hashmod: mac: Returns: A new Authenticator instance. Raises: FileEncryptionError: If file ist encrypted without providing a password """ auth = cls(crypto_provider=crypto_provider) auth.filename = cast("pathlib.Path", filename) auth.encryption = encryption or detect_file_encryption(auth.filename) if isinstance(auth.encryption, str): if password is None: message = "File is encrypted but no password provided." logger.critical(message) raise FileEncryptionError(message) cipher_kwargs = dict(kwargs) if "crypto_provider" not in cipher_kwargs: cipher_kwargs["crypto_provider"] = auth._get_crypto() auth.crypter = AESCipher(password, **cipher_kwargs) file_data = auth.crypter.from_file(auth.filename, auth.encryption) else: file_data = auth.filename.read_text() json_data = get_json_provider().loads(file_data) locale_code = json_data.pop("locale_code", None) locale = locale or locale_code auth.locale = cast("Locale", locale) # login cookies where renamed to website cookies # old names must be adjusted if "login_cookies" in json_data: auth.website_cookies = json_data.pop("login_cookies") auth._update_attrs(**json_data) logger.info( "load data from file %s for locale %s", auth.filename, auth.locale.country_code, ) return auth
[docs] @classmethod def from_login( cls, username: str, password: str, locale: str | Locale, serial: str | None = None, with_username: bool = False, captcha_callback: Callable[[str], str] | None = None, otp_callback: Callable[[], str] | None = None, cvf_callback: Callable[[], str] | None = None, approval_callback: Callable[[], Any] | None = None, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, ) -> Authenticator: """Instantiate a new Authenticator with authentication data from login. .. versionadded:: v0.5.0 .. versionadded:: v0.5.4 The serial argument The with_username argument Args: username: The Amazon email address. password: The Amazon password. locale: The ``country_code`` or :class:`audible.localization.Locale` instance for the marketplace to login. serial: The device serial. If ``None`` a custom one will be created. with_username: If ``True`` login with Audible username instead of Amazon account. captcha_callback: A custom callback to handle captcha requests during login. otp_callback: A custom callback to handle one-time password requests during login. cvf_callback: A custom callback to handle verify code requests during login. approval_callback: A custom Callable for handling approval alerts. crypto_provider: Optional provider override (class or instance). Returns: Authenticator: New authenticator populated with registration data. """ auth = cls(crypto_provider=crypto_provider) auth.locale = cast("Locale", locale) login_device = login( username=username, password=password, country_code=auth.locale.country_code, domain=auth.locale.domain, market_place_id=auth.locale.market_place_id, serial=serial, with_username=with_username, captcha_callback=captcha_callback, otp_callback=otp_callback, cvf_callback=cvf_callback, approval_callback=approval_callback, ) logger.info("logged in to Audible as %s", username) register_device = register_(with_username=with_username, **login_device) auth._update_attrs(with_username=with_username, **register_device) logger.info("registered Audible device") return auth
[docs] @classmethod def from_login_external( cls, locale: str | Locale, serial: str | None = None, with_username: bool = False, login_url_callback: Callable[[str], str] | None = None, crypto_provider: CryptoProvider | type[CryptoProvider] | None = None, ) -> Authenticator: """Instantiate a new Authenticator from login with external browser. .. versionadded:: v0.5.1 .. versionadded:: v0.5.4 The serial argument The with_username argument Args: locale: The ``country_code`` or :class:`audible.localization.Locale` instance for the marketplace to login. serial: The device serial. If ``None`` a custom one will be created. with_username: If ``True`` login with Audible username instead of Amazon account. login_url_callback: A custom Callable for handling login with external browsers. crypto_provider: Optional provider override (class or instance). Returns: Authenticator: New authenticator populated with registration data. """ auth = cls(crypto_provider=crypto_provider) auth.locale = cast("Locale", locale) login_device = external_login( country_code=auth.locale.country_code, domain=auth.locale.domain, market_place_id=auth.locale.market_place_id, serial=serial, with_username=with_username, login_url_callback=login_url_callback, ) logger.info("logged in to Audible.") register_device = register_(with_username=with_username, **login_device) auth._update_attrs(with_username=with_username, **register_device) logger.info("registered Audible device") return auth
[docs] def auth_flow( self, request: httpx.Request ) -> Generator[httpx.Request, httpx.Response, None]: """Auth flow to be executed on every request by :mod:`httpx`. Args: request: The request made by ``httpx``. Yields: httpx.Request: The authenticated request with signed headers. Raises: AuthFlowError: If no auth flow is available. """ available_modes = self.available_auth_modes if "signing" in available_modes: self._apply_signing_auth_flow(request) elif "bearer" in available_modes: self._apply_bearer_auth_flow(request) else: message = "signing or bearer auth flow are not available." logger.critical(message) raise AuthFlowError(message) yield request
def _apply_signing_auth_flow(self, request: httpx.Request) -> None: if self.adp_token is None or self.device_private_key is None: raise Exception("No signing data found.") providers = self._get_crypto() # Load and cache RSA key on first use for performance if self._cached_rsa_key is None: self._cached_rsa_key = providers.rsa.load_private_key( self.device_private_key ) logger.debug("Loaded and cached RSA private key") headers = sign_request( method=request.method, path=request.url.raw_path.decode(), body=request.content, adp_token=self.adp_token, private_key=self.device_private_key, cached_key=self._cached_rsa_key, crypto_provider=providers, ) request.headers.update(headers) logger.info("signing auth flow applied to request") def _apply_bearer_auth_flow(self, request: httpx.Request) -> None: if self.access_token_expired: self.refresh_access_token() if self.access_token is None: raise Exception("No access token found.") headers = {"Authorization": "Bearer " + self.access_token, "client-id": "0"} request.headers.update(headers) logger.info("bearer auth flow applied to request") def _apply_cookies_auth_flow(self, request: httpx.Request) -> None: if self.website_cookies is None: raise Exception("No website cookies found.") cookies = self.website_cookies.copy() Cookies(cookies).set_cookie_header(request) logger.info("cookies auth flow applied to request")
[docs] def sign_request(self, request: httpx.Request) -> None: """Sign a request. .. deprecated:: 0.5.0 Use :meth:`self._apply_signing_auth_flow` instead. """ self._apply_signing_auth_flow(request)
@property def available_auth_modes(self) -> list[str]: available_modes = [] if self.adp_token and self.device_private_key: available_modes.append("signing") if self.access_token: if self.access_token_expired and not self.refresh_token: pass else: available_modes.append("bearer") if self.website_cookies: available_modes.append("cookies") return available_modes
[docs] def to_dict(self) -> dict[str, Any]: """Returns authentication data as dict. .. versionadded:: 0.7.1 .. versionadded:: v0.8 The returned dict now contains the `with_username` attribute """ data = { "website_cookies": self.website_cookies, "adp_token": self.adp_token, "access_token": self.access_token, "refresh_token": self.refresh_token, "device_private_key": self.device_private_key, "store_authentication_cookie": self.store_authentication_cookie, "device_info": self.device_info, "customer_info": self.customer_info, "expires": self.expires, "locale_code": self.locale.country_code if self.locale else None, "with_username": self.with_username, "activation_bytes": self.activation_bytes, } return data
[docs] def to_file( self, filename: pathlib.Path | str | None = None, password: str | None = None, encryption: bool | str = "default", indent: int = 4, set_default: bool = True, **kwargs: Any, ) -> None: """Save authentication data to file. .. versionadded:: 0.5.1 Save activation bytes to auth file .. versionadded:: v0.8 The saved file now contains the `with_username` attribute """ if not (filename or self.filename): raise ValueError("No filename provided") if filename: target_file: pathlib.Path = test_convert("filename", filename) elif self.filename: target_file = self.filename else: raise ValueError("No filename provided") if encryption != "default": encryption = test_convert("encryption", encryption) else: encryption = self.encryption or False data = self.to_dict() json_data = get_json_provider().dumps(data, indent=indent) if encryption is False: target_file.write_text(json_data) crypter = None else: if password: cipher_kwargs = dict(kwargs) if "crypto_provider" not in cipher_kwargs: cipher_kwargs["crypto_provider"] = self._get_crypto() crypter = test_convert("crypter", AESCipher(password, **cipher_kwargs)) elif self.crypter: crypter = self.crypter else: raise ValueError("No password provided") crypter.to_file( json_data, filename=target_file, encryption=encryption, # type: ignore[arg-type] indent=indent, ) logger.info("saved data to file %s", target_file) if set_default: self.filename = target_file self.encryption = encryption self.crypter = crypter logger.info("set filename %s as default", target_file)
[docs] def deregister_device(self, deregister_all: bool = False) -> Any: self.refresh_access_token() if self.access_token is None: raise Exception("No access token found.") if self.locale is None: raise Exception("No locale found.") return deregister_( access_token=self.access_token, deregister_all=deregister_all, domain=self.locale.domain, with_username=self.with_username or False, )
[docs] def refresh_access_token(self, force: bool = False) -> None: if force or self.access_token_expired: if self.refresh_token is None: message = "No refresh token found. Can't refresh access token." logger.critical(message) raise NoRefreshToken(message) if self.locale is None: raise Exception("No locale found.") refresh_data = refresh_access_token( refresh_token=self.refresh_token, domain=self.locale.domain, with_username=self.with_username or False, ) self._update_attrs(**refresh_data) else: logger.info( "Access Token not expired. No refresh necessary. " "To force refresh please use force=True" )
[docs] def set_website_cookies_for_country(self, country_code: str) -> None: cookies_domain = test_convert("locale", country_code).domain if self.refresh_token is None: raise Exception("No refresh token found.") if self.locale is None: raise Exception("No locale found.") self.website_cookies = refresh_website_cookies( self.refresh_token, self.locale.domain, cookies_domain, self.with_username or False, )
@overload def get_activation_bytes( self, filename: pathlib.Path | str | None = ..., extract: Literal[True] = ..., force_refresh: bool = ..., ) -> str: ... @overload def get_activation_bytes( self, filename: pathlib.Path | str | None = ..., *, extract: Literal[False], force_refresh: bool = ..., ) -> bytes: ...
[docs] def get_activation_bytes( self, filename: pathlib.Path | str | None = None, extract: TrueFalseT = True, force_refresh: bool = False, ) -> str | bytes: """Get Activation bytes from Audible. Args: filename: [Optional] filename to save the activation blob extract: [Optional] if True, returns the extracted activation bytes otherwise the whole activation blob force_refresh: [Optional] if True, existing activation bytes in auth file will be ignored and new activation bytes will be requested from server. Returns: The activation bytes .. versionadded:: 0.5.1 The ``force_refresh`` argument. Fetched activation bytes are now stored to `Authententicator.activation_bytes`. """ if not force_refresh and extract and self.activation_bytes is not None: logger.debug("Activation bytes already fetched. Returned saved one.") return self.activation_bytes logger.debug("Fetch activation blob from server now.") ab = get_ab(self, filename, extract) # type: ignore[arg-type] if extract: logger.debug( "Extract activation bytes from blob and store value" "activation_bytes attribute." ) logger.debug("Found activation bytes: %s", ab) self.activation_bytes = ab return ab
[docs] def user_profile(self) -> dict[str, Any]: if self.access_token is None: raise Exception("No access token found.") if self.locale is None: raise Exception("No locale found.") return user_profile(access_token=self.access_token, domain=self.locale.domain)
@property def access_token_expires(self) -> timedelta: if self.expires is None: raise Exception("No expires timestamp found.") return datetime.fromtimestamp(self.expires, timezone.utc) - datetime.now( timezone.utc ) @property def access_token_expired(self) -> bool: if self.expires is None: raise Exception("No expires timestamp found.") return datetime.fromtimestamp(self.expires, timezone.utc) <= datetime.now( timezone.utc )