• 用C++手写消息队列,让自己更多的了解消息消费机制
  • 发布于 2个月前
  • 352 热度
    0 评论
不允许用现成的消息队列比如rabbitmq等,非要造轮子!

一. 生产者-消费者模式
生产者:这里由于是通过txt文件来进行交互,相当于txt文件的内容就是生产者,同时还需要实时监控txt文件,将其新消息放入队列。
消费者:从队列中取消息,并需要告诉队列该条消息已经被消费。

断点续传:需要考虑到程序崩溃之后,知道从哪开始消费。
1.1 多消费者——多线程
这里如果如果是IO操作较多的话,推荐使用多线程来创建消费者。具体创建和消费的过程如下:

我们可以看到其实这里的多消费者其实是串联的形式来进行消费,因此如果是CPU资源低,IO操作多的话,推荐这种形式。

多线程之间的内存变量交互很友善,不像多进程(哎,难受啊)。

1.2 多消费者——多进程
考虑到这里是需要像CPU请求较多资源,甚至是需要使用GPU的资源和利用CUDA加速(深度学习模型占用资源较多),因此我这里使用的是多进程来构建消费者。具体创建和消费过程如下:

二. 过程的实现
定时器:每隔一段时间扫描txt文件,并将文件中新加入的消息存放入队列中。
// 堆代码 duidaima.com
// timer扫描
int wait_sec = 1;
// 单独启动一个线程持续扫描文件(每5秒)
string path = "video.txt";
Timer timer1;
timer1.start(2000, std::bind(getVideoFromTxt, path, wait_sec, &output));
中间件:需要存放已经消费的消息,这样知道消费的具体位置,且支持程序崩溃/断掉之后,重启后知道在哪开始消费,同样用txt进行保存到本地。

多消费者:使用多进程创建消费者,这里考虑到进程中间的共享内存不好交互,直接使用txt来交互数据(反正进程之间的内存交互其实也是通过一个共享文件映射来完成的)。这里直接使用调用命令行的方式来构建消费者。
void gen_multiProcess(int id, string input_txt,string output_txt) {
  
STARTUPINFO si;
si.cb = sizeof(si);
PROCESS_INFORMATION pi;
ZeroMemory(&si, sizeof(si));
ZeroMemory(&pi, sizeof(pi));
string cmdLine = "D:/vs_project/setTimer/x64/Debug/setTimer.exe " + input_txt + " " + output_txt;
cout << cmdLine << endl;
wstring str = StringToWString(cmdLine);
  
BOOL bSuccess = CreateProcess(NULL,
const_cast<LPWSTR>(str.c_str()),
NULL, NULL, FALSE, 0, NULL, NULL, &si, &pi);
  
if (bSuccess) {
//handleOfProcess[id] = pi.hProcess;
cout << "Process-" << id << "completed!" << endl;
}
else {
cout << "Error:" << id << endl;
}
}

三. 收获
其实手写消息队列,帮助自己更多的了解消息消费机制,以及多进程和多线程的使用,当然了也更了解了C++的标准库。所以推荐大家还是亲手动手手写一哈。哎,作为一个python调包侠突然写这种偏底层,有点难受好吧。
用户评论