Source code for harp_apps.rules.subscribers
from whistle import IAsyncEventDispatcher
from harp import get_logger
from harp_apps.http_client.events import (
EVENT_FILTER_HTTP_CLIENT_REQUEST,
EVENT_FILTER_HTTP_CLIENT_RESPONSE,
HttpClientFilterEvent,
)
from harp_apps.proxy.events import EVENT_FILTER_PROXY_REQUEST, EVENT_FILTER_PROXY_RESPONSE, ProxyFilterEvent
from .models.rulesets import RuleSet
logger = get_logger(__name__)
[docs]
class RulesSubscriber:
[docs]
def __init__(self, ruleset: RuleSet):
self.ruleset = ruleset
[docs]
def subscribe(self, dispatcher: IAsyncEventDispatcher):
dispatcher.add_listener(EVENT_FILTER_PROXY_REQUEST, self.on_filter_event)
dispatcher.add_listener(EVENT_FILTER_HTTP_CLIENT_REQUEST, self.on_filter_event)
dispatcher.add_listener(EVENT_FILTER_HTTP_CLIENT_RESPONSE, self.on_filter_event)
dispatcher.add_listener(EVENT_FILTER_PROXY_RESPONSE, self.on_filter_event)
[docs]
def unsubscribe(self, dispatcher: IAsyncEventDispatcher):
dispatcher.remove_listener(EVENT_FILTER_PROXY_REQUEST, self.on_filter_event)
dispatcher.remove_listener(EVENT_FILTER_HTTP_CLIENT_REQUEST, self.on_filter_event)
dispatcher.remove_listener(EVENT_FILTER_HTTP_CLIENT_RESPONSE, self.on_filter_event)
dispatcher.remove_listener(EVENT_FILTER_PROXY_RESPONSE, self.on_filter_event)
[docs]
def match(self, *args):
return self.ruleset.match(*args)
[docs]
async def on_filter_event(self, event: ProxyFilterEvent | HttpClientFilterEvent):
for script in self.match(*event.criteria):
try:
event.execute_script(script)
except Exception as e:
logger.error(f"Error executing script {script}: {e}", exc_info=True)