fromtypingimportCallable,Optional,SelffromwhistleimportEventfromharpimportget_loggerfromharp.httpimportBaseHttpMessage,HttpError,HttpResponsefromharp.http.requestsimportHttpRequestfromharp.modelsimportTransactionfromharp.views.jsonimportserializelogger=get_logger(__name__)#: Event fired when a transaction is created.EVENT_TRANSACTION_STARTED="proxy.transaction.started"#: Event fired when a message is sent to a transaction (either request or response).EVENT_TRANSACTION_MESSAGE="proxy.transaction.message"#: Event fired when a transaction is finished, before the response is sent back to the caller.EVENT_TRANSACTION_ENDED="proxy.transaction.ended"#: Event fired when an incoming request is ready to be filtered, for example by the rules application.EVENT_FILTER_PROXY_REQUEST="proxy.filter.request"#: Event fired when an outgoing response is ready to be filtered, for example by the rules application.EVENT_FILTER_PROXY_RESPONSE="proxy.filter.response"#: An error happened.EVENT_PROXY_ERROR="proxy.error"
[docs]defset_response(self,response:HttpResponse):ifresponseisnotNoneandnotisinstance(response,HttpResponse):raiseValueError(f"Response must be an instance of HttpResponse, got {response!r} ({type(response).__module__}.{type(response).__qualname__}).")self.response=response
[docs]defupdate(self,mixed:Optional[dict|HttpResponse|Self])->Self:ifmixedisNone:returnselfifisinstance(mixed,ProxyFilterEvent):returnmixedifisinstance(mixed,dict):mixed=HttpResponse(serialize(mixed),content_type="application/json")ifnotisinstance(mixed,HttpResponse):raiseValueError(f"Response must be an instance of HttpResponse or a dict, got {mixed!r} ({type(mixed).__module__}.{type(mixed).__qualname__}).")self.set_response(mixed)returnself