Improved OpenMetaverse.Parallel by only using a single AutoResetEvent per loop (fewer context switches into kernel space) and allowing exceptions in method bodies to propagate back to the caller

git-svn-id: http://libopenmetaverse.googlecode.com/svn/libopenmetaverse/trunk@3043 52acb1d6-8a22-11de-b505-999d5b087335
This commit is contained in:
John Hurliman
2009-07-31 22:39:37 +00:00
parent ee7bb90907
commit 6bb96a6e28

View File

@@ -57,35 +57,40 @@ namespace OpenMetaverse
/// <param name="body">Method body to run for each iteration of the loop</param>
public static void For(int threadCount, int fromInclusive, int toExclusive, Action<int> body)
{
AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount];
int counter = threadCount;
AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
Exception exception = null;
--fromInclusive;
for (int i = 0; i < threadCount; i++)
{
threadFinishEvent[i] = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(
delegate(object o)
{
int threadIndex = (int)o;
while (true)
while (exception == null)
{
int currentIndex = Interlocked.Increment(ref fromInclusive);
if (currentIndex >= toExclusive)
break;
body(currentIndex);
try { body(currentIndex); }
catch (Exception ex) { exception = ex; break; }
}
threadFinishEvent[threadIndex].Set();
if (Interlocked.Decrement(ref counter) == 0)
threadFinishEvent.Set();
}, i
);
}
for (int i = 0; i < threadCount; i++)
threadFinishEvent[i].WaitOne();
threadFinishEvent.WaitOne();
if (exception != null)
throw exception;
}
/// <summary>
@@ -108,20 +113,20 @@ namespace OpenMetaverse
/// <param name="body">Method body to run for each object in the collection</param>
public static void ForEach<T>(int threadCount, IEnumerable<T> enumerable, Action<T> body)
{
AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount];
int counter = threadCount;
AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
IEnumerator<T> enumerator = enumerable.GetEnumerator();
object syncRoot = new Object();
Exception exception = null;
for (int i = 0; i < threadCount; i++)
{
threadFinishEvent[i] = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(
delegate(object o)
{
int threadIndex = (int)o;
while (true)
while (exception == null)
{
T entry;
@@ -132,16 +137,20 @@ namespace OpenMetaverse
entry = (T)enumerator.Current; // Explicit typecast for Mono's sake
}
body(entry);
try { body(entry); }
catch (Exception ex) { exception = ex; break; }
}
threadFinishEvent[threadIndex].Set();
if (Interlocked.Decrement(ref counter) == 0)
threadFinishEvent.Set();
}, i
);
}
for (int i = 0; i < threadCount; i++)
threadFinishEvent[i].WaitOne();
threadFinishEvent.WaitOne();
if (exception != null)
throw exception;
}
/// <summary>
@@ -160,35 +169,39 @@ namespace OpenMetaverse
/// <param name="actions">A series of method bodies to execute</param>
public static void Invoke(int threadCount, params Action[] actions)
{
AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount];
int counter = threadCount;
AutoResetEvent threadFinishEvent = new AutoResetEvent(false);
int index = -1;
Exception exception = null;
for (int i = 0; i < threadCount; i++)
{
threadFinishEvent[i] = new AutoResetEvent(false);
ThreadPool.QueueUserWorkItem(
delegate(object o)
{
int threadIndex = (int)o;
while (true)
while (exception == null)
{
int currentIndex = Interlocked.Increment(ref index);
if (currentIndex >= actions.Length)
break;
actions[currentIndex]();
try { actions[currentIndex](); }
catch (Exception ex) { exception = ex; break; }
}
threadFinishEvent[threadIndex].Set();
if (Interlocked.Decrement(ref counter) == 0)
threadFinishEvent.Set();
}, i
);
}
for (int i = 0; i < threadCount; i++)
threadFinishEvent[i].WaitOne();
threadFinishEvent.WaitOne();
if (exception != null)
throw exception;
}
}
}