TexturePipeline._Transfers becomes ConcurrentDictionary

This commit is contained in:
cinder
2025-06-20 15:54:14 -05:00
parent d2064cccba
commit da6a861892

View File

@@ -1,5 +1,6 @@
/*
* Copyright (c) 2006-2016, openmetaverse.co
* Copyright (c) 2025, Sjofn LLC.
* All rights reserved.
*
* - Redistribution and use in source and binary forms, with or without
@@ -27,6 +28,8 @@
//#define DEBUG_TIMING
using System;
using System.Collections.Concurrent;
using System.Collections.Frozen;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
@@ -114,14 +117,14 @@ namespace OpenMetaverse
/// <summary>The time the request was sent to the simulator</summary>
public DateTime NetworkTime;
#endif
/// <summary>An object that maintains the data of an request thats in-process.</summary>
/// <summary>An object that maintains the data of a request that is in-process.</summary>
public ImageDownload Transfer;
}
/// <summary>A dictionary containing all pending and in-process transfer requests where the Key is both the RequestID
/// and also the Asset Texture ID, and the value is an object containing the current state of the request and also
/// the asset data as it is being re-assembled</summary>
private readonly Dictionary<UUID, TaskInfo> _Transfers;
private readonly ConcurrentDictionary<UUID, TaskInfo> _Transfers;
/// <summary>Holds the reference to the <see cref="GridClient"/> client object</summary>
private readonly GridClient _Client;
/// <summary>Maximum concurrent texture requests allowed at a time</summary>
@@ -129,17 +132,14 @@ namespace OpenMetaverse
/// <summary>The primary thread which manages the requests.</summary>
private Thread downloadMaster;
/// <summary>The cancellation token for the TexturePipeline and all child tasks.</summary>
private CancellationTokenSource downloadTokenSource;
private readonly CancellationTokenSource downloadTokenSource;
/// <summary>true if the TexturePipeline is currently running</summary>
bool _Running;
private bool _Running;
/// <summary>A refresh timer used to increase the priority of stalled requests</summary>
private System.Timers.Timer RefreshDownloadsTimer;
/// <summary>Current number of pending and in-process transfers</summary>
public int TransferCount
{
get { lock (_Transfers) return _Transfers.Count; }
}
public int TransferCount => _Transfers.Count;
/// <summary>
/// Default constructor, Instantiates a new copy of the TexturePipeline class
@@ -153,7 +153,7 @@ namespace OpenMetaverse
downloadTokenSource = new CancellationTokenSource();
_Transfers = new Dictionary<UUID, TaskInfo>();
_Transfers = new ConcurrentDictionary<UUID, TaskInfo>();
// Handle client connected and disconnected events
client.Network.LoginProgress += delegate(object sender, LoginProgressEventArgs e) {
@@ -224,42 +224,40 @@ namespace OpenMetaverse
_Client.Network.UnregisterCallback(PacketType.ImageData, ImageDataHandler);
_Client.Network.UnregisterCallback(PacketType.ImagePacket, ImagePacketHandler);
lock (_Transfers)
_Transfers.Clear();
_Transfers.Clear();
_Running = false;
}
private void RefreshDownloadsTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_Transfers)
var transfers = _Transfers.ToFrozenDictionary();
foreach (TaskInfo transfer in transfers.Values)
{
foreach (TaskInfo transfer in _Transfers.Values)
if (transfer.State != TextureRequestState.Progress) continue;
ImageDownload download = transfer.Transfer;
// Find the first missing packet in the download
ushort packet = 0;
lock (download)
{
if (transfer.State != TextureRequestState.Progress) continue;
ImageDownload download = transfer.Transfer;
if (download.PacketsSeen != null && download.PacketsSeen.Count > 0)
packet = GetFirstMissingPacket(download.PacketsSeen);
}
// Find the first missing packet in the download
ushort packet = 0;
lock (download)
{
if (download.PacketsSeen != null && download.PacketsSeen.Count > 0)
packet = GetFirstMissingPacket(download.PacketsSeen);
}
if (download.TimeSinceLastPacket > 5000)
{
// We're not receiving data for this texture fast enough, bump up the priority by 5%
download.Priority *= 1.05f;
if (download.TimeSinceLastPacket > 5000)
{
// We're not receiving data for this texture fast enough, bump up the priority by 5%
download.Priority *= 1.05f;
download.TimeSinceLastPacket = 0;
RequestImage(download.ID, download.ImageType, download.Priority, download.DiscardLevel, packet);
}
download.TimeSinceLastPacket = 0;
RequestImage(download.ID, download.ImageType, download.Priority, download.DiscardLevel, packet);
}
if (download.TimeSinceLastPacket > _Client.Settings.PIPELINE_REQUEST_TIMEOUT)
{
transfer.TokenSource.Cancel();
}
if (download.TimeSinceLastPacket > _Client.Settings.PIPELINE_REQUEST_TIMEOUT)
{
transfer.TokenSource.Cancel();
}
}
}
@@ -307,42 +305,30 @@ namespace OpenMetaverse
}
else
{
lock (_Transfers)
{
TaskInfo request;
if (_Transfers.TryGetValue(textureID, out request))
_Transfers.AddOrUpdate(textureID,
new TaskInfo
{
request.Callbacks.Add(callback);
}
else
{
request = new TaskInfo
{
State = TextureRequestState.Pending,
RequestID = textureID,
ReportProgress = progressive,
TokenSource = CancellationTokenSource.CreateLinkedTokenSource(downloadTokenSource.Token),
Type = imageType,
Callbacks = new List<TextureDownloadCallback> {callback}
};
ImageDownload downloadParams = new ImageDownload
State = TextureRequestState.Pending,
RequestID = textureID,
ReportProgress = progressive,
TokenSource = CancellationTokenSource.CreateLinkedTokenSource(downloadTokenSource.Token),
Type = imageType,
Callbacks = new List<TextureDownloadCallback> { callback },
Transfer = new ImageDownload
{
ID = textureID,
Priority = priority,
ImageType = imageType,
DiscardLevel = discardLevel
};
request.Transfer = downloadParams;
},
#if DEBUG_TIMING
request.StartTime = DateTime.UtcNow;
StartTime = DateTime.UtcNow
#endif
_Transfers.Add(textureID, request);
}
}
}, (uuid, info) =>
{
info.Callbacks.Add(callback);
return info;
});
}
}
@@ -369,8 +355,7 @@ namespace OpenMetaverse
}
else
{
TaskInfo task;
if (TryGetTransferValue(imageID, out task))
if (_Transfers.TryGetValue(imageID, out var task))
{
if (task.Transfer.Simulator != null)
{
@@ -381,8 +366,8 @@ namespace OpenMetaverse
if (percentComplete > 0f)
{
Logger.DebugLog(string.Format("Updating priority on image transfer {0} to {1}, {2}% complete",
imageID, task.Transfer.Priority, Math.Round(percentComplete, 2)));
Logger.DebugLog(
$"Updating priority on image transfer {imageID} to {task.Transfer.Priority}, {Math.Round(percentComplete, 2)}% complete");
}
}
else
@@ -414,7 +399,8 @@ namespace OpenMetaverse
}
else
{
Logger.Log("Received texture download request for a texture that isn't in the download queue: " + imageID, Helpers.LogLevel.Warning);
Logger.Log($"Received texture download request for a texture that isn't in the download queue: {imageID}",
Helpers.LogLevel.Warning);
}
}
}
@@ -425,8 +411,7 @@ namespace OpenMetaverse
/// <param name="textureID">The texture assets unique ID</param>
public void AbortTextureRequest(UUID textureID)
{
TaskInfo task;
if (!TryGetTransferValue(textureID, out task)) return;
if (!_Transfers.TryGetValue(textureID, out var task)) { return; }
// this means we've actually got the request assigned to the threadpool
if (task.State == TextureRequestState.Progress)
@@ -457,11 +442,11 @@ namespace OpenMetaverse
task.TokenSource.Cancel();
RemoveTransfer(textureID);
_Transfers.TryRemove(textureID, out _);
}
else
{
RemoveTransfer(textureID);
_Transfers.TryRemove(textureID, out _);
foreach (var callback in task.Callbacks)
callback(TextureRequestState.Aborted, new AssetTexture(textureID, Utils.EmptyBytes));
@@ -482,20 +467,17 @@ namespace OpenMetaverse
var pendingTasks = new Queue<TaskInfo>();
lock (_Transfers)
foreach (var request in _Transfers)
{
foreach (var request in _Transfers)
switch (request.Value.State)
{
switch (request.Value.State)
{
case TextureRequestState.Pending:
pendingTasks.Enqueue(request.Value);
break;
case TextureRequestState.Started:
case TextureRequestState.Progress:
++active;
break;
}
case TextureRequestState.Pending:
pendingTasks.Enqueue(request.Value);
break;
case TextureRequestState.Started:
case TextureRequestState.Progress:
++active;
break;
}
}
@@ -510,7 +492,7 @@ namespace OpenMetaverse
++active;
}
// Queue was empty or all download slots are inuse, let's give up some CPU time
// Queue was empty or all download slots are in use, let's give up some CPU time
Thread.Sleep(500);
}
@@ -549,8 +531,8 @@ namespace OpenMetaverse
if (task.TokenSource.Token.WaitHandle.WaitOne())
{
// Timed out
Logger.Log("Worker timeout waiting for texture " + task.RequestID + " to download got " +
task.Transfer.Transferred + " of " + task.Transfer.Size, Helpers.LogLevel.Warning);
Logger.Log($"Worker timeout waiting for texture {task.RequestID} to download got " +
$"{task.Transfer.Transferred} of {task.Transfer.Size}", Helpers.LogLevel.Warning);
AssetTexture texture = new AssetTexture(task.RequestID, task.Transfer.AssetData);
foreach (TextureDownloadCallback callback in task.Callbacks)
@@ -558,7 +540,7 @@ namespace OpenMetaverse
_Client.Assets.FireImageProgressEvent(task.RequestID, task.Transfer.Transferred, task.Transfer.Size);
RemoveTransfer(task.RequestID);
_Transfers.TryRemove(task.RequestID, out _);
}
}
@@ -569,7 +551,7 @@ namespace OpenMetaverse
lock (packetsSeen)
{
bool first = true;
foreach (KeyValuePair<ushort, ushort> packetSeen in packetsSeen)
foreach (var packetSeen in packetsSeen)
{
if (first)
{
@@ -608,9 +590,8 @@ namespace OpenMetaverse
protected void ImageNotInDatabaseHandler(object sender, PacketReceivedEventArgs e)
{
ImageNotInDatabasePacket imageNotFoundData = (ImageNotInDatabasePacket)e.Packet;
TaskInfo task;
if (TryGetTransferValue(imageNotFoundData.ImageID.ID, out task))
if (_Transfers.TryGetValue(imageNotFoundData.ImageID.ID, out var task))
{
// cancel active request and free up the threadpool slot
task.TokenSource.Cancel();
@@ -619,11 +600,12 @@ namespace OpenMetaverse
foreach (TextureDownloadCallback callback in task.Callbacks)
callback(TextureRequestState.NotFound, new AssetTexture(imageNotFoundData.ImageID.ID, Utils.EmptyBytes));
RemoveTransfer(imageNotFoundData.ImageID.ID);
_Transfers.TryRemove(imageNotFoundData.ImageID.ID, out _);
}
else
{
Logger.Log("Received an ImageNotFound packet for an image we did not request: " + imageNotFoundData.ImageID.ID, Helpers.LogLevel.Warning);
Logger.Log($"Received an ImageNotFound packet for an image we did not request: {imageNotFoundData.ImageID.ID}",
Helpers.LogLevel.Warning);
}
}
@@ -635,9 +617,8 @@ namespace OpenMetaverse
protected void ImagePacketHandler(object sender, PacketReceivedEventArgs e)
{
ImagePacketPacket image = (ImagePacketPacket)e.Packet;
TaskInfo task;
if (TryGetTransferValue(image.ImageID.ID, out task))
if (_Transfers.TryGetValue(image.ImageID.ID, out var task))
{
if (task.Transfer.Size == 0)
{
@@ -646,12 +627,12 @@ namespace OpenMetaverse
if (task.Transfer.Size == 0)
{
Logger.Log("Timed out while waiting for the image header to download for " +
task.Transfer.ID, Helpers.LogLevel.Warning, _Client);
Logger.Log($"Timed out while waiting for the image header to download for {task.Transfer.ID}",
Helpers.LogLevel.Warning, _Client);
task.TokenSource.Cancel();
RemoveTransfer(task.Transfer.ID);
_Transfers.TryRemove(task.Transfer.ID, out _);
foreach (TextureDownloadCallback callback in task.Callbacks)
callback(TextureRequestState.Timeout, new AssetTexture(task.RequestID, task.Transfer.AssetData));
@@ -698,7 +679,7 @@ namespace OpenMetaverse
task.Transfer.Success = true;
task.TokenSource.Cancel();
RemoveTransfer(task.Transfer.ID);
_Transfers.TryRemove(task.Transfer.ID, out _);
_Client.Assets.Cache.SaveAssetToCache(task.RequestID, task.Transfer.AssetData);
foreach (var callback in task.Callbacks)
callback(TextureRequestState.Finished, new AssetTexture(task.RequestID, task.Transfer.AssetData));
@@ -729,9 +710,8 @@ namespace OpenMetaverse
protected void ImageDataHandler(object sender, PacketReceivedEventArgs e)
{
ImageDataPacket data = (ImageDataPacket)e.Packet;
TaskInfo task;
if (TryGetTransferValue(data.ImageID.ID, out task))
if (_Transfers.TryGetValue(data.ImageID.ID, out var task))
{
// reset the timeout interval since we got data
task.Transfer.TimeSinceLastPacket = 0;
@@ -775,7 +755,7 @@ namespace OpenMetaverse
#endif
task.Transfer.Success = true;
task.TokenSource.Cancel();
RemoveTransfer(task.RequestID);
_Transfers.TryRemove(task.RequestID, out _);
_Client.Assets.Cache.SaveAssetToCache(task.RequestID, task.Transfer.AssetData);
@@ -802,17 +782,5 @@ namespace OpenMetaverse
}
#endregion
private bool TryGetTransferValue(UUID textureID, out TaskInfo task)
{
lock (_Transfers)
return _Transfers.TryGetValue(textureID, out task);
}
private bool RemoveTransfer(UUID textureID)
{
lock (_Transfers)
return _Transfers.Remove(textureID);
}
}
}