Timer + Producer-Consumer

I am new to threading and I am trying to learn different concepts.

Right now I am doing a Producer/Consumer pattern with a Timer Thread. The Problem is I have no idea on how to check if all producer and consumer threads finished their processes before letting the Timer thread tick for a certain time and dispose all created producer and consumer thread for the next tick.

Would like to ask for your help and guidance on how to create a work-around for this approach.

Here is my sample code:

public class WorkerThread
{
    public BlockingQueue<Item> collection = new BlockingQueue<Item>(100);

    private Timer TimerThread { get; set; }

    public void ThreadTimer()
    {
        this.TimerThread = new Timer(new TimerCallback(StartMonitor), null, 500, Timeout.Infinite);
    }

    public void StartMonitor(object state)
    {
        List<Thread> producers = new List<Thread>();
        List<Thread> consumers = new List<Thread>();

        for (int i = 0; i < 1; i++)
        {
            producers.Add(new Thread(() => RunProducers(this.collection)));
        }

        //TODO: Start all producer threads...

        for (int i = 0; i < 2; i++)
        {
            consumers.Add(new Thread(() => RunConsumers(this.collection)));
        }

        //TODO: Start all consumer threads...

        //TODO: Let Thread wait until all worker threads are done
        //TODO: Dispose Threads

        TimerThread.Change(5000, Timeout.Infinite);

    }



    public void RunProducers(BlockingQueue<Item> collection)
    {
        List<Item> lsItems = CreateListOfItems();

        foreach(var item in lsItems)
        {
            collection.Add(item);
        }

    }

    public void RunConsumers(BlockingQueue<Item> collection) 
    {
        while(true)
        {
            Item item = collection.Take();
            Console.WriteLine("Processed[{0}] : {1}", item.ID, item.Name);
            //Thread.Sleep(100);
        }
    }

    public List<Item> CreateListOfItems()
    {
        List<Item> lsItems = new List<Item>();
        for (int i = 0; i <= 9999; i++)
        {
            lsItems.Add(new Item() { ID = i, Name = "Item[" + i + "]" });
        }
        return lsItems;
    }

}

BlockCollection Implementation (Since our environment is in .Net 3.5 we can't use libraries on the higher verions).

public class BlockingQueue<T> 
{
    private readonly Queue<T> queue = new Queue<T>();
    private readonly int MaxSize;
    public bool closing;

    public BlockingQueue(int maxSize) {
        this.MaxSize = maxSize;
    }

    public void Add(T item) 
    {
        lock(queue)
        {
            while(queue.Count >= this.MaxSize)
            {
                Monitor.Wait(queue);
            }

            queue.Enqueue(item);
            if(queue.Count == 1)
            {
                Monitor.PulseAll(queue);
            }

        }
    }

    public T Take() 
    {
        lock(queue)
        {
            while(queue.Count == 0)
            {
                Monitor.Wait(queue);
            }

            T item = queue.Dequeue();
            if(queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return item;
        }
    }

    public void Close() 
    {
        lock (queue)
        {
            closing = true;
            Monitor.PulseAll(queue);
        }
    }

    public bool TryDequeue(out T value)
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                if (closing)
                {
                    value = default(T);
                    return false;
                }
                Monitor.Wait(queue);
            }
            value = queue.Dequeue();
            if (queue.Count == MaxSize - 1)
            {
                Monitor.PulseAll(queue);
            }
            return true;
        }
    }
}

1 answer

  • answered 2018-11-08 08:06 Alexander Kiselev

    You can just check property IsAlive of all of worker threads. It seems as not very clear code, but it works:

    public void StartMonitor(object state)
    {
            List<Thread> producers = new List<Thread>();
            List<Thread> consumers = new List<Thread>();
    
            for (int i = 0; i < 1; i++)
            {
                producers.Add(new Thread(() => RunProducers(this.collection)));
            }
    
            //TODO: Start all producer threads...
    
            for (int i = 0; i < 2; i++)
            {
                consumers.Add(new Thread(() => RunConsumers(this.collection)));
            }
    
           //TODO: Let Thread wait until all worker threads are done
            List<Thread> to_check = new List<Thread>(producers);
            to_check.AddRange(consumers);
    
            while(true)
            {
                Thread.Sleep(50);
                List<Thread> is_alive = new List<Thread>();
                foreach(Thread t in to_check)
                     if(t.IsAlive)
                         is_alive.Add(t);
                if(is_alive.Count == 0)
                    break;
                to_check = is_alive;
            }
            //TODO: Dispose Threads
    
            TimerThread.Change(5000, Timeout.Infinite);
    
    }
    

    Or, maybe somewhat better way:

        private int[] _counter = new int[1];
        private int Counter
        {
            get 
            {
                lock (_counter) { return _counter[0]; }
            }
            set 
            {
                lock (_counter) { _counter[0] = value; }
            }
    
        }
    
        public void StartMonitor(object state)
        {
            List<Thread> producers = new List<Thread>();
            List<Thread> consumers = new List<Thread>();
            Counter = 0;
            for (int i = 0; i < 1; i++)
            {
                producers.Add(new Thread(() => { Counter++; RunProducers(this.collection); Counter--; }));
            }
    
            //TODO: Start all producer threads...
    
            for (int i = 0; i < 2; i++)
            {
                consumers.Add(new Thread(() => { Counter++; RunConsumers(this.collection); Counter--; }));
            }
    
            //TODO: Let Thread wait until all worker threads are done
            List<Thread> to_check = new List<Thread>(producers);
            to_check.AddRange(consumers);
    
            while (Counter > 0)
                Thread.Sleep(50);
    
            //TODO: Dispose Threads
    
            TimerThread.Change(5000, Timeout.Infinite);
    
        }
    

    And to avoid using of Sleep() you can use a Barrier class:

        public void StartMonitor(object state)
        {
            List<Thread> producers = new List<Thread>();
            List<Thread> consumers = new List<Thread>();
            int producer_cnt = 1,
                consumer_cnt = 2;
    
            Barrier b = new Barrier(producer_cnt + consumer_cnt + 1);
            try
            {
                for (int i = 0; i < 1; i++)
                {
                    producers.Add(new Thread(() => { try { RunProducers(this.collection); } finally { b.SignalAndWait(); } }));
                }
    
                //TODO: Start all producer threads...
    
                for (int i = 0; i < 2; i++)
                {
                    consumers.Add(new Thread(() => { try { RunConsumers(this.collection); } finally { b.SignalAndWait(); } }));
                }
    
                //TODO: Let Thread wait until all worker threads are done
                List<Thread> to_check = new List<Thread>(producers);
                to_check.AddRange(consumers);
            }
            finally
            {
                b.SignalAndWait();
            }
            //TODO: Dispose Threads
    
            TimerThread.Change(5000, Timeout.Infinite);
    
        }