* Heralding in a new era of botdom with [LIBOMV-343], rate limiting for outgoing packets

* Converted BlockingQueue to a generic collection

git-svn-id: http://libopenmetaverse.googlecode.com/svn/trunk@2048 52acb1d6-8a22-11de-b505-999d5b087335
This commit is contained in:
John Hurliman
2008-08-02 01:49:20 +00:00
parent 09340efe47
commit e2986fbf06
6 changed files with 151 additions and 93 deletions

View File

@@ -422,8 +422,6 @@ namespace OpenMetaverse
if (download.TimeSinceLastPacket > 5000)
{
Logger.DebugLog("Image download refresh timer is re-requesting texture " + download.ID.ToString());
// FIXME: This will probably break on baked textures and doesn't preserve the originally requested discardlevel
download.TimeSinceLastPacket = 0;
RequestImage(download.ID, ImageType.Normal, 1013010.0f, 0);
@@ -652,8 +650,8 @@ namespace OpenMetaverse
if (Single.IsNaN(percentComplete))
percentComplete = 0f;
Logger.DebugLog(String.Format("Updating priority on image transfer {0}, ({1:#.#}% complete",
imageID, percentComplete));
Logger.DebugLog(String.Format("Updating priority on image transfer {0}, {1}% complete",
imageID, Math.Round(percentComplete, 2)));
}
// Build and send the request packet

View File

@@ -25,6 +25,7 @@
*/
using System;
using System.Collections.Generic;
using System.Threading;
using OpenMetaverse;
@@ -34,28 +35,19 @@ namespace System.Collections
/// Same as Queue except Dequeue function blocks until there is an object to return.
/// Note: This class does not need to be synchronized
/// </summary>
public class BlockingQueue : Queue
public class BlockingQueue<T> : Queue<T>
{
private object SyncRoot;
private bool open;
/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="col">The System.Collections.ICollection to copy elements from</param>
public BlockingQueue(ICollection col)
public BlockingQueue(IEnumerable<T> col)
: base(col)
{
open = true;
}
/// <summary>
/// Create new BlockingQueue.
/// </summary>
/// <param name="capacity">The initial number of elements that the queue can contain</param>
/// <param name="growFactor">The factor by which the capacity of the queue is expanded</param>
public BlockingQueue(int capacity, float growFactor)
: base(capacity, growFactor)
{
SyncRoot = new object();
open = true;
}
@@ -66,6 +58,7 @@ namespace System.Collections
public BlockingQueue(int capacity)
: base(capacity)
{
SyncRoot = new object();
open = true;
}
@@ -75,6 +68,7 @@ namespace System.Collections
public BlockingQueue()
: base()
{
SyncRoot = new object();
open = true;
}
@@ -89,9 +83,9 @@ namespace System.Collections
/// <summary>
/// Remove all objects from the Queue.
/// </summary>
public override void Clear()
public new void Clear()
{
lock (base.SyncRoot)
lock (SyncRoot)
{
base.Clear();
}
@@ -102,11 +96,11 @@ namespace System.Collections
/// </summary>
public void Close()
{
lock (base.SyncRoot)
lock (SyncRoot)
{
open = false;
base.Clear();
Monitor.PulseAll(base.SyncRoot); // resume any waiting threads
Monitor.PulseAll(SyncRoot); // resume any waiting threads
}
}
@@ -114,7 +108,7 @@ namespace System.Collections
/// Removes and returns the object at the beginning of the Queue.
/// </summary>
/// <returns>Object in queue.</returns>
public override object Dequeue()
public new T Dequeue()
{
return Dequeue(Timeout.Infinite);
}
@@ -124,7 +118,7 @@ namespace System.Collections
/// </summary>
/// <param name="timeout">time to wait before returning</param>
/// <returns>Object in queue.</returns>
public NetworkManager.IncomingPacket Dequeue(TimeSpan timeout)
public T Dequeue(TimeSpan timeout)
{
return Dequeue(timeout.Milliseconds);
}
@@ -134,51 +128,54 @@ namespace System.Collections
/// </summary>
/// <param name="timeout">time to wait before returning (in milliseconds)</param>
/// <returns>Object in queue.</returns>
public NetworkManager.IncomingPacket Dequeue(int timeout)
public T Dequeue(int timeout)
{
lock (base.SyncRoot)
lock (SyncRoot)
{
while (open && (base.Count == 0))
{
if (!Monitor.Wait(base.SyncRoot, timeout))
if (!Monitor.Wait(SyncRoot, timeout))
throw new InvalidOperationException("Timeout");
}
if (open)
return (NetworkManager.IncomingPacket)base.Dequeue();
return base.Dequeue();
else
throw new InvalidOperationException("Queue Closed");
}
}
public bool Dequeue(int timeout, ref NetworkManager.IncomingPacket packet)
public bool Dequeue(int timeout, ref T obj)
{
lock (base.SyncRoot)
lock (SyncRoot)
{
while (open && (base.Count == 0))
{
if (!Monitor.Wait(base.SyncRoot, timeout))
if (!Monitor.Wait(SyncRoot, timeout))
return false;
}
if (open)
{
packet = (NetworkManager.IncomingPacket)base.Dequeue();
obj = base.Dequeue();
return true;
}
else
{
obj = default(T);
return false;
}
}
}
/// <summary>
/// Adds an object to the end of the Queue.
/// Adds an object to the end of the Queue
/// </summary>
/// <param name="obj">Object to put in queue</param>
public override void Enqueue(object obj)
public new void Enqueue(T obj)
{
lock (base.SyncRoot)
lock (SyncRoot)
{
base.Enqueue(obj);
Monitor.Pulse(base.SyncRoot);
Monitor.Pulse(SyncRoot);
}
}
@@ -187,7 +184,7 @@ namespace System.Collections
/// </summary>
public void Open()
{
lock (base.SyncRoot)
lock (SyncRoot)
{
open = true;
}

View File

@@ -72,15 +72,42 @@ namespace OpenMetaverse
#region Structs
/// <summary>
/// Holds a simulator reference and a packet, these structs are put in
/// the packet inbox for decoding
/// Holds a simulator reference and a decoded packet, these structs are put in
/// the packet inbox for event handling
/// </summary>
public struct IncomingPacket
{
/// <summary>Reference to the simulator that this packet came from</summary>
public Simulator Simulator;
/// <summary>The packet that needs to be processed</summary>
/// <summary>Packet that needs to be processed</summary>
public Packet Packet;
public IncomingPacket(Simulator simulator, Packet packet)
{
Simulator = simulator;
Packet = packet;
}
}
/// <summary>
/// Holds a simulator reference and an encoded packet, these structs are put in
/// the packet outbox for sending
/// </summary>
public struct OutgoingPacket
{
/// <summary>Reference to the simulator this packet is destined for</summary>
public Simulator Simulator;
/// <summary>Packet that needs to be processed</summary>
public Packet Packet;
/// <summary>True if the sequence number needs to be set, otherwise false</summary>
public bool SetSequence;
public OutgoingPacket(Simulator simulator, Packet packet, bool setSequence)
{
Simulator = simulator;
Packet = packet;
SetSequence = setSequence;
}
}
#endregion Structs
@@ -212,6 +239,8 @@ namespace OpenMetaverse
public bool Connected { get { return connected; } }
/// <summary>Number of packets in the incoming queue</summary>
public int InboxCount { get { return PacketInbox.Count; } }
/// <summary>Number of packets in the outgoing queue</summary>
public int OutboxCount { get { return PacketOutbox.Count; } }
#endregion Properties
@@ -223,7 +252,9 @@ namespace OpenMetaverse
/// <summary>Handlers for incoming packets</summary>
internal PacketEventDictionary PacketEvents;
/// <summary>Incoming packets that are awaiting handling</summary>
internal BlockingQueue PacketInbox = new BlockingQueue(Settings.PACKET_INBOX_SIZE);
internal BlockingQueue<IncomingPacket> PacketInbox = new BlockingQueue<IncomingPacket>(Settings.PACKET_INBOX_SIZE);
/// <summary>Outgoing packets that are awaiting handling</summary>
internal BlockingQueue<OutgoingPacket> PacketOutbox = new BlockingQueue<OutgoingPacket>(Settings.PACKET_INBOX_SIZE);
private GridClient Client;
private Timer DisconnectTimer;
@@ -328,31 +359,6 @@ namespace OpenMetaverse
simulator.SendPacket(packet, true);
}
/// <summary>
/// Send a raw byte array as a packet to the current simulator
/// </summary>
/// <param name="payload">Byte array containing a packet</param>
/// <param name="setSequence">Whether to set the second, third, and fourth
/// bytes of the payload to the current sequence number</param>
public void SendPacket(byte[] payload, bool setSequence)
{
if (CurrentSim != null)
CurrentSim.SendPacket(payload, setSequence);
}
/// <summary>
/// Send a raw byte array as a packet to the specified simulator
/// </summary>
/// <param name="payload">Byte array containing a packet</param>
/// <param name="simulator">Simulator to send the packet to</param>
/// <param name="setSequence">Whether to set the second, third, and fourth
/// bytes of the payload to the current sequence number</param>
public void SendPacket(byte[] payload, Simulator simulator, bool setSequence)
{
if (simulator != null)
simulator.SendPacket(payload, setSequence);
}
/// <summary>
/// Connect to a simulator
/// </summary>
@@ -403,12 +409,17 @@ namespace OpenMetaverse
// Mark that we are connecting/connected to the grid
connected = true;
// restart the blocking queue in case of re-connect
// Open the queues in case this is a reconnect and they were shut down
PacketInbox.Open();
PacketOutbox.Open();
// Start the packet decoding thread
Thread decodeThread = new Thread(new ThreadStart(PacketHandler));
Thread decodeThread = new Thread(new ThreadStart(IncomingPacketHandler));
decodeThread.Start();
// Start the packet sending thread
Thread sendThread = new Thread(new ThreadStart(OutgoingPacketHandler));
sendThread.Start();
}
// Fire the OnSimConnecting event
@@ -605,6 +616,7 @@ namespace OpenMetaverse
// Clear out all of the packets that never had time to process
PacketInbox.Close();
PacketOutbox.Close();
connected = false;
@@ -649,7 +661,38 @@ namespace OpenMetaverse
}
}
private void PacketHandler()
private void OutgoingPacketHandler()
{
OutgoingPacket outgoingPacket = new OutgoingPacket();
Simulator simulator = null;
Packet packet = null;
int now;
int lastPacketTime = Environment.TickCount;
while (connected)
{
if (PacketOutbox.Dequeue(100, ref outgoingPacket))
{
simulator = outgoingPacket.Simulator;
packet = outgoingPacket.Packet;
// Very primitive rate limiting, keeps a fixed buffer of time between each packet
now = Environment.TickCount;
int ms = now - lastPacketTime;
if (ms < 75)
{
Logger.DebugLog(String.Format("Rate limiting, last packet was {0}ms ago", ms));
Thread.Sleep(75 - ms);
}
lastPacketTime = now;
simulator.SendPacketUnqueued(packet, outgoingPacket.SetSequence);
}
}
}
private void IncomingPacketHandler()
{
IncomingPacket incomingPacket = new IncomingPacket();
Packet packet = null;
@@ -660,7 +703,7 @@ namespace OpenMetaverse
// Reset packet to null for the check below
packet = null;
if (PacketInbox.Dequeue(200, ref incomingPacket))
if (PacketInbox.Dequeue(100, ref incomingPacket))
{
packet = incomingPacket.Packet;
simulator = incomingPacket.Simulator;

View File

@@ -890,7 +890,8 @@ namespace OpenMetaverse
Thread th = new Thread(delegate()
{
int y, x;
int count = 0, y, x;
for (y = 0; y < 64; y++)
{
for (x = 0; x < 64; x++)
@@ -903,11 +904,22 @@ namespace OpenMetaverse
Client.Parcels.PropertiesRequest(simulator,
(y + 1) * 4.0f, (x + 1) * 4.0f,
y * 4.0f, x * 4.0f, 0, false);
// Pause after every request to avoid flooding the sim
System.Threading.Thread.Sleep(msDelay);
// Simulators can be sliced up in 4x4 chunks, but even the smallest
// parcel is 16x16. There's no reason to send out 4096 packets. If
// we sleep for a little while here it is likely the response
// packet will come back and nearby parcel information is filled in
// so we can skip a lot of unnecessary sends
Thread.Sleep(100);
++count;
}
}
}
Logger.Log(String.Format(
"Requested full simulator parcel information. Sent {0} parcel requests. Current outgoing queue: {1}",
count, Client.Network.OutboxCount), Helpers.LogLevel.Info);
});
th.Start();

View File

@@ -572,16 +572,35 @@ namespace OpenMetaverse
}
}
/// <summary>
/// Sends a packet
/// </summary>
/// <param name="packet">Packet to be sent</param>
/// <param name="incrementSequence">Increment sequence number?</param>
///
/// <param name="setSequence">True to set the sequence number, false to
/// leave it as is</param>
public void SendPacket(Packet packet, bool setSequence)
{
// Send ACK and logout packets directly, everything else goes through the queue
if (packet.Type == PacketType.PacketAck ||
packet.Header.AppendedAcks ||
packet.Type == PacketType.LogoutRequest)
{
SendPacketUnqueued(packet, setSequence);
}
else
{
Network.PacketOutbox.Enqueue(new NetworkManager.OutgoingPacket(this, packet, setSequence));
}
}
public void SendPacket(Packet packet, bool incrementSequence)
/// <summary>
/// Sends a packet directly to the simulator without queuing
/// </summary>
/// <param name="packet">Packet to be sent</param>
/// <param name="setSequence">True to set the sequence number, false to
/// leave it as is</param>
public void SendPacketUnqueued(Packet packet, bool setSequence)
{
byte[] buffer;
int bytes;
@@ -589,7 +608,7 @@ namespace OpenMetaverse
// Keep track of when this packet was sent out
packet.TickCount = Environment.TickCount;
if (incrementSequence)
if (setSequence)
{
// Reset to zero if we've hit the upper sequence number limit
Interlocked.CompareExchange(ref Sequence, 0, Settings.MAX_SEQUENCE);
@@ -693,7 +712,7 @@ namespace OpenMetaverse
/// <param name="payload">The packet payload</param>
/// <param name="setSequence">Whether the second, third, and fourth bytes
/// should be modified to the current stream sequence number</param>
public void SendPacket(byte[] payload, bool setSequence)
public void SendPacketUnqueued(byte[] payload, bool setSequence)
{
try
{
@@ -730,16 +749,6 @@ namespace OpenMetaverse
}
}
/// <summary>
/// Send a prepared <code>UDPPacketBuffer</code> object as a packet
/// </summary>
/// <param name="buffer">The prepared packet structure to be sent out</param>
public void SendPacket(UDPPacketBuffer buffer)
{
try { AsyncBeginSend(buffer); }
catch (Exception e) { Logger.Log(e.Message, Helpers.LogLevel.Error, Client, e); }
}
/// <summary>
///
/// </summary>
@@ -921,7 +930,6 @@ namespace OpenMetaverse
/// <summary>
/// Resend unacknowledged packets
/// </summary>
///
private void ResendUnacked()
{
lock (NeedAck)
@@ -932,7 +940,7 @@ namespace OpenMetaverse
// Resend packets
foreach (Packet packet in NeedAck.Values)
{
if (now - packet.TickCount > Client.Settings.RESEND_TIMEOUT)
if (packet.TickCount != 0 && now - packet.TickCount > Client.Settings.RESEND_TIMEOUT)
{
if (packet.ResendCount < Client.Settings.MAX_RESEND_COUNT)
{
@@ -944,10 +952,10 @@ namespace OpenMetaverse
packet.Header.Sequence, packet.GetType(), now - packet.TickCount), Client);
}
packet.TickCount = 0;
packet.Header.Resent = true;
++Stats.ResentPackets;
++packet.ResendCount;
packet.TickCount = now;
SendPacket(packet, false);
}

View File

@@ -36,7 +36,7 @@ namespace OpenMetaverse.TestClient
if (Client.Network.CurrentSim.IsParcelMapFull())
ParcelsDownloaded.Set();
if (ParcelsDownloaded.WaitOne(20000, false) && Client.Network.Connected)
if (ParcelsDownloaded.WaitOne(30000, false) && Client.Network.Connected)
{
sb.AppendFormat("Downloaded {0} Parcels in {1} " + System.Environment.NewLine,
Client.Network.CurrentSim.Parcels.Count, Client.Network.CurrentSim.Name);