撰写文章时,需清晰阐述关键点,并引发读者兴趣。今天咱们来探讨QT中多线程接收数据的实时处理方法,特别是如何通过循环队列实现线程间的信息交流!
QT多线程处理困境
在QT软件中,多线程技术被频繁应用。特别是在涉及数据接收与处理的场合,常用socket进行数据传输。然而,当线程处理数据速度较慢且数据量庞大时,问题便随之而来。若处理不及时,接收缓冲区将迅速填满,新数据将取代旧数据,从而可能导致关键数据丢失,这对后续的数据分析和系统运行效率造成严重影响。
例如,一套自动化的监控系统,每一秒钟都会产出大量的环境信息。如果处理数据的线程速度赶不上接收的速度,这些信息就会如同沙粒般逐渐消失。这样一来,系统所收集到的环境资料就可能存在误差。
线程通信问题凸显
解决上文问题的关键在于线程通信。我们通常在操作系统的教材中了解到,有线程同步、互斥、信号量这三种机制。然而,在QT的多线程数据接收处理过程中,这些方法可能不够高效,或者使用起来较为复杂。因此,我们急需一种既简单又有效的方法,以便将接收到的数据暂时保存,让处理线程能够逐步处理。这便是循环队列技术得以应用的原因。
在网络直播过程中,由于音视频数据传输量巨大,若各线程间无法实现有效沟通,画面会频繁出现卡顿,声音也会出现延迟现象。
循环队列基本原理
循环队列的设计颇为巧妙,它采用全局变量来定义一个大数组,并设立读写指针。这两个指针并非一般意义上的指针,而是用来标记读写位置的特定标志。借助这些指针,我们便构建起了循环队列的基础框架。
在火车调度站,每条轨道犹如数组中的一个元素。调度员通过调度指针来指引火车进出,确保了调度的有序性,防止了混乱。这个指针负责引导数据的存储与提取,确保了数据的有序排列。
数据存储与读取流程
数据接收到后,先存入循环队列,写指针随之移动。另一线程则负责移动读指针,持续从队列中取出数据并处理。这个过程就像接力赛,接收数据是第一环节,存入队列是第二环节,处理数据是第三环节,确保整个接力过程数据流畅,不出现中断或混乱。
在高速交易领域,每一秒钟都有大量交易信息在传输与处理。采用循环队列的方式,可以显著提升交易处理的效率和精确度。
MyThread.h
#ifndef MYTHREAD_H
#define MYTHREAD_H
#include
#include
#include "basetype.h"
#include "mythreadstore.h"
/**
* @brief The MyThreadRecv class
* 该类负责接收从tcp读取到的数据。并将数据存储到缓冲区中
*/
class MyThreadRecv
{
public:
MyThreadRecv();
~MyThreadRecv(){};
void RecvMsg(const BYTE *data, int len); ///< 接收数据,存入循环队列
};
/**
* @brief The MyThread class
* 处理数据的线程
*/
class MyThread: public QThread
{
public:
public:
MyThread();
~MyThread(){};
void init();
void run(); ///< 任务执行
private:
volatile bool stopped;
int Flag;
MythreadStore *mythreadstore;
};
#endif // MYTHREAD_H
关键类的作用解析
代码里包含两个重要的类:MyThreadRecv和MyThread。MyThreadRecv类负责将socket接收到的数据存储起来,并将这些数据放入循环队列中,这个过程就像仓库管理员对货物进行分类存放。而MyThread类则负责从队列中读取数据并进行处理,它会根据读指针的位置精确地取出数据,就像质检员对货物进行检查和处理。
图书馆的书籍管理系统中设有两个角色,一个专门负责新书入库,另一个则负责对书籍进行分类和上架,以保证系统运作的有序性。
MyThread.cpp
#include "mythread.h"
BYTE Queue1[(1024 * 500)] = {0}; ///< 循环消息队列
int wReadPoint = 0; ///< 读指针
int wWritePoint = 0; ///< 写指针
MyThreadRecv::MyThreadRecv()
{
}
void MyThreadRecv::RecvMsg(const BYTE *data, int len)
{
qDebug()<<"I will gointo for";
for(int iNum = 0; iNum < len; iNum++) {
/**
* @brief tempWReadPoint
* 存储,到程序执行到此处的时候,wReadPoint的值,因为线程一直在执行,很有可能执行到这步的时候,wReadPoint的值被改变。
*/
int tempWReadPoint = wReadPoint;
if((wWritePoint + 1) % (1024 * 500) == tempWReadPoint) {
/**
* 队列已满
*/
continue;
}
/**
* 处理队列不满的情况
*/
Queue1[wWritePoint % (1024 * 500)] = data[iNum];
wWritePoint = (wWritePoint +1) % (1024 * 500);
}
qDebug()<<"After for";
}
void MyThread::init()
{
start();
}
MyThread::MyThread()
{
stopped = false;
Flag = 0;
mythreadstore = new MythreadStore();
}
void MyThread::run()
{
qDebug()<<"In run";
int iFlag = 0; ///< 标志位
int iNum = 0;
BYTE NeedDealdata[200] = {0};
while(!stopped) {
/**
* @brief itempWritePoint
* 存储,到程序执行到此处的时候,wWritePoint的值,因为线程一直在执行,很有可能执行到这步的时候,wWritePoint的值被改变。
*/
int itempWritePoint = wWritePoint;
if((wReadPoint) % (1024 * 500) != itempWritePoint) {
/**
* 队列不空
*/
if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0x54 == Queue1[(wReadPoint) % (1024 * 500)])) {
/**
* 找帧头
*/
iNum = 0;
NeedDealdata[iNum++] = Queue1[(wReadPoint -1) % (1024 * 500)];
NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
wReadPoint = (wReadPoint + 1) % (1024 * 500);
iFlag = 1;
}
if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0xFE == Queue1[(wReadPoint) % (1024 * 500)]) && (1 == iFlag)) {
NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
wReadPoint = (wReadPoint + 1) % (1024 * 500);
qDebug()<StoreMsgDeal(NeedDealdata, iNum);
memset(NeedDealdata, '', sizeof(NeedDealdata));
iFlag = 0;
iNum = 0;
}
if(1 == iFlag) {
NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
wReadPoint = (wReadPoint + 1) % (1024 * 500);
} else {
wReadPoint = (wReadPoint + 1) % (1024 * 500);
}
}
usleep(10);
}
}
循环队列设计要点
设计循环队列需注意其何时达到最大容量、何时为空。这是确保队列正常运作的核心。当队列满载,需暂停或舍弃新增数据;而当队列为空,处理线程需等待新数据的到来。
以水坝蓄水为例,当水位触及最高点,便需停止蓄水或进行泄洪。若水位降至较低水平,则需等待水源补给。这样能确保水坝系统持续稳定运作。在构建循环队列时,必须针对实际应用环境进行详尽分析,以保证其稳定性和高效性。
在使用QT进行多线程数据处理时,你是否遇到过数据丢失的情况?不妨尝试一下这种循环队列的通信方法。觉得这篇文章对你有帮助的话,请点赞并分享!