博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
C++线程池实现
阅读量:4297 次
发布时间:2019-05-27

本文共 4782 字,大约阅读时间需要 15 分钟。

最近读了muduo的源码,看了一下其中线程池的是实现。其中互斥量、条件变量都是库里面自己封装的,正好现在C++标准库里面有对应的类,所以就改造了一下,补充了部分注释。同时总结了一下条件变量和锁的使用。代码如下:

ThreadPool.h

#pragma once#include 
#include
#include
#include
#include
#include
#include
namespace muduo{ using namespace std; class CThreadPool { public: // 入参为空的任务类型 typedef function
CTask; // 构造及析构函数 explicit CThreadPool(const string& nameArg = string("ThreadPool")); ~CThreadPool(); // 设置任务队列的数量上限 void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; } // 设置工作线程初次执行时的消息回调 void setThreadInitCallback(const CTask& cb) { threadInitCallback_ = cb; } // 开启指定数量的工作线程 void start(int numThreads); // 退出线程池执行,退出所有的工作线程 void stop(); // 线程池名称 const string& name() const { return name_; } // 任务队列多少 size_t queueSize() const; // 新任务入队,若队列已满,将堵塞接口调用线程 void run(const CTask& f); private: // 禁止拷贝构造 CThreadPool(const CThreadPool&) = delete; CThreadPool& operator=(const CThreadPool&) = delete; // 判断队列是否已满,可采用lambda表达式替代 bool isFull() const; // 完成任务到线程的分发 // 线程池对象维护了任务队列,工作线程在执行时需要从任务队列中获取工作任务, // 所以只能将线程的执行体与线程池对象的接口关联,完成工作任务获取; void runInThread(); // 从任务队列中取任务执行 CThreadPool::CTask take();private: mutable mutex mutex_; // 任务队列的互斥锁 condition_variable notEmpty_; // 当前队列为空的条件变量,工作线程取任务时空则等待; condition_variable notFull_; // 队列已满的条件变量,任务如队时,队列满则等待; string name_; // 线程池名称 CTask threadInitCallback_; // 工作线程初次执行的回调 vector
threads_; // 工作线程集 deque
queue_; // 任务队列 size_t maxQueueSize_; //任务量上限 bool running_; // 线程池是否在执行 };}

ThreadPool.cpp

#include "MyThreadPool.h"#include 
#include
#include
using namespace muduo;CThreadPool::CThreadPool(const string& nameArg) :name_(nameArg), maxQueueSize_(0), running_(false){}CThreadPool::~CThreadPool(){ if (running_) { stop(); }}void CThreadPool::start(int numThreads){ assert(threads_.empty()); running_ = true; threads_.reserve(numThreads); for (int i = 0; i < numThreads; ++i) { // 将线程的执行过程绑定到线程池的runInThread threads_.push_back(new thread(&CThreadPool::runInThread, this)); } if (numThreads == 0) { threadInitCallback_(); }}void CThreadPool::stop(){ { lock_guard
lock(mutex_); running_ = false; // 喚醒所有的工作線程 notEmpty_.notify_all(); } //等待所有执行线程退出 for each (auto ptr_thread in threads_) { ptr_thread->join(); }}size_t CThreadPool::queueSize() const{ lock_guard
lck(mutex_); return queue_.size();}void CThreadPool::run(const CTask& task){ if (!task) { return; } if (threads_.empty()) { // 当前可执行线程数为0,当前主线程子集执行这个任务; task(); } else { unique_lock
lck(mutex_); notFull_.wait(lck, [&]() { return !isFull(); }); // 将task入队,唤醒执行线程 queue_.push_back(task); lck.unlock(); notEmpty_.notify_one(); }}CThreadPool::CTask CThreadPool::take(){ unique_lock
lock(mutex_); // 若线程池需要退出,也不需要再等待 notEmpty_.wait(lock, [&](){ return !queue_.empty() || !running_; }); CTask task; if (!queue_.empty()) { task = queue_.front(); queue_.pop_front(); if (maxQueueSize_ > 0) { //条件变量的分发不需要对互斥量加锁,以免唤醒的线程再次进入wait状态 lock.unlock(); notFull_.notify_one(); } } return task;}bool CThreadPool::isFull() const{ return maxQueueSize_ > 0 && queue_.size() >= maxQueueSize_;}void CThreadPool::runInThread(){ try { if (threadInitCallback_) { threadInitCallback_(); } while (running_) { CTask task(take()); if (task) { task(); } } } catch (...) { fprintf(stderr, "unknown exception caught in CThreadPool %s\n", name_.c_str()); throw; // rethrow }}// 测试代码// 1. 执行函数定义#define Func(index) \void func_##index() \{\ ostringstream ss;\ ss << "=========== thread: " << std::this_thread::get_id() << std::endl;\ std::cout << ss.str() << std::endl;\ std::cout.flush();\}\Func(1)Func(2)//2. 执行任务入队列#define RUN_IN_POOL(index) \do{ tp.run(std::function
(&func_##index)); }while(0)void main(){ CThreadPool tp; tp.start(2); tp.setMaxQueueSize(3); RUN_IN_POOL(1); RUN_IN_POOL(2); // 睡眠一下,否则立即调用stop,部分任务还未执行完 Sleep(1000); tp.stop(); system("pause");}

上述实现存在几个问题:

1. task类为无参的function类型,与其他可执行对象相比,无法保存状态;不能适配需要入参的场景;
2. 所有的成员变量的读写都用同一个互斥量保护,在工作线程较多情况下,运行效率不高;
3. run函数调用,插入待执行任务时,若当前任务队列已满,会将调用线程挂起;此时若调用该线程GUI线程,将导致界面卡死。

几点总结:

1. 条件变量的使用
a. 对共享变量修改的执行顺序
1) 通过lock_guard对互斥量mutex加锁;
2) 加锁完成后对变量进行修改;
3) 调用notify_one或notify_all接口唤醒在std::condition_variable上等待的线程(对外发送消息通知时,不需要持有锁)。
为了能够正确通知条件变量,即便共享变量为原子变量,也必须在对mutex互斥量加锁情况下进行修改。
b. 在条件变量上等待的线程
1) 通过unique_lock对同一个mutex上进行加锁,已保护共享变量;
2) 执行wait、wait_for或者wait_until,此时自动释放mutex,将线程挂起;
3) 某条件变量发出通知后,将当前线程唤醒,并自动获取mutex。同时,必须对条件变量进行判断,以避免为虚假唤醒。
2. lock_guard和unique_lock的差别
lock_guard和unique_lock都能实现互斥量的加锁操作。但unique_lock在guard_lock上做了扩展,支持锁状态的转移,锁状态的自动释放和获取。wait接口调用时,需要传入unique_lock类型的入参。

转载地址:http://hscws.baihongyu.com/

你可能感兴趣的文章
spring boot jpa 实现拦截器
查看>>
jenkins + maven+ gitlab 自动化部署
查看>>
Pull Request流程
查看>>
Lambda 表达式
查看>>
函数式数据处理(一)--流
查看>>
java 流使用
查看>>
java 用流收集数据
查看>>
java并行流
查看>>
CompletableFuture 组合式异步编程
查看>>
mysql查询某一个字段是否包含中文字符
查看>>
Java中equals和==的区别
查看>>
JVM内存管理及GC机制
查看>>
Java:按值传递还是按引用传递详细解说
查看>>
全面理解Java内存模型
查看>>
Java中Synchronized的用法
查看>>
阻塞队列
查看>>
linux的基础知识
查看>>
接口技术原理
查看>>
五大串口的基本原理
查看>>
PCB设计技巧与注意事项
查看>>