使用MQ使接口立即返回,避免线程阻塞
1.1 以下是个人写的一个MQ处理代码(线程加队列),实现接口的立即返回,避免造成接口多层调用时的阻塞问题
#ifndef __INTERNAL_MESSAGE_QUEUE_
#define __INTERNAL_MESSAGE_QUEUE_
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <atomic>
#include <chrono>
#include <thread>
template<typename T>
class InternalMessageQueue
{
public:
InternalMessageQueue()
: queue_ready_(false)
{
static_assert(std::is_trivially_copyable<T>::value, "Template must be trivially copyable");
initializeQueue();
}
virtual ~InternalMessageQueue()
{
}
virtual void submitMessage(const T & msg)
{
if (false == queue_ready_) {
logError("write: queue is not ready");
} else {
const int ret = write(pipe_fds_[INDEX_WRITE], &msg, sizeof(T));
if (ret < 0) {
logError(strerror(errno));
} else if (sizeof(T) != ret) {
logError("write: data length does not match the size of template type");
}
}
}
virtual void handleMessage(const T & msg) = 0;
virtual void logError(const char * error_msg) = 0;
private:
const static int INDEX_READ = 0;
const static int INDEX_WRITE = 1;
const static int INDEX_MAX = 2;
int pipe_fds_[INDEX_MAX];
std::atomic<bool> queue_ready_;
void initializeQueue()
{
if (0 != pipe(pipe_fds_)) {
logError(strerror(errno));
} else {
queue_ready_ = true;
std::thread t(&InternalMessageQueue::routine, this);
t.detach();
}
}
void routine()
{
int ret = 0;
T buffer;
while (true) {
ret = read(pipe_fds_[INDEX_READ], &buffer, sizeof(T));
if (ret == sizeof(T)) {
handleMessage(buffer);
} else if (ret < 0) {
logError(strerror(errno));
std::this_thread::sleep_for(std::chrono::seconds(1));
} else {
logError("read: data length does not match the size of template type");
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
}
};
#endif
/*MediaInfoMQ.hpp*/
#prama once
#include <message/InternalMessageQueue.hpp>
#include <SVPLog.h>
class MediaInfoMQ: public InternalMessageQueue<CarIfDataType_AudiocurrentStationInfo>
{
public:
static MediaInfoMQ & getInstance()
{
static MediaInfoMQ instance;
return instance;
}
private:
MediaInfoMQ()
{
}
~MediaInfoMQ()
{
}
virtual void logError(const char * msg)
{
SVP_ERROR("MediaInfoMQ: %s", msg);
}
virtual void handleMessage(const CarIfDataType_AudiocurrentStationInfo &msg)
{
CarIfDataType_AudiocurrentStationInfo AudiocurrentStationInfo = msg;
CarIfError_t ret = carIfSetData(CarIfDataId_AudiocurrentStationInfo,&AudiocurrentStationInfo);
SVP_INFO("carIfSetData SetBapAudioRadioStation CarIfError_t:%d ",ret);
}
};
test.cpp
#include "MediaInfoMQ.hpp"
void XXXX::listener(const std::vector<uint8_t>& msg_buf)
{
Message<AUDIO_INFO_ALL_VALUES, uint8_t, uint8_t, uint8_t> msg;
msg.fromByteBuffer(msg_buf);
AudioType_t AudiocurrentStationInfo = {msg.getParam<2, uint8_t>(),msg.getParam<1, uint8_t(),msg.getParam<0, uint8_t>()};
MediaInfoMQ::getInstance().submitMessage(AudiocurrentStationInfo);
}