/* * Copyright (c) 2008, openmetaverse.org * All rights reserved. * * - Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * - Neither the name of the openmetaverse.org nor the names * of its contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ using System; using System.Collections.Generic; using System.Net; using System.IO; using System.Threading; using System.Xml; using OpenMetaverse.StructuredData; namespace OpenMetaverse.Capabilities { public class EventQueueEvent { public string Name; public OSDMap Body; public EventQueueEvent(string name, OSDMap body) { Name = name; Body = body; } } public class EventQueueServer { /// The number of milliseconds to wait before the connection times out /// and an empty response is sent to the client. This value should be higher /// than BATCH_WAIT_INTERVAL for the timeout to function properly const int CONNECTION_TIMEOUT = 120000; /// This interval defines the amount of time to wait, in milliseconds, /// for new events to show up on the queue before sending a response to the /// client and completing the HTTP request. The interval also specifies the /// maximum time that can pass before the queue shuts down after Stop() or the /// class destructor is called const int BATCH_WAIT_INTERVAL = 100; /// Since multiple events can be batched together and sent in the same /// response, this prevents the event queue thread from infinitely dequeueing /// events and never sending a response if there is a constant stream of new /// events const int MAX_EVENTS_PER_RESPONSE = 5; HttpServer server; HttpServer.HttpRequestHandler handler; BlockingQueue eventQueue = new BlockingQueue(); int currentID = 0; bool running = true; public EventQueueServer(HttpServer server, HttpServer.HttpRequestHandler handler) { this.server = server; this.handler = handler; } ~EventQueueServer() { Stop(); } public void Stop() { running = false; try { server.RemoveHandler(handler); } catch (Exception) { } } public void SendEvent(string eventName, OSDMap body) { SendEvent(new EventQueueEvent(eventName, body)); } public void SendEvent(EventQueueEvent eventQueueEvent) { eventQueue.Enqueue(eventQueueEvent); } public void SendEvents(IList events) { for (int i = 0; i < events.Count; i++) eventQueue.Enqueue(events[i]); } public bool EventQueueHandler(ref HttpListenerContext context) { // Decode the request OSD request = null; try { request = OSDParser.DeserializeLLSDXml(new XmlTextReader(context.Request.InputStream)); } catch (Exception) { } if (request != null && request.Type == OSDType.Map) { OSDMap requestMap = (OSDMap)request; int ack = requestMap["ack"].AsInteger(); bool done = requestMap["done"].AsBoolean(); if (ack != currentID - 1) { Logger.Log(String.Format("[EventQueue] Received an ack for id {0}, last id sent was {1}", ack, currentID - 1), Helpers.LogLevel.Warning); } if (!done) { StartEventQueueThread(context); // Tell HttpServer to leave the connection open return false; } else { Logger.Log(String.Format("[EventQueue] Shutting down the event queue {0} at the client's request", context.Request.Url), Helpers.LogLevel.Info); Stop(); context.Response.KeepAlive = context.Request.KeepAlive; return true; } } else { Logger.Log(String.Format("[EventQueue] Received a request with invalid or missing LLSD at {0}, closing the connection", context.Request.Url), Helpers.LogLevel.Warning); context.Response.KeepAlive = context.Request.KeepAlive; context.Response.StatusCode = (int)HttpStatusCode.BadRequest; return true; } } void StartEventQueueThread(HttpListenerContext httpContext) { // Spawn a new thread to hold the connection open and return from our precious IOCP thread Thread thread = new Thread(new ThreadStart( delegate() { EventQueueEvent eventQueueEvent = null; int totalMsPassed = 0; while (running) { if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent)) { // An event was dequeued totalMsPassed = 0; List eventsToSend = new List(); eventsToSend.Add(eventQueueEvent); DateTime start = DateTime.Now; int batchMsPassed = 0; // Wait BATCH_WAIT_INTERVAL milliseconds looking for more events, // or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE) { if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent)) eventsToSend.Add(eventQueueEvent); batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds; } SendResponse(httpContext, eventsToSend); return; } else { // BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection // has timed out yet. totalMsPassed += BATCH_WAIT_INTERVAL; if (totalMsPassed >= CONNECTION_TIMEOUT) { Logger.DebugLog(String.Format( "[EventQueue] {0}ms passed without an event, timing out the event queue", totalMsPassed)); SendResponse(httpContext, null); return; } } } } )); thread.Start(); } void SendResponse(HttpListenerContext httpContext, List eventsToSend) { httpContext.Response.KeepAlive = httpContext.Request.KeepAlive; if (eventsToSend != null) { OSDArray responseArray = new OSDArray(eventsToSend.Count); // Put all of the events in an array for (int i = 0; i < eventsToSend.Count; i++) { EventQueueEvent currentEvent = eventsToSend[i]; OSDMap eventMap = new OSDMap(2); eventMap.Add("body", currentEvent.Body); eventMap.Add("message", OSD.FromString(currentEvent.Name)); responseArray.Add(eventMap); } // Create a map containing the events array and the id of this response OSDMap responseMap = new OSDMap(2); responseMap.Add("events", responseArray); responseMap.Add("id", OSD.FromInteger(currentID++)); // Serialize the events and send the response byte[] buffer = OSDParser.SerializeLLSDXmlBytes(responseMap); httpContext.Response.ContentType = "application/xml"; httpContext.Response.ContentLength64 = buffer.Length; httpContext.Response.OutputStream.Write(buffer, 0, buffer.Length); httpContext.Response.OutputStream.Close(); httpContext.Response.Close(); } else { // The 502 response started as a bug in the LL event queue server implementation, // but is now hardcoded into the protocol as the code to use for a timeout httpContext.Response.StatusCode = (int)HttpStatusCode.BadGateway; httpContext.Response.Close(); } } } }