使用C++11实现一个半同步半异步线程池

C++11之前我们使用线程需要系统提供API、posix线程库或者使用boost提供的线程库,C++11后就加入了跨平台的线程类std::thread,线程

使用C++11实现一个半同步半异步线程池

C++11之前我们使用线程需要系统提供API、posix线程库或者使用boost提供的线程库,C++11后就加入了跨平台的线程类std::thread,线程同步相关类std::mutex、std::lock_guard、std::condition_variable、std::atomic以及异步操作相关类std::async、std::future、std::promise等等,这使得我们编写跨平台的多线程程序变得容易,线程的一个高级应用就是线程池,使用线程池可以充分利用多核CPU的并行计算能力,以及避免了使用单个线程的创建和销毁的开销,所以线程池在实际项目中用的很广泛,很多RPC框架都是用了线程池来处理事务,比如说 Thrifteasyrpc 等等,接下来我们将使用C++11来实现一个通用的半同步半异步线程池(个人博客也发表了 《使用C++11实现一个半同步半异步线程池》 )。

实现

一个半同步半异步线程池分为三层。

  1. 同步服务层:它处理来自上层的任务请求,上层的请求可能是并发的,这些请求不是马上就会被处理的,而是将这些任务放到一个同步排队层中,等待处理。
  2. 同步排队层: 来自上层的任务请求都会加到排队层中等待处理,排队层实际就是一个std::queue。
  3. 异步服务层: 这一层中会有多个线程同时处理排队层中的任务,异步服务层从同步排队层中取出任务并行的处理。

这三个层次之间需要使用std::mutex、std::condition_variable来进行事件同步,线程池的实现代码如下。

#ifndef _THREADPOOL_H

#define _THREADPOOL_H

#include <vector>

#include <queue>

#include <thread>

#include <mutex>

#include <memory>

#include <functional>

#include <condition_variable>

#include <atomic>

#include <type_traits>

static const std::size_t max_task_quque_size = 100000;

static const std::size_t max_thread_size = 30;

class thread_pool

{

public:

using work_thread_ptr = std::shared_ptr<std::thread>;

using task_t = std::function<void()>;

explicit thread_pool() : _is_stop_threadpool(false) {}

~thread_pool()

{

stop();

}

void init_thread_num(std::size_t num)

{

if (num <= 0 || num > max_thread_size)

{

std::string str = "Number of threads in the range of 1 to " + std::to_string(max_thread_size);

throw std::invalid_argument(str);

}

for (std::size_t i = 0; i < num; ++i)

{

work_thread_ptr t = std::make_shared<std::thread>(std::bind(&thread_pool::run_task, this));

_thread_vec.emplace_back(t);

}

}

// 支持普通全局函数、静态函数、以及lambda表达式

template<typename Function, typename... Args>

void add_task(const Function& func, Args... args)

{

if (!_is_stop_threadpool)

{

// 用lambda表达式来保存函数地址和参数

task_t task = [&func, args...]{ return func(args...); };

add_task_impl(task);

}

}

// 支持函数对象(仿函数)

template<typename Function, typename... Args>

typename std::enable_if<std::is_class<Function>::value>::type add_task(Function& func, Args... args)

{

if (!_is_stop_threadpool)

{

task_t task = [&func, args...]{ return func(args...); };

add_task_impl(task);

}

}

// 支持类成员函数

template<typename Function, typename Self, typename... Args>

void add_task(const Function& func, Self* self, Args... args)

{

if (!_is_stop_threadpool)

{

task_t task = [&func, &self, args...]{ return (*self.*func)(args...); };

add_task_impl(task);

}

}

void stop()

{

// 保证terminate_all函数只被调用一次

std::call_once(_call_flag, [this]{ terminate_all(); });

}

private:

void add_task_impl(const task_t& task)

{

{

// 任务队列满了将等待线程池消费任务队列

std::unique_lock<std::mutex> locker(_task_queue_mutex);

while (_task_queue.size() == max_task_quque_size && !_is_stop_threadpool)

{

_task_put.wait(locker);

}

_task_queue.emplace(std::move(task));

}

// 向任务队列插入了一个任务并提示线程池可以来取任务了

_task_get.notify_one();

}

void terminate_all()

{

_is_stop_threadpool = true;

_task_get.notify_all();

for (auto& iter : _thread_vec)

{

if (iter != nullptr)

{

if (iter->joinable())

{

iter->join();

}

}

}

_thread_vec.clear();

clean_task_queue();

}

void run_task()

{

// 线程池循环取任务

while (true)

{

task_t task = nullptr;

{

// 任务队列为空将等待

std::unique_lock<std::mutex> locker(_task_queue_mutex);

while (_task_queue.empty() && !_is_stop_threadpool)

{

_task_get.wait(locker);

}

if (_is_stop_threadpool)

{

break;

}

if (!_task_queue.empty())

{

task = std::move(_task_queue.front());

_task_queue.pop();

}

}

if (task != nullptr)

{

// 执行任务,并通知同步服务层可以向队列放任务了

task();

_task_put.notify_one();

}

}

}

void clean_task_queue()

{

std::lock_guard<std::mutex> locker(_task_queue_mutex);

while (!_task_queue.empty())

{

_task_queue.pop();

}

}

private:

std::vector<work_thread_ptr> _thread_vec;

std::condition_variable _task_put;

std::condition_variable _task_get;

std::mutex _task_queue_mutex;

std::queue<task_t> _task_queue;

std::atomic<bool> _is_stop_threadpool;

std::once_flag _call_flag;

};

#endif

测试代码

#include<iostream>

#include<string>

#include<chrono>

#include"thread_pool.hpp"

voidtest_task(conststd::string& str)

{

std::cout<<"Current thread id: "<<std::this_thread::get_id() <<", str: "<< str <<std::endl;

std::this_thread::sleep_for(std::chrono::milliseconds(50));

}

classTest

{

public:

voidprint(conststd::string& str,inti)

{

std::cout<<"Test: "<< str <<", i: "<< i <<std::endl;

}

};

classTest2

{

public:

voidoperator()(conststd::string& str,inti)

{

std::cout<<"Test2: "<< str <<", i: "<< i <<std::endl;

}

};

intmain()

{

Test t;

Test2 t2;

thread_pool pool;

// 启动10个线程

pool.init_thread_num(10);

std::stringstr ="Hello world";

for(inti =0; i <1000; ++i)

{

// 支持lambda表达式

pool.add_task([]{ std::cout<<"Hello ThreadPool"<<std::endl; });

// 支持全局函数

pool.add_task(test_task, str);

// 支持函数对象

pool.add_task(t2, str, i);

// 支持类成员函数

pool.add_task(&Test::print, &t, str, i);

}

std::cin.get();

std::cout<<"##############END###################"<<std::endl;

return0;

}

测试程序启动了十个线程并调用add_task函数加入了4000个任务,add_task支持普通全局函数、静态函数、类成员函数、函数对象(仿函数)以及lambda表达式,并且支持函数传入,该线程池的实现以及测试代码我已经放到了 github 上。

未登录用户
全部评论0
到底啦