# standard python libs import unittest # related from indra.base import llsd from eventlet import api, coros # pyogp from pyogp.lib.base.region import Region from pyogp.lib.base.event_queue import EventQueueClient from pyogp.lib.base.exc import * from pyogp.lib.base.caps import Capability # pyogp tests import pyogp.lib.base.tests.config class TestEventQueue(unittest.TestCase): def setUp(self): self.eq = EventQueueClient(region = Region()) def tearDown(self): pass def test__decode_eq_result(self): data = {'events': [{'body': {'SimulatorInfo': [{'IP': '\xd8R R', 'Handle': '\x00\x03\xe4\x00\x00\x03\xe9\x00', 'Port': 13001}]}, 'message': 'EnableSimulator'}, {'body': {'SimulatorInfo': [{'IP': '\xd8R\x0f\x06', 'Handle': '\x00\x03\xe5\x00\x00\x03\xe8\x00', 'Port': 13000}]}, 'message': 'EnableSimulator'}, {'body': {'SimulatorInfo': [{'IP': '\xd8R R', 'Handle': '\x00\x03\xe4\x00\x00\x03\xe9\x00', 'Port': 13001}]}, 'message': 'EnableSimulator'}, {'body': {'SimulatorInfo': [{'IP': '\xd8R\x0f\x06', 'Handle': '\x00\x03\xe5\x00\x00\x03\xe8\x00', 'Port': 13000}]}, 'message': 'EnableSimulator'}], 'id': -2054685694} packets = self.eq._decode_eq_result(data) for packet in packets: self.assertEquals(str(type(packet)), '') def test__decode_eq_result2(self): llsd_data = 'eventsbodyAgentDataAgentIDa517168d-1af5-4854-ba6d-672c8a59e439GroupDataAcceptNotices1Contribution0GroupID4dd70b7f-8b3a-eef9-fc2f-909151d521f6GroupInsigniaIDGroupNameEnus' Construction CrewGroupPowersAAA5ABgBAAA=ListInProfile0AcceptNotices1Contribution0GroupID69fd708c-3f20-a01b-f9b5-b5c4b310e5caGroupInsigniaIDGroupNameEnusBot ArmyGroupPowersAAD5ABgBAAA=ListInProfile0messageAgentGroupDataUpdatebodyAgeVerificationBlockRegionDenyAgeUnverified0MediaDataMediaDescMediaHeight0MediaLoop1MediaTypenone/noneMediaWidth0ObscureMedia1ObscureMusic1ParcelDataAABBMax25625650AABBMin000Area65536AuctionIDAAAAAA==AuthBuyerIDBitmapategory255ClaimDate1088625472ClaimPrice0DescGroupIDGroupPrims0IsGroupOwned0LandingType1LocalID15MaxPrims3750MediaAutoScale0MediaIDMediaURLMusicURLNameOtherCleanTime0OtherCount4096OtherPrims0OwnerIDdd1e79b2-ddfe-4080-8206-242ab63f4a19OwnerPrims44ParcelFlagsegAAAQ==ParcelPrimBonus1PassHours1PassPrice10PublicCount0RegionDenyAnonymous0RegionDenyIdentified0RegionDenyTransacted0RegionPushOverride0RentPrice0RequestResult0SalePrice10000SelectedPrims0SelfCount0SequenceID0SimWideMaxPrims3750SimWideTotalPrims44SnapSelection0SnapshotIDStatus0TotalPrims44UserLocation000UserLookAt000messageParcelPropertiesbodySimulatorInfoHandleAAP8AAADxAA=IP2FISdw==Port13002messageEnableSimulatorid-182442759' data = llsd.parse(llsd_data) packets = self.eq._decode_eq_result(data) packet_names = [] for packet in packets: self.assertEquals(str(type(packet)), '') packet_names.append(packet.name) self.assertEquals(packet_names, ['AgentGroupDataUpdate', 'ParcelProperties', 'EnableSimulator']) def test__decode_eq_results3(self): llsd_data = 'eventsbodysession_id6e20d408-2702-ea83-04b4-12cef089a327session_infomoderated_modevoice0session_namePyogptype0voice_enabled1success1temp_session_id6e20d408-2702-ea83-04b4-12cef089a327messageChatterBoxSessionStartReplybodyagent_updates43b8b2d7-99b0-4b60-a935-45896c74be62infocan_voice_chat1is_moderator0transitionENTER4ca88f33-f34a-4ae4-a8f1-7357f883789einfocan_voice_chat1is_moderator0transitionENTER77e48a07-06f1-4ae4-9d33-6eab4d445e2dinfocan_voice_chat1is_moderator0transitionENTER79e7c4ad-3361-4736-bced-1f72e6c3dbd4infocan_voice_chat1is_moderator0transitionENTER7a59bbd8-5175-4cff-8328-f92b3acde98ainfocan_voice_chat1is_moderator0transitionENTER94e1350b-64d6-444b-8d3c-d1da460b259cinfocan_voice_chat1is_moderator0transitionENTER97980bee-d865-4100-80e7-9763e53e8a6ainfocan_voice_chat1is_moderator0transitionENTERa1f21e11-7db3-4eb4-89f3-5a65eea34495infocan_voice_chat1is_moderator0transitionENTERa517168d-1af5-4854-ba6d-672c8a59e439infocan_voice_chat1is_moderator0transitionENTERb2161990-f5c4-4dcb-a73c-e7365a18adfdinfocan_voice_chat1is_moderator0transitionENTERc12cce9d-3308-4517-9a5e-1f3460187e56infocan_voice_chat1is_moderator0transitionENTEReeaedeb6-c702-472a-b725-e4492095c69ainfocan_voice_chat1is_moderator0transitionENTERsession_id6e20d408-2702-ea83-04b4-12cef089a327updates43b8b2d7-99b0-4b60-a935-45896c74be62ENTER4ca88f33-f34a-4ae4-a8f1-7357f883789eENTER77e48a07-06f1-4ae4-9d33-6eab4d445e2dENTER79e7c4ad-3361-4736-bced-1f72e6c3dbd4ENTER7a59bbd8-5175-4cff-8328-f92b3acde98aENTER94e1350b-64d6-444b-8d3c-d1da460b259cENTER97980bee-d865-4100-80e7-9763e53e8a6aENTERa1f21e11-7db3-4eb4-89f3-5a65eea34495ENTERa517168d-1af5-4854-ba6d-672c8a59e439ENTERb2161990-f5c4-4dcb-a73c-e7365a18adfdENTERc12cce9d-3308-4517-9a5e-1f3460187e56ENTEReeaedeb6-c702-472a-b725-e4492095c69aENTERmessageChatterBoxSessionAgentListUpdatesbodyagent_updatesa517168d-1af5-4854-ba6d-672c8a59e439infocan_voice_chat1is_moderator0session_id6e20d408-2702-ea83-04b4-12cef089a327updatesmessageChatterBoxSessionAgentListUpdatesbodySimulatorInfoHandleAAP8AAADxAA=IP2FIT1A==Port12035messageEnableSimulatorid17' data = llsd.parse(llsd_data) packets = self.eq._decode_eq_result(data) packet_names = [] for packet in packets: self.assertEquals(str(type(packet)), '') packet_names.append(packet.name) def test_start_exception(self): # self.eq.cap is None so it throws an exception, logging should print error self.assertEquals(None, self.eq.cap) self.eq.start() self.assertTrue(True) def test_processRegionEventQueue_exception(self): self.eq.cap = Capability('foo', 'http://127.0.0.1') self.assertRaises(RegionCapNotAvailable, self.eq._processRegionEventQueue) def test_start_and_stop(self): self.eq.cap = Capability('EventQueueGet', 'http://127.0.0.1') self.assertFalse(self.eq.stopped) api.spawn(self.eq.start) api.sleep(1) #self.eq.stop() #stop is broken atm self.eq.stopped = True api.sleep(.1) self.assertTrue(self.eq.stopped) api.sleep(1) self.assertFalse(self.eq._running) def test_suite(): from unittest import TestSuite, makeSuite suite = TestSuite() suite.addTest(makeSuite(TestEventQueue)) return suite """ Contributors can be viewed at: http://svn.secondlife.com/svn/linden/projects/2008/pyogp/CONTRIBUTORS.txt $LicenseInfo:firstyear=2008&license=apachev2$ Copyright 2009, Linden Research, Inc. Licensed under the Apache License, Version 2.0 (the "License"). You may obtain a copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 or in http://svn.secondlife.com/svn/linden/projects/2008/pyogp/LICENSE.txt $/LicenseInfo$ """