Files
Hippolyzer/pyogp/lib/base/event_queue.py

349 lines
13 KiB
Python

# standard python libs
from logging import getLogger, ERROR, WARNING, INFO, DEBUG
import traceback
# related
from eventlet import api, util
# the following makes socket calls nonblocking. magic
util.wrap_socket_with_coroutine_socket()
# pyogp
from pyogp.lib.base.exc import RegionCapNotAvailable
# messaging
from pyogp.lib.base.message.message_handler import MessageHandler
from pyogp.lib.base.message.message import Message, Block, Variable
from pyogp.lib.base.message.template_dict import TemplateDictionary
# initialize logging
logger = getLogger('pyogp.lib.base.event_queue')
log = logger.log
class EventQueueClient(object):
""" handles an event queue of either an agent domain or a simulator
Initialize the event queue client class
>>> client = EventQueueClient()
The event queue client requires an event queue capability
>>> from pyogp.lib.base.caps import Capability
>>> cap = Capability('EventQueue', http://localhost:12345/cap)
>>> event_queue = EventQueueClient(cap)
>>> event_queue.start()
Sample implementations: region.py
Tests: tests/test_event_queue.py
"""
def __init__(self, capability = None, settings = None, message_handler = None, region = None):
""" set up the event queue attributes """
# allow the settings to be passed in
# otherwise, grab the defaults
if settings != None:
self.settings = settings
else:
from pyogp.lib.base.settings import Settings
self.settings = Settings()
# allow the packet_handler to be passed in
# otherwise, grab the defaults
# otherwise, let's just use our own
if message_handler != None:
self.message_handler = message_handler
else:
self.message_handler = MessageHandler()
self.region = region
self.cap = capability
#self.type = eq_type # specify 'agentdomain' or 'region'
self.type = 'typeNotSpecified'
self._running = False # this class controls this value
self.stopped = False # client can pause the event queue
self.last_id = -1
# if applicable, initialize data to post to the event queue
self.data = {}
# stores the result of the post to the event queue capability
self.result = None
# enables proper packet parsing in event queue responses
self.template_dict = TemplateDictionary()
self.current_template = None
def start(self):
""" spawns a coroutine connecting to the event queue on the target """
try:
if self.cap.name == 'event_queue':
if self.settings.LOG_COROUTINE_SPAWNS:
log(INFO, "Spawning a coroutine for event queue in the agent domain context.")
self._processADEventQueue()
return True
elif self.cap.name == 'EventQueueGet':
if self.settings.LOG_COROUTINE_SPAWNS:
log(INFO, "Spawning a coroutine for event queue in the simulator context for %s." % (str(self.region.sim_ip) + ':' + str(self.region.sim_port)))
self._processRegionEventQueue()
return True
else:
# ToDo handle this as an exception
log(WARNING, 'Unable to start event queue polling due to %s' % ('unknown queue type'))
return False
except Exception, error:
log(ERROR, "Problem starting event queue for %s with cap of %s, with an error of: %s" % (str(self.region.sim_ip) + ':' + str(self.region.sim_port), str(self.cap), error))
return False
def stop(self):
""" trigger the event queue to stop communicating with the simulator """
log(INFO, "Stopping event queue.")
self.stopped = True
def stop_monitor(self, interval, times):
""" monitors the stopping of the event queue client connection """
for i in range(0, times):
api.sleep(interval)
if self._running == False:
log(INFO,
"Stopped event queue processing for %s",
self.region.SimName)
return
log(WARNING,
"Failed to stop event queue for %s after %s seconds",
self.region.SimName,
str(interval * times))
api.spawn(stop_monitor, self, self.settings.REGION_EVENT_QUEUE_POLL_INTERVAL, 10)
def _processRegionEventQueue(self):
if self.cap.name != 'EventQueueGet':
raise RegionCapNotAvailable('EventQueueGet')
# well then get it...?
else:
self._running = True
while not self.stopped:
try:
api.sleep(self.settings.REGION_EVENT_QUEUE_POLL_INTERVAL)
self.data = {}
if self.last_id != -1:
self.data = {'ack':self.last_id}
if self.settings.ENABLE_EQ_LOGGING:
if self.settings.ENABLE_HOST_LOGGING:
host_string = ' to (%s)' % (str(self.region.sim_ip) + ':' + str(self.region.sim_port))
else:
host_string = ''
log(DEBUG, 'Posting to the event queue%s: %s' % (host_string, self.data))
try:
self.result = self.cap.POST(self.data)
except Exception, error:
if self.settings.ENABLE_HOST_LOGGING:
host_string = ' from (%s)' % (str(self.region.sim_ip) + ':' + str(self.region.sim_port))
else:
host_string = ''
log(INFO, "Received an error we ought not care about%s: %s" % (host_string, error))
if self.result != None:
self.last_id = self.result['id']
else:
self.last_id = -1
self._parse_result(self.result)
except Exception, error:
log(WARNING, "Error in a post to the event queue. Error was: %s" % (error))
if self.last_id != -1:
# Need to ack the last message received, otherwise it will be
# resent if we re-connect to the same queue
self.data = {'ack':self.last_id, 'done':True}
self.cap.POST(self.data)
if self.last_id != -1:
# Need to ack the last message received, otherwise it will be
# resent if we re-connect to the same queue
self.data = {'ack':self.last_id, 'done':True}
self.cap.POST(self.data)
self._running = False
log(DEBUG, "Stopped event queue processing for %s" % (self.region.SimName))
def _processADEventQueue(self):
""" connects to an agent domain's event queue """
if self.cap.name != 'event_queue':
raise RegionCapNotAvailable('event_queue')
# change the exception here (add a new one)
else:
self._running = True
while not self.stopped:
api.sleep(self.settings.agentdomain_event_queue_interval)
self.result = self.capabilities['event_queue'].POST(self.data)
if self.result != None: self.last_id = self.result['id']
self._parse_result(self.result)
self._running = False
def _parse_result(self, data):
""" tries to parse the llsd response from an event queue request.
if successful, the event queue passes messages through the message_handler for evaluation"""
# if there are subscribers to the event queue and packet handling is enabled
if self.settings.HANDLE_PACKETS: # and (len(self.handler) > 0):
try:
if data != None:
if self.settings.ENABLE_EQ_LOGGING:
if self.settings.ENABLE_HOST_LOGGING:
host_string = ' from (%s)' % (str(self.region.sim_ip) + ':' + str(self.region.sim_port))
else:
host_string = ''
log(DEBUG, 'Event Queue result%s: %s' % (host_string, data))
# if we are handling packets, handle the packet so any subscribers can get the data
if self.settings.HANDLE_PACKETS:
# this returns packets
parsed_data = self._decode_eq_result(data)
for packet in parsed_data:
self.message_handler.handle(packet)
except Exception, error:
traceback.print_exc()
if self.settings.ENABLE_HOST_LOGGING:
host_string = ' from (%s)' % (str(self.region.sim_ip) + ':' + str(self.region.sim_port))
else:
host_string = ''
log(WARNING, "Error parsing even queue results%s. Error: %s. Data was: %s" % (host_string, error, data))
def _decode_eq_result(self, data=None):
""" parse the event queue data, return a list of packets
the building of packets borrows absurdly much from UDPDeserializer.__decode_data()
"""
# ToDo: this is returning packets, but perhaps we want to return packet class instances?
if data != None:
messages = []
if data.has_key('events'):
for i in data:
if i == 'id':
#last_id = data[i]
pass
else:
for message in data[i]:
# move this to a proper solution, for now, append to some list eq events
# or some dict mapping name to action to take
in_template = self.template_dict.get_template(message['message'])
if in_template:
# this is a message found in the message_template
#self.current_template = self.template_dict.get_template(message['message'])
new_message = Message(message['message'])
new_message.event_queue_id = self.last_id
new_message.host = self.region.host
for block_name in message['body']:
# block_data keys off of block.name, which here is the body attribute
block = Block(block_name)
new_message.add_block(block)
for items in message['body'][block_name]:
for variable in items:
var_data = Variable(variable, items[variable], -1)
block.add_variable(var_data)
#packet.blocks = message.blocks
messages.append(new_message)
else:
# this is e.g. EstablishAgentCommunication or ChatterBoxInvitation, etc
new_message = Message(message['message'])
new_message.event_queue_id = self.last_id
new_message.host = self.region.host
# faux block with a name of Message_Data
block = Block('Message_Data')
new_message.add_block(block)
for var in message['body']:
var_data = Variable(var, message['body'][var], -1)
block.add_variable(var_data)
messages.append(new_message)
return messages
"""
Contributors can be viewed at:
http://svn.secondlife.com/svn/linden/projects/2008/pyogp/CONTRIBUTORS.txt
$LicenseInfo:firstyear=2008&license=apachev2$
Copyright 2009, Linden Research, Inc.
Licensed under the Apache License, Version 2.0 (the "License").
You may obtain a copy of the License at:
http://www.apache.org/licenses/LICENSE-2.0
or in
http://svn.secondlife.com/svn/linden/projects/2008/pyogp/LICENSE.txt
$/LicenseInfo$
"""