[docs]classHttpRequestAsgiBridge(HttpRequestBridge):"""Actually implements the getters required by HttpRequest using the asgi scope and receive callable. It is still an early implementation and will need to support streaming requests in the future. """
[docs]def__init__(self,scope:HTTPScope,receive:ASGIReceiveCallable):""" :param scope: see https://asgi.readthedocs.io/en/latest/specs/www.html#http-connection-scope :param receive: """self.asgi_scope=scopeself.asgi_receive=receiveself._closed=False
[docs]defget_server_ipaddr(self)->Optional[str]:"""Get the server IP address from asgi scope."""try:return(self.asgi_scope.get("server",_default_host)or_default_host)[0]exceptIndexError:returnNone
[docs]defget_server_port(self)->Optional[int]:"""Get the server port from asgi scope."""try:return(self.asgi_scope.get("server",_default_host)or_default_host)[1]exceptIndexError:returnNone
[docs]defget_method(self)->str:"""Get the HTTP method from asgi scope."""returnself.asgi_scope.get("method",None)
[docs]defget_path(self)->str:"""Get the HTTP path from asgi scope (/foo/bar), without the query string part."""if"raw_path"inself.asgi_scope:returnself.asgi_scope["raw_path"].decode("utf-8")returnself.asgi_scope.get("path","/")
[docs]defget_query(self)->MultiDict:"""Get the query string from asgi scope, as a multidict."""returnMultiDict(parse_qsl(self.asgi_scope.get("query_string",b"").decode("utf-8"),keep_blank_values=True,))
[docs]defget_headers(self)->CIMultiDict:"""Get the headers from asgi scope, as a case-insensitive multidict."""headers=CIMultiDict()forname,valueinself.asgi_scope.get("headers",[]):headers.add(name.decode("utf-8"),value.decode("utf-8"))returnheaders
[docs]defget_stream(self)->AsyncByteStream:"""Get the request body stream from asgi receive callable."""returnAsyncStreamFromAsgiReceive(self.asgi_receive)