template parser now doing the header decomposition. Message types added and now used (no more strings). Sizes also added to the variables.

This commit is contained in:
locklainn.linden
2008-07-22 15:18:35 +00:00
committed by Salad Dais
parent 195a8ac3c9
commit fd3dbd5530
7 changed files with 273 additions and 121 deletions

View File

@@ -25,7 +25,6 @@ $/LicenseInfo$
#standard libraries
import struct
import string
import re
import pprint
@@ -107,12 +106,15 @@ class MsgVariableData():
def get_type(self):
return self.lltype
class MessageTemplateVariable():
def __init__(self, name, tp):
def __init__(self, name, tp, size):
self.name = name
self.lltype = tp
self.size = size
def get_size(self):
return self.size
def get_name(self):
return self.name
@@ -121,18 +123,14 @@ class MessageTemplateVariable():
return self.lltype
class MessageTemplateBlock():
def __init__(self, header):
def __init__(self, name):
self.variables = {}
self.name = header[0]
self.blockType = header[1]
if self.blockType == 'Multiple':
self.number = int(header[2])
else:
self.number = 0
self.name = name
self.block_type = None
self.number = 0
def get_block_type(self):
return self.blockType
return self.block_type
def get_block_number(self):
return self.number
@@ -149,35 +147,24 @@ class MessageTemplateBlock():
def get_variable(self, name):
return self.variables[name]
class MessageTemplate():
def __init__(self, header):
self.blocks = {}
def set_type(self, tp):
self.block_type = tp
def set_number(self, num):
self.number = num
class MessageTemplate():
def __init__(self, name):
self.blocks = {}
#this is the function or object that will handle this type of message
self.handler = None
self.name = header[0]
self.frequency = header[1]
self.msg_num = string.atoi(header[2],0)
if self.frequency == 'Fixed':
#have to do this because Fixed messages are stored as a long in the template
binTemp = struct.pack('>L', string.atol(header[2],0))
self.msg_num_hex = binTemp
self.msg_num = struct.unpack('>h','\x00' + binTemp[3])[0]
elif self.frequency == 'Low':
self.msg_num_hex = struct.pack('>BBh',0xff,0xff, self.msg_num)
elif self.frequency == 'Medium':
self.msg_num_hex = struct.pack('>BB',0xff, self.msg_num)
elif self.frequency == 'High':
self.msg_num_hex = struct.pack('>B', self.msg_num)
self.msg_trust = header[3]
self.msg_encoding = header[4]
if len(header) > 5:
self.msg_deprecation = header[5]
else:
self.msg_deprecation = ''
self.name = name
self.frequency = None
self.msg_num = 0
self.msg_num_hex = None
self.msg_trust = None
self.msg_deprecation = None
def get_handler(self):
return self.handler
@@ -216,7 +203,24 @@ class MessageTemplate():
def get_block(self, name):
return self.blocks[name]
def set_frequency(self, freq):
self.frequency = freq
def set_message_number(self, num):
self.msg_num = num
def set_message_hex_num(self, num):
self.msg_num_hex = num
def set_trust(self, trust):
self.msg_trust = trust
def set_encoding(self, enc):
self.msg_encoding = enc
def set_deprecation(self, dep):
self.msg_deprecation = dep
#these remain unformatted (by standard) because they are going to be moved
def decodeHeaderPair(frequency, num):
return mypacketdictionary[(frequency, num)]

View File

@@ -1,4 +1,5 @@
from pyogp.lib.base.message_template import MsgData, MsgBlockData, MsgVariableData
from pyogp.lib.base.message_template import MsgData, MsgBlockData, \
MsgVariableData
class MessageTemplateBuilder():
""" This class builds messages at its high level, that is, keeping

View File

@@ -1,4 +1,5 @@
from pyogp.lib.base.data import msg_tmpl
from pyogp.lib.base.message_types import MsgFrequency
class TemplateDictionary():
def __init__(self, template_list):
@@ -14,12 +15,24 @@ class TemplateDictionary():
def buildDictionaries(self, template_list):
for template in template_list:
self.message_templates[template.get_name()] = template
self.message_dict[(template.get_frequency(), \
#do a mapping of type to a string for easier reference
frequency_str = ''
if template.get_frequency() == MsgFrequency.FIXED_FREQUENCY_MESSAGE:
frequency_str = "Fixed"
elif template.get_frequency() == MsgFrequency.LOW_FREQUENCY_MESSAGE:
frequency_str = "Low"
elif template.get_frequency() == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE:
frequency_str = "Medium"
elif template.get_frequency() == MsgFrequency.HIGH_FREQUENCY_MESSAGE:
frequency_str = "High"
self.message_dict[(frequency_str, \
template.get_message_number())] = template
def get_template(self, template_name):
return self.message_templates[template_name]
def get_template_by_pair(self, frequency, num):
return self.message_dict[(frequency, num)]

View File

@@ -1,10 +1,14 @@
#standard libraries
import struct
import string
import re
import pprint
#local libraries
import message_template
from pyogp.lib.base.data import msg_tmpl
from pyogp.lib.base.message_types import MsgFrequency, MsgTrust, \
MsgEncoding, MsgDeprecation, MsgBlockType, MsgType
class MessageTemplateParser():
def __init__(self, template_file):
@@ -40,7 +44,7 @@ class MessageTemplateParser():
#results = re.match("^\t\t([^{}]+.+)",line) #gets packet blocks
#results = re.match("^\t\t([{}]+.+)",line) #gets block data
current_packet = None
current_template = None
current_block = None
#we have to go through all the packets and parse them
@@ -58,7 +62,6 @@ class MessageTemplateParser():
parts = version_test.group(1)
parts = parts.split()
self.version = float(parts[0])
#get packet header, starting a new packet
packet_header = re.match("^\t([^\t{}]+.+)",line) #gets packet headers
@@ -66,16 +69,87 @@ class MessageTemplateParser():
parts = packet_header.group(1)
parts = parts.split()
current_packet = message_template.MessageTemplate(parts)
self.__add_template(current_packet)
current_template = message_template.MessageTemplate(parts[0])
frequency = None
if parts[1] == 'Low':
frequency = MsgFrequency.LOW_FREQUENCY_MESSAGE
elif parts[1] == 'Medium':
frequency = MsgFrequency.MEDIUM_FREQUENCY_MESSAGE
elif parts[1] == 'High':
frequency = MsgFrequency.HIGH_FREQUENCY_MESSAGE
elif parts[1] == 'Fixed':
frequency = MsgFrequency.FIXED_FREQUENCY_MESSAGE
current_template.set_frequency(frequency)
msg_num = string.atoi(parts[2],0)
if frequency == MsgFrequency.FIXED_FREQUENCY_MESSAGE:
#have to do this because Fixed messages are stored as a long in the template
binTemp = struct.pack('>L', string.atol(parts[2],0))
msg_num_hex = binTemp
msg_num = struct.unpack('>h','\x00' + binTemp[3])[0]
elif frequency == MsgFrequency.LOW_FREQUENCY_MESSAGE:
msg_num_hex = struct.pack('>BBh',0xff,0xff, msg_num)
elif frequency == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE:
msg_num_hex = struct.pack('>BB',0xff, msg_num)
elif frequency == MsgFrequency.HIGH_FREQUENCY_MESSAGE:
msg_num_hex = struct.pack('>B', msg_num)
current_template.set_message_number(msg_num)
current_template.set_message_hex_num(msg_num_hex)
msg_trust = None
if parts[3] == 'Trusted':
msg_trust = MsgTrust.LL_TRUSTED
elif parts[3] == 'NotTrusted':
msg_trust = MsgTrust.LL_NOTRUST
current_template.set_trust(msg_trust)
msg_encoding = None
if parts[4] == 'Unencoded':
msg_encoding = MsgEncoding.LL_UNENCODED
elif parts[4] == 'Zerocoded':
msg_encoding = MsgEncoding.LL_ZEROCODED
current_template.set_encoding(msg_encoding)
msg_dep = None
if len(parts) > 5:
if parts[5] == 'Deprecated':
msg_dep = MsgDeprecation.LL_DEPRECATED
elif parts[5] == 'UDPDeprecated':
msg_dep = MsgDeprecation.LL_UDPDEPRECATED
elif parts[5] == 'NotDeprecated':
msg_dep = MsgDeprecation.LL_NOTDEPRECATED
else:
msg_dep = MsgDeprecation.LL_NOTDEPRECATED
current_template.set_deprecation(msg_dep)
self.__add_template(current_template)
block_header = re.match("^\t\t([^{}]+.+)",line) #gets packet block header
if block_header != None:
parts = block_header.group(1)
parts = parts.split()
current_block = message_template.MessageTemplateBlock(parts)
current_packet.add_block(current_block)
current_block = message_template.MessageTemplateBlock(parts[0])
block_type = None
block_num = 0
if parts[1] == 'Single':
block_type = MsgBlockType.MBT_SINGLE
elif parts[1] == 'Multiple':
block_type = MsgBlockType.MBT_MULTIPLE
block_num = int(parts[2])
elif parts[1] == 'Variable':
block_type = MsgBlockType.MBT_VARIABLE
current_block.set_type(block_type)
current_block.set_number(block_num)
current_template.add_block(current_block)
block_data = re.match("^\t\t([{}]+.+)",line) #gets block data
if block_data != None:
@@ -83,7 +157,75 @@ class MessageTemplateParser():
parts = parts.split()
parts.remove('{')
parts.remove('}')
current_var = message_template.MessageTemplateVariable(parts[0], parts[1])
type_string = parts[1]
var_type = None
var_size = -1
if type_string == 'U8':
var_type = MsgType.MVT_U8
var_size = 1
elif type_string == 'U16':
var_type = MsgType.MVT_U16
var_size = 2
elif type_string == 'U32':
var_type = MsgType.MVT_U32
var_size = 4
elif type_string == 'U64':
var_type = MsgType.MVT_U64
var_size = 8
elif type_string == 'S8':
var_type = MsgType.MVT_S8
var_size = 1
elif type_string == 'S16':
var_type = MsgType.MVT_S16
var_size = 2
elif type_string == 'S32':
var_type = MsgType.MVT_S32
var_size = 4
elif type_string == 'S64':
var_type = MsgType.MVT_S64
var_size = 8
elif type_string == 'F32':
var_type = MsgType.MVT_F32
var_size = 4
elif type_string == 'F64':
var_type = MsgType.MVT_F64
var_size = 8
elif type_string == 'LLVector3':
var_type = MsgType.MVT_LLVector3
var_size = 12
elif type_string == 'LLVector3d':
var_type = MsgType.MVT_LLVector3d
var_size = 24
elif type_string == 'LLVector4':
var_type = MsgType.MVT_LLVector4
var_size = 16
elif type_string == 'LLQuaternion':
var_type = MsgType.MVT_LLQuaternion
var_size = 12
elif type_string == 'LLUUID':
var_type = MsgType.MVT_LLUUID
var_size = 16
elif type_string == 'BOOL':
var_type = MsgType.MVT_BOOL
var_size = 1
elif type_string == 'IPADDR':
var_type = MsgType.MVT_IP_ADDR
var_size = 4
elif type_string == 'IPPORT':
var_type = MsgType.MVT_IP_PORT
var_size = 2
elif type_string == 'Fixed' or type_string == 'Variable':
if type_string == 'Fixed':
var_type = MsgType.MVT_FIXED
elif type_string == 'Variable':
var_type = MsgType.MVT_VARIABLE
var_size = int(parts[2])
if var_size <= 0:
raise Exception('Bad variable size')
current_var = message_template.MessageTemplateVariable(parts[0], var_type, var_size)
current_block.add_variable(current_var)
self.template_file.seek(0)

View File

@@ -1,3 +1,8 @@
class MsgBlockType():
MBT_SINGLE, \
MBT_MULTIPLE, \
MBT_VARIABLE = range(3)
#pack flags
class PackFlags():
LL_ZERO_CODE_FLAG = '\x80'
@@ -13,32 +18,45 @@ class MsgFrequency():
MEDIUM_FREQUENCY_MESSAGE = '\xFF'
HIGH_FREQUENCY_MESSAGE = ''
class MsgTrust():
LL_TRUSTED, \
LL_NOTRUST = range(2)
class MsgEncoding():
LL_UNENCODED, \
LL_ZEROCODED = range(2)
class MsgDeprecation():
LL_DEPRECATED, \
LL_UDPDEPRECATED, \
LL_NOTDEPRECATED = range(3)
#message variable types
class MsgTypes():
class MsgType():
#these are variables that aren't used because they can't be added to the
#builder
#MVT_NULL, \
#MVT_FIXED, \
#MVT_VARIABLE, \
#MVT_U16Vec3, \
#MVT_U16Quat, \
#MVT_S16Array, \
#MVT_EOL, \
#MVT_S64, \
MVT_BOOL, \
MVT_S8, \
MVT_FIXED, \
MVT_VARIABLE, \
MVT_U8, \
MVT_S16, \
MVT_U16, \
MVT_F32, \
MVT_S32, \
MVT_U32, \
MVT_U64, \
MVT_S8, \
MVT_S16, \
MVT_S32, \
MVT_S64, \
MVT_F32, \
MVT_F64, \
MVT_LLVector3, \
MVT_LLVector4, \
MVT_LLVector3d, \
MVT_LLVector4, \
MVT_LLQuaternion, \
MVT_LLUUID, \
MVT_BOOL, \
MVT_IP_ADDR, \
MVT_IP_PORT = range(17)
MVT_IP_PORT = range(20)

View File

@@ -7,7 +7,7 @@ from pyogp.lib.base.message_template import MessageTemplate, MessageTemplateBloc
from pyogp.lib.base.message_template_parser import MessageTemplateParser
from pyogp.lib.base.message_template_builder import MessageTemplateBuilder
from pyogp.lib.base.message_template_dict import TemplateDictionary
from pyogp.lib.base.message_types import MsgTypes
from pyogp.lib.base.message_types import MsgType
class TestTemplateBuilder(unittest.TestCase):
@@ -97,7 +97,7 @@ class TestTemplateBuilder(unittest.TestCase):
t_var = variables['AgentID']
assert t_var != None,"Block doesn't have AgentID variable"
assert t_var.get_name() == 'AgentID', "AgentID name incorrect"
assert t_var.get_type() == MsgTypes.MVT_LLUUID, "AgentID type incorrect"
assert t_var.get_type() == MsgType.MVT_LLUUID, "AgentID type incorrect"
except KeyError:
assert False, "Block doesn't have AgentID variable"
@@ -105,7 +105,7 @@ class TestTemplateBuilder(unittest.TestCase):
t_var = variables['TexturesChanged']
assert t_var != None,"Block doesn't have TexturesChanged variable"
assert t_var.get_name() == 'TexturesChanged', "Name incorrect"
assert t_var.get_type() == message_types.MVT_BOOL, "Type incorrect"
assert t_var.get_type() == MsgType.MVT_BOOL, "Type incorrect"
except KeyError:
assert False, "Block doesn't have TexturesChanged variable"

View File

@@ -6,6 +6,8 @@ from pyogp.lib.base.data import msg_tmpl
from pyogp.lib.base.message_template import MessageTemplate, MessageTemplateBlock, MessageTemplateVariable
from pyogp.lib.base.message_template_dict import TemplateDictionary
from pyogp.lib.base.message_template_parser import MessageTemplateParser
from pyogp.lib.base.message_types import MsgFrequency, MsgTrust, \
MsgEncoding, MsgDeprecation, MsgBlockType, MsgType
class TestDictionary(unittest.TestCase):
def setUp(self):
@@ -27,7 +29,7 @@ class TestDictionary(unittest.TestCase):
msg_dict = TemplateDictionary(self.template_list)
packet = msg_dict.get_template('ConfirmEnableSimulator')
assert packet != None, "get_packet failed"
assert packet.get_frequency() == 'Medium', "Incorrect frequency for ConfirmEnableSimulator"
assert packet.get_frequency() == MsgFrequency.MEDIUM_FREQUENCY_MESSAGE, "Incorrect frequency for ConfirmEnableSimulator"
assert packet.get_message_number() == 8, "Incorrect message number for ConfirmEnableSimulator"
def test_get_packet_pair(self):
@@ -42,6 +44,8 @@ class TestTemplates(unittest.TestCase):
def setUp(self):
self.template_file = msg_tmpl
self.parser = MessageTemplateParser(self.template_file)
self.msg_dict = TemplateDictionary(self.parser.get_template_list())
def test_parser(self):
parser = MessageTemplateParser(self.template_file)
@@ -55,12 +59,11 @@ class TestTemplates(unittest.TestCase):
assert True
def test_parser_version(self):
parser = MessageTemplateParser(self.template_file)
version = parser.get_version()
version = self.parser.get_version()
assert version == 2.0, "Version not correct, expected 2.0, got " + str(version)
def test_template(self):
template = MessageTemplate(('CompletePingCheck', 'High', '2', 'NotTrusted', 'Unencoded'))
template = self.msg_dict['CompletePingCheck']
name = template.get_name()
freq = template.get_frequency()
num = template.get_message_number()
@@ -69,61 +72,57 @@ class TestTemplates(unittest.TestCase):
enc = template.get_encoding()
dep = template.get_deprecation()
assert name == 'CompletePingCheck', "Expected: CompletePingCheck Returned: " + name
assert freq == 'High', "Expected: High Returned: " + freq
assert freq == MsgFrequency.HIGH_FREQUENCY_MESSAGE, "Expected: High Returned: " + freq
assert num == 2, "Expected: 2 Returned: " + str(num)
assert hx == '\x02', "Expected: '\x02' Returned: " + repr(hx)
assert trust == 'NotTrusted', "Expected: NotTrusted Returned: " + trust
assert enc == 'Unencoded', "Expected: Unencoded Returned: " + enc
assert dep == '', "Expected: Returned: " + dep
assert trust == MsgTrust.LL_NOTRUST, "Expected: NotTrusted Returned: " + trust
assert enc == MsgEncoding.LL_UNENCODED, "Expected: Unencoded Returned: " + enc
assert dep == MsgDeprecation.LL_NOTDEPRECATED, "Expected: Returned: " + dep
def test_template_low_deprecated(self):
template = MessageTemplate(('CompletePingCheck', 'Low', '2', 'NotTrusted', 'Unencoded', 'Deprecated'))
def test_template_low(self):
template = self.msg_dict['AddCircuitCode']
hx = template.get_message_hex_num()
dep = template.get_deprecation()
assert hx == '\xff\xff\x00\x02', "Expected: '\xff\xff\x00\x02' Returned: " + repr(hx)
assert dep == 'Deprecated', "Expected: Deprecated Returned: " + dep
def test_deprecated(self):
template = self.msg_dict['ObjectPosition']
dep = template.get_deprecation()
assert dep == MsgDeprecation.LL_DEPRECATED, "Expected: Deprecated Returned: " + dep
def test_template_medium(self):
template = MessageTemplate(('CompletePingCheck', 'Medium', '2', 'NotTrusted', 'Unencoded', 'Deprecated'))
template = self.msg_dict['RequestMultipleObjects']
hx = template.get_message_hex_num()
dep = template.get_deprecation()
assert hx == '\xff\x02', "Expected: " + r'\xff\x02' + " Returned: " + hx
assert hx == '\xff\x03', "Expected: " + r'\xff\x03' + " Returned: " + hx
def test_template_fixed(self):
template = MessageTemplate(('CompletePingCheck', 'Fixed', '0xFFFFFFFB', 'NotTrusted', 'Unencoded'))
template = self.msg_dict['PacketAck']
num = template.get_message_number()
hx = template.get_message_hex_num()
assert num == 251, "Expected: 251 Returned: " + str(num)
assert hx == '\xff\xff\xff\xfb', "Expected: " + r'\xff\xff\xff\xfb' + " Returned: " + repr(hx)
def test_block(self):
block = MessageTemplateBlock(('PingID', 'Single'))
name = block.get_name()
block = self.msg_dict['OpenCircuit'].get_block('CircuitInfo')
tp = block.get_block_type()
num = block.get_block_number()
assert name == 'PingID', "Expected: PingID Returned" + name
assert tp == 'Single', "Expected: Single Returned: " + tp
assert tp == MsgBlockType.MBT_SINGLE, "Expected: Single Returned: " + tp
assert num == 0, "Expected: 0 Returned: " + str(num)
def test_block_multiple(self):
block = MessageTemplateBlock(('CircuitCode', 'Multiple', '2'))
name = block.get_name()
block = self.msg_dict['NeighborList'].get_block('NeighborBlock')
tp = block.get_block_type()
num = block.get_block_number()
assert name == 'CircuitCode', "Expected: CircuitCode Returned" + name
assert tp == 'Multiple', "Expected: Multiple Returned: " + tp
assert num == 2, "Expected: 2 Returned: " + str(num)
assert tp == MsgBlockType.MBT_MULTIPLE, "Expected: Multiple Returned: " + tp
assert num == 4, "Expected: 4 Returned: " + str(num)
def test_variable(self):
variable = MessageTemplateVariable("PingID", "U8")
name = variable.get_name()
variable = self.msg_dict['StartPingCheck'].get_block('PingID').get_variable('PingID')
tp = variable.get_type()
assert name == 'PingID', "Expected: PingID Returned: " + name
assert tp == 'U8', "Expected: U8 Returned: " + tp
assert tp == MsgType.MVT_U8, "Expected: U8 Returned: " + str(tp)
def test_add_get_block(self):
template = MessageTemplate(('CompletePingCheck', 'High', '2', 'NotTrusted', 'Unencoded'))
block = MessageTemplateBlock(('CircuitCode', 'Multiple', '2'))
template = MessageTemplate('CompletePingCheck')
block = MessageTemplateBlock('CircuitCode')
template.add_block(block)
blocks = template.get_blocks()
count = len(blocks)
@@ -131,38 +130,13 @@ class TestTemplates(unittest.TestCase):
assert template.get_block('CircuitCode') != None, "Get block failed"
def test_add_variable(self):
block = MessageTemplateBlock(('CircuitCode', 'Multiple', '2'))
variable = MessageTemplateVariable("PingID", "U8")
block = MessageTemplateBlock('CircuitCode')
variable = MessageTemplateVariable("PingID", MsgType.MVT_U8, 1)
block.add_variable(variable)
var_list = block.get_variables()
assert var_list[0].get_name() == 'PingID', "Add variable failed"
assert block.get_variable('PingID') != None, "Get variable failed"
def test_StartPingCheck(self):
parser = MessageTemplateParser(self.template_file)
template_list = parser.get_template_list()
my_template = template_list[4]
name = my_template.get_name()
frequency = my_template.get_frequency()
num = my_template.get_message_number()
hx = my_template.get_message_hex_num()
trust = my_template.get_trust()
code = my_template.get_encoding()
deprecate = my_template.get_deprecation()
block_count = len(my_template.get_blocks())
my_block = my_template.get_block('PingID')
p_id = my_block.get_variable('PingID')
assert name == 'StartPingCheck', "Expected: StartPingCheck Returned: " + name
assert frequency == 'High', "Expected: High Returned: " + frequency
assert num == 1, "Expected: 1 Returned: " + str(num)
assert hx == '\x01', "Expected: " + r'\x01' + " Returned: " + repr(hx)
assert trust == 'NotTrusted', "Expected: NotTrusted Returned: " + trust
assert code == 'Unencoded', "Expected: Unencoded Returned: " + code
assert deprecate == '', "Expected: Returned: " + str(deprecate)
assert block_count == 1, "Expected: 1 Returned: " + str(block_count)
assert my_block != None, "Getting block PingID failed"
assert p_id != None, "Getting variable PingID failed"
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()