import traceback
from inspect import signature
from asgiref.typing import ASGIReceiveCallable, ASGISendCallable, Scope
from hypercorn.utils import LifespanFailureError
from whistle import AsyncEventDispatcher, Event, IAsyncEventDispatcher
from harp import get_logger
from harp.http import AlreadyHandledHttpResponse, HttpRequest, HttpResponse
from ..utils.performances import performances_observer
from ..utils.types import typeof
from .bridge.requests import HttpRequestAsgiBridge
from .bridge.responses import HttpResponseAsgiBridge
from .events import (
EVENT_CORE_CONTROLLER,
EVENT_CORE_REQUEST,
EVENT_CORE_RESPONSE,
EVENT_CORE_STARTED,
EVENT_CORE_VIEW,
ControllerEvent,
RequestEvent,
ResponseEvent,
ViewEvent,
)
logger = get_logger(__name__)
[docs]
class ASGIKernel:
dispatcher: IAsyncEventDispatcher
[docs]
def __init__(self, *, dispatcher=None, resolver=None, debug=False, handle_errors=True):
from harp.controllers import DefaultControllerResolver
self.dispatcher = dispatcher or AsyncEventDispatcher()
# TODO IControllerResolver ? What contract do we expect ?
self.resolver = resolver or DefaultControllerResolver()
self.started = False
self.debug = debug
self.handle_errors = handle_errors
async def __call__(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
asgi_type = scope.get("type", None)
with performances_observer("kernel", labels={"type": asgi_type}):
if asgi_type == "http":
response = await self.handle_http(scope, receive, send)
if isinstance(response, AlreadyHandledHttpResponse):
return
return await HttpResponseAsgiBridge(response, send).send()
if asgi_type == "lifespan":
await receive()
# TODO: there are more than just the lifespan.startup event, maybe handle all messages possible or at
# least ignore the ones we're not interested in.
# See: https://asgi.readthedocs.io/en/latest/specs/lifespan.html
try:
await self.dispatcher.adispatch(EVENT_CORE_STARTED, Event())
except Exception as exc:
raise LifespanFailureError(EVENT_CORE_STARTED, repr(exc)) from exc
self.started = True
return
if asgi_type == "websocket":
# NOT IMPLEMENTED YET!
# This is ignored here to avoid huge errors in the console.
return
raise RuntimeError(f'Unable to handle request, invalid type "{asgi_type}".')
def _resolve_arguments(self, subject, **candidates):
"""
Dynamicaly resolve arguments by names in prototype, looking at a "candidates" dictionary that
contains the possible arguments used.
"""
while hasattr(subject, "_mock_wraps"):
subject = subject._mock_wraps
try:
return [
(candidates[name] if name in candidates or param.default is param.empty else param.default)
for name, param in signature(subject).parameters.items()
if param.kind is not param.KEYWORD_ONLY
], {}
except KeyError as exc:
raise TypeError(f"Unable to resolve arguments: {exc}")
async def _resolve_http_controller(self, request: HttpRequest):
"""
Resolves the controller for the given request, aka the callable that will handle the request. If we fail to do
so, we will have hard time building a response for the user, and thus we raise an error.
Resolving the controller involves two steps:
- first, we use `self.resolver` to find a controller for the request.
See `harp.controllers.ControllerResolver`
:param request:
:return:
"""
controller = await self.resolver.resolve(request)
if not controller:
raise RuntimeError("Unable to find request controller using resolver.")
event = ControllerEvent(request, controller)
await self.dispatcher.adispatch(EVENT_CORE_CONTROLLER, event)
if not event.controller:
raise RuntimeError("Unable to find request controller after controller event dispatch.")
return event.controller
async def _execute_http_controller(self, controller, request: HttpRequest, send: ASGISendCallable):
"""Executes the given controller and make all efforts to build a http response."""
args, kwargs = self._resolve_arguments(controller, request=request, asgi_send=send)
response = await controller(*args, **kwargs)
# if the controller returned anything that is not a response, maybe some view listener can create one from this
# value (for example for json data, etc.).
if not isinstance(response, HttpResponse):
event = ViewEvent(request, response)
await self.dispatcher.adispatch(EVENT_CORE_VIEW, event)
response = event.response or response
# if after the view event has been dispatched we still don't have a response, we have to raise an error.
if not isinstance(response, HttpResponse):
raise RuntimeError(
f"Response did not start despite the efforts made (controller return value is "
f"{typeof(response)} after controller view event was dispatched)."
)
# the core response event may want to filter the response, for example to add some headers, etc.
event = ResponseEvent(request, response)
await self.dispatcher.adispatch(EVENT_CORE_RESPONSE, event)
return event.response or response
[docs]
async def handle_http(self, scope: Scope, receive: ASGIReceiveCallable, send: ASGISendCallable):
request = HttpRequest(HttpRequestAsgiBridge(scope, receive))
if not self.started:
return HttpResponse(
"Unhandled server error: Cannot access service provider, the lifespan.startup asgi event "
"probably never went through.",
status=500,
content_type="text/plain",
)
try:
return await self.do_handle_http(request, send)
except Exception as exc:
if not self.handle_errors:
raise
# todo refactor this
logger.exception()
if self.debug:
return HttpResponse(
f"<h1>Internal Server Error</h1><h2>{type(exc).__name__}: {exc}</h2> "
f"<pre>{traceback.format_exc()}</pre>",
status=500,
content_type="text/html",
)
return HttpResponse("Internal Server Error", status=500, content_type="text/plain")
[docs]
async def do_handle_http(self, request: HttpRequest, send: ASGISendCallable):
event = RequestEvent(request)
await self.dispatcher.adispatch(EVENT_CORE_REQUEST, event)
if event.controller:
return await self._execute_http_controller(event.controller, request, send)
controller = await self._resolve_http_controller(request)
return await self._execute_http_controller(controller, request, send)