Source code for harp_apps.http_client.transport
from types import TracebackType
from typing import Optional, Self, Type
from httpx import AsyncBaseTransport, Request, Response
from whistle import IAsyncEventDispatcher
from harp import get_logger
from harp_apps.http_client.events import (
EVENT_FILTER_HTTP_CLIENT_REQUEST,
EVENT_FILTER_HTTP_CLIENT_RESPONSE,
HttpClientFilterEvent,
)
logger = get_logger(__name__)
[docs]
class AsyncFilterableTransport(AsyncBaseTransport):
[docs]
def __init__(self, transport: AsyncBaseTransport, dispatcher: IAsyncEventDispatcher):
self._transport = transport
self._dispatcher = dispatcher
[docs]
async def handle_async_request(self, request: Request) -> Response:
event = HttpClientFilterEvent(request)
await self._dispatcher.adispatch(EVENT_FILTER_HTTP_CLIENT_REQUEST, event)
logger.debug(f"▶▶▶ {event.request}", headers=event.request.headers)
if not event.response:
event.response = await self._transport.handle_async_request(event.request)
await self._dispatcher.adispatch(EVENT_FILTER_HTTP_CLIENT_RESPONSE, event)
logger.debug(
f"◀◀◀ {event.response}",
cache_control=event.response.headers.get("cache-control"),
)
return event.response
[docs]
async def aclose(self) -> None:
await self._transport.aclose()
async def __aenter__(self) -> Self:
return self
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]] = None,
exc_value: Optional[BaseException] = None,
traceback: Optional[TracebackType] = None,
) -> None:
await self.aclose()