diff --git a/OpenMetaverse.Http/EventQueueServer.cs b/OpenMetaverse.Http/EventQueueServer.cs index d403e87c..f21cf3a0 100644 --- a/OpenMetaverse.Http/EventQueueServer.cs +++ b/OpenMetaverse.Http/EventQueueServer.cs @@ -68,13 +68,20 @@ namespace OpenMetaverse.Http const int MAX_EVENTS_PER_RESPONSE = 5; HttpListener server; - BlockingQueue eventQueue = new BlockingQueue(); - int currentID = 1; - bool running = true; + BlockingQueue eventQueue; + int currentID; + bool running; + bool threadRunning; + IHttpClientContext context; + IHttpRequest request; + IHttpResponse response; public EventQueueServer(HttpListener server) { this.server = server; + eventQueue = new BlockingQueue(); + running = true; + currentID = 1; } ~EventQueueServer() @@ -131,7 +138,25 @@ namespace OpenMetaverse.Http if (!done) { - StartEventQueueThread(context, request, response); + if (threadRunning) + { + Logger.Log.Info("[EventQueue] New connection opened to the event queue while a previous connection is open. Closing old connection"); + // Kill the previous handler thread before starting a new one + SendEvent(null); + + while (threadRunning && running) + Thread.Sleep(50); + + Logger.Log.Info("[EventQueue] Old connection closed"); + } + + this.context = context; + this.request = request; + this.response = response; + + // Spawn a new thread to hold the connection open and return from our precious IOCP thread + Thread thread = new Thread(new ThreadStart(EventQueueThread)); + thread.Start(); // Tell HttpServer to leave the connection open return false; @@ -157,63 +182,78 @@ namespace OpenMetaverse.Http } } - void StartEventQueueThread(IHttpClientContext context, IHttpRequest request, IHttpResponse response) + void EventQueueThread() { - // Spawn a new thread to hold the connection open and return from our precious IOCP thread - Thread thread = new Thread(new ThreadStart( - delegate() + threadRunning = true; + EventQueueEvent eventQueueEvent = null; + int totalMsPassed = 0; + + while (running) + { + if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent)) { - EventQueueEvent eventQueueEvent = null; - int totalMsPassed = 0; + // An event was dequeued + totalMsPassed = 0; + List eventsToSend = null; - while (running) + if (eventQueueEvent != null) { - if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent)) + eventsToSend = new List(); + eventsToSend.Add(eventQueueEvent); + + DateTime start = DateTime.Now; + int batchMsPassed = 0; + + // Wait BATCH_WAIT_INTERVAL milliseconds looking for more events, + // or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE + while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE) { - // An event was dequeued - totalMsPassed = 0; + if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent) && eventQueueEvent != null) + eventsToSend.Add(eventQueueEvent); - List eventsToSend = new List(); - eventsToSend.Add(eventQueueEvent); - - DateTime start = DateTime.Now; - int batchMsPassed = 0; - - // Wait BATCH_WAIT_INTERVAL milliseconds looking for more events, - // or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE - while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE) - { - if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent)) - eventsToSend.Add(eventQueueEvent); - - batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds; - } - - SendResponse(context, request, response, eventsToSend); - return; - } - else - { - // BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection - // has timed out yet. - totalMsPassed += BATCH_WAIT_INTERVAL; - - if (totalMsPassed >= CONNECTION_TIMEOUT) - { - Logger.Log.DebugFormat( - "[EventQueue] {0}ms passed without an event, timing out the event queue", - totalMsPassed); - SendResponse(context, request, response, null); - return; - } + batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds; } } + else + { + Logger.Log.Info("[EventQueue] Dequeued a signal to close the handler thread"); + } - Logger.Log.Info("[EventQueue] Handler thread is no longer running"); + // Make sure we can actually send the events right now + if (context.Stream == null || !context.Stream.CanWrite) + { + Logger.Log.Info("[EventQueue] Connection is closed, requeuing events and closing the handler thread"); + if (eventsToSend != null) + { + for (int i = 0; i < eventsToSend.Count; i++) + eventQueue.Enqueue(eventsToSend[i]); + } + goto ThreadDone; + } + + SendResponse(context, request, response, eventsToSend); + goto ThreadDone; } - )); + else + { + // BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection + // has timed out yet. + totalMsPassed += BATCH_WAIT_INTERVAL; - thread.Start(); + if (totalMsPassed >= CONNECTION_TIMEOUT) + { + Logger.Log.DebugFormat( + "[EventQueue] {0}ms passed without an event, timing out the event queue", + totalMsPassed); + SendResponse(context, request, response, null); + goto ThreadDone; + } + } + } + + ThreadDone: + threadRunning = false; + Logger.Log.Debug("[EventQueue] Handler thread is exiting"); } void SendResponse(IHttpClientContext context, IHttpRequest request, IHttpResponse response, List eventsToSend) @@ -241,20 +281,19 @@ namespace OpenMetaverse.Http responseMap.Add("id", OSD.FromInteger(currentID++)); // Serialize the events and send the response - byte[] buffer = OSDParser.SerializeLLSDXmlBytes(responseMap); - response.ContentType = "application/llsd+xml"; - response.ContentLength = buffer.Length; - response.Body.Write(buffer, 0, buffer.Length); - response.Body.Flush(); + string responseBody = OSDParser.SerializeLLSDXmlString(responseMap); + + Logger.Log.Debug("[EventQueue] Sending " + responseArray.Count + " events over the event queue"); + context.Respond(HttpHelper.HTTP11, HttpStatusCode.OK, "OK", responseBody, "application/xml"); } else { + Logger.Log.Debug("[EventQueue] Sending a timeout response over the event queue"); + // The 502 response started as a bug in the LL event queue server implementation, // but is now hardcoded into the protocol as the code to use for a timeout - response.Status = HttpStatusCode.BadGateway; + context.Respond(HttpHelper.HTTP10, HttpStatusCode.BadGateway, "Upstream error:", "Upstream error:", null); } - - response.Send(); } } }