* Moved BlockingQueue, DoubleDictionary, and ExpiringCache into OpenMetaverseTypes.dll

* First attempt at an EventQueueServer implementation, untested
* Implemented a capabilities server that can route capabilities to local callbacks or remote URIs
* Modified HttpServer.HttpRequestCallback to return a bool: true to close the connection, false to leave it open
* Removed all locks from HttpServer and added try/catch around HttpListenerContext operations
* Added Color4.FromHSV()

git-svn-id: http://libopenmetaverse.googlecode.com/svn/libopenmetaverse/trunk@2379 52acb1d6-8a22-11de-b505-999d5b087335
This commit is contained in:
John Hurliman
2008-12-15 19:13:24 +00:00
parent 674d4ea653
commit 7f8cffafd2
10 changed files with 1209 additions and 103 deletions

View File

@@ -25,25 +25,231 @@
*/
using System;
using System.Collections.Generic;
using System.Net;
using System.IO;
using System.Threading;
using System.Xml;
using OpenMetaverse.StructuredData;
namespace OpenMetaverse.Capabilities
{
public class EventQueueEvent
{
public string Name;
public OSDMap Body;
public EventQueueEvent(string name, OSDMap body)
{
Name = name;
Body = body;
}
}
public class EventQueueServer
{
public EventQueueServer(HttpServer server, string path)
/// <summary>The number of milliseconds to wait before the connection times out
/// and an empty response is sent to the client. This value should be higher
/// than BATCH_WAIT_INTERVAL for the timeout to function properly</summary>
const int CONNECTION_TIMEOUT = 120000;
/// <summary>This interval defines the amount of time to wait, in milliseconds,
/// for new events to show up on the queue before sending a response to the
/// client and completing the HTTP request. The interval also specifies the
/// maximum time that can pass before the queue shuts down after Stop() or the
/// class destructor is called</summary>
const int BATCH_WAIT_INTERVAL = 100;
/// <summary>Since multiple events can be batched together and sent in the same
/// response, this prevents the event queue thread from infinitely dequeueing
/// events and never sending a response if there is a constant stream of new
/// events</summary>
const int MAX_EVENTS_PER_RESPONSE = 5;
HttpServer server;
HttpServer.HttpRequestHandler handler;
BlockingQueue<EventQueueEvent> eventQueue = new BlockingQueue<EventQueueEvent>();
int currentID = 0;
bool running = true;
public EventQueueServer(HttpServer server, HttpServer.HttpRequestHandler handler)
{
HttpRequestSignature signature = new HttpRequestSignature();
signature.Method = "post";
signature.ContentType = String.Empty;
signature.Path = path;
HttpServer.HttpRequestCallback callback = new HttpServer.HttpRequestCallback(EventQueueHandler);
HttpServer.HttpRequestHandler handler = new HttpServer.HttpRequestHandler(signature, callback);
server.AddHandler(handler);
this.server = server;
this.handler = handler;
}
protected void EventQueueHandler(HttpRequestSignature signature, ref HttpListenerContext context)
~EventQueueServer()
{
Stop();
}
public void Stop()
{
running = false;
try { server.RemoveHandler(handler); }
catch (Exception) { }
}
public void SendEvent(string eventName, OSDMap body)
{
SendEvent(new EventQueueEvent(eventName, body));
}
public void SendEvent(EventQueueEvent eventQueueEvent)
{
eventQueue.Enqueue(eventQueueEvent);
}
public void SendEvents(IList<EventQueueEvent> events)
{
for (int i = 0; i < events.Count; i++)
eventQueue.Enqueue(events[i]);
}
public bool EventQueueHandler(ref HttpListenerContext context)
{
// Decode the request
OSD request = null;
try { request = OSDParser.DeserializeLLSDXml(new XmlTextReader(context.Request.InputStream)); }
catch (Exception) { }
if (request != null && request.Type == OSDType.Map)
{
OSDMap requestMap = (OSDMap)request;
int ack = requestMap["ack"].AsInteger();
bool done = requestMap["done"].AsBoolean();
if (ack != currentID - 1)
{
Logger.Log(String.Format("[EventQueue] Received an ack for id {0}, last id sent was {1}",
ack, currentID - 1), Helpers.LogLevel.Warning);
}
if (!done)
{
StartEventQueueThread(context);
// Tell HttpServer to leave the connection open
return false;
}
else
{
Logger.Log(String.Format("[EventQueue] Shutting down the event queue {0} at the client's request",
context.Request.Url), Helpers.LogLevel.Info);
Stop();
context.Response.KeepAlive = false;
return true;
}
}
else
{
Logger.Log(String.Format("[EventQueue] Received a request with invalid or missing LLSD at {0}, closing the connection",
context.Request.Url), Helpers.LogLevel.Warning);
context.Response.KeepAlive = false;
context.Response.StatusCode = (int)HttpStatusCode.BadRequest;
return true;
}
}
void StartEventQueueThread(HttpListenerContext httpContext)
{
// Spawn a new thread to hold the connection open and return from our precious IOCP thread
Thread thread = new Thread(new ThreadStart(
delegate()
{
EventQueueEvent eventQueueEvent = null;
int totalMsPassed = 0;
while (running)
{
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent))
{
// An event was dequeued
totalMsPassed = 0;
List<EventQueueEvent> eventsToSend = new List<EventQueueEvent>();
eventsToSend.Add(eventQueueEvent);
DateTime start = DateTime.Now;
int batchMsPassed = 0;
// Wait BATCH_WAIT_INTERVAL milliseconds looking for more events,
// or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE
while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE)
{
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent))
eventsToSend.Add(eventQueueEvent);
batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds;
}
SendResponse(httpContext, eventsToSend);
return;
}
else
{
// BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection
// has timed out yet.
totalMsPassed += BATCH_WAIT_INTERVAL;
if (totalMsPassed >= CONNECTION_TIMEOUT)
{
Logger.DebugLog(String.Format(
"[EventQueue] {0}ms passed without an event, timing out the event queue",
totalMsPassed));
SendResponse(httpContext, null);
return;
}
}
}
}
));
thread.Start();
}
void SendResponse(HttpListenerContext httpContext, List<EventQueueEvent> eventsToSend)
{
if (eventsToSend != null)
{
OSDArray responseArray = new OSDArray(eventsToSend.Count);
// Put all of the events in an array
for (int i = 0; i < eventsToSend.Count; i++)
{
EventQueueEvent currentEvent = eventsToSend[i];
OSDMap eventMap = new OSDMap(2);
eventMap.Add("body", currentEvent.Body);
eventMap.Add("message", OSD.FromString(currentEvent.Name));
responseArray.Add(eventMap);
}
// Create a map containing the events array and the id of this response
OSDMap responseMap = new OSDMap(2);
responseMap.Add("events", responseArray);
responseMap.Add("id", OSD.FromInteger(currentID++));
// Serialize the events and send the response
byte[] buffer = OSDParser.SerializeLLSDXmlBytes(responseMap);
httpContext.Response.KeepAlive = true;
httpContext.Response.ContentType = "application/xml";
httpContext.Response.ContentLength64 = buffer.Length;
httpContext.Response.OutputStream.Write(buffer, 0, buffer.Length);
httpContext.Response.OutputStream.Close();
httpContext.Response.Close();
}
else
{
// The 502 response started as a bug in the LL event queue server implementation,
// but is now hardcoded into the protocol as the code to use for a timeout
httpContext.Response.StatusCode = (int)HttpStatusCode.BadGateway;
httpContext.Response.KeepAlive = true;
httpContext.Response.Close();
}
}
}
}