生产者-消费者方案是多线程应用程序开发中最常用的构造之一 ― 因此困难也在于此。因为在一个应用程序中可以多次重复生产者-消费者行为,其代码也可以如此。软件开发人员 Ze'ev Bubis 和 Saffi Hartal 创建了 Consumer 类,该类通过在一些多线程应用程序中促进代码重用以及简化代码调试和维护来解决这个问题。请通过单击本文顶部或底部的 讨论来参与本文的 论坛,与作者和其他读者分享您的想法。
多线程应用程序通常利用生产者-消费者编程方案,其中由生产者线程创建重复性作业,将其传递给作业队列,然后由消费者线程处理作业。虽然这种编程方法很有用,但是它通常导致重复的代码,这对于调试和维护可能是真正的问题。
为了解决这个问题并促进代码重用,我们创建了 Consumer 类。 Consumer 类包含所有用于作业队列和消费者线程的代码,以及使这两者能够结合在一起的逻辑。这使我们可以专注于业务逻辑 ― 关于应该如何处理作业的细节 ― 而不是专注于编写大量冗余的代码。同时,它还使得调试多线程应用程序的任务变得更为容易。
在本文中,我们将简单观察一下多线程应用程序开发中公共线程用法,同时,解释一下生产者-消费者编程方案,并研究一个实际的示例来向您演示 Consumer 类是如何工作的。请注意,对于多线程应用程序开发或消费者-生产者方案,本文不作深入介绍;有关那些主题,请参阅 参考资料获取文章的清单。
多线程基础知识
多线程是一种使应用程序能同时处理多个操作的编程技术。通常有两种不同类型的多线程操作使用多个线程:
适时事件,当作业必须在特定的时间或在特定的间隔内调度执行时
后台处理,当后台事件必须与当前执行流并行处理或执行时
(本文来源于图老师网站,更多请访问http://m.tulaoshi.com/bianchengyuyan/)适时事件的示例包括程序提醒、超时事件以及诸如轮询和刷新之类的重复性操作。后台处理的示例包括等待发送的包或等待处理的已接收的消息。
生产者-消费者关系
生产者-消费者方案很适合于后台处理类别的情况。这些情况通常围绕一个作业生产者方和一个作业消费者方。当然,关于作业并行执行还有其它考虑事项。在大多数情况下,对于使用同一资源的作业,应以先来先服务的方式按顺序处理,这可以通过使用单线程的消费者轻松实现。通过使用这种方法,我们使用单个线程来访问单个资源,而不是用多个线程来访问单个资源。
要启用标准消费者,当作业到来时创建一个作业队列来存储所有作业。生产者线程通过将新对象添加到消费者队列来交付这个要处理的新对象。然后消费者线程从队列取出每个对象,并依次处理。当队列为空时,消费者进入休眠。当新的对象添加到空队列时,消费者会醒来并处理该对象。因为大多数应用程序喜欢顺序处理方式,所以消费者通常是单线程的。
问题:代码重复
因为生产者-消费者方案很常用,所以在构建应用程序时它可能会出现几次,这导致了代码重复。我们认识到,这显示了在应用程序开发过程期间多次使用了生产者-消费者方案的问题。
当第一次需要生产者-消费者行为时,通过编写一个采用一个线程和一个队列的类来实现该行为。当第二次需要这种行为时,我们着手从头开始实现它,但是接着认识到以前已经做过这件事了。我们复制了代码并修改了处理对象的方式。当第三次在该应用程序中实现生产者-消费者行为时,很明显我们复制了太多代码。我们决定,需要一个适用的 Consumer 类,它将处理我们所有的生产者-消费者方案。
我们的解决方案:Consumer 类
我们创建 Consumer 类的目的是:在我们的应用程序中,消除这种代码重复 ― 为每个生产者-消费者实例编写一个新作业队列和消费者线程来解决这个问题。有了适当的 Consumer 类,我们所必须做的只是编写专门用于作业处理(业务逻辑)的代码。这使得我们的代码更清晰、更易于维护以及更改起来更灵活。
我们对 Consumer 类有如下需求:
重用:我们希望这个类包括所有东西。一个线程、一个队列以及使这两者结合在一起的所有逻辑。这将使我们只须编写队列中消费特定作业的代码。(因而,例如,程序员使用 Consumer 类时,将重载 onConsume(ObjectjobToBeConsumed) 方法。)
队列选项:我们希望能够设置将由 Consumer 对象使用的队列实现。但是,这意味着我们必须确保队列是线程安全的或使用一个不会与消费操作冲突的单线程生产者。无论使用哪种方法,都必须将队列设计成允许不同的进程能访问其方法。
Consumer 线程优先级:我们希望能够设置 Consumer 线程运行的优先级。
Consumer 线程命名:线程拥有一个有意义的名称会比较方便,当然这的确有助于调试。例如,如果您向 Java 虚拟机发送了一个信号,它将生成一个完整的线程转储 ― 所有线程及其相应堆栈跟踪的快照。要在 Windows 平台上生成这个线程转储,您必须在 Java 程序运行的窗口中按下键序列 ctrlbreak ,或者单击窗口上的关闭按钮。有关如何使用完整的线程转储来诊断 Java 软件问题的更多信息,请参阅 参考资料。
类代码
在 getThread() 方法中,我们使用惰性创建来创建 Consumer 的线程,如清单 1 所示:
清单 1. 创建 Consumer 的线程
/**
* Lazy creation of the Consumer's thread.
*
* @return the Consumer's thread
*/
private Thread getThread()
{
if (_thread==null)
{
_thread = new Thread()
{
public void run()
{
Consumer.this.run();
}
};
}
return _thread;
该线程的 run() 方法运行 Consumer 的 run() 方法,它是主消费者循环,如清单 2 所示:
(本文来源于图老师网站,更多请访问http://m.tulaoshi.com/bianchengyuyan/)清单 2. run() 方法是主 Consumer 循环
/**
* Main Consumer's thread method.
*/
private void run()
{
while (!_isTerminated)
{
// job handling loop
while (true)
{
Object o;
synchronized (_queue)
{
if (_queue.isEmpty())
break;
o = _queue.remove();
}
if (o == null)
break;
onConsume(o);
}
// if we are not terminated and the queue is still empty
// then wait until new jobs arrive.
synchronized(_waitForJobsMonitor)
{
if (_isTerminated)
break;
if(_queue.isEmpty())
{
try
{
_waitForJobsMonitor.wait();
}
catch (InterruptedException ex)
{
}
}
}
}
}// run()
基本上, Consumer 的线程一直运行,直到队列中不再有等待的作业为止。然后它进入休眠,只在第一次调用 add(Object) 时醒来,该方法向队列添加一个新作业并踢醒该线程。
使用 wait() 和 notify() 机制来完成睡眠和踢。实际的消费者工作由 OnConsume(Object) 方法处理,如清单 3 所示:
清单 3. 唤醒和通知 Consumer
/**
* Add an object to the Consumer.
* This is the entry point for the producer.
* After the item is added, the Consumer's thread
* will be notified.
*
* @param the object to be 'consumed' by this consumer
*/
public void add(Object o)
{
_queue.add(o);
kickThread();
}
/**
* Wake up the thread (without adding new stuff to consume)
*
*/
public void kickThread()
{
if (!this._thread.isInterrupted())
{
synchronized(_waitForJobsMonitor)
{
_waitForJobsMonitor.notify();
}
}
}
示例:MessagesProcessor
为了向您展示 Consumer 类是如何工作的,我们将使用一个简单示例。 MessagesProcessor 类以异步方式处理进入的消息(也就是说,不干扰调用线程)。其工作是在每个消息到来时打印它。 MessagesProcessor 具有一个处理到来的消息作业的内部 Consumer 。当新作业进入空队列时, Consumer 调用 processMessage(String) 方法来处理它,如清单 4 所示: