first attempt at packets using the message template. This includes a header dictionary (frequency, num to name), reverse header dictionary (name to packet header frequency,num), creating a packet list that has all packets that are available, with their body blocks, as well as what data are in the blocks

This commit is contained in:
locklainn.linden
2008-07-15 20:12:19 +00:00
committed by Salad Dais
parent 23ac671be7
commit 87f96c9313
4 changed files with 405 additions and 0 deletions

View File

@@ -0,0 +1,82 @@
import re
import packet
import pprint
def parse_packets():
dic = {}
count = 0
lines = open("../../linden/scripts/messages/message_template.msg", ).xreadlines()
#results = re.match("^\t([^\t{}]+.+)",line) #gets packet headers
#results = re.match("^\t\t([^{}]+.+)",line) #gets packet blocks
#results = re.match("^\t\t([{}]+.+)",line) #gets block data
current_packet = None
current_block = None
current_var = None
#we have to go through all the packets and parse them
while(True):
try:
line = lines.next()
except StopIteration:
break
#get packet header, starting a new packet
packet_header = re.match("^\t([^\t{}]+.+)",line) #gets packet headers
if packet_header != None:
parts = packet_header.group(1)
parts = parts.split()
current_packet = packet.Packet(parts)
dic[current_packet.name] = current_packet
block_header = re.match("^\t\t([^{}]+.+)",line) #gets packet block header
if block_header != None:
parts = block_header.group(1)
parts = parts.split()
current_block = packet.PacketBlock(parts)
current_packet.addBlock(current_block)
block_data = re.match("^\t\t([{}]+.+)",line) #gets block data
if block_data != None:
parts = block_data.group(1)
parts = parts.split()
parts.remove('{')
parts.remove('}')
current_var = packet.PacketBlockVariable(parts[0], parts[1])
current_block.addVar(current_var)
return dic
def print_packet_list(packet_list):
for packet in packet_list:
print '======================================'
print packet
for block in packet_list[packet].blocks:
print '\t' + block.name
for var in block.vars:
print '\t\t' + var.name + ' ' + var.lltype
def get_all_types(packet_list):
type_set = set([])
for packet in packet_list:
for block in packet_list[packet].blocks:
for var in block.vars:
type_set.add(var.lltype)
type_list = list(type_set)
type_list.sort()
return type_list
def main():
p_list = parse_packets()
#print_packet_list(p_list)
p_typelist = get_all_types(p_list)
pprint.pprint(p_typelist)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
#!/usr/bin/python
"""
@file makepacketdict.py
@author Lawson English
@date 2008-06-13
@brief Iniitializes path directories
$LicenseInfo:firstyear=2008&license=apachev2$
Copyright 2008, Linden Research, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
$/LicenseInfo$
"""
import re
def makereversepacketdict():
rev_dict = {}
for line in open("../../linden/scripts/messages/message_template.msg", ).xreadlines():
results = re.match("^\t([^\t{}]+.+)",line)
if results:
aline = results.group(1)
aline = aline.split()
if aline[1] == "Fixed":
rev_dict[aline[0]] = (aline[1],int("0x"+aline[2][8:],16))
else:
rev_dict[aline[0]] = (aline[1],int(aline[2]))
return rev_dict
def makepacketdict():
dict = {}
for line in open("../../linden/scripts/messages/message_template.msg", ).xreadlines():
results = re.match("^\t([^\t{}]+.+)",line)
if results:
aline = results.group(1)
aline = aline.split()
if aline[1] == "Fixed":
dict[(aline[1],int("0x"+aline[2][8:],16))] = (aline[0],aline[3], aline[4])
else:
dict[(aline[1],int(aline[2]))] = (aline[0],aline[3], aline[4])
return dict

161
pyogp/lib/base/packet.py Normal file
View File

@@ -0,0 +1,161 @@
#!/usr/bin/python
"""
@file packet.py
@author Linden Lab
@date 2008-06-13
@brief Iniitializes path directories
$LicenseInfo:firstyear=2008&license=apachev2$
Copyright 2008, Linden Research, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
$/LicenseInfo$
"""
from makepacketdict import makepacketdict, makereversepacketdict
import struct
import pprint
mypacketdictionary = makepacketdict()
myreversedictionary = makereversepacketdict()
""" can construct and deconstruct packet headers. Has nothing to
do with the packet payload, yet. """
LL_ZERO_CODE_FLAG = '\x80'
LL_RELIABLE_FLAG = '\x40'
LL_RESENT_FLAG = '\x20'
LL_ACK_FLAG = '\x10'
LL_NONE = '\x00'
FIXED_FREQUENCY_MESSAGE = '\xFF\xFF\xFF'
LOW_FREQUENCY_MESSAGE = '\xFF\xFF'
MEDIUM_FREQUENCY_MESSAGE = '\xFF'
HIGH_FREQUENCY_MESSAGE = ''
class PacketBlockVariable():
def __init__(self, name, tp):
self.name = name
self.lltype = tp
class PacketBlock():
def __init__(self, header):
self.header = header
self.name = header[0]
self.vars = []
def addVar(self, var):
self.vars.append(var)
def getVar(self, index):
return self.vars[index]
def getVarByName(self, name):
for var in self.vars:
if var.name == name:
return var
return None
class Packet():
def __init__(self, header):
self.header = header
self.name = header[0]
self.blocks = []
def addBlock(self, block):
self.blocks.append(block)
def getBlockByName(self, name):
for block in self.blocks:
if block.name == name:
return block
return None
def decodeHeaderPair(frequency, num):
return mypacketdictionary[(frequency, num)]
def decodeFrequency(header):
#if it is not a high
if header[0] == '\xFF':
#if it is not a medium frequency message
if header[1] == '\xFF':
#if it is a Fixed frequency message
if header[2] == '\xFF':
return 'Fixed'
#then it is low
else:
return 'Low'
#then it is medium
else:
return 'Medium'
#then it is high
else:
return 'High'
return None
def decodeNum(header):
frequency = decodeFrequency(header)
if frequency == 'Low':
return struct.unpack('B', header[2:4])[0] #int("0x"+ByteToHex(header[2:4]).replace(' ', ''),16)
elif frequency == 'Medium':
return struct.unpack('B', header[1:2])[0] #int("0x"+ByteToHex(header[1:2]).replace(' ', ''),16)
elif frequency == 'High':
return struct.unpack('B', header[0])[0] #int("0x"+ByteToHex(header[0]), 16)
elif frequency == 'Fixed':
return struct.unpack('B', header[0:4])[0] #int("0x"+ByteToHex(header[0:4]).replace(' ', ''), 16)
else:
return None
def decodeHeader(header):
frequency = decodeFrequency(header)
num = decodeNum(header)
return decodeHeaderPair(frequency, num)
def encodePacketID(frequency, num):
if frequency == 'Low':
frequencyData = LOW_FREQUENCY_MESSAGE
packedNum = struct.pack('>H', num)
elif frequency == 'Medium':
frequencyData = MEDIUM_FREQUENCY_MESSAGE
packedNum = struct.pack('>B', num)
elif frequency == 'High':
frequencyData = HIGH_FREQUENCY_MESSAGE
packedNum = struct.pack('>B', num)
elif frequency == 'Fixed':
frequencyData = FIXED_FREQUENCY_MESSAGE
packedNum = struct.pack('>B', num)
return frequencyData + packedNum
def encodeHeaderName(ack, sequenceNumber, packetName):
header_tuple = myreversedictionary[packetName]
frequency = header_tuple[0]
num = header_tuple[1]
return encodeHeader(ack, sequenceNumber, frequency, num)
def encodeHeader(ack, sequenceNumber, frequency, num):
packetID = encodePacketID(frequency, num)
return ack + struct.pack('>LB', sequenceNumber, 0) + packetID

View File

@@ -0,0 +1,106 @@
import unittest
import packet
class PacketTest(unittest.TestCase):
#testing each component
def test_freqLow(self):
assert packet.decodeFrequency('\xFF\xFF\x01') == 'Low', '0xFFFF01 is supposed to be "Low"'
def test_freqMedium(self):
assert packet.decodeFrequency('\xFF\x01') == 'Medium', '0xFF01 is supposed to be "Medium"'
def test_freqHigh(self):
assert packet.decodeFrequency('\x01') == 'High', '0x01 is supposed to be "High"'
def test_numLow(self):
num = packet.decodeNum('\xFF\xFF\x01')
assert num == 1, 'Outcome: ' + str(num) + ' Expected: 1'
def test_numMedium(self):
num = packet.decodeNum('\xFF\x01')
assert num == 1, 'Outcome: ' + str(num) + ' Expected: 1'
def test_numHigh(self):
num = packet.decodeNum('\x01')
assert num == 1, 'Outcome: ' + str(num) + ' Expected: 1'
#pass cases
def test_DecodeLow(self):
assert packet.decodeHeader('\xFF\xFF\x01')[0] == 'TestMessage', 'wrong packet for 0xFFFF01: expected TestMessage'
def test_DecodeMedium(self):
assert packet.decodeHeader('\xFF\x01')[0] == 'ObjectAdd', 'wrong packet for 0xFF01: expected ObjectAdd'
def test_DecodeHigh(self):
assert packet.decodeHeader('\x01')[0] == 'StartPingCheck', 'wrong packet for 0x01: expected StartPingCheck'
def test_DecodeLow(self):
assert packet.decodeHeaderPair('Low', 1)[0] == 'TestMessage', 'wrong packet for ("Low", 1): expected TestMessage'
def test_DecodePairMedium(self):
assert packet.decodeHeaderPair('Medium', 1)[0] == 'ObjectAdd', 'wrong packet for ("Medium", 1): expected ObjectAdd'
def test_DecodePairHigh(self):
assert packet.decodeHeaderPair('High', 1)[0] == 'StartPingCheck', 'wrong packet for ("High", 1): expected StartPingCheck'
#fail cases
def test_DecodeLowFail(self):
assert packet.decodeHeader('\xFF\xFF\x01')[0] != 'TestMessage', 'wrong packet for 0xFFFF01: expected TestMessage'
def test_DecodeMediumFail(self):
assert packet.decodeHeader('\x01')[0] != 'ObjectAdd', 'wrong packet for 0x01: expected ObjectAdd'
def test_DecodeHighFail(self):
assert packet.decodeHeader('\xFF\x01')[0] != 'StartPingCheck', 'wrong packet for 0xFF01: expected StartPingCheck'
def test_DecodeLowFail(self):
assert packet.decodeHeaderPair('Medium', 1)[0] != 'TestMessage', 'wrong packet for ("Medium", 1): expected TestMessage'
def test_DecodePairMediumFail(self):
assert packet.decodeHeaderPair('High', 1)[0] != 'ObjectAdd', 'wrong packet for ("High", 1): expected ObjectAdd'
def test_DecodePairHighFail(self):
assert packet.decodeHeaderPair('Low', 1)[0] != 'StartPingCheck', 'wrong packet for ("Low", 1): expected StartPingCheck'
#test encode packetID
def test_encodePackIDLow(self):
pID = packet.encodePacketID('Low', 1)
assert pID == '\xFF\xFF\x00\x01', 'Outcome: ' + repr(pID) + ' Expected: ' + r'\xFF\xFF\x00\x01'
def test_encodePackIDMedium(self):
pID = packet.encodePacketID('Medium', 1)
assert pID == '\xFF\x01', 'Outcome: ' + repr(pID) + ' Expected: ' + r'\xFF\x01'
def test_encodePackIDHigh(self):
pID = packet.encodePacketID('High', 1)
assert pID == '\x01', 'Outcome: ' + repr(pID) + ' Expected: ' + r'x01'
def test_encodeHeaderLow(self):
header = packet.encodeHeader(packet.LL_NONE, 1, 'Low', 1)
assert header == '\x00\x00\x00\x00\x01\x00\xff\xff\x00\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\xff\xff\x00\x01'
def test_encodeHeaderMedium(self):
header = packet.encodeHeader(packet.LL_NONE, 1, 'Medium', 1)
assert header == '\x00\x00\x00\x00\x01\x00\xff\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\xff\x01'
def test_encodeHeaderHigh(self):
header = packet.encodeHeader(packet.LL_NONE, 1, 'High', 1)
assert header == '\x00\x00\x00\x00\x01\x00\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\x01'
def test_encodeHeaderNameLow(self):
header = packet.encodeHeaderName(packet.LL_NONE, 1, 'TestMessage')
assert header == '\x00\x00\x00\x00\x01\x00\xff\xff\x00\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\xff\xff\x00\x01'
def test_encodeHeaderNameMedium(self):
header = packet.encodeHeaderName(packet.LL_NONE, 1, 'ObjectAdd')
assert header == '\x00\x00\x00\x00\x01\x00\xff\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\xff\x01'
def test_encodeHeaderNameHigh(self):
header = packet.encodeHeaderName(packet.LL_NONE, 1, 'StartPingCheck')
assert header == '\x00\x00\x00\x00\x01\x00\x01', 'Outcome: ' + repr(header) + ' Expected: ' + r'\x00\x00\x00\x00\x01\x00\x01'
if __name__ == "__main__":
unittest.main()