Files
Hippolyzer/pyogp/lib/base/tests/test_message_manager.py

135 lines
35 KiB
Python
Raw Normal View History

"""
Contributors can be viewed at:
http://svn.secondlife.com/svn/linden/projects/2008/pyogp/lib/base/trunk/CONTRIBUTORS.txt
$LicenseInfo:firstyear=2008&license=apachev2$
Copyright 2009, Linden Research, Inc.
Licensed under the Apache License, Version 2.0.
You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
or in
http://svn.secondlife.com/svn/linden/projects/2008/pyogp/lib/base/LICENSE.txt
$/LicenseInfo$
"""
# standard python libs
import unittest
import os
# pyogp
from pyogp.lib.base.message_manager import MessageManager
from pyogp.lib.base.caps import Capability
from pyogp.lib.base.message.message import Message, Block
from pyogp.lib.base.tests.mockup_net import MockupUDPServer, MockupUDPClient
from pyogp.lib.base.message.circuit import Host
2009-10-06 18:33:56 +00:00
from pyogp.lib.base.message.udpdispatcher import UDPDispatcher
# pyogp tests
import pyogp.lib.base.tests.config
from eventlet import api
class TestMessageManager(unittest.TestCase):
def setUp(self):
self.host = Host((MockupUDPServer(), 80))
self.message_manager = MessageManager(self.host,
capabilities={'EventQueueGet' : Capability('EventQueueGet', 'http://127.0.0.1')})
def tearDown(self):
pass
def test_start_stop_monitors(self):
self.message_manager.start_monitors()
api.sleep(0)
self.assertTrue(self.message_manager._is_running)
self.assertTrue(self.message_manager.event_queue._running)
self.message_manager.stop_monitors()
api.sleep(2)
self.assertFalse(self.message_manager._is_running)
self.assertTrue(self.message_manager.event_queue.stopped)
self.assertFalse(self.message_manager.event_queue._running)
def test_enqueue_message(self):
message = Message('TestMessage1',
Block('TestBlock1',
Test1 = 0),
Block('NeighborBlock',
Test0 = 0,
Test1 = 1,
Test2 = 2))
self.message_manager.enqueue_message(message,
reliable = True)
self.assertEqual(self.message_manager.outgoing_queue[0][0].name,
'TestMessage1')
self.assertTrue(self.message_manager.outgoing_queue[0][1])
message2 = Message('TestMessage2',
Block('TestBlock1',
Test1 = 0),
Block('NeighborBlock',
Test0 = 0,
Test1 = 1,
Test2 = 2))
self.message_manager.enqueue_message(message2,
reliable = False,
now = True)
self.assertEqual(self.message_manager.outgoing_queue[0][0].name,
'TestMessage2')
self.assertFalse(self.message_manager.outgoing_queue[0][1])
def test_send_udp_message(self):
2009-10-06 18:33:56 +00:00
self.message_manager.udp_dispatcher = UDPDispatcher(MockupUDPClient(),
self.message_manager.settings,
self.message_manager.message_handler)
message = Message('PacketAck',
Block('Packets', ID=0x00000003))
buf = self.message_manager.send_udp_message(message)
assert buf == \
'\x00' + '\x00\x00\x00\x01' + '\x00' + '\xff\xff\xff\xfb' + \
'\x01' + '\x03\x00\x00\x00', \
'Received: ' + repr(buf) + ' ' + \
'Expected: ' + repr('\x00' + '\x00\x00\x00\x01' + '\x00' + \
'\xff\xff\xff\xfb' + '\x01' + '\x03\x00\x00\x00')
def test_custom_message_template_count(self):
message_manager = MessageManager(self.host,
message_template = open(os.path.join(os.path.dirname(__file__), 'mock_message_template.msg')))
# the custom message_template.msg should pass all the way through, and should have 210 messages
self.assertEquals(len(message_manager.udp_dispatcher.udp_deserializer.template_dict.template_list), 210)
def test_embedded_message_template_count(self):
message_manager = MessageManager(self.host,
message_template = open(os.path.join(os.path.dirname(__file__), 'mock_message_template.msg')))
# the message_template.msg should default to the embedded instance
# and should have > 470 messages
assert len(self.message_manager.udp_dispatcher.udp_deserializer.template_dict.template_list) >= 475
def test_custom_message_xml(self):
message_manager = MessageManager(self.host,
message_xml = open(os.path.join(os.path.dirname(__file__), 'mock_message.xml')))
# the custom message.xml should be used
self.assertEquals(message_manager.message_xml.__dict__, {'messageBans': {'untrusted': {}, 'trusted': {}}, 'messages': {'PacketAck': {'flavor': 'template', 'trusted-sender': False}}, 'serverDefaults': {'simulator': 'template'}, 'parsed_llsd': {'serverDefaults': {'simulator': 'template'}, 'messages': {'PacketAck': {'flavor': 'template', 'trusted-sender': False}}, 'capBans': {'MapLayer': False, 'UploadBakedTexture': True}, 'maxQueuedEvents': 100, 'messageBans': {'untrusted': {}, 'trusted': {}}}, 'maxQueuedEvents': 100, 'raw_llsd': '<?xml version="1.0"?>\n<llsd>\n <map>\n\t\t <key>serverDefaults</key>\n <!--\n\t\t\t\t a map of server names to default message transport\n\t\t\t\t-->\n\t\t <map>\n\t\t\t\t<key>simulator</key>\n\t\t\t\t<string>template</string>\n\n\n\t\t </map>\n\t\t <key>messages</key>\n <!--\n\t\t\t\t a map of individual message names that override defaults\n\t\t\t\t-->\n\t\t <map>\n\t\t\t\t<!--\n\t\t\t\t\tCircuit related messages\n\t\t\t\t-->\n\t\t\t\t<key>PacketAck</key>\n\t\t\t\t<map>\n\t\t\t\t\t<key>flavor</key>\n\t\t\t\t\t<string>template</string>\n\t\t\t\t\t<key>trusted-sender</key>\n\t\t\t\t\t<boolean>false</boolean>\n\t\t\t\t</map>\n\n\t\t </map>\n \t \t<key>capBans</key>\n \t<map>\n\t\t\t<key>MapLayer</key>\n\t\t\t<boolean>false</boolean>\n\n\t\t\t<key>UploadBakedTexture</key>\n\t\t\t<boolean>true</boolean>\n\t\t</map>\n\n\t\t<key>messageBans</key>\n\t\t<map>\n\t\t\t<key>trusted</key>\n\t\t\t<map>\n\t\t\t</map>\n\t\t\n\t\t\t<key>untrusted</key>\n\t\t\t<map>\n\t\t\t</map>\n\t\t</map>\n\n\t\t<key>maxQueuedEvents</key>\n\t\t<integer>100</integer>\n </map>\n</llsd>\n', 'capBans': {'MapLayer': False, 'UploadBakedTexture': True}})
def test_embedded_message_xml(self):
self.assertEquals(self.message_manager.message_xml.__dict__, {'messageBans': {'untrusted': {}, 'trusted': {}}, 'messages': {'ParcelProperties': {'flavor': 'llsd', 'trusted-sender': True}, 'ImprovedTerseObjectUpdate': {'flavor': 'template', 'trusted-sender': False}, 'EstablishAgentCommunication': {'flavor': 'llsd', 'trusted-sender': True}, 'FetchInventoryDescendents': {'flavor': 'template', 'trusted-sender': False}, 'CrossedRegion': {'flavor': 'llsd', 'trusted-sender': True}, 'ForceCloseChatterBoxSession': {'flavor': 'llsd', 'trusted-sender': True}, 'EconomyDataRequest': {'flavor': 'template', 'trusted-sender': False}, 'CoarseLocationUpdate': {'flavor': 'template', 'trusted-sender': True, 'only-send-latest': True}, 'CompletePingCheck': {'flavor': 'template', 'trusted-sender': False}, 'avatarpicksrequest': {'service_name': 'avatar-pick', 'builder': 'template', 'trusted-sender': False}, 'ParcelVoiceInfo': {'flavor': 'llsd', 'trusted-sender': True}, 'ObjectUpdateCached': {'flavor': 'template', 'trusted-sender': False}, 'ObjectUpdateCompressed': {'flavor': 'template', 'trusted-sender': False}, 'RoutedMoneyBalanceReply': {'flavor': 'llsd', 'trusted-sender': False}, 'EdgeDataPacket': {'flavor': 'template', 'trusted-sender': True}, 'LandStatReply': {'flavor': 'llsd', 'trusted-sender': False}, 'AvatarAppearance': {'flavor': 'template', 'trusted-sender': False}, 'EnableSimulator': {'flavor': 'llsd', 'trusted-sender': True}, 'DirLandReply': {'flavor': 'llsd', 'trusted-sender': True}, 'UseCircuitCode': {'flavor': 'template', 'trusted-sender': False}, 'AvatarAnimation': {'flavor': 'template', 'trusted-sender': False}, 'RequiredVoiceVersion': {'flavor': 'llsd', 'trusted-sender': True}, 'ImagePacket': {'flavor': 'template', 'trusted-sender': False}, 'StartGroupProposal': {'flavor': 'llsd', 'trusted-sender': False}, 'StartPingCheck': {'flavor': 'template', 'trusted-sender': False}, 'AgentUpdate': {'flavor': 'template', 'trusted-sender': False}, 'RequestTrustedCircuit': {'flavor': 'template', 'trusted-sender': True}, 'ChatterBoxSessionEventReply': {'flavor': 'llsd', 'trusted-sender': True}, 'GroupProposalBallot': {'flavor': 'llsd', 'trusted-sender': False}, 'RpcScriptRequestInboundForward': {'flavor': 'llsd', 'trusted-sender': False}, 'ParcelObjectOwnersReply': {'flavor': 'llsd', 'trusted-sender': True}, 'OpenCircuit': {'flavor': 'template', 'trusted-sender': False}, 'ParcelVoiceInfoRequest': {'flavor': 'llsd', 'trusted-sender': False}, 'CloseCircuit': {'flavor': 'template', 'trusted-sender': False}, 'PacketAck': {'flavor': 'template', 'trusted-sender': False}, 'ChatterBoxInvitation': {'flavor': 'llsd', 'trusted-sender': True}, 'CompleteAgentMovement': {'flavor': 'template', 'trusted-sender': False}, 'ScriptRunningReply': {'flavor': 'llsd', 'trusted-sender': False}, 'avatarclassifiedsrequest': {'service_name': 'avatar-classifieds', 'builder': 'template', 'trusted-sender': False}, 'LayerData': {'flavor': 'template', 'trusted-sender': False}, 'CreateTrustedCircuit': {'flavor': 'template', 'trusted-sender': False}, 'PlacesReply': {'flavor': 'llsd', 'trusted-sender': True}, 'SetCPURatio': {'flavor': 'template', 'trusted-sender': False}, 'RegionHandshakeReply': {'flavor': 'template', 'trusted-sender': False}, 'pickinforequest': {'service_name': 'pick-info', 'builder': 'template', 'trusted-sender': False}, 'ObjectUpdate': {'flavor': 'template', 'trusted-sender': False}, 'ViewerEffect': {'flavor': 'template', 'trusted-sender': False}, 'ChatterBoxSessionUpdate': {'flavor': 'llsd', 'trusted-sender': True}, 'SimulatorLoad': {'flavor': 'template', 'trusted-sender': True}, 'TeleportFailed': {'flavor': 'llsd', 'trusted-sender': True}, 'avatarnotesrequest': {'service_name': 'avatar-notes', 'builder': 'template', 'trusted-sender': False}, 'AgentGroupDataUpdate': {'flavor': 'llsd', 'trusted-sender': True}, 'GodKickUser': {'flavor': 'llsd', 'trusted-sender': False}, 'AddCircuitCode': {'flavor': 'template', 'trusted-sender': True}, 'ProvisionVoiceAccountRequest': {'flavor': 'llsd', 'trusted-sender': False}, 'AgentDropGroup': {'flavor': 'llsd', 't
def test_suite():
from unittest import TestSuite, makeSuite
suite = TestSuite()
suite.addTest(makeSuite(TestMessageManager))
return suite