using System; using System.Collections; using System.Collections.Generic; using System.Net; using System.Threading; using ExtensionLoader; using OpenMetaverse; using OpenMetaverse.Packets; namespace Simian { public struct IncomingPacket { public UDPClient Client; public Packet Packet; } public class OutgoingPacket { public Packet Packet; /// Number of times this packet has been resent public int ResendCount; /// Environment.TickCount when this packet was last sent over the wire public int TickCount; /// Category this packet belongs to public PacketCategory Category; public OutgoingPacket(Packet packet, PacketCategory category) { Packet = packet; Category = category; } } public class UDPClient { /// public Agent Agent; /// public IPEndPoint Address; /// Sequence numbers of packets we've received (for duplicate checking) public Queue PacketArchive = new Queue(); /// Packets we have sent that need to be ACKed by the client public Dictionary NeedAcks = new Dictionary(); /// ACKs that are queued up, waiting to be sent to the client public SortedList PendingAcks = new SortedList(); /// Current packet sequence number public int CurrentSequence = 0; Timer ackTimer; UDPServer udpServer; public UDPClient(UDPServer server, Agent agent, IPEndPoint address) { udpServer = server; Agent = agent; Address = address; ackTimer = new Timer(new TimerCallback(AckTimer_Elapsed), null, Settings.NETWORK_TICK_INTERVAL, Settings.NETWORK_TICK_INTERVAL); } public void Shutdown() { ackTimer.Dispose(); } private void AckTimer_Elapsed(object obj) { udpServer.SendAcks(this); udpServer.ResendUnacked(this); } } public class UDPManager : IExtension, IUDPProvider { Simian server; UDPServer udpServer; public UDPManager() { } public void Start(Simian server) { this.server = server; udpServer = new UDPServer(server.UDPPort, server); } public void Stop() { udpServer.Stop(); } public void AddClient(Agent agent, IPEndPoint endpoint) { udpServer.AddClient(agent, endpoint); } public bool RemoveClient(Agent agent) { return udpServer.RemoveClient(agent); } public uint CreateCircuit(Agent agent) { return udpServer.CreateCircuit(agent); } public void SendPacket(UUID agentID, Packet packet, PacketCategory category) { udpServer.SendPacket(agentID, packet, category); } public void BroadcastPacket(Packet packet, PacketCategory category) { udpServer.BroadcastPacket(packet, category); } public void RegisterPacketCallback(PacketType type, PacketCallback callback) { udpServer.RegisterPacketCallback(type, callback); } } public class UDPServer : UDPBase { /// This is only used to fetch unassociated agents, which will /// be exposed through a login interface at some point Simian server; /// Handlers for incoming packets PacketEventDictionary packetEvents = new PacketEventDictionary(); /// Incoming packets that are awaiting handling BlockingQueue packetInbox = new BlockingQueue(Settings.PACKET_INBOX_SIZE); /// DoubleDictionary clients = new DoubleDictionary(); /// Dictionary unassociatedAgents = new Dictionary(); /// int currentCircuitCode = 0; public UDPServer(int port, Simian server) : base(port) { this.server = server; Start(); // Start the incoming packet processing thread Thread incomingThread = new Thread(new ThreadStart(IncomingPacketHandler)); incomingThread.Start(); } public void RegisterPacketCallback(PacketType type, PacketCallback callback) { packetEvents.RegisterEvent(type, callback); } public void AddClient(Agent agent, IPEndPoint endpoint) { UDPClient client = new UDPClient(this, agent, endpoint); clients.Add(agent.AgentID, endpoint, client); } public bool RemoveClient(Agent agent) { UDPClient client; if (clients.TryGetValue(agent.AgentID, out client)) { client.Shutdown(); lock (server.Agents) server.Agents.Remove(agent.AgentID); return clients.Remove(agent.AgentID, client.Address); } else return false; } public uint CreateCircuit(Agent agent) { uint circuitCode = (uint)Interlocked.Increment(ref currentCircuitCode); // Put this client in the list of clients that have not been associated with an IPEndPoint yet lock (unassociatedAgents) unassociatedAgents[circuitCode] = agent; Logger.Log("Created a circuit for " + agent.FirstName, Helpers.LogLevel.Info); return circuitCode; } public void BroadcastPacket(Packet packet, PacketCategory category) { clients.ForEach( delegate(UDPClient client) { SendPacket(client, new OutgoingPacket(packet, category)); }); } public void SendPacket(UUID agentID, Packet packet, PacketCategory category) { // Look up the UDPClient this is going to UDPClient client; if (!clients.TryGetValue(agentID, out client)) { Logger.Log("Attempted to send a packet to unknown UDP client " + agentID.ToString(), Helpers.LogLevel.Warning); return; } SendPacket(client, new OutgoingPacket(packet, category)); } void SendPacket(UDPClient client, OutgoingPacket outgoingPacket) { Packet packet = outgoingPacket.Packet; byte[] buffer; int bytes; // Update the sent time for this packet outgoingPacket.TickCount = Environment.TickCount; if (!packet.Header.Resent) { // Reset to zero if we've hit the upper sequence number limit Interlocked.CompareExchange(ref client.CurrentSequence, 0, 0xFFFFFF); // Increment and fetch the current sequence number uint sequence = (uint)Interlocked.Increment(ref client.CurrentSequence); packet.Header.Sequence = sequence; if (packet.Header.Reliable) { // Add this packet to the list of ACK responses we are waiting on from this client lock (client.NeedAcks) client.NeedAcks[sequence] = outgoingPacket; // This packet is reliable and not a resend, check if the conditions are favorable // to ACK appending if (packet.Type != PacketType.PacketAck) { lock (client.PendingAcks) { int count = client.PendingAcks.Count; if (count > 0 && count < 10) { // Append all of the queued up outgoing ACKs to this packet packet.Header.AckList = new uint[count]; for (int i = 0; i < count; i++) packet.Header.AckList[i] = client.PendingAcks.Values[i]; client.PendingAcks.Clear(); packet.Header.AppendedAcks = true; } } } } } else { // This packet has already been sent out once, strip any appended ACKs // off it and reinsert them into the outgoing ACK queue under the // assumption that this packet will continually be rejected from the // client or that the appended ACKs are possibly making the delivery fail if (packet.Header.AckList.Length > 0) { Logger.DebugLog(String.Format("Purging ACKs from packet #{0} ({1}) which will be resent.", packet.Header.Sequence, packet.GetType())); lock (client.PendingAcks) { foreach (uint ack in packet.Header.AckList) { if (!client.PendingAcks.ContainsKey(ack)) client.PendingAcks[ack] = ack; } } packet.Header.AppendedAcks = false; packet.Header.AckList = new uint[0]; } } // Serialize the packet buffer = packet.ToBytes(); bytes = buffer.Length; //Stats.SentBytes += (ulong)bytes; //++Stats.SentPackets; UDPPacketBuffer buf = new UDPPacketBuffer(client.Address); // Zerocode if needed if (packet.Header.Zerocoded) bytes = Helpers.ZeroEncode(buffer, bytes, buf.Data); else Buffer.BlockCopy(buffer, 0, buf.Data, 0, bytes); buf.DataLength = bytes; AsyncBeginSend(buf); } void QueueAck(UDPClient client, uint ack) { // Add this packet to the list of ACKs that need to be sent out lock (client.PendingAcks) client.PendingAcks[ack] = ack; // Send out ACKs if we have a lot of them if (client.PendingAcks.Count >= 10) SendAcks(client); } void ProcessAcks(UDPClient client, List acks) { lock (client.NeedAcks) { foreach (uint ack in acks) client.NeedAcks.Remove(ack); } } void SendAck(UDPClient client, uint ack) { PacketAckPacket acks = new PacketAckPacket(); acks.Header.Reliable = false; acks.Packets = new PacketAckPacket.PacketsBlock[1]; acks.Packets[0] = new PacketAckPacket.PacketsBlock(); acks.Packets[0].ID = ack; SendPacket(client, new OutgoingPacket(acks, PacketCategory.Overhead)); } public void SendAcks(UDPClient client) { PacketAckPacket acks = null; lock (client.PendingAcks) { int count = client.PendingAcks.Count; if (count > 250) { Logger.Log("Too many ACKs queued up!", Helpers.LogLevel.Error); return; } else if (count > 0) { acks = new PacketAckPacket(); acks.Header.Reliable = false; acks.Packets = new PacketAckPacket.PacketsBlock[count]; for (int i = 0; i < count; i++) { acks.Packets[i] = new PacketAckPacket.PacketsBlock(); acks.Packets[i].ID = client.PendingAcks.Values[i]; } client.PendingAcks.Clear(); } } if (acks != null) SendPacket(client, new OutgoingPacket(acks, PacketCategory.Overhead)); } public void ResendUnacked(UDPClient client) { if (client.NeedAcks.Count > 0) { OutgoingPacket[] array; int now = Environment.TickCount; lock (client.NeedAcks) { // Create a temporary copy of the outgoing packets array to iterate over array = new OutgoingPacket[client.NeedAcks.Count]; client.NeedAcks.Values.CopyTo(array, 0); } // Resend packets for (int i = 0; i < array.Length; i++) { OutgoingPacket outgoing = array[i]; // FIXME: Make 4000 and 3 .ini settings if (outgoing.TickCount != 0 && now - outgoing.TickCount > 4000) { if (outgoing.ResendCount < 3) { Logger.DebugLog(String.Format("Resending packet #{0} ({1}), {2}ms have passed", outgoing.Packet.Header.Sequence, outgoing.Packet.GetType(), now - outgoing.TickCount)); // The TickCount will be set to the current time when the packet // is actually sent out again outgoing.TickCount = 0; outgoing.Packet.Header.Resent = true; ++outgoing.ResendCount; //++Stats.ResentPackets; SendPacket(client, outgoing); } else { Logger.Log(String.Format("Dropping packet #{0} ({1}) after {2} failed attempts", outgoing.Packet.Header.Sequence, outgoing.Packet.GetType(), outgoing.ResendCount), Helpers.LogLevel.Warning); lock (client.NeedAcks) client.NeedAcks.Remove(outgoing.Packet.Header.Sequence); //Disconnect an agent if no packets are received for some time //FIXME: Make 60000 an .ini setting if (Environment.TickCount - client.Agent.TickLastPacketReceived > 60000) { Logger.Log(String.Format("Ack timeout for {0}, disconnecting", client.Agent.Avatar.Name), Helpers.LogLevel.Warning); server.Avatars.Disconnect(client.Agent); return; } } } } } } protected override void PacketReceived(UDPPacketBuffer buffer) { UDPClient client = null; Packet packet = null; int packetEnd = buffer.DataLength - 1; IPEndPoint address = (IPEndPoint)buffer.RemoteEndPoint; // Decoding try { packet = Packet.BuildPacket(buffer.Data, ref packetEnd, buffer.ZeroData); } catch (MalformedDataException) { Logger.Log(String.Format("Malformed data, cannot parse packet:\n{0}", Utils.BytesToHexString(buffer.Data, buffer.DataLength, null)), Helpers.LogLevel.Error); } // Fail-safe check if (packet == null) { Logger.Log("Couldn't build a message from the incoming data", Helpers.LogLevel.Warning); return; } //Stats.RecvBytes += (ulong)buffer.DataLength; //++Stats.RecvPackets; if (packet.Type == PacketType.UseCircuitCode) { UseCircuitCodePacket useCircuitCode = (UseCircuitCodePacket)packet; Agent agent; if (CompleteAgentConnection(useCircuitCode.CircuitCode.Code, out agent)) { // FIXME: Sanity check that the agent isn't already logged in here AddClient(agent, address); if (clients.TryGetValue(agent.AgentID, out client)) { Logger.Log("Activated UDP circuit " + useCircuitCode.CircuitCode.Code, Helpers.LogLevel.Info); } else { Logger.Log("Failed to locate newly created UDPClient", Helpers.LogLevel.Error); return; } } else { Logger.Log("Received a UseCircuitCode packet for an unrecognized circuit: " + useCircuitCode.CircuitCode.Code.ToString(), Helpers.LogLevel.Warning); return; } } else { // Determine which agent this packet came from if (!clients.TryGetValue(address, out client)) { Logger.Log("Received UDP packet from an unrecognized source: " + address.ToString(), Helpers.LogLevel.Warning); return; } } client.Agent.TickLastPacketReceived = Environment.TickCount; // Reliable handling if (packet.Header.Reliable) { // Queue up this sequence number for acknowledgement QueueAck(client, (uint)packet.Header.Sequence); //if (packet.Header.Resent) ++Stats.ReceivedResends; } // Inbox insertion IncomingPacket incomingPacket; incomingPacket.Client = client; incomingPacket.Packet = packet; // TODO: Prioritize the queue packetInbox.Enqueue(incomingPacket); } protected override void PacketSent(UDPPacketBuffer buffer, int bytesSent) { } void IncomingPacketHandler() { IncomingPacket incomingPacket = new IncomingPacket(); Packet packet = null; UDPClient client = null; while (IsRunning) { // Reset packet to null for the check below packet = null; if (packetInbox.Dequeue(100, ref incomingPacket)) { packet = incomingPacket.Packet; client = incomingPacket.Client; if (packet != null) { #region ACK accounting // Check the archives to see whether we already received this packet lock (client.PacketArchive) { if (client.PacketArchive.Contains(packet.Header.Sequence)) { if (packet.Header.Resent) { Logger.DebugLog("Received resent packet #" + packet.Header.Sequence); } else { Logger.Log(String.Format("Received a duplicate of packet #{0}, current type: {1}", packet.Header.Sequence, packet.Type), Helpers.LogLevel.Warning); } // Avoid firing a callback twice for the same packet continue; } else { // Keep the PacketArchive size within a certain capacity while (client.PacketArchive.Count >= Settings.PACKET_ARCHIVE_SIZE) { client.PacketArchive.Dequeue(); client.PacketArchive.Dequeue(); client.PacketArchive.Dequeue(); client.PacketArchive.Dequeue(); } client.PacketArchive.Enqueue(packet.Header.Sequence); } } #endregion ACK accounting #region ACK handling // Handle appended ACKs if (packet.Header.AppendedAcks) { lock (client.NeedAcks) { for (int i = 0; i < packet.Header.AckList.Length; i++) client.NeedAcks.Remove(packet.Header.AckList[i]); } } // Handle PacketAck packets if (packet.Type == PacketType.PacketAck) { PacketAckPacket ackPacket = (PacketAckPacket)packet; lock (client.NeedAcks) { for (int i = 0; i < ackPacket.Packets.Length; i++) client.NeedAcks.Remove(ackPacket.Packets[i].ID); } } #endregion ACK handling packetEvents.BeginRaiseEvent(packet.Type, packet, client.Agent); } } } } bool CompleteAgentConnection(uint circuitCode, out Agent agent) { lock (unassociatedAgents) { if (unassociatedAgents.TryGetValue(circuitCode, out agent)) { unassociatedAgents.Remove(circuitCode); lock (server.Agents) server.Agents[agent.AgentID] = agent; return true; } else { return false; } } } } }