[docs]def__init__(self,impl:Optional[HttpRequestBridge]=None,*,extensions:Optional[dict]=None,**kwargs,):super().__init__(extensions=extensions)ifimplisNone:from.tests.stubsimportHttpRequestStubBridgeimpl=HttpRequestStubBridge(**kwargs)else:iflen(kwargs)>0:raiseValueError("Cannot pass both an implementation and non-explicit keyword arguments.")self._impl=impl# Initialize properties from the implementation bridgeself.method=self._impl.get_method()self.path=self._impl.get_path()self.query=self._impl.get_query()self.server_ipaddr=self._impl.get_server_ipaddr()self.server_port=self._impl.get_server_port()self._headers=self._impl.get_headers()self._stream=self._impl.get_stream()
@propertydefstream(self):returnself._stream@stream.setterdefstream(self,stream:AsyncByteStream):self._stream=streamifhasattr(self,"_body"):delattr(self,"_body")@propertydefheaders(self)->CIMultiDict:returnself._headers@headers.setterdefheaders(self,headers):self._headers=CIMultiDict(headers)@propertydefcookies(self)->dict:cookies={}forheaderinself.headers.getall("cookie",[]):cookies.update(parse_cookie(header))returncookies@propertydefbasic_auth(self)->tuple[str,str]|None:"""Parse and returns basic auth from headers."""forheaderinself.headers.getall("authorization",[]):try:_type,_auth=header.split(" ",1)exceptValueError:continueif_type.lower()=="basic":try:user,passwd=b64decode(_auth).decode("utf-8").split(":",1)except(binascii.Error,ValueError):continuereturnuser,passwd@propertydefbody(self)->bytes:"""Returns the previously read body of the request. Raises a RuntimeError if the body has not been read yet, you must await the `read()` asynchronous method first, which cannot be done here because properties are synchronous, so we let the user explicitely call it before."""ifnothasattr(self,"_body"):raiseRuntimeError("Request body has not been read yet, please await `read()` first.")returnself._body
[docs]asyncdefaread(self)->bytes:"""Read all chunks from request. We may want to be able to read partial body later, but for now it's all or nothing. This method does nothing if the body has already been read."""ifnothasattr(self,"_body"):self._body=b"".join([partasyncforpartinself._stream])# self.headers["content-length"] = str(len(self.body))ifnotisinstance(self._stream,ByteStream):self._stream=ByteStream(self._body)returnself.body