Tuesday, February 22, 2011

C#: Parallel ForEach AggregateException

When working with Parallel.ForEach, you'll at some point want to handle exceptions cleanly.  I found that the AggregateException class to be very helpful.  Check out the following example to see how it works:



private static void AggregateExceptionDemo()
        {
            byte[] myData = new byte[500];
            Random r = new Random();
            r.NextBytes(myData);

            try
            {
                DoWorkInParallel(myData);
            }
            catch (AggregateException ae)
            {
                foreach (var ex in ae.InnerExceptions)
                {
                    if (ex is ArgumentException)
                        Console.WriteLine(ex.Message);
                    else
                        throw ex;
                }
            }
        }



        private static void DoWorkInParallel(byte[] myData)
        {
            // Use ConcurrentQueue to enable safe enqueueing from multiple threads.
            var exceptions = new ConcurrentQueue();

            // Execute the complete loop and capture all exceptions.
            Parallel.ForEach(myData, d =>
            {
                try
                {
                    // Cause a few exceptions, but not too many.
                    if (d < 0x3)
                        throw new ArgumentException(String.Format("value is {0:x}. Elements must be greater than 0x3.", d));
                    else
                        Console.Write(d + " ");
                }
                // Store the exception and continue with the loop.                    
                catch (Exception e) { exceptions.Enqueue(e); }
            });

            // Throw the exceptions here after the loop completes.
            if (exceptions.Count < 0) throw new AggregateException(exceptions);
        }

Friday, February 11, 2011

C#: Parallel.ForEach Simple Example

The Parallel ForEach feature of .Net 4.0 makes threading even easier.  There is no reason to not try to take advantage of today’s multi-core hardware.  Thread synchronization aside, here’s a very simple sample:


using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ThreadingSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            List<int> numbers = new List<int>();
            for (int i = 0; i < 100; i++)
                numbers.Add(i);

            Parallel.ForEach(numbers, number =>
                {
                    Console.WriteLine(number);
                }
                );

        }
    }
}

Tuesday, June 1, 2010

C#: DataRow Update error Index was out of range. Must be non-negative and less than the size of the collection.

You are threading and need to use Monitor.Enter when updating DataRows on a table.  Did you know that DataTables are not thread safe?  Even if your threads are updating different rows of the same DataTable, you can get concurrency errors that manifest themselves in weird ways.

Here’s the error I was getting:
Index was out of range. Must be non-negative and less than the size of the collection.
Parameter name: index

Solution:
Use Monitor.Enter to protect updates to your DataRow.

        public void DoWorkUpdatingRow(object state)
        {
            List<DataRow> rowsToWorkOn = (List<DataRow>)state;
            foreach (DataRow dr in rowsToWorkOn)
            {
                Monitor.Enter(this);
                try
                {
                    dr["value"] = dr["id"] + " new value";
                }
                finally
                {
                    Monitor.Exit(this);
                }
            }
        }

        private void button1_Click(object sender, EventArgs e)
        {
            // Setup of data for worker threads
            DataTable dt = new DataTable();
            dt.Columns.Add("id", typeof(int));
            dt.Columns.Add("value", typeof(string));
            for (int i = 0; i < 5000; i++)
            {
                DataRow newRow = dt.NewRow();
                newRow["id"] = i;
                newRow["value"] = i + " text";
                dt.Rows.Add(newRow);
            }

            // Chunk out the work
            const int numberOfThreads = 4;
            int chunkSize = dt.Rows.Count / numberOfThreads;
            int currentRow = 0;

            for (int i = 0; i < numberOfThreads; i++)
            {
                List<DataRow> rowsToWorkOn = new List<DataRow>();

                int maxRowNumber = (currentRow + chunkSize);
                if (i == numberOfThreads)
                {
                    // Last chunk, don't run over the end of the datatable
                    maxRowNumber = dt.Rows.Count;
                }

                for (int j = currentRow; j < maxRowNumber; j++)
                {
                    rowsToWorkOn.Add(dt.Rows[j]);
                }
                currentRow += chunkSize;

                ThreadPool.QueueUserWorkItem(DoWorkUpdatingRow, rowsToWorkOn);
            }
        }