Spaces:
Sleeping
Sleeping
File size: 3,664 Bytes
1d777c4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 |
import logging
import typing
log = logging.getLogger('aiogram.Middleware')
class MiddlewareManager:
"""
Middlewares manager. Works only with dispatcher.
"""
def __init__(self, dispatcher):
"""
Init
:param dispatcher: instance of Dispatcher
"""
self.dispatcher = dispatcher
self.bot = dispatcher.bot
self.storage = dispatcher.storage
self.applications = []
@property
def loop(self):
return self.dispatcher.loop
def setup(self, middleware):
"""
Setup middleware
:param middleware:
:return:
"""
if not isinstance(middleware, BaseMiddleware):
raise TypeError(f"`middleware` must be an instance of BaseMiddleware, not {type(middleware)}")
if middleware.is_configured():
raise ValueError('That middleware is already used!')
self.applications.append(middleware)
middleware.setup(self)
log.debug(f"Loaded middleware '{middleware.__class__.__name__}'")
return middleware
async def trigger(self, action: str, args: typing.Iterable):
"""
Call action to middlewares with args lilt.
:param action:
:param args:
:return:
"""
for app in self.applications:
await app.trigger(action, args)
class BaseMiddleware:
"""
Base class for middleware.
All methods on the middle always must be coroutines and name starts with "on_" like "on_process_message".
"""
def __init__(self):
self._configured = False
self._manager = None
@property
def manager(self) -> MiddlewareManager:
"""
Instance of MiddlewareManager
"""
if self._manager is None:
raise RuntimeError('Middleware is not configured!')
return self._manager
def setup(self, manager):
"""
Mark middleware as configured
:param manager:
:return:
"""
self._manager = manager
self._configured = True
def is_configured(self) -> bool:
"""
Check middleware is configured
:return:
"""
return self._configured
async def trigger(self, action, args):
"""
Trigger action.
:param action:
:param args:
:return:
"""
handler_name = f"on_{action}"
handler = getattr(self, handler_name, None)
if not handler:
return None
await handler(*args)
class LifetimeControllerMiddleware(BaseMiddleware):
# TODO: Rename class
skip_patterns = None
_skip_actions = None
@property
def skip_actions(self):
if self._skip_actions is None:
self._skip_actions = []
if self.skip_patterns:
for item in self.skip_patterns:
self._skip_actions.extend([
f"pre_process_{item}",
f"process_{item}",
f"post_process_{item}",
])
return self._skip_actions
async def pre_process(self, obj, data, *args):
pass
async def post_process(self, obj, data, *args):
pass
async def trigger(self, action, args):
if action in self.skip_actions:
return False
obj, *args, data = args
if action.startswith('pre_process_'):
await self.pre_process(obj, data, *args)
elif action.startswith('post_process_'):
await self.post_process(obj, data, *args)
else:
return False
return True
|