/*
* BlockingQueue.h
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#ifndef BLOCKINGQUEUE_H_
#define BLOCKINGQUEUE_H_
#include queue
#include pthread.h
typedef void* CommonItem;
class BlockingQueue
{
public:
BlockingQueue();
virtual ~BlockingQueue();
int peek(CommonItem &item);
int append(CommonItem item);
private:
pthread_mutex_t _mutex;
pthread_cond_t _cond;
std::queueCommonItem _read_queue;
std::queueCommonItem _write_queue;
};
#endif /* BLOCKINGQUEUE_H_ */
BlockingQueue.cpp 文件代码
代码如下:
/*
* BlockingQueue.cpp
*
* Created on: Apr 19, 2013
* Author: archy_yu
*/
#include "BlockingQueue.h"
BlockingQueue::BlockingQueue()
{
pthread_mutex_init(&this-_mutex,NULL);
pthread_cond_init(&this-_cond,NULL);
}
BlockingQueue::~BlockingQueue()
{
pthread_mutex_destroy(&this-_mutex);
pthread_cond_destroy(&this-_cond);
}
int BlockingQueue::peek(CommonItem &item)
{
if( !this-_read_queue.empty() )
{
item = this-_read_queue.front();
this-_read_queue.pop();
}
else
{
pthread_mutex_lock(&this-_mutex);
while(this-_write_queue.empty())
{
pthread_cond_wait(&this-_cond,&this-_mutex);
}
while(!this-_write_queue.empty())
{
this-_read_queue.push(this-_write_queue.front());
this-_write_queue.pop();
}
pthread_mutex_unlock(&this-_mutex);
}
return 0;
}
int BlockingQueue::append(CommonItem item)
{
pthread_mutex_lock(&this-_mutex);
this-_write_queue.push(item);
pthread_cond_signal(&this-_cond);
pthread_mutex_unlock(&this-_mutex);
return 0;
}
测试代码:
代码如下:
BlockingQueue _queue;
void* process(void* arg)
{
int i=0;
while(true)
{
int *j = new int();
*j = i;
_queue.append((void *)j);
i ++;
}
return NULL;
}
int main(int argc,char** argv)
{
pthread_t pid;
pthread_create(&pid,0,process,0);
long long int start = get_os_system_time();
int i = 0;
while(true)
{
int* j = NULL;
_queue.peek((void* &)j);
i ++;
if(j != NULL && (*j) == 100000)
{
long long int end = get_os_system_time();
printf("consume %dn",end - start);
break;
}
}
return 0;
}