上次我们使用AutoResetEvent实现了一个生产/消费者队列。这一次我们要使用Wait和Pulse方法来实现一个更强大的版本,它允许多个消费者,每一个消费者都在自己的线程中运行。
我们使用数组来跟踪线程。
Thread[] _workers;
通过跟踪线程可以让我们在所有的线程都结束后再结束我们的队列任务。
每一个消费者线程都执行一个叫做Consume的方法,在一个for循环中,我们可以创建和启动线程。例如:
Public delegate void Action();
为了表示一系列的任务,我们使用Queue<T> 集合,例如:
Queue<Action> _itemQ = new Queue<Action>();
在我们调用生产(EnqueueItem)和消费(Consume)方法前,还是完整的看一看代码吧:
public void Shutdown(bool waitForWorkers)
{
//为每一个线程插入一个null item,可以是每一个worker 退出
foreach (Thread worker in _workers)
EnqueueItem(null);
//等待所有的线程退出。
if (waitForWorkers)
foreach (Thread worker in _workers)
worker.Join();
}
public void EnqueueItem(Action item)
{
lock (_locker)
{
_itemQ.Enqueue(item);
Monitor.Pulse(_locker); //通知等待队列中的线程
}
}
void Consume()
{
while (true)
{
Action item;
lock (_locker)
{
while (_itemQ.Count == 0)
{
Monitor.Wait(_locker); //释放锁,并阻止当前线程,直到其他线程发送pulse信号。 }
item = _itemQ.Dequeue();
}
if (item == null) return; //退出的信号
item();
}
}
}
下面是Main方法。使用两个consumer线程,然后让这两个consumers执行10个委托。
for (int i = 0; i < 10; i++)
{
int itemNumber = i;
q.EnqueueItem(() =>
{
Thread.Sleep(1000); //模拟耗时的工作
Console.WriteLine(" Task " + itemNumber);
});
}
q.Shutdown(true); //等待关闭
Console.WriteLine();
Console.WriteLine("Workers complete!");
}
下面让我们细致的看一看EnqueueItem方法:
因为我们插入了一个新的任务,我们必须修改阻塞条件,也就是调用pulse方法,来唤醒调用了wait方法的线程。
出于对效率的考虑,当插入一个Item的时候使用Pulse来代替PulseAll方法,因为大部分时候每一个Item只需要一个consumer来执行。如果你有一个冰淇淋,你不可能叫30个睡眠的孩子都起来吃它,同样,对于一个item,同时唤醒30个consumers一点好处都没有。
让我们再看看Consumer方法。
我们希望当没什么事情做的时候,线程阻塞就可以了,换句话说,队列中没有item的时候,线程就应该阻塞。因此我们的阻塞条件是_itemQ.Count ==0;
if (item == null) return; //退出的信号
item();
Wait Timeouts
在调用Wait方法的时候可以传递一个毫秒或Timespan的时间来设置超时。如果Wait超时了,那么Wait方法就会返回false。
带有超时功能的Wait方法的主要步骤:
释放锁。
阻塞 直到 pulsed 或者超时。
重新获取锁。
超时就好像CLR 在超时到了的时候自动的调用了 pulse方法一样。
下面是使用超时的Wait的主要代码:
lock(_locker)
while(<阻塞条件>)
Monitor.Wait(_locker,<超时时间>);
Monitor.Wait 方法返回一个bool值来代表是调用了pulse还是已经超时了。
如果是true: 代表调用了pulse。
如果是false:代表超时了。
这对记录日志很有用。
新闻热点
疑难解答