* [LIBOMV-506] Complete rewrite of CapsBase to use HttpWebRequest instead of our homebrewed WebRequest hack. We lose the upload progress callback but gain IOCP thread instead of System.Thread usage and Keep-Alive support

* Content-Types described in http://tools.ietf.org/html/draft-hamrick-llsd-00 are used for CAPS requests. This *may* be incompatible with the current SL grid, needs testing
* Modified CapsClient requests to require OSDFormat enum and timeout values

git-svn-id: http://libopenmetaverse.googlecode.com/svn/libopenmetaverse/trunk@2680 52acb1d6-8a22-11de-b505-999d5b087335
This commit is contained in:
John Hurliman
2009-05-01 06:04:32 +00:00
parent 6d6af6f3f8
commit c5409af63f
19 changed files with 508 additions and 877 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright (c) 2007-2008, openmetaverse.org
* Copyright (c) 2009, openmetaverse.org
* All rights reserved.
*
* - Redistribution and use in source and binary forms, with or without
@@ -27,529 +27,202 @@
using System;
using System.Net;
using System.IO;
using System.Text;
using System.Threading;
using System.Security.Cryptography.X509Certificates;
namespace OpenMetaverse.Http
{
public class CapsBase
public static class CapsBase
{
#region Callback Data Classes
public delegate void OpenWriteEventHandler(HttpWebRequest request);
public delegate void DownloadProgressEventHandler(HttpWebRequest request, HttpWebResponse response, int bytesReceived, int totalBytesToReceive);
public delegate void RequestCompletedEventHandler(HttpWebRequest request, HttpWebResponse response, byte[] responseData, Exception error);
public class OpenWriteCompletedEventArgs
private class RequestState
{
public Stream Result;
public Exception Error;
public bool Cancelled;
public object UserState;
public HttpWebRequest Request;
public byte[] UploadData;
public int MillisecondsTimeout;
public OpenWriteEventHandler OpenWriteCallback;
public DownloadProgressEventHandler DownloadProgressCallback;
public RequestCompletedEventHandler CompletedCallback;
public OpenWriteCompletedEventArgs(Stream result, Exception error, bool cancelled, object userState)
public RequestState(HttpWebRequest request, byte[] uploadData, int millisecondsTimeout, OpenWriteEventHandler openWriteCallback,
DownloadProgressEventHandler downloadProgressCallback, RequestCompletedEventHandler completedCallback)
{
Result = result;
Error = error;
Cancelled = cancelled;
UserState = userState;
Request = request;
UploadData = uploadData;
MillisecondsTimeout = millisecondsTimeout;
OpenWriteCallback = openWriteCallback;
DownloadProgressCallback = downloadProgressCallback;
CompletedCallback = completedCallback;
}
}
public class UploadDataCompletedEventArgs
public static HttpWebRequest UploadDataAsync(Uri address, X509Certificate2 clientCert, string contentType, byte[] data,
int millisecondsTimeout, OpenWriteEventHandler openWriteCallback, DownloadProgressEventHandler downloadProgressCallback,
RequestCompletedEventHandler completedCallback)
{
public byte[] Result;
public Exception Error;
public bool Cancelled;
public object UserState;
public UploadDataCompletedEventArgs(byte[] result, Exception error, bool cancelled, object userState)
{
Result = result;
Error = error;
Cancelled = cancelled;
UserState = userState;
}
}
public class DownloadDataCompletedEventArgs
{
public byte[] Result;
public Exception Error;
public bool Cancelled;
public object UserState;
}
public class DownloadStringCompletedEventArgs
{
public Uri Address;
public string Result;
public Exception Error;
public bool Cancelled;
public object UserState;
public DownloadStringCompletedEventArgs(Uri address, string result, Exception error, bool cancelled, object userState)
{
Address = address;
Result = result;
Error = error;
Cancelled = cancelled;
UserState = userState;
}
}
public class DownloadProgressChangedEventArgs
{
public long BytesReceived;
public int ProgressPercentage;
public long TotalBytesToReceive;
public object UserState;
public DownloadProgressChangedEventArgs(long bytesReceived, long totalBytesToReceive, object userToken)
{
BytesReceived = bytesReceived;
ProgressPercentage = (int)(((float)bytesReceived / (float)totalBytesToReceive) * 100f);
TotalBytesToReceive = totalBytesToReceive;
UserState = userToken;
}
}
public class UploadProgressChangedEventArgs
{
public long BytesReceived;
public long BytesSent;
public int ProgressPercentage;
public long TotalBytesToReceive;
public long TotalBytesToSend;
public object UserState;
public UploadProgressChangedEventArgs(long bytesReceived, long totalBytesToReceive, long bytesSent, long totalBytesToSend, object userState)
{
BytesReceived = bytesReceived;
TotalBytesToReceive = totalBytesToReceive;
ProgressPercentage = (int)(((float)bytesSent / (float)totalBytesToSend) * 100f);
BytesSent = bytesSent;
TotalBytesToSend = totalBytesToSend;
UserState = userState;
}
}
#endregion Callback Data Classes
public delegate void OpenWriteCompletedEventHandler(object sender, OpenWriteCompletedEventArgs e);
public delegate void UploadDataCompletedEventHandler(object sender, UploadDataCompletedEventArgs e);
public delegate void DownloadStringCompletedEventHandler(object sender, DownloadStringCompletedEventArgs e);
public delegate void DownloadProgressChangedEventHandler(object sender, DownloadProgressChangedEventArgs e);
public delegate void UploadProgressChangedEventHandler(object sender, UploadProgressChangedEventArgs e);
public event OpenWriteCompletedEventHandler OpenWriteCompleted;
public event UploadDataCompletedEventHandler UploadDataCompleted;
public event DownloadStringCompletedEventHandler DownloadStringCompleted;
public event DownloadProgressChangedEventHandler DownloadProgressChanged;
public event UploadProgressChangedEventHandler UploadProgressChanged;
public WebHeaderCollection Headers = new WebHeaderCollection();
public IWebProxy Proxy;
public X509Certificate2 ClientCertificate;
public Uri Location { get { return location; } }
public bool IsBusy { get { return isBusy; } }
public WebHeaderCollection ResponseHeaders { get { return responseHeaders; } }
protected WebHeaderCollection responseHeaders;
protected Uri location;
protected bool isBusy;
protected Thread asyncThread;
protected System.Text.Encoding encoding = System.Text.Encoding.Default;
public CapsBase(Uri location, X509Certificate2 clientCert)
{
this.location = location;
ClientCertificate = clientCert;
}
public void OpenWriteAsync(Uri address)
{
OpenWriteAsync(address, null, null);
}
public void OpenWriteAsync(Uri address, string method)
{
OpenWriteAsync(address, method, null);
}
public void OpenWriteAsync(Uri address, string method, object userToken)
{
if (address == null)
throw new ArgumentNullException("address");
SetBusy();
asyncThread = new Thread(
delegate()
{
WebRequest request = null;
try
{
request = SetupRequest(address);
Stream stream = request.GetRequestStream();
OnOpenWriteCompleted(new OpenWriteCompletedEventArgs(
stream, null, false, userToken));
}
catch (ThreadInterruptedException)
{
if (request != null)
request.Abort();
OnOpenWriteCompleted(new OpenWriteCompletedEventArgs(
null, null, true, userToken));
}
catch (Exception e)
{
OnOpenWriteCompleted(new OpenWriteCompletedEventArgs(
null, e, false, userToken));
}
}
);
asyncThread.IsBackground = true;
asyncThread.Start();
}
public void UploadDataAsync(Uri address, byte[] data)
{
UploadDataAsync(address, null, data, null);
}
public void UploadDataAsync(Uri address, string method, byte[] data)
{
UploadDataAsync(address, method, data, null);
}
public void UploadDataAsync(Uri address, string method, byte[] data, object userToken)
{
if (address == null)
throw new ArgumentNullException("address");
if (data == null)
throw new ArgumentNullException("data");
SetBusy();
asyncThread = new Thread(delegate(object state)
{
object[] args = (object[])state;
byte[] data2;
try
{
data2 = UploadDataCore((Uri)args[0], (string)args[1], (byte[])args[2], args[3]);
OnUploadDataCompleted(
new UploadDataCompletedEventArgs(data2, null, false, args[3]));
}
catch (ThreadInterruptedException)
{
OnUploadDataCompleted(
new UploadDataCompletedEventArgs(null, null, true, args[3]));
}
catch (Exception e)
{
OnUploadDataCompleted(
new UploadDataCompletedEventArgs(null, e, false, args[3]));
}
});
object[] cbArgs = new object[] { address, method, data, userToken };
asyncThread.IsBackground = true;
asyncThread.Start(cbArgs);
}
public void DownloadStringAsync(Uri address)
{
DownloadStringAsync(address, null);
}
public void DownloadStringAsync(Uri address, object userToken)
{
if (address == null)
throw new ArgumentNullException("address");
SetBusy();
asyncThread = new Thread(
delegate()
{
try
{
string data = encoding.GetString(DownloadDataCore(address, userToken));
OnDownloadStringCompleted(
new DownloadStringCompletedEventArgs(location, data, null, false, userToken));
}
catch (ThreadInterruptedException)
{
OnDownloadStringCompleted(
new DownloadStringCompletedEventArgs(location, null, null, true, userToken));
}
catch (Exception e)
{
OnDownloadStringCompleted(
new DownloadStringCompletedEventArgs(location, null, e, false, userToken));
}
}
);
asyncThread.Start();
}
public void CancelAsync()
{
if (asyncThread == null)
return;
Thread t = asyncThread;
CompleteAsync();
t.Interrupt();
}
protected void CompleteAsync()
{
isBusy = false;
asyncThread = null;
}
protected void SetBusy()
{
CheckBusy();
isBusy = true;
}
protected void CheckBusy()
{
if (isBusy)
throw new NotSupportedException("CapsBase does not support concurrent I/O operations.");
}
protected Stream ProcessResponse(WebResponse response)
{
responseHeaders = response.Headers;
return response.GetResponseStream();
}
protected byte[] ReadAll(Stream stream, int length, object userToken, bool uploading)
{
MemoryStream ms = null;
bool nolength = (length == -1);
int size = ((nolength) ? 8192 : length);
if (nolength)
ms = new MemoryStream();
long total = 0;
int nread = 0;
int offset = 0;
byte[] buffer = new byte[size];
while ((nread = stream.Read(buffer, offset, size)) != 0)
{
if (nolength)
{
ms.Write(buffer, 0, nread);
}
else
{
offset += nread;
size -= nread;
}
if (uploading)
{
if (UploadProgressChanged != null)
{
total += nread;
UploadProgressChanged(this, new UploadProgressChangedEventArgs(nread, length, 0, 0, userToken));
}
}
else
{
if (DownloadProgressChanged != null)
{
total += nread;
DownloadProgressChanged(this, new DownloadProgressChangedEventArgs(nread, length, userToken));
}
}
}
if (nolength)
return ms.ToArray();
return buffer;
}
protected WebRequest SetupRequest(Uri uri)
{
HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(uri);
if (request == null)
throw new ArgumentException("Could not create an HttpWebRequest from the given Uri", "address");
location = uri;
if (Proxy != null)
request.Proxy = Proxy;
if (ClientCertificate != null)
request.ClientCertificates.Add(ClientCertificate);
// Create the request
HttpWebRequest request = SetupRequest(address, clientCert);
request.ContentLength = data.Length;
if (!String.IsNullOrEmpty(contentType))
request.ContentType = contentType;
request.Method = "POST";
if (Headers != null && Headers.Count != 0)
{
string expect = Headers["Expect"];
string contentType = Headers["Content-Type"];
string accept = Headers["Accept"];
string connection = Headers["Connection"];
string userAgent = Headers["User-Agent"];
string referer = Headers["Referer"];
// Create an object to hold all of the state for this request
RequestState state = new RequestState(request, data, millisecondsTimeout, openWriteCallback,
downloadProgressCallback, completedCallback);
if (!String.IsNullOrEmpty(expect))
request.Expect = expect;
// Start the request for a stream to upload to
IAsyncResult result = request.BeginGetRequestStream(OpenWrite, state);
// Register a timeout for the request
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, state, millisecondsTimeout, true);
if (!String.IsNullOrEmpty(accept))
request.Accept = accept;
return request;
}
if (!String.IsNullOrEmpty(contentType))
request.ContentType = contentType;
public static HttpWebRequest DownloadStringAsync(Uri address, X509Certificate2 clientCert, int millisecondsTimeout,
DownloadProgressEventHandler downloadProgressCallback, RequestCompletedEventHandler completedCallback)
{
// Create the request
HttpWebRequest request = SetupRequest(address, clientCert);
request.Method = "GET";
if (!String.IsNullOrEmpty(connection))
request.Connection = connection;
// Create an object to hold all of the state for this request
RequestState state = new RequestState(request, null, millisecondsTimeout, null, downloadProgressCallback,
completedCallback);
if (!String.IsNullOrEmpty(userAgent))
request.UserAgent = userAgent;
// Start the request for the remote server response
IAsyncResult result = request.BeginGetResponse(GetResponse, state);
// Register a timeout for the request
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, state, millisecondsTimeout, true);
if (!String.IsNullOrEmpty(referer))
request.Referer = referer;
}
return request;
}
// Disable keep-alive by default
request.KeepAlive = false;
// Set the closed connection (idle) time to one second
request.ServicePoint.MaxIdleTime = 1000;
static HttpWebRequest SetupRequest(Uri address, X509Certificate2 clientCert)
{
if (address == null)
throw new ArgumentNullException("address");
// Create the request
HttpWebRequest request = (HttpWebRequest)HttpWebRequest.Create(address);
// Add the client certificate to the request if one was given
if (clientCert != null)
request.ClientCertificates.Add(clientCert);
// Leave idle connections to this endpoint open for up to 60 seconds
request.ServicePoint.MaxIdleTime = 1000 * 60;
// Disable stupid Expect-100: Continue header
request.ServicePoint.Expect100Continue = false;
// Crank up the max number of connections (default is 2!)
request.ServicePoint.ConnectionLimit = Int32.MaxValue;
// Crank up the max number of connections per endpoint (default is 2!)
request.ServicePoint.ConnectionLimit = 20;
// Caps requests are never sent as trickles of data, so Nagle's
// coalescing algorithm won't help us
request.ServicePoint.UseNagleAlgorithm = false;
return request;
}
protected WebRequest SetupRequest(Uri uri, string method)
static void OpenWrite(IAsyncResult ar)
{
WebRequest request = SetupRequest(uri);
request.Method = method;
return request;
}
protected byte[] UploadDataCore(Uri address, string method, byte[] data, object userToken)
{
HttpWebRequest request = (HttpWebRequest)SetupRequest(address);
// Mono insists that if you have Content-Length set, Keep-Alive must be true.
// Otherwise the unhelpful exception of "Content-Length not set" will be thrown.
// The Linden Lab event queue server breaks HTTP 1.1 by always replying with a
// Connection: Close header, which will confuse the Windows .NET runtime and throw
// a "Connection unexpectedly closed" exception. This is our cross-platform hack
if (Utils.GetRunningRuntime() == Utils.Runtime.Mono)
request.KeepAlive = true;
RequestState state = (RequestState)ar.AsyncState;
try
{
// Content-Length
int contentLength = data.Length;
request.ContentLength = contentLength;
// Get the stream to write our upload to
Stream uploadStream = state.Request.EndGetRequestStream(ar);
using (Stream stream = request.GetRequestStream())
{
// Most uploads are very small chunks of data, use an optimized path for these
if (contentLength < 4096)
{
stream.Write(data, 0, contentLength);
}
else
{
// Upload chunks directly instead of buffering to memory
request.AllowWriteStreamBuffering = false;
// Fire the callback for successfully opening the stream
if (state.OpenWriteCallback != null)
state.OpenWriteCallback(state.Request);
MemoryStream ms = new MemoryStream(data);
// Write our data to the upload stream
uploadStream.Write(state.UploadData, 0, state.UploadData.Length);
byte[] buffer = new byte[checked((uint)Math.Min(4096, (int)contentLength))];
int bytesRead = 0;
while ((bytesRead = ms.Read(buffer, 0, buffer.Length)) != 0)
{
stream.Write(buffer, 0, bytesRead);
if (UploadProgressChanged != null)
{
UploadProgressChanged(this, new UploadProgressChangedEventArgs(0, 0, bytesRead, contentLength, userToken));
}
}
ms.Close();
}
}
HttpWebResponse response = (HttpWebResponse)request.GetResponse();
Stream st = ProcessResponse(response);
contentLength = (int)response.ContentLength;
return ReadAll(st, contentLength, userToken, true);
}
catch (ThreadInterruptedException)
{
if (request != null)
request.Abort();
throw;
}
}
protected byte[] DownloadDataCore(Uri address, object userToken)
{
WebRequest request = null;
try
{
request = SetupRequest(address, "GET");
WebResponse response = request.GetResponse();
Stream st = ProcessResponse(response);
return ReadAll(st, (int)response.ContentLength, userToken, false);
}
catch (ThreadInterruptedException)
{
if (request != null)
request.Abort();
throw;
// Start the request for the remote server response
IAsyncResult result = state.Request.BeginGetResponse(GetResponse, state);
// Register a timeout for the request
ThreadPool.RegisterWaitForSingleObject(result.AsyncWaitHandle, TimeoutCallback, state,
state.MillisecondsTimeout, true);
}
catch (Exception ex)
{
throw new WebException("An error occurred performing a WebClient request.", ex);
if (state.CompletedCallback != null)
state.CompletedCallback(state.Request, null, null, ex);
}
}
protected virtual void OnOpenWriteCompleted(OpenWriteCompletedEventArgs args)
static void GetResponse(IAsyncResult ar)
{
CompleteAsync();
if (OpenWriteCompleted != null)
OpenWriteCompleted(this, args);
RequestState state = (RequestState)ar.AsyncState;
HttpWebResponse response = null;
byte[] responseData = null;
Exception error = null;
try
{
response = (HttpWebResponse)state.Request.EndGetResponse(ar);
// Get the stream for downloading the response
Stream responseStream = response.GetResponseStream();
#region Read the response
// If Content-Length is set we create a buffer of the exact size, otherwise
// a MemoryStream is used to receive the response
bool nolength = (response.ContentLength <= 0);
int size = (nolength) ? 8192 : (int)response.ContentLength;
MemoryStream ms = (nolength) ? new MemoryStream() : null;
byte[] buffer = new byte[size];
int bytesRead = 0;
int offset = 0;
while ((bytesRead = responseStream.Read(buffer, offset, size)) != 0)
{
if (nolength)
{
ms.Write(buffer, 0, bytesRead);
}
else
{
offset += bytesRead;
size -= bytesRead;
}
// Fire the download progress callback for each chunk of received data
if (state.DownloadProgressCallback != null)
state.DownloadProgressCallback(state.Request, response, bytesRead, size);
}
if (nolength)
responseData = ms.ToArray();
else
responseData = buffer;
#endregion Read the response
}
catch (Exception ex)
{
error = ex;
}
if (state.CompletedCallback != null)
state.CompletedCallback(state.Request, response, responseData, error);
}
protected virtual void OnUploadDataCompleted(UploadDataCompletedEventArgs args)
static void TimeoutCallback(object state, bool timedOut)
{
CompleteAsync();
if (UploadDataCompleted != null)
UploadDataCompleted(this, args);
}
protected virtual void OnDownloadStringCompleted(DownloadStringCompletedEventArgs args)
{
CompleteAsync();
if (DownloadStringCompleted != null)
DownloadStringCompleted(this, args);
if (timedOut)
{
RequestState requestState = state as RequestState;
if (requestState != null && requestState.Request != null)
requestState.Request.Abort();
}
}
}
}

View File

@@ -34,272 +34,154 @@ namespace OpenMetaverse.Http
{
public class CapsClient
{
public delegate void ProgressCallback(CapsClient client, long bytesReceived, long bytesSent,
long totalBytesToReceive, long totalBytesToSend);
public delegate void DownloadProgressCallback(CapsClient client, int bytesReceived, int totalBytesToReceive);
public delegate void CompleteCallback(CapsClient client, OSD result, Exception error);
public event ProgressCallback OnProgress;
public event DownloadProgressCallback OnDownloadProgress;
public event CompleteCallback OnComplete;
public IWebProxy Proxy;
public object UserData;
protected CapsBase _Client;
protected Uri _Address;
protected byte[] _PostData;
protected X509Certificate2 _ClientCert;
protected string _ContentType;
protected HttpWebRequest _Request;
protected OSD _Response;
protected AutoResetEvent _ResponseEvent = new AutoResetEvent(false);
public CapsClient(Uri capability)
: this(capability, null)
{
Init(capability, null);
}
public CapsClient(Uri capability, X509Certificate2 clientCert)
{
Init(capability, clientCert);
_Address = capability;
_ClientCert = clientCert;
}
void Init(Uri capability, X509Certificate2 clientCert)
public void BeginGetResponse(int millisecondsTimeout)
{
_Client = new CapsBase(capability, clientCert);
_Client.DownloadProgressChanged += new CapsBase.DownloadProgressChangedEventHandler(Client_DownloadProgressChanged);
_Client.UploadProgressChanged += new CapsBase.UploadProgressChangedEventHandler(Client_UploadProgressChanged);
_Client.UploadDataCompleted += new CapsBase.UploadDataCompletedEventHandler(Client_UploadDataCompleted);
_Client.DownloadStringCompleted += new CapsBase.DownloadStringCompletedEventHandler(Client_DownloadStringCompleted);
BeginGetResponse(null, null, millisecondsTimeout);
}
public void BeginGetResponse()
public void BeginGetResponse(OSD data, OSDFormat format, int millisecondsTimeout)
{
BeginGetResponse(null, null);
}
byte[] postData;
string contentType;
public void BeginGetResponse(OSD data)
{
byte[] postData = OSDParser.SerializeLLSDXmlBytes(data);
BeginGetResponse(postData, null);
}
public void BeginGetResponse(OSD data, bool json)
{
if (json)
switch (format)
{
byte[] postData = System.Text.Encoding.UTF8.GetBytes(OSDParser.SerializeJsonString(data));
BeginGetResponse(postData, "application/json");
}
else
{
BeginGetResponse(data);
case OSDFormat.Xml:
postData = OSDParser.SerializeLLSDXmlBytes(data);
contentType = "application/llsd+xml";
break;
case OSDFormat.Binary:
postData = OSDParser.SerializeLLSDBinary(data);
contentType = "application/llsd+binary";
break;
case OSDFormat.Json:
default:
postData = System.Text.Encoding.UTF8.GetBytes(OSDParser.SerializeJsonString(data));
contentType = "application/llsd+json";
break;
}
BeginGetResponse(postData, contentType, millisecondsTimeout);
}
public void BeginGetResponse(byte[] postData)
{
BeginGetResponse(postData, null);
}
public void BeginGetResponse(byte[] postData, string contentType)
public void BeginGetResponse(byte[] postData, string contentType, int millisecondsTimeout)
{
_PostData = postData;
_ContentType = contentType;
if (_Client.IsBusy)
_Client.CancelAsync();
_Client.Headers.Clear();
// Proxy
if (Proxy != null)
_Client.Proxy = Proxy;
// Content-Type
if (!String.IsNullOrEmpty(contentType))
_Client.Headers.Add(HttpRequestHeader.ContentType, contentType);
else
_Client.Headers.Add(HttpRequestHeader.ContentType, "application/xml");
if (_Request != null)
{
_Request.Abort();
_Request = null;
}
if (postData == null)
_Client.DownloadStringAsync(_Client.Location);
{
// GET
Logger.Log.Debug("[CapsClient] GET " + _Address);
_Request = CapsBase.DownloadStringAsync(_Address, _ClientCert, millisecondsTimeout, DownloadProgressHandler,
RequestCompletedHandler);
}
else
_Client.UploadDataAsync(_Client.Location, postData);
{
// POST
Logger.Log.Debug("[CapsClient] POST (" + postData.Length + " bytes) " + _Address);
_Request = CapsBase.UploadDataAsync(_Address, _ClientCert, contentType, postData, millisecondsTimeout, null,
DownloadProgressHandler, RequestCompletedHandler);
}
}
public OSD GetResponse(int millisecondsTimeout)
{
OSD response = null;
AutoResetEvent waitEvent = new AutoResetEvent(false);
OnComplete += delegate(CapsClient client, OSD result, Exception error) { response = result; waitEvent.Set(); };
BeginGetResponse();
waitEvent.WaitOne(millisecondsTimeout, false);
return response;
BeginGetResponse(millisecondsTimeout);
_ResponseEvent.WaitOne(millisecondsTimeout, false);
return _Response;
}
public OSD GetResponse(OSD data, int millisecondsTimeout)
public OSD GetResponse(OSD data, OSDFormat format, int millisecondsTimeout)
{
OSD response = null;
AutoResetEvent waitEvent = new AutoResetEvent(false);
OnComplete += delegate(CapsClient client, OSD result, Exception error) { response = result; waitEvent.Set(); };
BeginGetResponse(data);
waitEvent.WaitOne(millisecondsTimeout, false);
return response;
}
public OSD GetResponse(OSD data, bool json, int millisecondsTimeout)
{
OSD response = null;
AutoResetEvent waitEvent = new AutoResetEvent(false);
OnComplete += delegate(CapsClient client, OSD result, Exception error) { response = result; waitEvent.Set(); };
BeginGetResponse(data, json);
waitEvent.WaitOne(millisecondsTimeout, false);
return response;
}
public OSD GetResponse(byte[] postData, int millisecondsTimeout)
{
OSD response = null;
AutoResetEvent waitEvent = new AutoResetEvent(false);
OnComplete += delegate(CapsClient client, OSD result, Exception error) { response = result; waitEvent.Set(); };
BeginGetResponse(postData);
waitEvent.WaitOne(millisecondsTimeout, false);
return response;
BeginGetResponse(data, format, millisecondsTimeout);
_ResponseEvent.WaitOne(millisecondsTimeout, false);
return _Response;
}
public OSD GetResponse(byte[] postData, string contentType, int millisecondsTimeout)
{
OSD response = null;
AutoResetEvent waitEvent = new AutoResetEvent(false);
OnComplete += delegate(CapsClient client, OSD result, Exception error) { response = result; waitEvent.Set(); };
BeginGetResponse(postData, contentType);
waitEvent.WaitOne(millisecondsTimeout, false);
return response;
BeginGetResponse(postData, contentType, millisecondsTimeout);
_ResponseEvent.WaitOne(millisecondsTimeout, false);
return _Response;
}
public void Cancel()
{
if (_Client.IsBusy)
_Client.CancelAsync();
if (_Request != null)
_Request.Abort();
}
#region Callback Handlers
private void Client_DownloadProgressChanged(object sender, CapsBase.DownloadProgressChangedEventArgs e)
void DownloadProgressHandler(HttpWebRequest request, HttpWebResponse response, int bytesReceived, int totalBytesToReceive)
{
if (OnProgress != null)
_Request = request;
if (OnDownloadProgress != null)
{
try { OnProgress(this, e.BytesReceived, 0, e.TotalBytesToReceive, 0); }
try { OnDownloadProgress(this, bytesReceived, totalBytesToReceive); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
}
private void Client_UploadProgressChanged(object sender, CapsBase.UploadProgressChangedEventArgs e)
void RequestCompletedHandler(HttpWebRequest request, HttpWebResponse response, byte[] responseData, Exception error)
{
if (OnProgress != null)
_Request = request;
OSD result = null;
if (responseData != null)
{
try { OnProgress(this, e.BytesReceived, e.BytesSent, e.TotalBytesToReceive, e.TotalBytesToSend); }
try { result = OSDParser.Deserialize(responseData); }
catch (Exception ex) { error = ex; }
}
FireCompleteCallback(result, error);
}
private void FireCompleteCallback(OSD result, Exception error)
{
CompleteCallback callback = OnComplete;
if (callback != null)
{
try { callback(this, result, error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
_Response = result;
_ResponseEvent.Set();
}
private void Client_UploadDataCompleted(object sender, CapsBase.UploadDataCompletedEventArgs e)
{
if (OnComplete != null && !e.Cancelled)
{
if (e.Error == null)
{
OSD result = OSDParser.Deserialize(e.Result);
try { OnComplete(this, result, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
else
{
// Some error occurred, try to figure out what happened
HttpStatusCode code = HttpStatusCode.OK;
if (e.Error is WebException && ((WebException)e.Error).Response != null)
code = ((HttpWebResponse)((WebException)e.Error).Response).StatusCode;
if (code == HttpStatusCode.BadGateway)
{
// This is not good (server) protocol design, but it's normal.
// The CAPS server is a proxy that connects to a Squid
// cache which will time out periodically. The CAPS server
// interprets this as a generic error and returns a 502 to us
// that we ignore
BeginGetResponse(_PostData, _ContentType);
}
else if (code != HttpStatusCode.OK)
{
// Status code was set to something unknown, this is a failure
Logger.Log.DebugFormat("Caps error at {0}: {1}", _Client.Location, code);
try { OnComplete(this, null, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
else
{
// Status code was not set, some other error occurred. This is a failure
Logger.Log.DebugFormat("Caps error at {0}: {1}", _Client.Location, e.Error.Message);
try { OnComplete(this, null, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
}
}
else if (e.Cancelled)
{
Logger.Log.Debug("Capability action at " + _Client.Location + " cancelled");
}
}
private void Client_DownloadStringCompleted(object sender, CapsBase.DownloadStringCompletedEventArgs e)
{
if (OnComplete != null && !e.Cancelled)
{
if (e.Error == null)
{
OSD result = OSDParser.DeserializeLLSDXml(e.Result);
try { OnComplete(this, result, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
else
{
// Some error occurred, try to figure out what happened
HttpStatusCode code = HttpStatusCode.OK;
if (e.Error is WebException && ((WebException)e.Error).Response != null)
code = ((HttpWebResponse)((WebException)e.Error).Response).StatusCode;
if (code == HttpStatusCode.BadGateway)
{
// This is not good (server) protocol design, but it's normal.
// The CAPS server is a proxy that connects to a Squid
// cache which will time out periodically. The CAPS server
// interprets this as a generic error and returns a 502 to us
// that we ignore
BeginGetResponse(_PostData, _ContentType);
}
else if (code != HttpStatusCode.OK)
{
// Status code was set to something unknown, this is a failure
Logger.Log.DebugFormat("Caps error at {0}: {1}", _Client.Location, code);
try { OnComplete(this, null, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
else
{
// Status code was not set, some other error occurred. This is a failure
Logger.Log.DebugFormat("Caps error at {0}: {1}", _Client.Location, e.Error.Message);
try { OnComplete(this, null, e.Error); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
}
}
else if (e.Cancelled)
{
Logger.Log.Debug("Capability action at " + _Client.Location + " cancelled");
}
}
#endregion Callback Handlers
}
}

View File

@@ -33,41 +33,39 @@ namespace OpenMetaverse.Http
{
public class EventQueueClient
{
/// <summary>
///
/// </summary>
/// <summary>=</summary>
public const int REQUEST_TIMEOUT = 1000 * 120;
public delegate void ConnectedCallback();
/// <summary>
///
/// </summary>
/// <param name="eventName"></param>
/// <param name="body"></param>
public delegate void EventCallback(string eventName, OSDMap body);
/// <summary></summary>
public ConnectedCallback OnConnected;
/// <summary></summary>
public EventCallback OnEvent;
public IWebProxy Proxy;
public bool Running { get { return _Running; } }
public bool Running { get { return _Running && _Client.IsBusy; } }
protected CapsBase _Client;
protected Uri _Address;
protected bool _Dead;
protected bool _Running;
protected HttpWebRequest _Request;
public EventQueueClient(Uri eventQueueLocation)
{
_Client = new CapsBase(eventQueueLocation, null);
_Client.OpenWriteCompleted += new CapsBase.OpenWriteCompletedEventHandler(Client_OpenWriteCompleted);
_Client.UploadDataCompleted += new CapsBase.UploadDataCompletedEventHandler(Client_UploadDataCompleted);
_Address = eventQueueLocation;
}
public void Start()
{
_Dead = false;
_Client.OpenWriteAsync(_Client.Location);
// Create an EventQueueGet request
OSDMap request = new OSDMap();
request["ack"] = new OSD();
request["done"] = OSD.FromBoolean(false);
byte[] postData = OSDParser.SerializeLLSDXmlBytes(request);
_Request = CapsBase.UploadDataAsync(_Address, null, "application/xml", postData, REQUEST_TIMEOUT, OpenWriteHandler, null, RequestCompletedHandler);
}
public void Stop(bool immediate)
@@ -77,60 +75,71 @@ namespace OpenMetaverse.Http
if (immediate)
_Running = false;
if (_Client.IsBusy)
_Client.CancelAsync();
if (_Request != null)
_Request.Abort();
}
#region Callback Handlers
private void Client_OpenWriteCompleted(object sender, CapsBase.OpenWriteCompletedEventArgs e)
void OpenWriteHandler(HttpWebRequest request)
{
bool raiseEvent = false;
_Running = true;
_Request = request;
if (!_Dead)
Logger.Log.Debug("Capabilities event queue connected");
// The event queue is starting up for the first time
if (OnConnected != null)
{
if (!_Running) raiseEvent = true;
// We are connected to the event queue
_Running = true;
}
// Create an EventQueueGet request
OSDMap request = new OSDMap();
request["ack"] = new OSD();
request["done"] = OSD.FromBoolean(false);
byte[] postData = OSDParser.SerializeLLSDXmlBytes(request);
_Client.UploadDataAsync(_Client.Location, postData);
if (raiseEvent)
{
Logger.Log.Debug("Capabilities event queue connected");
// The event queue is starting up for the first time
if (OnConnected != null)
{
try { OnConnected(); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
try { OnConnected(); }
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
}
private void Client_UploadDataCompleted(object sender, CapsBase.UploadDataCompletedEventArgs e)
void RequestCompletedHandler(HttpWebRequest request, HttpWebResponse response, byte[] responseData, Exception error)
{
// We don't care about this request now that it has completed
_Request = null;
OSDArray events = null;
int ack = 0;
if (e.Error != null)
if (responseData != null)
{
// Got a response
OSDMap result = OSDParser.DeserializeLLSDXml(responseData) as OSDMap;
if (result != null)
{
events = result["events"] as OSDArray;
ack = result["id"].AsInteger();
}
else
{
Logger.Log.Warn("Got an unparseable response from the event queue: \"" +
System.Text.Encoding.UTF8.GetString(responseData) + "\"");
}
}
else if (error != null)
{
#region Error handling
HttpStatusCode code = HttpStatusCode.OK;
if (e.Error is WebException && ((WebException)e.Error).Response != null)
code = ((HttpWebResponse)((WebException)e.Error).Response).StatusCode;
if (error is WebException)
{
WebException webException = (WebException)error;
if (webException.Response != null)
code = ((HttpWebResponse)webException.Response).StatusCode;
else if (webException.Status == WebExceptionStatus.RequestCanceled)
goto HandlingDone;
}
if (error is WebException && ((WebException)error).Response != null)
code = ((HttpWebResponse)((WebException)error).Response).StatusCode;
if (code == HttpStatusCode.NotFound || code == HttpStatusCode.Gone)
{
Logger.Log.InfoFormat("Closing event queue at {0} due to missing caps URI", _Client.Location);
Logger.Log.InfoFormat("Closing event queue at {0} due to missing caps URI", _Address);
_Running = false;
_Dead = true;
@@ -143,55 +152,50 @@ namespace OpenMetaverse.Http
// interprets this as a generic error and returns a 502 to us
// that we ignore
}
else if (!e.Cancelled)
else
{
// Try to log a meaningful error message
if (code != HttpStatusCode.OK)
{
Logger.Log.WarnFormat("Unrecognized caps connection problem from {0}: {1}",
_Client.Location, code);
_Address, code);
}
else if (e.Error.InnerException != null)
else if (error.InnerException != null)
{
Logger.Log.WarnFormat("Unrecognized caps exception from {0}: {1}",
_Client.Location, e.Error.InnerException.Message);
_Address, error.InnerException.Message);
}
else
{
Logger.Log.WarnFormat("Unrecognized caps exception from {0}: {1}",
_Client.Location, e.Error.Message);
_Address, error.Message);
}
}
}
else if (!e.Cancelled && e.Result != null)
{
// Got a response
OSD result = OSDParser.DeserializeLLSDXml(e.Result);
if (result != null && result.Type == OSDType.Map)
{
// Parse any events returned by the event queue
OSDMap map = (OSDMap)result;
events = (OSDArray)map["events"];
ack = map["id"].AsInteger();
}
#endregion Error handling
}
else if (e.Cancelled)
else
{
// Connection was cancelled
Logger.Log.Debug("Cancelled connection to event queue at " + _Client.Location);
Logger.Log.Warn("No response from the event queue but no reported error either");
}
HandlingDone:
#region Resume the connection
if (_Running)
{
OSDMap request = new OSDMap();
if (ack != 0) request["ack"] = OSD.FromInteger(ack);
else request["ack"] = new OSD();
request["done"] = OSD.FromBoolean(_Dead);
OSDMap osdRequest = new OSDMap();
if (ack != 0) osdRequest["ack"] = OSD.FromInteger(ack);
else osdRequest["ack"] = new OSD();
osdRequest["done"] = OSD.FromBoolean(_Dead);
byte[] postData = OSDParser.SerializeLLSDXmlBytes(request);
byte[] postData = OSDParser.SerializeLLSDXmlBytes(osdRequest);
_Client.UploadDataAsync(_Client.Location, postData);
// Resume the connection. The event handler for the connection opening
// just sets class _Request variable to the current HttpWebRequest
CapsBase.UploadDataAsync(_Address, null, "application/xml", postData, REQUEST_TIMEOUT,
delegate(HttpWebRequest newRequest) { _Request = newRequest; }, null, RequestCompletedHandler);
// If the event queue is dead at this point, turn it off since
// that was the last thing we want to do
@@ -202,6 +206,10 @@ namespace OpenMetaverse.Http
}
}
#endregion Resume the connection
#region Handle incoming events
if (OnEvent != null && events != null && events.Count > 0)
{
// Fire callbacks for each event received
@@ -214,8 +222,8 @@ namespace OpenMetaverse.Http
catch (Exception ex) { Logger.Log.Error(ex.Message, ex); }
}
}
}
#endregion Callback Handlers
#endregion Handle incoming events
}
}
}