多线程条件变量
main.cpp
xthread.h
xthread.cpp
xmsg_server.h
xmsg_server.cpp
#include "xmsg_server.h"
#include <sstream>
#include <iostream>
using namespace std;
int main(int argc, char* argv[])
{
XMsgServer server;
server.Start();
for (int i = 0; i < 10; i++)
{
stringstream ss;
ss << " msg : " << i + 1;
server.SendMsg(ss.str());
this_thread::sleep_for(500ms);
}
server.Stop();
cout << "Server stoped!" << endl;
return 0;
}
#pragma once
#include <thread>
class XThread
{
public:
//启动线程
virtual void Start();
//设置线程退出标志 并等待
virtual void Stop();
//等待线程退出(阻塞)
virtual void Wait();
//线程是否退出
bool is_exit();
protected:
bool is_exit_ = false;
private:
//线程入口
virtual void Main() = 0;
std::thread th_;
};
#include "xthread.h"
using namespace std;
//启动线程
void XThread::Start()
{
is_exit_ = false;
th_ = thread(&XThread::Main, this);
}
//设置线程退出标志 并等待
void XThread::Stop()
{
is_exit_ = true;
Wait();
}
//等待线程退出(阻塞)
void XThread::Wait()
{
// 判断线程是否调用过join
if (th_.joinable())
// 主线程需要等待该线程执行完成之后再结束
th_.join();
}
//线程是否退出
bool XThread::is_exit()
{
return is_exit_;
}
#pragma once
#include "xthread.h"
#include <string>
#include <list>
#include <mutex>
class XMsgServer:public XThread
{
public:
//给当前线程发消息
void SendMsg(std::string msg);
void Stop() override;
private:
//处理消息的线程入口函数
void Main() override;
//消息队列缓冲
std::list<std::string> msgs_;
//互斥访问消息队列
std::mutex mux_;
std::condition_variable cv_;
};
#include "xmsg_server.h"
#include <iostream>
using namespace std;
using namespace this_thread;
void XMsgServer::Stop()
{
is_exit_ = true;
cv_.notify_all();
Wait();
}
//处理消息的线程入口函数
void XMsgServer::Main()
{
while (!is_exit())
{
//sleep_for(10ms);
unique_lock<mutex> lock(mux_);
cv_.wait(lock, [this]
{
cout << "wait cv" << endl;
if (is_exit())return true;
return !msgs_.empty();
});
while (!msgs_.empty())
{
//消息处理业务逻辑
cout << "recv : " << msgs_.front() << endl;
msgs_.pop_front();
}
}
}
//给当前线程发消息
void XMsgServer::SendMsg(std::string msg)
{
unique_lock<mutex> lock(mux_);
msgs_.push_back(msg);
cout << "send:" << endl;
lock.unlock();
cv_.notify_one();
}