From 142f2e42cad7def219b07f31c816dce4dc2844fe Mon Sep 17 00:00:00 2001 From: Salad Dais Date: Thu, 4 Jan 2024 19:08:09 +0000 Subject: [PATCH] Clean up message template code --- hippolyzer/apps/proxy_gui.py | 2 +- hippolyzer/lib/base/message/msgtypes.py | 32 +++++------- hippolyzer/lib/base/message/template.py | 33 ++++-------- hippolyzer/lib/base/message/template_dict.py | 28 +++++----- .../lib/base/message/template_parser.py | 52 ++++++++++--------- hippolyzer/lib/base/message/udpserializer.py | 2 +- tests/base/test_template_parser.py | 30 +++++------ 7 files changed, 81 insertions(+), 98 deletions(-) diff --git a/hippolyzer/apps/proxy_gui.py b/hippolyzer/apps/proxy_gui.py index 927af72..83b50ff 100644 --- a/hippolyzer/apps/proxy_gui.py +++ b/hippolyzer/apps/proxy_gui.py @@ -576,7 +576,7 @@ class MessageBuilderWindow(QtWidgets.QMainWindow): message_names = sorted(x.name for x in self.templateDict) for message_name in message_names: - if self.templateDict[message_name].msg_trust: + if self.templateDict[message_name].trusted: self.comboTrusted.addItem(message_name) else: self.comboUntrusted.addItem(message_name) diff --git a/hippolyzer/lib/base/message/msgtypes.py b/hippolyzer/lib/base/message/msgtypes.py index e646d98..951a04c 100644 --- a/hippolyzer/lib/base/message/msgtypes.py +++ b/hippolyzer/lib/base/message/msgtypes.py @@ -47,7 +47,6 @@ class MsgBlockType: MBT_SINGLE = 0 MBT_MULTIPLE = 1 MBT_VARIABLE = 2 - MBT_String_List = ['Single', 'Multiple', 'Variable'] class PacketFlags(enum.IntFlag): @@ -64,28 +63,23 @@ class PacketFlags(enum.IntFlag): # = '\xFF\xFF' # = '\xFF' # = '' -class MsgFrequency: - FIXED_FREQUENCY_MESSAGE = -1 # marking it - LOW_FREQUENCY_MESSAGE = 4 - MEDIUM_FREQUENCY_MESSAGE = 2 - HIGH_FREQUENCY_MESSAGE = 1 +class MsgFrequency(enum.IntEnum): + FIXED = -1 # marking it + LOW = 4 + MEDIUM = 2 + HIGH = 1 -class MsgTrust: - LL_NOTRUST = 0 - LL_TRUSTED = 1 +class MsgEncoding(enum.IntEnum): + UNENCODED = 0 + ZEROCODED = 1 -class MsgEncoding: - LL_UNENCODED = 0 - LL_ZEROCODED = 1 - - -class MsgDeprecation: - LL_DEPRECATED = 0 - LL_UDPDEPRECATED = 1 - LL_UDPBLACKLISTED = 2 - LL_NOTDEPRECATED = 3 +class MsgDeprecation(enum.IntEnum): + DEPRECATED = 0 + UDPDEPRECATED = 1 + UDPBLACKLISTED = 2 + NOTDEPRECATED = 3 # message variable types diff --git a/hippolyzer/lib/base/message/template.py b/hippolyzer/lib/base/message/template.py index 85da700..33990d2 100644 --- a/hippolyzer/lib/base/message/template.py +++ b/hippolyzer/lib/base/message/template.py @@ -21,7 +21,7 @@ Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. import typing -from .msgtypes import MsgType, MsgBlockType +from .msgtypes import MsgType, MsgBlockType, MsgFrequency from ..datatypes import UUID @@ -105,26 +105,19 @@ class MessageTemplateBlock: return self.variable_map[name] -class MessageTemplate(object): - frequency_strings = {-1: 'fixed', 1: 'high', 2: 'medium', 4: 'low'} # strings for printout - deprecation_strings = ["Deprecated", "UDPDeprecated", "UDPBlackListed", "NotDeprecated"] # using _as_string methods - encoding_strings = ["Unencoded", "Zerocoded"] # etc - trusted_strings = ["Trusted", "NotTrusted"] # etc LDE 24oct2008 - +class MessageTemplate: def __init__(self, name): self.blocks: typing.List[MessageTemplateBlock] = [] self.block_map: typing.Dict[str, MessageTemplateBlock] = {} - # this is the function or object that will handle this type of message - self.received_count = 0 - self.name = name - self.frequency = None - self.msg_num = 0 - self.msg_freq_num_bytes = None - self.msg_trust = None - self.msg_deprecation = None - self.msg_encoding = None + self.frequency: typing.Optional[MsgFrequency] = None + self.num = 0 + # Frequency + msg num as bytes + self.freq_num_bytes = None + self.trusted = False + self.deprecation = None + self.encoding = None def add_block(self, block): self.block_map[block.name] = block @@ -134,12 +127,6 @@ class MessageTemplate(object): return self.block_map[name] def get_msg_freq_num_len(self): - if self.frequency == -1: + if self.frequency == MsgFrequency.FIXED: return 4 return self.frequency - - def get_frequency_as_string(self): - return MessageTemplate.frequency_strings[self.frequency] - - def get_deprecation_as_string(self): - return MessageTemplate.deprecation_strings[self.msg_deprecation] diff --git a/hippolyzer/lib/base/message/template_dict.py b/hippolyzer/lib/base/message/template_dict.py index 16d1840..39d8bd8 100644 --- a/hippolyzer/lib/base/message/template_dict.py +++ b/hippolyzer/lib/base/message/template_dict.py @@ -68,32 +68,32 @@ class TemplateDictionary: # do a mapping of type to a string for easier reference frequency_str = '' - if template.frequency == MsgFrequency.FIXED_FREQUENCY_MESSAGE: + if template.frequency == MsgFrequency.FIXED: frequency_str = "Fixed" - elif template.frequency == MsgFrequency.LOW_FREQUENCY_MESSAGE: + elif template.frequency == MsgFrequency.LOW: frequency_str = "Low" - elif template.frequency == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE: + elif template.frequency == MsgFrequency.MEDIUM: frequency_str = "Medium" - elif template.frequency == MsgFrequency.HIGH_FREQUENCY_MESSAGE: + elif template.frequency == MsgFrequency.HIGH: frequency_str = "High" self.message_dict[(frequency_str, - template.msg_num)] = template + template.num)] = template def build_message_ids(self): for template in list(self.message_templates.values()): frequency = template.frequency num_bytes = None - if frequency == MsgFrequency.FIXED_FREQUENCY_MESSAGE: + if frequency == MsgFrequency.FIXED: # have to do this because Fixed messages are stored as a long in the template - num_bytes = b'\xff\xff\xff' + struct.pack("B", template.msg_num) - elif frequency == MsgFrequency.LOW_FREQUENCY_MESSAGE: - num_bytes = b'\xff\xff' + struct.pack("!H", template.msg_num) - elif frequency == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE: - num_bytes = b'\xff' + struct.pack("B", template.msg_num) - elif frequency == MsgFrequency.HIGH_FREQUENCY_MESSAGE: - num_bytes = struct.pack("B", template.msg_num) - template.msg_freq_num_bytes = num_bytes + num_bytes = b'\xff\xff\xff' + struct.pack("B", template.num) + elif frequency == MsgFrequency.LOW: + num_bytes = b'\xff\xff' + struct.pack("!H", template.num) + elif frequency == MsgFrequency.MEDIUM: + num_bytes = b'\xff' + struct.pack("B", template.num) + elif frequency == MsgFrequency.HIGH: + num_bytes = struct.pack("B", template.num) + template.freq_num_bytes = num_bytes def get_template_by_name(self, template_name) -> typing.Optional[MessageTemplate]: return self.message_templates.get(template_name) diff --git a/hippolyzer/lib/base/message/template_parser.py b/hippolyzer/lib/base/message/template_parser.py index b61344f..e8c8f58 100644 --- a/hippolyzer/lib/base/message/template_parser.py +++ b/hippolyzer/lib/base/message/template_parser.py @@ -22,7 +22,7 @@ import struct import re from . import template -from .msgtypes import MsgFrequency, MsgTrust, MsgEncoding +from .msgtypes import MsgFrequency, MsgEncoding from .msgtypes import MsgDeprecation, MsgBlockType, MsgType from ..exc import MessageTemplateParsingError, MessageTemplateNotFound @@ -112,67 +112,69 @@ class MessageTemplateParser: frequency = None freq_str = match.group(2) if freq_str == 'Low': - frequency = MsgFrequency.LOW_FREQUENCY_MESSAGE + frequency = MsgFrequency.LOW elif freq_str == 'Medium': - frequency = MsgFrequency.MEDIUM_FREQUENCY_MESSAGE + frequency = MsgFrequency.MEDIUM elif freq_str == 'High': - frequency = MsgFrequency.HIGH_FREQUENCY_MESSAGE + frequency = MsgFrequency.HIGH elif freq_str == 'Fixed': - frequency = MsgFrequency.FIXED_FREQUENCY_MESSAGE + frequency = MsgFrequency.FIXED new_template.frequency = frequency msg_num = int(match.group(3), 0) - if frequency == MsgFrequency.FIXED_FREQUENCY_MESSAGE: + if frequency == MsgFrequency.FIXED: # have to do this because Fixed messages are stored as a long in the template msg_num &= 0xff msg_num_bytes = struct.pack('!BBBB', 0xff, 0xff, 0xff, msg_num) - elif frequency == MsgFrequency.LOW_FREQUENCY_MESSAGE: + elif frequency == MsgFrequency.LOW: msg_num_bytes = struct.pack('!BBH', 0xff, 0xff, msg_num) - elif frequency == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE: + elif frequency == MsgFrequency.MEDIUM: msg_num_bytes = struct.pack('!BB', 0xff, msg_num) - elif frequency == MsgFrequency.HIGH_FREQUENCY_MESSAGE: + elif frequency == MsgFrequency.HIGH: msg_num_bytes = struct.pack('!B', msg_num) else: raise Exception("don't know about frequency %s" % frequency) - new_template.msg_num = msg_num - new_template.msg_freq_num_bytes = msg_num_bytes + new_template.num = msg_num + new_template.freq_num_bytes = msg_num_bytes - msg_trust = None msg_trust_str = match.group(4) if msg_trust_str == 'Trusted': - msg_trust = MsgTrust.LL_TRUSTED + msg_trust = True elif msg_trust_str == 'NotTrusted': - msg_trust = MsgTrust.LL_NOTRUST + msg_trust = False + else: + raise ValueError(f"Invalid trust {msg_trust_str}") - new_template.msg_trust = msg_trust + new_template.trusted = msg_trust - msg_encoding = None msg_encoding_str = match.group(5) if msg_encoding_str == 'Unencoded': - msg_encoding = MsgEncoding.LL_UNENCODED + msg_encoding = MsgEncoding.UNENCODED elif msg_encoding_str == 'Zerocoded': - msg_encoding = MsgEncoding.LL_ZEROCODED + msg_encoding = MsgEncoding.ZEROCODED + else: + raise ValueError(f"Invalid encoding {msg_encoding_str}") - new_template.msg_encoding = msg_encoding + new_template.encoding = msg_encoding msg_dep = None msg_dep_str = match.group(7) if msg_dep_str: if msg_dep_str == 'Deprecated': - msg_dep = MsgDeprecation.LL_DEPRECATED + msg_dep = MsgDeprecation.DEPRECATED elif msg_dep_str == 'UDPDeprecated': - msg_dep = MsgDeprecation.LL_UDPDEPRECATED + msg_dep = MsgDeprecation.UDPDEPRECATED elif msg_dep_str == 'UDPBlackListed': - msg_dep = MsgDeprecation.LL_UDPBLACKLISTED + msg_dep = MsgDeprecation.UDPBLACKLISTED elif msg_dep_str == 'NotDeprecated': - msg_dep = MsgDeprecation.LL_NOTDEPRECATED + msg_dep = MsgDeprecation.NOTDEPRECATED else: - msg_dep = MsgDeprecation.LL_NOTDEPRECATED + msg_dep = MsgDeprecation.NOTDEPRECATED if msg_dep is None: raise MessageTemplateParsingError("Unknown msg_dep field %s" % match.group(0)) - new_template.msg_deprecation = msg_dep + new_template.deprecation = msg_dep return new_template diff --git a/hippolyzer/lib/base/message/udpserializer.py b/hippolyzer/lib/base/message/udpserializer.py index e5617a0..3a67679 100644 --- a/hippolyzer/lib/base/message/udpserializer.py +++ b/hippolyzer/lib/base/message/udpserializer.py @@ -69,7 +69,7 @@ class UDPMessageSerializer: # frequency and message number. The template stores it because it doesn't # change per template. body_writer = se.BufferWriter("<") - body_writer.write_bytes(current_template.msg_freq_num_bytes) + body_writer.write_bytes(current_template.freq_num_bytes) body_writer.write_bytes(msg.extra) # We're going to pop off keys as we go, so shallow copy the dict. diff --git a/tests/base/test_template_parser.py b/tests/base/test_template_parser.py index ae0b3f4..922399d 100644 --- a/tests/base/test_template_parser.py +++ b/tests/base/test_template_parser.py @@ -27,7 +27,7 @@ from hippolyzer.lib.base.message.data import msg_tmpl from hippolyzer.lib.base.message.template import MessageTemplate, MessageTemplateBlock, MessageTemplateVariable from hippolyzer.lib.base.message.template_dict import TemplateDictionary from hippolyzer.lib.base.message.template_parser import MessageTemplateParser -from hippolyzer.lib.base.message.msgtypes import MsgFrequency, MsgTrust, MsgEncoding, \ +from hippolyzer.lib.base.message.msgtypes import MsgFrequency, MsgEncoding, \ MsgDeprecation, MsgBlockType, MsgType @@ -45,8 +45,8 @@ class TestDictionary(unittest.TestCase): msg_dict = TemplateDictionary(self.template_list) packet = msg_dict.get_template_by_name('ConfirmEnableSimulator') assert packet is not None, "get_packet failed" - assert packet.frequency == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE, "Incorrect frequency" - assert packet.msg_num == 8, "Incorrect message number for ConfirmEnableSimulator" + assert packet.frequency == MsgFrequency.MEDIUM, "Incorrect frequency" + assert packet.num == 8, "Incorrect message number for ConfirmEnableSimulator" def test_get_packet_pair(self): msg_dict = TemplateDictionary(self.template_list) @@ -76,29 +76,29 @@ class TestTemplates(unittest.TestCase): template = self.msg_dict['CompletePingCheck'] name = template.name freq = template.frequency - num = template.msg_num - trust = template.msg_trust - enc = template.msg_encoding + num = template.num + trust = template.trusted + enc = template.encoding assert name == 'CompletePingCheck', "Expected: CompletePingCheck Returned: " + name - assert freq == MsgFrequency.HIGH_FREQUENCY_MESSAGE, "Expected: High Returned: " + freq + assert freq == MsgFrequency.HIGH, "Expected: High Returned: " + freq assert num == 2, "Expected: 2 Returned: " + str(num) - assert trust == MsgTrust.LL_NOTRUST, "Expected: NotTrusted Returned: " + trust - assert enc == MsgEncoding.LL_UNENCODED, "Expected: Unencoded Returned: " + enc + assert not trust, "Expected: NotTrusted Returned: " + trust + assert enc == MsgEncoding.UNENCODED, "Expected: Unencoded Returned: " + enc def test_deprecated(self): template = self.msg_dict['ObjectPosition'] - dep = template.msg_deprecation - assert dep == MsgDeprecation.LL_DEPRECATED, "Expected: Deprecated Returned: " + str(dep) + dep = template.deprecation + assert dep == MsgDeprecation.DEPRECATED, "Expected: Deprecated Returned: " + str(dep) def test_template_fixed(self): template = self.msg_dict['PacketAck'] - num = template.msg_num + num = template.num assert num == 251, "Expected: 251 Returned: " + str(num) def test_blacklisted(self): template = self.msg_dict['TeleportFinish'] - self.assertEqual(template.msg_deprecation, - MsgDeprecation.LL_UDPBLACKLISTED) + self.assertEqual(template.deprecation, + MsgDeprecation.UDPBLACKLISTED) def test_block(self): block = self.msg_dict['OpenCircuit'].get_block('CircuitInfo') @@ -167,7 +167,7 @@ class TestTemplates(unittest.TestCase): frequency_counter = {"low": 0, 'medium': 0, "high": 0, 'fixed': 0} for template in list(self.msg_dict.message_templates.values()): - frequency_counter[template.get_frequency_as_string()] += 1 + frequency_counter[template.frequency.name.lower()] += 1 self.assertEqual(low_count, frequency_counter["low"]) self.assertEqual(medium_count, frequency_counter["medium"]) self.assertEqual(high_count, frequency_counter["high"])