2009-02-11 22:36:48 +00:00
# standard python libs
from logging import getLogger , CRITICAL , ERROR , WARNING , INFO , DEBUG
2009-03-26 20:10:13 +00:00
import sys , traceback
2009-02-11 22:36:48 +00:00
2009-03-03 01:40:52 +00:00
# related
from eventlet import api , util
# the following makes socket calls nonblocking. magic
util . wrap_socket_with_coroutine_socket ( )
2009-02-11 22:36:48 +00:00
2009-03-03 01:40:52 +00:00
# pyogp
2009-02-11 22:36:48 +00:00
from pyogp . lib . base . utilities . events import Event
2009-03-26 20:10:13 +00:00
from pyogp . lib . base . groups import *
2009-02-11 22:36:48 +00:00
2009-03-03 01:40:52 +00:00
# messaging
from pyogp . lib . base . message . packet import UDPPacket
from pyogp . lib . base . message . template_dict import TemplateDictionary
from pyogp . lib . base . message . template import MsgData , MsgBlockData , MsgVariableData
2009-02-11 22:36:48 +00:00
# initialize logging
logger = getLogger ( ' pyogp.lib.base.event_queue ' )
log = logger . log
2009-03-03 01:40:52 +00:00
class EventQueueClient ( object ) :
""" handles an event queue of either an agent domain or a simulator
Initialize the event queue client class
>> > client = EventQueueClient ( )
The event queue client requires an event queue capability
>> > from pyogp . lib . base . caps import Capability
>> > cap = Capability ( ' EventQueue ' , http : / / localhost : 12345 / cap )
>> > event_queue = EventQueueClient ( cap )
>> > event_queue . start ( )
2009-02-11 22:36:48 +00:00
2009-03-03 01:40:52 +00:00
Sample implementations : region . py
Tests : tests / test_event_queue . py
"""
2009-03-12 20:48:26 +00:00
def __init__ ( self , capability = None , settings = None , packet_handler = None , region = None , event_queue_handler = None ) :
2009-02-11 22:36:48 +00:00
""" set up the event queue attributes """
2009-03-03 01:40:52 +00:00
# allow the settings to be passed in
# otherwise, grab the defaults
if settings != None :
self . settings = settings
else :
from pyogp . lib . base . settings import Settings
self . settings = Settings ( )
# allow the packet_handler to be passed in
# otherwise, grab the defaults
if packet_handler != None :
self . packet_handler = packet_handler
elif self . settings . HANDLE_PACKETS :
2009-03-05 21:50:23 +00:00
from pyogp . lib . base . message . packethandler import PacketHandler
2009-03-17 04:24:08 +00:00
self . packet_handler = PacketHandler ( settings = self . settings )
2009-03-03 01:40:52 +00:00
2009-03-12 20:48:26 +00:00
# allow the event_queue_handler to be passed in
# otherwise, grab the defaults
if event_queue_handler != None :
self . event_queue_handler = event_queue_handler
elif self . settings . HANDLE_EVENT_QUEUE_DATA :
2009-03-17 04:24:08 +00:00
self . event_queue_handler = EventQueueHandler ( settings = self . settings )
2009-03-12 20:48:26 +00:00
2009-03-04 21:33:01 +00:00
self . region = region
2009-02-11 22:36:48 +00:00
self . cap = capability
#self.type = eq_type # specify 'agentdomain' or 'region'
self . _running = False # this class controls this value
self . stop = False # client can pause the event queue
self . last_id = - 1
# if applicable, initialize data to post to the event queue
self . data = { }
# stores the result of the post to the event queue capability
self . result = None
2009-03-03 01:40:52 +00:00
# enables proper packet parsing in event queue responses
self . template_dict = TemplateDictionary ( )
self . current_template = None
2009-02-11 22:36:48 +00:00
def start ( self ) :
2009-03-04 21:33:01 +00:00
try :
2009-04-01 23:49:16 +00:00
2009-03-04 21:33:01 +00:00
if self . cap . name == ' event_queue ' :
2009-04-01 23:49:16 +00:00
if self . settings . LOG_COROUTINE_SPAWNS : log ( INFO , " Spawning a coroutine for event queue in the agent domain context. " )
self . _processADEventQueue ( )
2009-03-04 21:33:01 +00:00
return True
2009-04-01 23:49:16 +00:00
2009-03-04 21:33:01 +00:00
elif self . cap . name == ' EventQueueGet ' :
2009-04-01 23:49:16 +00:00
if self . settings . LOG_COROUTINE_SPAWNS : log ( INFO , " Spawning a coroutine for event queue in the simulator context for %s . " % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) ) )
self . _processRegionEventQueue ( )
2009-03-04 21:33:01 +00:00
return True
2009-04-01 23:49:16 +00:00
2009-03-04 21:33:01 +00:00
else :
2009-04-01 23:49:16 +00:00
2009-03-04 21:33:01 +00:00
# ToDo handle this as an exception
2009-04-01 23:49:16 +00:00
2009-03-04 21:33:01 +00:00
log ( WARNING , ' Unable to start event queue polling due to %s ' % ( ' unknown queue type ' ) )
return False
2009-04-01 23:49:16 +00:00
except Exception , error :
log ( ERROR , " Problem stating event queue for %s with cap of %s " % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) , self . seed_capability_url ) )
2009-03-04 21:33:01 +00:00
return False
2009-02-11 22:36:48 +00:00
def stop ( self ) :
2009-03-03 01:40:52 +00:00
2009-03-04 21:33:01 +00:00
log ( INFO , " Stopping %s event queue. " % ( self . type ) )
2009-02-11 22:36:48 +00:00
self . stop = True
# ToDo: turn this into a timeout
2009-03-03 01:40:52 +00:00
for i in range ( 1 , 10 ) :
if self . _running == False : return True
2009-02-11 22:36:48 +00:00
# well, we failed to stop. let's log it and get outta here
log ( WARNING , " Failed to stop %s event queue. " % ( self . type ) )
self . stop = False
2009-03-03 01:40:52 +00:00
return False
2009-02-11 22:36:48 +00:00
def _handle ( self , data ) :
""" essentially a case statement to pass packets to event notifiers in the form of self attributes """
try :
# Handle the event queue result if we have subscribers
if len ( self . subscribers ) > 0 :
2009-04-01 23:49:16 +00:00
#log(DEBUG, 'Handling event queue results')
2009-02-11 22:36:48 +00:00
handler ( data )
except AttributeError :
#log(INFO, "Received an unhandled packet: %s" % (packet.name))
pass
def _processRegionEventQueue ( self ) :
if self . cap . name != ' EventQueueGet ' :
raise exc . RegionCapNotAvailable ( ' EventQueueGet ' )
# well then get it...?
else :
self . _running = True
2009-03-04 21:33:01 +00:00
while self . stop != True :
2009-02-11 22:36:48 +00:00
2009-03-31 15:20:26 +00:00
try :
api . sleep ( self . settings . REGION_EVENT_QUEUE_POLL_INTERVAL )
2009-02-11 22:36:48 +00:00
2009-03-31 15:20:26 +00:00
if self . last_id != - 1 :
self . data = { ' ack ' : self . last_id , ' done ' : True }
2009-02-11 22:36:48 +00:00
2009-04-01 23:49:16 +00:00
if self . settings . ENABLE_EQ_LOGGING :
if self . settings . ENABLE_HOST_LOGGING :
host_string = ' to ( %s ) ' % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) )
else :
host_string = ' '
log ( DEBUG , ' Posting to the event queue %s : %s ' % ( host_string , self . data ) )
2009-03-12 20:48:26 +00:00
2009-03-31 15:20:26 +00:00
try :
self . result = self . cap . POST ( self . data )
except Exception , error :
2009-04-01 23:49:16 +00:00
if self . settings . ENABLE_HOST_LOGGING :
host_string = ' from ( %s ) ' % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) )
else :
host_string = ' '
log ( INFO , " Received an error we ought not care about %s : %s " % ( host_string , error ) )
2009-03-31 15:20:26 +00:00
pass
2009-03-03 01:40:52 +00:00
2009-03-31 15:20:26 +00:00
if self . result != None :
self . last_id = self . result [ ' id ' ]
else :
self . last_id = - 1
2009-02-11 22:36:48 +00:00
2009-03-31 15:20:26 +00:00
self . _parse_result ( self . result )
except Exception , error :
log ( WARNING , " Error in a post to the event queue. Error was: %s " % ( error ) )
#finally:
#log(CRITICAL, "Why am i here?")
2009-03-03 01:40:52 +00:00
self . _running = False
2009-02-11 22:36:48 +00:00
2009-03-04 21:33:01 +00:00
log ( DEBUG , " Stopped event queue processing for %s " % ( self . region . SimName ) )
2009-02-11 22:36:48 +00:00
def _processADEventQueue ( self ) :
if self . cap . name != ' event_queue ' :
raise RegionCapNotAvailable ( ' event_queue ' )
# change the exception here (add a new one)
else :
self . _running = True
while not self . stop :
api . sleep ( self . settings . agentdomain_event_queue_interval )
self . result = self . capabilities [ ' event_queue ' ] . POST ( self . data )
2009-03-03 01:40:52 +00:00
if self . result != None : self . last_id = self . result [ ' id ' ]
self . _parse_result ( self . result )
self . _running = False
2009-02-11 22:36:48 +00:00
def _parse_result ( self , data ) :
# if there are subscribers to the event queue and packet handling is enabled
2009-03-03 01:40:52 +00:00
if self . settings . HANDLE_PACKETS : # and (len(self.handler) > 0):
2009-03-09 21:43:15 +00:00
2009-02-11 22:36:48 +00:00
try :
2009-03-09 21:43:15 +00:00
2009-03-03 01:40:52 +00:00
if data != None :
2009-03-09 21:43:15 +00:00
2009-04-01 23:49:16 +00:00
if self . settings . ENABLE_EQ_LOGGING :
if self . settings . ENABLE_HOST_LOGGING :
host_string = ' from ( %s ) ' % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) )
else :
host_string = ' '
log ( DEBUG , ' Event Queue result %s : %s ' % ( host_string , data ) )
2009-03-09 21:43:15 +00:00
2009-03-11 22:52:07 +00:00
# if we are handling packets, handle the packet so any subscribers can get the data
2009-02-11 22:36:48 +00:00
if self . settings . HANDLE_PACKETS :
2009-03-09 21:43:15 +00:00
2009-03-11 22:52:07 +00:00
# this returns packets
parsed_data = self . _decode_eq_result ( data )
2009-03-03 01:40:52 +00:00
for packet in parsed_data :
2009-03-26 20:10:13 +00:00
self . event_queue_handler . _handle ( packet )
2009-03-09 21:43:15 +00:00
2009-03-03 01:40:52 +00:00
except Exception , error :
2009-04-01 23:49:16 +00:00
traceback . print_exc ( )
if self . settings . ENABLE_HOST_LOGGING :
host_string = ' from ( %s ) ' % ( str ( self . region . sim_ip ) + ' : ' + str ( self . region . sim_port ) )
else :
host_string = ' '
log ( WARNING , " Error parsing even queue results %s . Error: %s . Data was: %s " % ( host_string , error , data ) )
2009-02-11 22:36:48 +00:00
def _decode_eq_result ( self , data = None ) :
2009-03-03 01:40:52 +00:00
""" parse the event queue data, return a list of packets
the building of packets borrows absurdly much from UDPDeserializer . __decode_data ( )
"""
# ToDo: this is returning packets, but perhaps we want to return packet class instances?
if data != None :
packets = [ ]
if data . has_key ( ' events ' ) :
for i in data :
if i == ' id ' :
last_id = data [ i ]
else :
for message in data [ i ] :
2009-03-12 20:48:26 +00:00
# move this to a proper solution, for now, append to some list eq events
# or some dict mapping name to action to take
if message [ ' message ' ] == ' ChatterBoxInvitation ' :
2009-03-03 01:40:52 +00:00
2009-03-12 20:48:26 +00:00
message [ ' name ' ] = message [ ' message ' ]
group_chat = ChatterBoxInvitation_Message ( ChatterBoxInvitation_Data = message [ ' body ' ] )
2009-03-03 01:40:52 +00:00
2009-03-12 20:48:26 +00:00
self . event_queue_handler . _handle ( group_chat )
2009-02-11 22:36:48 +00:00
2009-03-26 20:10:13 +00:00
elif message [ ' message ' ] == ' ChatterBoxSessionEventReply ' :
message [ ' name ' ] = message [ ' message ' ]
chat_response = ChatterBoxSessionEventReply_Message ( message [ ' body ' ] )
self . event_queue_handler . _handle ( chat_response )
elif message [ ' message ' ] == ' ChatterBoxSessionAgentListUpdates ' :
message [ ' name ' ] = message [ ' message ' ]
group_agent_update = ChatterBoxSessionAgentListUpdates_Message ( message [ ' body ' ] )
self . event_queue_handler . _handle ( group_agent_update )
elif message [ ' message ' ] == ' ChatterBoxSessionStartReply ' :
message [ ' name ' ] = message [ ' message ' ]
group_chat_session_data = ChatterBoxSessionStartReply_Message ( message [ ' body ' ] )
self . event_queue_handler . _handle ( group_chat_session_data )
2009-04-01 23:49:16 +00:00
elif message [ ' message ' ] == ' EstablishAgentCommunication ' :
#message['name'] == message['message']
establish_agent_communication_data = EstablishAgentCommunication_Message ( message [ ' body ' ] )
self . event_queue_handler . _handle ( establish_agent_communication_data )
2009-03-12 20:48:26 +00:00
else :
# this is a UDP packet sent over the event queue
2009-03-26 20:10:13 +00:00
2009-03-12 20:48:26 +00:00
self . current_template = self . template_dict . get_template ( message [ ' message ' ] )
message_data = MsgData ( self . current_template . name )
2009-02-11 22:36:48 +00:00
2009-03-12 20:48:26 +00:00
# treat this like a UDPPacket
# in some cases, we will be creating UDPPacket instances
# where there is no corresponding entry in the message_template
# hence the try: except:
# there must be a better solution
packet = UDPPacket ( message_data )
packet . name = self . current_template . name
2009-02-11 22:36:48 +00:00
2009-03-12 20:48:26 +00:00
# irrelevant packet attributes since it's from the EQ
'''
packet . send_flags
packet . packet_id
packet . add_ack
packet . reliable
'''
2009-02-11 22:36:48 +00:00
2009-03-12 20:48:26 +00:00
for block_name in message [ ' body ' ] :
2009-02-11 22:36:48 +00:00
2009-03-26 20:10:13 +00:00
for items in message [ ' body ' ] [ block_name ] :
2009-02-11 22:36:48 +00:00
2009-03-26 20:10:13 +00:00
# block_data keys off of block.name, which here is the body attribute
block_data = MsgBlockData ( block_name )
2009-02-11 22:36:48 +00:00
2009-03-26 20:10:13 +00:00
message_data . add_block ( block_data )
2009-02-11 22:36:48 +00:00
2009-03-12 20:48:26 +00:00
for variable in items :
var_data = MsgVariableData ( variable , items [ variable ] , - 1 )
block_data . add_variable ( var_data )
packet . message_data = message_data
packets . append ( packet )
2009-03-26 20:10:13 +00:00
return packets
2009-03-12 20:48:26 +00:00
class EventQueueHandler ( object ) :
""" general class handling individual packets """
def __init__ ( self , settings = None ) :
""" i do nothing """
# allow the settings to be passed in
# otherwise, grab the defaults
if settings != None :
self . settings = settings
else :
from pyogp . lib . base . settings import Settings
self . settings = Settings ( )
self . handlers = { }
def _register ( self , name ) :
if self . settings . LOG_VERBOSE : log ( DEBUG , ' Creating a monitor for %s ' % ( name ) )
2009-03-13 22:09:43 +00:00
return self . handlers . setdefault ( name , EventQueueReceivedNotifier ( name , self . settings ) )
2009-03-12 20:48:26 +00:00
def is_packet_handled ( self , name ) :
""" if the data is being monitored, return True, otherwise, return False
this can allow us to skip parsing inbound packets if no one is watching a particular one
"""
try :
handler = self . handlers [ name ]
return True
except KeyError :
return False
def _handle ( self , data ) :
""" essentially a case statement to pass packets to event notifiers in the form of self attributes """
try :
handler = self . handlers [ data . name ]
# Handle the packet if we have subscribers
# Conveniently, this will also enable verbose packet logging
if len ( handler ) > 0 :
2009-04-01 23:49:16 +00:00
if self . settings . LOG_VERBOSE :
log ( DEBUG , ' Handling event queue data : %s ' % ( data . name ) )
2009-03-12 20:48:26 +00:00
handler ( data )
except KeyError :
#log(INFO, "Received an unhandled packet: %s" % (packet.name))
pass
class EventQueueReceivedNotifier ( object ) :
""" received TestMessage packet """
2009-03-13 22:09:43 +00:00
def __init__ ( self , name , settings ) :
2009-03-12 20:48:26 +00:00
self . event = Event ( )
self . name = name
2009-03-13 22:09:43 +00:00
self . settings = settings
2009-03-12 20:48:26 +00:00
def subscribe ( self , * args , * * kwdargs ) :
self . event . subscribe ( * args , * * kwdargs )
def received ( self , data ) :
self . event ( data )
2009-03-13 22:09:43 +00:00
def unsubscribe ( self , * args , * * kwdargs ) :
self . event . unsubscribe ( * args , * * kwdargs )
if self . settings . LOG_VERBOSE : log ( DEBUG , " Removed the monitor for %s by %s " % ( args , kwdargs ) )
2009-03-12 20:48:26 +00:00
def __len__ ( self ) :
return len ( self . event )
__call__ = received
2009-04-27 22:48:01 +00:00
"""
Contributors can be viewed at :
http : / / svn . secondlife . com / svn / linden / projects / 2008 / pyogp / CONTRIBUTORS . txt
2009-04-01 23:49:16 +00:00
2009-04-27 22:48:01 +00:00
$ LicenseInfo : firstyear = 2008 & license = apachev2 $
2009-04-01 23:49:16 +00:00
2009-04-27 22:48:01 +00:00
Copyright 2009 , Linden Research , Inc .
2009-04-01 23:49:16 +00:00
2009-04-27 22:48:01 +00:00
Licensed under the Apache License , Version 2.0 ( the " License " ) .
You may obtain a copy of the License at :
http : / / www . apache . org / licenses / LICENSE - 2.0
or in
http : / / svn . secondlife . com / svn / linden / projects / 2008 / pyogp / LICENSE . txt
2009-04-01 23:49:16 +00:00
2009-04-27 22:48:01 +00:00
$ / LicenseInfo $
"""
2009-04-01 23:49:16 +00:00