diff --git a/OpenMetaverseTypes/Parallel.cs b/OpenMetaverseTypes/Parallel.cs index ca698aa7..62b2cbfa 100644 --- a/OpenMetaverseTypes/Parallel.cs +++ b/OpenMetaverseTypes/Parallel.cs @@ -57,35 +57,40 @@ namespace OpenMetaverse /// Method body to run for each iteration of the loop public static void For(int threadCount, int fromInclusive, int toExclusive, Action body) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); + Exception exception = null; + --fromInclusive; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { int currentIndex = Interlocked.Increment(ref fromInclusive); if (currentIndex >= toExclusive) break; - body(currentIndex); + try { body(currentIndex); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } /// @@ -108,20 +113,20 @@ namespace OpenMetaverse /// Method body to run for each object in the collection public static void ForEach(int threadCount, IEnumerable enumerable, Action body) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); IEnumerator enumerator = enumerable.GetEnumerator(); object syncRoot = new Object(); + Exception exception = null; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { T entry; @@ -132,16 +137,20 @@ namespace OpenMetaverse entry = (T)enumerator.Current; // Explicit typecast for Mono's sake } - body(entry); + try { body(entry); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } /// @@ -160,35 +169,39 @@ namespace OpenMetaverse /// A series of method bodies to execute public static void Invoke(int threadCount, params Action[] actions) { - AutoResetEvent[] threadFinishEvent = new AutoResetEvent[threadCount]; + int counter = threadCount; + AutoResetEvent threadFinishEvent = new AutoResetEvent(false); int index = -1; + Exception exception = null; for (int i = 0; i < threadCount; i++) { - threadFinishEvent[i] = new AutoResetEvent(false); - ThreadPool.QueueUserWorkItem( delegate(object o) { int threadIndex = (int)o; - while (true) + while (exception == null) { int currentIndex = Interlocked.Increment(ref index); if (currentIndex >= actions.Length) break; - actions[currentIndex](); + try { actions[currentIndex](); } + catch (Exception ex) { exception = ex; break; } } - threadFinishEvent[threadIndex].Set(); + if (Interlocked.Decrement(ref counter) == 0) + threadFinishEvent.Set(); }, i ); } - for (int i = 0; i < threadCount; i++) - threadFinishEvent[i].WaitOne(); + threadFinishEvent.WaitOne(); + + if (exception != null) + throw exception; } } }