using System; using System.Collections; using System.Collections.Generic; using System.Net; using System.Threading; using OpenMetaverse; using OpenMetaverse.Packets; namespace Simian { public struct IncomingPacket { public Agent Agent; public Packet Packet; } public class UDPServer : UDPBase { /// /// Coupled with RegisterCallback(), this is triggered whenever a packet /// of a registered type is received /// public delegate void PacketCallback(Packet packet, Agent agent); /// This is only used to fetch unassociated agents, which will /// be exposed through a login interface at some point Simian server; /// Handlers for incoming packets PacketEventDictionary packetEvents = new PacketEventDictionary(); /// All of the agents currently connected to this UDP server Dictionary agents = new Dictionary(); /// Incoming packets that are awaiting handling BlockingQueue packetInbox = new BlockingQueue(Settings.PACKET_INBOX_SIZE); public UDPServer(int port, Simian server) : base(port) { this.server = server; Start(); // Start the incoming packet processing thread Thread incomingThread = new Thread(new ThreadStart(IncomingPacketHandler)); incomingThread.Start(); } public void RegisterPacketCallback(PacketType type, PacketCallback callback) { packetEvents.RegisterEvent(type, callback); } protected override void PacketReceived(UDPPacketBuffer buffer) { Agent agent = null; Packet packet = null; int packetEnd = buffer.DataLength - 1; // Decoding try { packet = Packet.BuildPacket(buffer.Data, ref packetEnd, buffer.ZeroData); } catch (MalformedDataException) { Logger.Log(String.Format("Malformed data, cannot parse packet:\n{0}", Utils.BytesToHexString(buffer.Data, buffer.DataLength, null)), Helpers.LogLevel.Error); } // Fail-safe check if (packet == null) { Logger.Log("Couldn't build a message from the incoming data", Helpers.LogLevel.Warning); return; } //Stats.RecvBytes += (ulong)buffer.DataLength; //++Stats.RecvPackets; if (packet.Type == PacketType.UseCircuitCode) { UseCircuitCodePacket useCircuitCode = (UseCircuitCodePacket)packet; if (server.TryGetUnassociatedAgent(useCircuitCode.CircuitCode.Code, out agent)) { agent.Initialize((IPEndPoint)buffer.RemoteEndPoint); lock (agents) agents[(IPEndPoint)buffer.RemoteEndPoint] = agent; Logger.Log("Activated UDP circuit " + useCircuitCode.CircuitCode.Code, Helpers.LogLevel.Info); //agent.SendAck(useCircuitCode.Header.Sequence); } else { Logger.Log("Received a UseCircuitCode packet for an unrecognized circuit: " + useCircuitCode.CircuitCode.Code.ToString(), Helpers.LogLevel.Warning); return; } } else { // Determine which agent this packet came from if (!agents.TryGetValue((IPEndPoint)buffer.RemoteEndPoint, out agent)) { Logger.Log("Received UDP packet from an unrecognized source: " + ((IPEndPoint)buffer.RemoteEndPoint).ToString(), Helpers.LogLevel.Warning); return; } } // Reliable handling if (packet.Header.Reliable) { // Queue up this sequence number for acknowledgement agent.QueueAck((uint)packet.Header.Sequence); //if (packet.Header.Resent) ++Stats.ReceivedResends; } // Inbox insertion IncomingPacket incomingPacket; incomingPacket.Agent = agent; incomingPacket.Packet = packet; // TODO: Prioritize the queue packetInbox.Enqueue(incomingPacket); } protected override void PacketSent(UDPPacketBuffer buffer, int bytesSent) { Logger.DebugLog("Sent " + buffer.DataLength + " byte packet"); } private void IncomingPacketHandler() { IncomingPacket incomingPacket = new IncomingPacket(); Packet packet = null; Agent agent = null; while (IsRunning) { // Reset packet to null for the check below packet = null; if (packetInbox.Dequeue(100, ref incomingPacket)) { packet = incomingPacket.Packet; agent = incomingPacket.Agent; if (packet != null) { #region ACK accounting // Check the archives to see whether we already received this packet lock (agent.packetArchive) { if (agent.packetArchive.Contains(packet.Header.Sequence)) { if (packet.Header.Resent) { Logger.DebugLog("Received resent packet #" + packet.Header.Sequence); } else { Logger.Log(String.Format("Received a duplicate of packet #{0}, current type: {1}", packet.Header.Sequence, packet.Type), Helpers.LogLevel.Warning); } // Avoid firing a callback twice for the same packet continue; } else { // Keep the PacketArchive size within a certain capacity while (agent.packetArchive.Count >= Settings.PACKET_ARCHIVE_SIZE) { agent.packetArchive.Dequeue(); agent.packetArchive.Dequeue(); agent.packetArchive.Dequeue(); agent.packetArchive.Dequeue(); } agent.packetArchive.Enqueue(packet.Header.Sequence); } } #endregion ACK accounting #region ACK handling // Handle appended ACKs if (packet.Header.AppendedAcks) { lock (agent.needAcks) { for (int i = 0; i < packet.Header.AckList.Length; i++) agent.needAcks.Remove(packet.Header.AckList[i]); } } // Handle PacketAck packets if (packet.Type == PacketType.PacketAck) { PacketAckPacket ackPacket = (PacketAckPacket)packet; lock (agent.needAcks) { for (int i = 0; i < ackPacket.Packets.Length; i++) agent.needAcks.Remove(ackPacket.Packets[i].ID); } } #endregion ACK handling packetEvents.BeginRaiseEvent(packet.Type, packet, agent); } } } } } }