From 6bb96a6e28323f4da73b7c9d814dc5763d46e68e Mon Sep 17 00:00:00 2001 From: John Hurliman Date: Fri, 31 Jul 2009 22:39:37 +0000 Subject: [PATCH] Improved OpenMetaverse.Parallel by only using a single AutoResetEvent per loop (fewer context switches into kernel space) and allowing exceptions in method bodies to propagate back to the caller git-svn-id: http://libopenmetaverse.googlecode.com/svn/libopenmetaverse/trunk@3043 52acb1d6-8a22-11de-b505-999d5b087335 --- OpenMetaverseTypes/Parallel.cs | 61 +++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 24 deletions(-) diff --git a/OpenMetaverseTypes/Parallel.cs b/OpenMetaverseTypes/Parallel.cs index ca698aa7..62b2cbfa 100644 --- a/OpenMetaverseTypes/Parallel.cs +++ b/OpenMetaverseTypes/Parallel.cs @@ -57,35 +57,40 @@ namespace OpenMetaverse /// Method body to run for each iteration of the loop public static void For(int threadCount, int fromInclusive, int toExclusive, Action body) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); + Exception exception = null; + --fromInclusive; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { int currentIndex = Interlocked.Increment(ref fromInclusive); if (currentIndex >= toExclusive) break; - body(currentIndex); + try { body(currentIndex); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } /// @@ -108,20 +113,20 @@ namespace OpenMetaverse /// Method body to run for each object in the collection public static void ForEach(int threadCount, IEnumerable enumerable, Action body) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); IEnumerator enumerator = enumerable.GetEnumerator(); object syncRoot = new Object(); + Exception exception = null; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { T entry; @@ -132,16 +137,20 @@ namespace OpenMetaverse entry = (T)enumerator.Current; // Explicit typecast for Mono's sake } - body(entry); + try { body(entry); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } /// @@ -160,35 +169,39 @@ namespace OpenMetaverse /// A series of method bodies to execute public static void Invoke(int threadCount, params Action[] actions) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); int index = -1; + Exception exception = null; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { int currentIndex = Interlocked.Increment(ref index); if (currentIndex >= actions.Length) break; - actions[currentIndex](); + try { actions[currentIndex](); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } } }