译自:
在多线程应用程序中,要求消息输入队列和消息输出队列顺序要求保持一致,而忽略多线程并发处理的顺序,这种情况是比较难处理的。在本文中,作者设计了一种新型解决方案:PRQueue(预留位置队列),较很好的解决这个问题。
PRQueue是使用c++的两个STL的deque还有pthread线程库实现的,并且在例子中使用了两个简单的类-Mutex和Lock来展示这个逻辑。StringMsg类表现一个样本消息,QueueTest类用来测试。
我选择STL的deque是因为deque拥有很多必要的操作(包括operator[])来实现PRQueue。特别的,有一点很重要的是push_back和pop_front()操作对于deque的元素的指针或引用来说都是有效地。
这里有一个简单的例子来展示PRQueue的作用。首先,我们需要记录大量的多域的消息流。转化文本字符串的数字域是一个慢的且并不关键的过程,因此我们决定将这份任务分派给能够生成日志的线程来做。处理的流程如下图所示:
因为消息的核心处理过程是发生在多线程中,消息的就绪顺序也许跟原始的输入队列不同。例如:如果一个线程从输入队列中拿走了一个报文消息后进入休眠状态,另外一个线程取出下一个报文消息,在第一个线程之前就运行完成处理这个报文消息,并把这个报文放入输出队列中。因此,这输出日志也许会无序了(我们假设消息被处理完之后才日志)。
使用PRQueue 则以上这种场景就能避免,它将会确保在输出队列中报文的顺序和输入队列中保持一致,而不管线程处理报文的顺序如何。PRQueue的基本逻辑比较简单,当下一个报文从输入队列中取出的时候,仍然放入锁中,下一个push_back的位置。然后释放锁,并且继续处理。在这个消息报文完全处理完之后,先前请求的位置用来把这个消息放入输出队列。
PRQueue 使用两个队列构造:'data’ 和'filled’。
'filled’队列的一个元素使用数据填充并且能够从PRQueue弹出。一个封装类DataQueue是'data’和'filled’队列的装载器。 这种设计允许我们在确切的实现过程中分离线程安全代码,因此用户不需要关心任何锁/解锁的逻辑。
下面让我们更详细的讨论PRQUEUE。
PRQueue方法主要做两件事情:它从输入队列中弹出数据,并且在输出队列中保留一个位置。PUSH方法使用之前保存的位置在输出队列中保存数据。
为了使用多线程测试PRQUEUE,PROCESS_MSG执行。它从输入队列中取出一个StringMsg,通过调用StringMsg::process()方法来处理这个消息并且push这个报文。
//------------static void* process_msg( void* arg){ int thidx = ++Thidx; QueueTest* quetest =(QueueTest*)arg; Msg* msg; PRQueue< Msg*>::position pos; cout << "Input thread=" << thidx << " started" << endl; for( ;;) { // Wait for next message appeared in input queue, // pop it up and get push position allocated in output queue quetest->input_que.pop( msg, quetest->output_que, pos); // Process message msg->process( thidx); // Push processed message into output queue using acquired position quetest->output_que.push( msg, pos); } return NULL;}//POP方法不仅等待输入队列中下一个报文的到来,也通过查看’filled’队列中元素来检查这个报文是否准备弹出了。如果数据还没有填充,pop将继续阻塞等待
POP方法的处理逻辑:
1. 锁定输入队列
2. 如果输入队列非空并且顶层元素填充了数据,则pop它(否则释放掉锁并继续睡眠)
3. 锁住输出队列
4. 保留输出队列底部位置
5. 解锁输出队列
6. 解锁输入队列
代码段如下:
// Pop data from input queue and reserve position in the output queue void pop( DT& data, PRQueue& outque, position& pos) { Lock lk( m_mux); while( true) { if( m_que.pop( data)) break; wait_while_empty(); } outque.reserve_pos( pos); }PUSH方法拷贝数据到输出队列的保留位置并且设置'filled’指示为真。它也通过发送一个通知信号来释放掉等待一个条件变量的线程。
代码段如下:
// Simple push void push( const DT& data) { Lock lk( m_mux); m_que.push( data); notify_not_empty(); }
现在,这个消息报文按序的到达了输出队列,如果我们想要更深的扩展我们的处理链的话,可以在后面再加上一个PRQueue。在以上的测试用例中我们不会这样做:我们使用一个单一的线程简单的从输出队列中读取处理完的报文并将它们打印出来。在最后的一步,只简单的使用了pop方法(未使用第二、第三个参数:指向输出队列和保留的位置的值)。
//------------static void* print_msg( void* arg){ QueueTest* quetest =(QueueTest*)arg; Msg* msg; cout << "Output thread started" << endl; for( ;;) { quetest->output_que.pop( msg); msg->print(); delete msg; } return NULL;}//总结
在多线程应用程序中,当处理的消息流顺序需要保证的时候,本文所说的预留位置队列将会是有用的。PRQueue将会确保输出队列中报文顺序同输入队列保持一致,因为在输出队列中下一个push_back的位置在输入队列取出报文的时候就同步的保留了。当报文消息处理完成之后,所保留的位置随后将会被数据填充。
注:完整代码于此处下载:(译此文时,时间较仓促,因此译文很粗糙,待时间较宽松时再细细校验)。