使用MQ使接口立即返回,避免线程阻塞

个人写的一个MQ类,实现优化接口处理返回时间的问题

Posted by VK on July 29, 2019

使用MQ使接口立即返回,避免线程阻塞

1.1 以下是个人写的一个MQ处理代码(线程加队列),实现接口的立即返回,避免造成接口多层调用时的阻塞问题

#ifndef __INTERNAL_MESSAGE_QUEUE_
#define __INTERNAL_MESSAGE_QUEUE_
#include <unistd.h>

#include <errno.h>
#include <string.h>

#include <atomic>
#include <chrono>
#include <thread>

template<typename T>
class InternalMessageQueue
{
public:
    InternalMessageQueue()
        : queue_ready_(false)
    {
        static_assert(std::is_trivially_copyable<T>::value, "Template must be trivially copyable");
        initializeQueue();
    }

    virtual ~InternalMessageQueue()
    {
    }

    virtual void submitMessage(const T & msg)
    {
        if (false == queue_ready_) {
            logError("write: queue is not ready");
        } else {
            const int ret = write(pipe_fds_[INDEX_WRITE], &msg, sizeof(T));
            if (ret < 0) {
                logError(strerror(errno));
            } else if (sizeof(T) != ret) {
                logError("write: data length does not match the size of template type");
            }
        }
    }

    virtual void handleMessage(const T & msg) = 0;
    virtual void logError(const char * error_msg) = 0;
private:
    const static int INDEX_READ     = 0;
    const static int INDEX_WRITE    = 1;
    const static int INDEX_MAX      = 2;
    int pipe_fds_[INDEX_MAX];
    std::atomic<bool> queue_ready_;

    void initializeQueue()
    {
        if (0 != pipe(pipe_fds_)) {
            logError(strerror(errno));
        } else {
            queue_ready_ = true;
            std::thread t(&InternalMessageQueue::routine, this);
            t.detach();
        }
    }

    void routine()
    {
        int ret = 0;
        T buffer;
        while (true) {
            ret = read(pipe_fds_[INDEX_READ], &buffer, sizeof(T));
            if (ret == sizeof(T)) {
                handleMessage(buffer);
            } else if (ret < 0) {
                logError(strerror(errno));
                std::this_thread::sleep_for(std::chrono::seconds(1));
            } else {
                logError("read: data length does not match the size of template type");
                std::this_thread::sleep_for(std::chrono::seconds(1));
            }
        }
    }
};
#endif

/*MediaInfoMQ.hpp*/
#prama once
#include <message/InternalMessageQueue.hpp>

#include <SVPLog.h>


class MediaInfoMQ: public InternalMessageQueue<CarIfDataType_AudiocurrentStationInfo>
{
public:
    static MediaInfoMQ & getInstance()
    {
        static MediaInfoMQ instance;
        return instance;
    }

private:
    MediaInfoMQ()
    {
    }

    ~MediaInfoMQ()
    {
    }

    virtual void logError(const char * msg)
    {
        SVP_ERROR("MediaInfoMQ: %s", msg);
    }

    virtual void handleMessage(const CarIfDataType_AudiocurrentStationInfo &msg)
    {
     	CarIfDataType_AudiocurrentStationInfo AudiocurrentStationInfo = msg;
		CarIfError_t ret = carIfSetData(CarIfDataId_AudiocurrentStationInfo,&AudiocurrentStationInfo);
		SVP_INFO("carIfSetData SetBapAudioRadioStation CarIfError_t:%d ",ret);
    }
};

test.cpp

#include "MediaInfoMQ.hpp"
void XXXX::listener(const std::vector<uint8_t>& msg_buf)
{
  Message<AUDIO_INFO_ALL_VALUES, uint8_t, uint8_t, uint8_t> msg;
  msg.fromByteBuffer(msg_buf);
  AudioType_t AudiocurrentStationInfo = {msg.getParam<2, uint8_t>(),msg.getParam<1, uint8_t(),msg.getParam<0, uint8_t>()};
  MediaInfoMQ::getInstance().submitMessage(AudiocurrentStationInfo);
}