Improve MessageHandler resiliency
This commit is contained in:
@@ -19,16 +19,17 @@ along with this program; if not, write to the Free Software Foundation,
|
||||
Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
|
||||
"""
|
||||
import asyncio
|
||||
from logging import getLogger
|
||||
import logging
|
||||
|
||||
logger = getLogger('utilities.events')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Event:
|
||||
""" an object containing data which will be passed out to all subscribers """
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, name=None):
|
||||
self.subscribers = []
|
||||
self.name = name
|
||||
|
||||
def subscribe(self, handler, *args, one_shot=False, predicate=None, **kwargs):
|
||||
""" establish the subscribers (handlers) to this event """
|
||||
@@ -66,10 +67,19 @@ class Event:
|
||||
unsubscribe = await handler(args, *inner_args, **kwargs)
|
||||
if unsubscribe:
|
||||
_ = self.unsubscribe(handler, *inner_args, **kwargs)
|
||||
asyncio.create_task(_run_handler_wrapper())
|
||||
task = asyncio.create_task(_run_handler_wrapper())
|
||||
task.add_done_callback(self._log_task_failures)
|
||||
else:
|
||||
if handler(args, *inner_args, **kwargs) and not one_shot:
|
||||
self.unsubscribe(handler, *inner_args, **kwargs)
|
||||
try:
|
||||
if handler(args, *inner_args, **kwargs) and not one_shot:
|
||||
self.unsubscribe(handler, *inner_args, **kwargs)
|
||||
except:
|
||||
# One handler failing shouldn't prevent notification of other handlers.
|
||||
LOG.exception(f"Failed in handler for {self.name}")
|
||||
|
||||
def _log_task_failures(self, task: asyncio.Task):
|
||||
if task.exception():
|
||||
LOG.exception(f"Failed in handler for {self.name}", exc_info=task.exception())
|
||||
|
||||
def __len__(self):
|
||||
return len(self.subscribers)
|
||||
|
||||
@@ -75,8 +75,8 @@ class Block:
|
||||
for var_name, val in kwargs.items():
|
||||
self[var_name] = val
|
||||
|
||||
def get_variable(self, var_name):
|
||||
return self.vars.get(var_name)
|
||||
def get(self, var_name, default: Optional[VAR_TYPE] = None) -> Optional[VAR_TYPE]:
|
||||
return self.vars.get(var_name, default)
|
||||
|
||||
def __contains__(self, item):
|
||||
return item in self.vars
|
||||
|
||||
@@ -31,7 +31,7 @@ _T = TypeVar("_T")
|
||||
_K = TypeVar("_K", bound=Hashable)
|
||||
MESSAGE_HANDLER = Callable[[_T], Any]
|
||||
PREDICATE = Callable[[_T], bool]
|
||||
# TODO: Can't do `Iterable[Union[_K, Literal["*"]]` apparently?
|
||||
# TODO: Can't do `Iterable[Union[_K, Literal["*"]]]` apparently?
|
||||
MESSAGE_NAMES = Iterable[Union[_K, str]]
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ class MessageHandler(Generic[_T, _K]):
|
||||
|
||||
def register(self, message_name: _K) -> Event:
|
||||
LOG.debug('Creating a monitor for %s' % message_name)
|
||||
return self.handlers.setdefault(message_name, Event())
|
||||
return self.handlers.setdefault(message_name, Event(message_name))
|
||||
|
||||
def subscribe(self, message_name: Union[_K, Literal["*"]], handler: MESSAGE_HANDLER):
|
||||
notifier = self.register(message_name)
|
||||
|
||||
@@ -37,7 +37,7 @@ class MessageTemplateVariable:
|
||||
return f"{self.__class__.__name__}(name={self.name!r}, tp={self.type!r}, size={self.size!r})"
|
||||
|
||||
@property
|
||||
def probably_binary(self):
|
||||
def probably_binary(self) -> bool:
|
||||
if self._probably_binary is not None:
|
||||
return self._probably_binary
|
||||
|
||||
@@ -49,7 +49,7 @@ class MessageTemplateVariable:
|
||||
return self._probably_binary
|
||||
|
||||
@property
|
||||
def probably_text(self):
|
||||
def probably_text(self) -> bool:
|
||||
if self._probably_text is not None:
|
||||
return self._probably_text
|
||||
|
||||
@@ -97,11 +97,11 @@ class MessageTemplateBlock:
|
||||
self.block_type: MsgBlockType = MsgBlockType.MBT_SINGLE
|
||||
self.number = 0
|
||||
|
||||
def add_variable(self, var):
|
||||
def add_variable(self, var: MessageTemplateVariable):
|
||||
self.variable_map[var.name] = var
|
||||
self.variables.append(var)
|
||||
|
||||
def get_variable(self, name):
|
||||
def get_variable(self, name) -> MessageTemplateVariable:
|
||||
return self.variable_map[name]
|
||||
|
||||
|
||||
@@ -119,11 +119,11 @@ class MessageTemplate:
|
||||
self.deprecation = None
|
||||
self.encoding = None
|
||||
|
||||
def add_block(self, block):
|
||||
def add_block(self, block: MessageTemplateBlock):
|
||||
self.block_map[block.name] = block
|
||||
self.blocks.append(block)
|
||||
|
||||
def get_block(self, name):
|
||||
def get_block(self, name) -> MessageTemplateBlock:
|
||||
return self.block_map[name]
|
||||
|
||||
def get_msg_freq_num_len(self):
|
||||
|
||||
@@ -43,7 +43,7 @@ class TemplateDictionary:
|
||||
|
||||
self.template_list: typing.List[MessageTemplate] = []
|
||||
# maps name to template
|
||||
self.message_templates = {}
|
||||
self.message_templates: typing.Dict[str, MessageTemplate] = {}
|
||||
|
||||
# maps (freq,num) to template
|
||||
self.message_dict = {}
|
||||
|
||||
Reference in New Issue
Block a user