/* * Copyright (c) 2008, openmetaverse.org * All rights reserved. * * - Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: * * - Redistributions of source code must retain the above copyright notice, this * list of conditions and the following disclaimer. * - Neither the name of the openmetaverse.org nor the names * of its contributors may be used to endorse or promote products derived from * this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE * POSSIBILITY OF SUCH DAMAGE. */ using System; using System.Collections.Generic; using System.Net; using System.IO; using System.Threading; using OpenMetaverse.StructuredData; using HttpServer; namespace OpenMetaverse.Http { public class EventQueueEvent { public string Name; public OSDMap Body; public EventQueueEvent(string name, OSDMap body) { Name = name; Body = body; } } public class EventQueueServer { /// The number of milliseconds to wait before the connection times out /// and an empty response is sent to the client. This value should be higher /// than BATCH_WAIT_INTERVAL for the timeout to function properly const int CONNECTION_TIMEOUT = 120000; /// This interval defines the amount of time to wait, in milliseconds, /// for new events to show up on the queue before sending a response to the /// client and completing the HTTP request. The interval also specifies the /// maximum time that can pass before the queue shuts down after Stop() or the /// class destructor is called const int BATCH_WAIT_INTERVAL = 200; /// Since multiple events can be batched together and sent in the same /// response, this prevents the event queue thread from infinitely dequeueing /// events and never sending a response if there is a constant stream of new /// events const int MAX_EVENTS_PER_RESPONSE = 5; WebServer server; BlockingQueue eventQueue = new BlockingQueue(); int currentID = 1; bool running = true; public EventQueueServer(WebServer server) { this.server = server; } ~EventQueueServer() { Stop(); } public void Stop() { running = false; } public void SendEvent(string eventName, OSDMap body) { SendEvent(new EventQueueEvent(eventName, body)); } public void SendEvent(EventQueueEvent eventQueueEvent) { if (!running) throw new InvalidOperationException("Cannot add event while the queue is stopped"); eventQueue.Enqueue(eventQueueEvent); } public void SendEvents(IList events) { if (!running) throw new InvalidOperationException("Cannot add event while the queue is stopped"); for (int i = 0; i < events.Count; i++) eventQueue.Enqueue(events[i]); } public bool EventQueueHandler(IHttpClientContext context, IHttpRequest request, IHttpResponse response) { // Decode the request OSD osdRequest = null; try { osdRequest = OSDParser.DeserializeLLSDXml(request.Body); } catch (Exception) { } if (request != null && osdRequest.Type == OSDType.Map) { OSDMap requestMap = (OSDMap)osdRequest; int ack = requestMap["ack"].AsInteger(); bool done = requestMap["done"].AsBoolean(); if (ack != currentID - 1 && ack != 0) { Logger.Log.WarnFormat("[EventQueue] Received an ack for id {0}, last id sent was {1}", ack, currentID - 1); } if (!done) { StartEventQueueThread(context, request, response); // Tell HttpServer to leave the connection open return false; } else { Logger.Log.InfoFormat("[EventQueue] Shutting down the event queue {0} at the client's request", request.Uri); Stop(); response.Connection = request.Connection; return true; } } else { Logger.Log.WarnFormat("[EventQueue] Received a request with invalid or missing LLSD at {0}, closing the connection", request.Uri); response.Connection = request.Connection; response.Status = HttpStatusCode.BadRequest; return true; } } void StartEventQueueThread(IHttpClientContext context, IHttpRequest request, IHttpResponse response) { // Spawn a new thread to hold the connection open and return from our precious IOCP thread Thread thread = new Thread(new ThreadStart( delegate() { EventQueueEvent eventQueueEvent = null; int totalMsPassed = 0; while (running) { if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL, ref eventQueueEvent)) { // An event was dequeued totalMsPassed = 0; List eventsToSend = new List(); eventsToSend.Add(eventQueueEvent); DateTime start = DateTime.Now; int batchMsPassed = 0; // Wait BATCH_WAIT_INTERVAL milliseconds looking for more events, // or until the size of the current batch equals MAX_EVENTS_PER_RESPONSE while (batchMsPassed < BATCH_WAIT_INTERVAL && eventsToSend.Count < MAX_EVENTS_PER_RESPONSE) { if (eventQueue.Dequeue(BATCH_WAIT_INTERVAL - batchMsPassed, ref eventQueueEvent)) eventsToSend.Add(eventQueueEvent); batchMsPassed = (int)(DateTime.Now - start).TotalMilliseconds; } SendResponse(context, request, response, eventsToSend); return; } else { // BATCH_WAIT_INTERVAL milliseconds passed with no event. Check if the connection // has timed out yet. totalMsPassed += BATCH_WAIT_INTERVAL; if (totalMsPassed >= CONNECTION_TIMEOUT) { Logger.Log.DebugFormat( "[EventQueue] {0}ms passed without an event, timing out the event queue", totalMsPassed); SendResponse(context, request, response, null); return; } } } Logger.Log.Info("[EventQueue] Handler thread is no longer running"); } )); thread.Start(); } void SendResponse(IHttpClientContext context, IHttpRequest request, IHttpResponse response, List eventsToSend) { response.Connection = request.Connection; if (eventsToSend != null) { OSDArray responseArray = new OSDArray(eventsToSend.Count); // Put all of the events in an array for (int i = 0; i < eventsToSend.Count; i++) { EventQueueEvent currentEvent = eventsToSend[i]; OSDMap eventMap = new OSDMap(2); eventMap.Add("message", OSD.FromString(currentEvent.Name)); eventMap.Add("body", currentEvent.Body); responseArray.Add(eventMap); } // Create a map containing the events array and the id of this response OSDMap responseMap = new OSDMap(2); responseMap.Add("events", responseArray); responseMap.Add("id", OSD.FromInteger(currentID++)); // Serialize the events and send the response byte[] buffer = OSDParser.SerializeLLSDXmlBytes(responseMap); response.ContentType = "application/llsd+xml"; response.ContentLength = buffer.Length; response.Body.Write(buffer, 0, buffer.Length); response.Body.Flush(); } else { // The 502 response started as a bug in the LL event queue server implementation, // but is now hardcoded into the protocol as the code to use for a timeout response.Status = HttpStatusCode.BadGateway; } response.Send(); } } }