Several fixes to the EventQueueServer

* Only allow one open connection per event queue
* Send the invalid HTTP 502 response that the LL client is expecting for a timeout
* Check if the connection is still open before trying to send a response
* Use IHttpClientContext.Response() instead of IHttpResponse.Send(), which doesn't work in long-held connections like the event queue
* More logging

git-svn-id: http://libopenmetaverse.googlecode.com/svn/libopenmetaverse/trunk@2486 52acb1d6-8a22-11de-b505-999d5b087335
This commit is contained in:
John Hurliman
2009-03-16 19:54:59 +00:00
parent e69157a4af
commit 20008e6f6b

View File

@@ -68,13 +68,20 @@ namespace OpenMetaverse.Http
const int MAX_EVENTS_PER_RESPONSE = 5;
HttpListener server;
BlockingQueue<EventQueueEvent> eventQueue = new BlockingQueue<EventQueueEvent>();
int currentID = 1;
bool running = true;
BlockingQueue<EventQueueEvent> eventQueue;
int currentID;
bool running;
bool threadRunning;
IHttpClientContext context;
IHttpRequest request;
IHttpResponse response;
public EventQueueServer(HttpListener server)
{
this.server = server;
eventQueue = new BlockingQueue<EventQueueEvent>();
running = true;
currentID = 1;
}
~EventQueueServer()
@@ -131,7 +138,25 @@ namespace OpenMetaverse.Http
if (!done)
{
StartEventQueueThread(context, request, response);
if (threadRunning)
{
Logger.Log.Info("[EventQueue] New connection opened to the event queue while a previous connection is open. Closing old connection");
// Kill the previous handler thread before starting a new one
SendEvent(null);
while (threadRunning && running)
Thread.Sleep(50);
Logger.Log.Info("[EventQueue] Old connection closed");
}
this.context = context;
this.request = request;
this.response = response;
// Spawn a new thread to hold the connection open and return from our precious IOCP thread
Thread thread = new Thread(new ThreadStart(EventQueueThread));
thread.Start();
// Tell HttpServer to leave the connection open
return false;
@@ -157,63 +182,78 @@ namespace OpenMetaverse.Http
}
}
void StartEventQueueThread(IHttpClientContext context, IHttpRequest request, IHttpResponse response)
void EventQueueThread()
{
// Spawn a new thread to hold the connection open and return from our precious IOCP thread
Thread thread = new Thread(new ThreadStart(
delegate()
threadRunning = true;
EventQueueEvent eventQueueEvent = null;
int totalMsPassed = 0;
while (running)
{
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent))
{
EventQueueEvent eventQueueEvent = null;
int totalMsPassed = 0;
// An event was dequeued
totalMsPassed = 0;
List<EventQueueEvent> eventsToSend = null;
while (running)
if (eventQueueEvent != null)
{
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent))
eventsToSend = new List<EventQueueEvent>();
eventsToSend.Add(eventQueueEvent);
DateTime start = DateTime.Now;
int batchMsPassed = 0;
// Wait BATCH_WAIT_INTERVAL milliseconds looking for more events,
// or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE
while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE)
{
// An event was dequeued
totalMsPassed = 0;
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent) && eventQueueEvent != null)
eventsToSend.Add(eventQueueEvent);
List<EventQueueEvent> eventsToSend = new List<EventQueueEvent>();
eventsToSend.Add(eventQueueEvent);
DateTime start = DateTime.Now;
int batchMsPassed = 0;
// Wait BATCH_WAIT_INTERVAL milliseconds looking for more events,
// or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE
while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE)
{
if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent))
eventsToSend.Add(eventQueueEvent);
batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds;
}
SendResponse(context, request, response, eventsToSend);
return;
}
else
{
// BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection
// has timed out yet.
totalMsPassed += BATCH_WAIT_INTERVAL;
if (totalMsPassed >= CONNECTION_TIMEOUT)
{
Logger.Log.DebugFormat(
"[EventQueue] {0}ms passed without an event, timing out the event queue",
totalMsPassed);
SendResponse(context, request, response, null);
return;
}
batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds;
}
}
else
{
Logger.Log.Info("[EventQueue] Dequeued a signal to close the handler thread");
}
Logger.Log.Info("[EventQueue] Handler thread is no longer running");
// Make sure we can actually send the events right now
if (context.Stream == null || !context.Stream.CanWrite)
{
Logger.Log.Info("[EventQueue] Connection is closed, requeuing events and closing the handler thread");
if (eventsToSend != null)
{
for (int i = 0; i < eventsToSend.Count; i++)
eventQueue.Enqueue(eventsToSend[i]);
}
goto ThreadDone;
}
SendResponse(context, request, response, eventsToSend);
goto ThreadDone;
}
));
else
{
// BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection
// has timed out yet.
totalMsPassed += BATCH_WAIT_INTERVAL;
thread.Start();
if (totalMsPassed >= CONNECTION_TIMEOUT)
{
Logger.Log.DebugFormat(
"[EventQueue] {0}ms passed without an event, timing out the event queue",
totalMsPassed);
SendResponse(context, request, response, null);
goto ThreadDone;
}
}
}
ThreadDone:
threadRunning = false;
Logger.Log.Debug("[EventQueue] Handler thread is exiting");
}
void SendResponse(IHttpClientContext context, IHttpRequest request, IHttpResponse response, List<EventQueueEvent> eventsToSend)
@@ -241,20 +281,19 @@ namespace OpenMetaverse.Http
responseMap.Add("id", OSD.FromInteger(currentID++));
// Serialize the events and send the response
byte[] buffer = OSDParser.SerializeLLSDXmlBytes(responseMap);
response.ContentType = "application/llsd+xml";
response.ContentLength = buffer.Length;
response.Body.Write(buffer, 0, buffer.Length);
response.Body.Flush();
string responseBody = OSDParser.SerializeLLSDXmlString(responseMap);
Logger.Log.Debug("[EventQueue] Sending " + responseArray.Count + " events over the event queue");
context.Respond(HttpHelper.HTTP11, HttpStatusCode.OK, "OK", responseBody, "application/xml");
}
else
{
Logger.Log.Debug("[EventQueue] Sending a timeout response over the event queue");
// The 502 response started as a bug in the LL event queue server implementation,
// but is now hardcoded into the protocol as the code to use for a timeout
response.Status = HttpStatusCode.BadGateway;
context.Respond(HttpHelper.HTTP10, HttpStatusCode.BadGateway, "Upstream error:", "Upstream error:", null);
}
response.Send();
}
}
}