QT多线程数据丢失?循环队列轻松解决通信难题

撰写文章时,需清晰阐述关键点,并引发读者兴趣。今天咱们来探讨QT中多线程接收数据的实时处理方法,特别是如何通过循环队列实现线程间的信息交流!

QT多线程处理困境

在QT软件中,多线程技术被频繁应用。特别是在涉及数据接收与处理的场合,常用socket进行数据传输。然而,当线程处理数据速度较慢且数据量庞大时,问题便随之而来。若处理不及时,接收缓冲区将迅速填满,新数据将取代旧数据,从而可能导致关键数据丢失,这对后续的数据分析和系统运行效率造成严重影响。

例如,一套自动化的监控系统,每一秒钟都会产出大量的环境信息。如果处理数据的线程速度赶不上接收的速度,这些信息就会如同沙粒般逐渐消失。这样一来,系统所收集到的环境资料就可能存在误差。

线程通信问题凸显

解决上文问题的关键在于线程通信。我们通常在操作系统的教材中了解到,有线程同步、互斥、信号量这三种机制。然而,在QT的多线程数据接收处理过程中,这些方法可能不够高效,或者使用起来较为复杂。因此,我们急需一种既简单又有效的方法,以便将接收到的数据暂时保存,让处理线程能够逐步处理。这便是循环队列技术得以应用的原因。

在网络直播过程中,由于音视频数据传输量巨大,若各线程间无法实现有效沟通,画面会频繁出现卡顿,声音也会出现延迟现象。

循环队列基本原理

实时处理数据点_实时处理数据的软件_实时数据处理

循环队列的设计颇为巧妙,它采用全局变量来定义一个大数组,并设立读写指针。这两个指针并非一般意义上的指针,而是用来标记读写位置的特定标志。借助这些指针,我们便构建起了循环队列的基础框架。

在火车调度站,每条轨道犹如数组中的一个元素。调度员通过调度指针来指引火车进出,确保了调度的有序性,防止了混乱。这个指针负责引导数据的存储与提取,确保了数据的有序排列。

数据存储与读取流程

数据接收到后,先存入循环队列,写指针随之移动。另一线程则负责移动读指针,持续从队列中取出数据并处理。这个过程就像接力赛,接收数据是第一环节,存入队列是第二环节,处理数据是第三环节,确保整个接力过程数据流畅,不出现中断或混乱。

在高速交易领域,每一秒钟都有大量交易信息在传输与处理。采用循环队列的方式,可以显著提升交易处理的效率和精确度。

MyThread.h
#ifndef MYTHREAD_H
#define MYTHREAD_H
#include 
#include 
#include "basetype.h"
#include "mythreadstore.h"
/**
 * @brief The MyThreadRecv class
 * 该类负责接收从tcp读取到的数据。并将数据存储到缓冲区中
 */
class MyThreadRecv
{
public:
    MyThreadRecv();
    ~MyThreadRecv(){};
    void RecvMsg(const BYTE *data, int len);            ///< 接收数据,存入循环队列
};
/**
 * @brief The MyThread class
 * 处理数据的线程
 */
class MyThread: public QThread
{
public:
public:
    MyThread();
    ~MyThread(){};
    void init();
    void run();                 ///< 任务执行
private:
    volatile bool stopped;
    int Flag;
    MythreadStore *mythreadstore;
};
#endif // MYTHREAD_H

关键类的作用解析

代码里包含两个重要的类:MyThreadRecv和MyThread。MyThreadRecv类负责将socket接收到的数据存储起来,并将这些数据放入循环队列中,这个过程就像仓库管理员对货物进行分类存放。而MyThread类则负责从队列中读取数据并进行处理,它会根据读指针的位置精确地取出数据,就像质检员对货物进行检查和处理。

图书馆的书籍管理系统中设有两个角色,一个专门负责新书入库,另一个则负责对书籍进行分类和上架,以保证系统运作的有序性。

MyThread.cpp
#include "mythread.h"
BYTE Queue1[(1024 * 500)] = {0};            ///< 循环消息队列
int wReadPoint = 0;                         ///< 读指针
int wWritePoint = 0;                        ///< 写指针
MyThreadRecv::MyThreadRecv()
{
}
void MyThreadRecv::RecvMsg(const BYTE *data, int len)
{
    qDebug()<<"I will gointo for";
    for(int iNum = 0; iNum < len; iNum++) {
        /**
         * @brief tempWReadPoint
         * 存储,到程序执行到此处的时候,wReadPoint的值,因为线程一直在执行,很有可能执行到这步的时候,wReadPoint的值被改变。
         */
        int tempWReadPoint = wReadPoint;
        if((wWritePoint + 1) % (1024 * 500) == tempWReadPoint) {
            /**
             * 队列已满
             */
            continue;
        }
        /**
         * 处理队列不满的情况
         */
        Queue1[wWritePoint % (1024 * 500)] = data[iNum];
        wWritePoint = (wWritePoint +1) % (1024 * 500);
    }
    qDebug()<<"After for";
}
void MyThread::init()
{
    start();
}
MyThread::MyThread()
{
    stopped = false;
    Flag = 0;
    mythreadstore = new MythreadStore();
}
void MyThread::run()
{
    qDebug()<<"In run";
    int iFlag = 0;              ///< 标志位
    int iNum = 0;
    BYTE NeedDealdata[200] = {0};
    while(!stopped) {
        /**
         * @brief itempWritePoint
         * 存储,到程序执行到此处的时候,wWritePoint的值,因为线程一直在执行,很有可能执行到这步的时候,wWritePoint的值被改变。
         */
        int itempWritePoint = wWritePoint;
        if((wReadPoint) % (1024 * 500) != itempWritePoint) {
            /**
             * 队列不空
             */
            if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0x54 == Queue1[(wReadPoint) % (1024 * 500)])) {
                /**
                 * 找帧头
                 */
                iNum = 0;
                NeedDealdata[iNum++] = Queue1[(wReadPoint -1) % (1024 * 500)];
                NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
                wReadPoint = (wReadPoint + 1) % (1024 * 500);
                iFlag = 1;
            }
            if((0 != Queue1[(wReadPoint - 2) % (1024 * 500)]) && (0x5A == Queue1[(wReadPoint - 1) % (1024 * 500)]) && (0xFE == Queue1[(wReadPoint) % (1024 * 500)]) && (1 == iFlag)) {
                NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
                wReadPoint = (wReadPoint + 1) % (1024 * 500);
                qDebug()<StoreMsgDeal(NeedDealdata, iNum);
                memset(NeedDealdata, '', sizeof(NeedDealdata));
                iFlag = 0;
                iNum = 0;
            }
            if(1 == iFlag) {
                NeedDealdata[iNum++] = Queue1[(wReadPoint) % (1024 * 500)];
                wReadPoint = (wReadPoint + 1) % (1024 * 500);
            } else {
                wReadPoint = (wReadPoint + 1) % (1024 * 500);
            }
        }
        usleep(10);
    }
}

循环队列设计要点

设计循环队列需注意其何时达到最大容量、何时为空。这是确保队列正常运作的核心。当队列满载,需暂停或舍弃新增数据;而当队列为空,处理线程需等待新数据的到来。

以水坝蓄水为例,当水位触及最高点,便需停止蓄水或进行泄洪。若水位降至较低水平,则需等待水源补给。这样能确保水坝系统持续稳定运作。在构建循环队列时,必须针对实际应用环境进行详尽分析,以保证其稳定性和高效性。

在使用QT进行多线程数据处理时,你是否遇到过数据丢失的情况?不妨尝试一下这种循环队列的通信方法。觉得这篇文章对你有帮助的话,请点赞并分享!

发表评论