import inspect
import logging
from abc import ABCMeta, abstractmethod
from collections.abc import Callable, Coroutine
from contextlib import AbstractAsyncContextManager, AbstractContextManager
from types import TracebackType
from typing import (
Any,
Generic,
Literal,
TypeVar,
overload,
)
import httpx
from httpx import URL
from httpx._models import HeaderTypes # type: ignore[attr-defined]
from ._types import TrueFalseT
from .auth import Authenticator
from .exceptions import (
BadRequest,
NetworkError,
NotFoundError,
NotResponding,
RatelimitError,
RequestError,
ServerError,
Unauthorized,
UnexpectedError,
)
from .json_provider import JSONDecodeError, get_json_provider
from .localization import LOCALE_TEMPLATES, Locale
logger = logging.getLogger("audible.client")
ClientT = TypeVar("ClientT", httpx.AsyncClient, httpx.Client)
httpx_client_request_args = frozenset(
inspect.signature(httpx.Client.request).parameters.keys()
)
[docs]
def default_response_callback(resp: httpx.Response) -> Any:
raise_for_status(resp)
return convert_response_content(resp)
[docs]
def raise_for_status(resp: httpx.Response) -> None:
try:
resp.raise_for_status()
except httpx.HTTPStatusError as e:
code = resp.status_code
data = convert_response_content(resp)
if code == 400:
raise BadRequest(resp, data) from None
elif code in (401, 403): # Unauthorized request - Invalid credentials
raise Unauthorized(resp, data) from None
elif code == 404: # not found
raise NotFoundError(resp, data) from None
elif code == 429:
raise RatelimitError(resp, data) from None
elif code == 503: # Maintainence
raise ServerError(resp, data) from None
else:
raise UnexpectedError(resp, data) from e
[docs]
def convert_response_content(resp: httpx.Response) -> Any:
try:
return get_json_provider().loads(resp.text)
except JSONDecodeError as e:
# All JSON providers raise JSONDecodeError for invalid JSON
# Falls back to returning raw text if JSON parsing fails
logger.debug("JSON parsing failed: %s", e, exc_info=True)
return resp.text
[docs]
class BaseClient(Generic[ClientT], metaclass=ABCMeta):
_API_URL_TEMP = "https://api.audible."
_API_VERSION = "1.0"
_REQUEST_LOG = "{method} {url} has received {text}, has returned {status}"
def __init__(
self,
auth: Authenticator,
country_code: str | None = None,
headers: HeaderTypes | None = None,
timeout: int = 10,
response_callback: Callable[[httpx.Response], Any] | None = None,
**session_kwargs: Any,
):
locale = Locale(country_code.lower()) if country_code else auth.locale
if not isinstance(locale, Locale):
raise Exception("Authenticator has no `Locale` class set.")
self._api_url = httpx.URL(self._API_URL_TEMP + locale.domain)
default_headers = httpx.Headers(
{
"Accept": "application/json",
"Accept-Charset": "utf-8",
"Content-Type": "application/json",
}
)
if headers is not None:
default_headers.update(headers)
self.session: ClientT = self._get_session(
headers=default_headers, timeout=timeout, auth=auth, **session_kwargs
)
if response_callback is None:
response_callback = default_response_callback
self._response_callback = response_callback
@abstractmethod
def _get_session(self, *args: Any, **kwargs: Any) -> ClientT: ...
@abstractmethod
def _request(
self,
method: str,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: Any,
) -> Any: ...
[docs]
def switch_marketplace(self, country_code: str) -> None:
locale = Locale(country_code.lower())
self._api_url = URL(self._API_URL_TEMP + locale.domain)
@property
def marketplace(self) -> str:
api_url = str(self._api_url)
slice_len = len(self._API_URL_TEMP)
domain = api_url[slice_len:]
for country in LOCALE_TEMPLATES.values():
if domain == country["domain"]:
return country["country_code"]
return "unknown"
@property
def auth(self) -> Authenticator:
if not isinstance(self.session.auth, Authenticator):
auth_type = type(self.session.auth)
raise Exception(
f"`session.auth` has type {auth_type}, expected `Authenticator`."
)
return self.session.auth
[docs]
def switch_user(
self, auth: Authenticator, switch_to_default_marketplace: bool = False
) -> None:
if switch_to_default_marketplace:
if not isinstance(auth.locale, Locale):
raise Exception("Authenticator has no `Locale` class set.")
self.switch_marketplace(auth.locale.country_code)
self.session.auth = auth
[docs]
def get_user_profile(self) -> dict[str, Any]:
self.auth.refresh_access_token()
return self.auth.user_profile()
@property
def user_name(self) -> str:
user_profile = self.get_user_profile()
if "name" not in user_profile:
raise Exception("user profile has no key `name`.")
user_name = user_profile["name"]
if not isinstance(user_name, str):
user_name_type = type(user_name)
raise Exception(f"username has type {user_name_type}, expected `str`.")
return user_name
def _prepare_api_path(self, path: str) -> httpx.URL:
if httpx.URL(path).is_absolute_url:
return httpx.URL(path)
if path.startswith("/"):
path = path[1:]
if not (path.startswith(self._API_VERSION) or path.startswith("0.0")):
path = "/".join((self._API_VERSION, path))
path = "/" + path
path_bytes = path.encode()
return self._api_url.copy_with(raw_path=path_bytes)
@overload
def raw_request(
self: "BaseClient[httpx.Client]",
method: str,
url: str,
*,
stream: Literal[False] = ...,
apply_auth_flow: bool = ...,
apply_cookies: bool = ...,
**kwargs: Any,
) -> httpx.Response: ...
@overload
def raw_request(
self: "BaseClient[httpx.Client]",
method: str,
url: str,
*,
stream: Literal[True],
apply_auth_flow: bool = ...,
apply_cookies: bool = ...,
**kwargs: Any,
) -> AbstractContextManager[httpx.Response]: ...
@overload
def raw_request(
self: "BaseClient[httpx.AsyncClient]",
method: str,
url: str,
*,
stream: Literal[False] = ...,
apply_auth_flow: bool = ...,
apply_cookies: bool = ...,
**kwargs: Any,
) -> Coroutine[Any, Any, httpx.Response]: ...
@overload
def raw_request(
self: "BaseClient[httpx.AsyncClient]",
method: str,
url: str,
*,
stream: Literal[True],
apply_auth_flow: bool = ...,
apply_cookies: bool = ...,
**kwargs: Any,
) -> AbstractAsyncContextManager[httpx.Response]: ...
[docs]
def raw_request(
self,
method: str,
url: str,
*,
stream: TrueFalseT = False,
apply_auth_flow: bool = False,
apply_cookies: bool = False,
**kwargs: Any,
) -> httpx.Response | (
Coroutine[Any, Any, httpx.Response]
| (
AbstractContextManager[httpx.Response]
| AbstractAsyncContextManager[httpx.Response]
)
):
"""Sends a raw request with the underlying httpx Client.
This method ignores a set api_url and allows send request to custom
hosts. The raw httpx response will be returned.
Args:
method: The http request method.
url: The url to make requests to.
stream: If `True`, streams the response
apply_auth_flow: If `True`, the :meth:`Authenticator.auth_flow`
will be applied to the request.
apply_cookies: If `True`, website cookies from
:attr:`Authenticator.website_cookies` will be added to
request headers.
**kwargs: keyword args supported by :class:`httpx.AsyncClient.stream`,
:class:`httpx.Client.stream`, :class:`httpx.AsyncClient.request`,
:class:`httpx.Client.request`.
Returns:
An unprepared httpx Response object.
.. versionadded:: v0.5.1
"""
request_params = {"method": method, "url": url, **kwargs}
if apply_cookies:
cookies = httpx.Cookies(self.auth.website_cookies)
cookies.update(kwargs.pop("cookies", {}))
request_params["cookies"] = cookies
if apply_auth_flow:
request_params["auth"] = self.auth
if stream:
return self.session.stream(**request_params)
return self.session.request(**request_params)
@staticmethod
def _prepare_params(kwargs: dict[str, Any]) -> None:
params = kwargs.pop("params", {})
for key in list(kwargs.keys()):
if key not in httpx_client_request_args:
params[key] = kwargs.pop(key)
kwargs["params"] = params
[docs]
@abstractmethod
def get(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any: ...
[docs]
@abstractmethod
def post(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any: ...
[docs]
@abstractmethod
def delete(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any: ...
[docs]
@abstractmethod
def put(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any: ...
[docs]
class Client(BaseClient[httpx.Client]):
def _get_session(self, *args: Any, **kwargs: Any) -> httpx.Client:
return httpx.Client(*args, **kwargs)
def __enter__(self) -> "Client":
return self
def __exit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
self.close()
def __repr__(self) -> str:
return f"<Sync Client for *{self.marketplace}* marketplace>"
[docs]
def close(self) -> None:
self.session.close()
def _request(
self,
method: str,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: Any,
) -> Any:
url = self._prepare_api_path(path)
if response_callback is None:
response_callback = self._response_callback
try:
resp = self.session.request(method, url, **kwargs)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
self._REQUEST_LOG.format(
method=method,
url=resp.url,
text=resp.text,
status=resp.status_code,
)
)
return response_callback(resp)
except (
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
):
raise NotResponding from None
except httpx.NetworkError:
raise NetworkError from None
except httpx.RequestError as exc:
raise RequestError(exc) from None
finally:
try:
resp.close()
except UnboundLocalError:
pass
[docs]
def get(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return self._request(
method="GET", path=path, response_callback=response_callback, **kwargs
)
[docs]
def post(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return self._request(
method="POST",
path=path,
response_callback=response_callback,
json=body,
**kwargs,
)
[docs]
def delete(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return self._request(
method="DELETE", path=path, response_callback=response_callback, **kwargs
)
[docs]
def put(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return self._request(
method="PUT",
path=path,
response_callback=response_callback,
json=body,
**kwargs,
)
[docs]
class AsyncClient(BaseClient[httpx.AsyncClient]):
def _get_session(self, *args: Any, **kwargs: Any) -> httpx.AsyncClient:
return httpx.AsyncClient(*args, **kwargs)
async def __aenter__(self) -> "AsyncClient":
return self
async def __aexit__(
self,
exc_type: type[BaseException] | None = None,
exc_value: BaseException | None = None,
traceback: TracebackType | None = None,
) -> None:
await self.close()
def __repr__(self) -> str:
return f"<AyncClient for *{self.marketplace}* marketplace>"
[docs]
async def close(self) -> None:
await self.session.aclose()
async def _request(
self,
method: str,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: Any,
) -> Any:
url = self._prepare_api_path(path)
if response_callback is None:
response_callback = self._response_callback
try:
resp = await self.session.request(method, url, **kwargs)
if logger.isEnabledFor(logging.DEBUG):
logger.debug(
self._REQUEST_LOG.format(
method=method,
url=resp.url,
text=resp.text,
status=resp.status_code,
)
)
return response_callback(resp)
except (
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.PoolTimeout,
):
raise NotResponding from None
except httpx.NetworkError:
raise NetworkError from None
except httpx.RequestError as exc:
raise RequestError(exc) from None
finally:
try:
await resp.aclose()
except UnboundLocalError:
pass
[docs]
async def get(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return await self._request(
method="GET", path=path, response_callback=response_callback, **kwargs
)
[docs]
async def post(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return await self._request(
method="POST",
path=path,
response_callback=response_callback,
json=body,
**kwargs,
)
[docs]
async def delete(
self,
path: str,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return await self._request(
method="DELETE", path=path, response_callback=response_callback, **kwargs
)
[docs]
async def put(
self,
path: str,
body: Any,
response_callback: Callable[[httpx.Response], Any] | None = None,
**kwargs: dict[str, Any],
) -> Any:
self._prepare_params(kwargs)
return await self._request(
method="PUT",
path=path,
response_callback=response_callback,
json=body,
**kwargs,
)