跳转至

十二 多线程与线程池

多线程(Multithreading)是指在一个进程(Process)中同时执行多个线程(Thread),以实现并发处理任务,从而提高程序的执行效率,特别适合处理IO密集型或部分CPU密集型任务。

1. 线程 vs 进程

比较项 进程(Process) 线程(Thread)
定义 资源分配的基本单位 CPU调度的基本单位
拥有资源 独立的内存空间等资源 与同一进程中的其他线程共享资源
通信开销 进程间通信(如管道、消息队列)较复杂 线程间通信简单,因共享内存
创建销毁 开销大 开销小
崩溃影响 一个进程崩溃不影响其他进程 一个线程崩溃可能导致整个进程崩溃

2. C++多线程

2.1 引入头文件

C++
1
#include <thread>

2.2 基本使用

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
#include <iostream>
#include <thread>

void task() {
    std::cout << "线程ID: " << std::this_thread::get_id() << "\n";
}

int main() {
    std::thread t(task);  // 创建线程
    t.join();             // 等待线程执行完成
    return 0;
}
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
using namespace std;

void thread_1()
{
    cout<<"子线程1"<<endl;
}

void thread_2(int x)
{
    cout<<"x:"<<x<<endl;
    cout<<"子线程2"<<endl;
}

int main()
{
  thread first ( thread_1); // 开启线程,调用:thread_1()
  thread second (thread_2,100); // 开启线程,调用:thread_2(100)
  //thread third(thread_2,3);//开启第3个线程,共享thread_2函数。
  std::cout << "主线程\n";

  first.join(); //必须说明添加线程的方式            
  second.join(); 
  std::cout << "子线程结束.\n";//必须join完成
  return 0;
}

3. 线程常用方法

方法 说明
join() 阻塞主线程,等待该线程完成
detach() 线程脱离,后台运行,主线程无需等待
std::this_thread::sleep_for() 当前线程睡眠一段时间
std::this_thread::get_id() 获取当前线程ID

3.1 join与detach

当线程启动后,一定要在和线程相关联的thread销毁前,确定以何种方式等待线程执行结束。比如上例中的join。

  • detach方式,启动的线程自主在后台运行,当前的代码继续往下执行,不等待新线程结束
  • join方式,等待启动的线程完成,才会继续往下执行

可以使用joinable判断是join模式还是detach模式。

if (myThread.joinable()) foo.join();

3.1.1 join举例

下面的代码,join后面的代码不会被执行,除非子线程结束

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>
#include <thread>
using namespace std;
void thread_1()
{
  while(1)
  {
  //cout<<"子线程1111"<<endl;
  }
}
void thread_2(int x)
{
  while(1)
  {
  //cout<<"子线程2222"<<endl;
  }
}
int main()
{
    thread first ( thread_1); // 开启线程,调用:thread_1()
    thread second (thread_2,100); // 开启线程,调用:thread_2(100)

    first.join(); // pauses until first finishes 这个操作完了之后才能destroyed
    second.join(); // pauses until second finishes//join完了之后,才能往下执行。
    while(1)
    {
      std::cout << "主线程\n";
    }
    return 0;
}

3.1.2 detach举例

下列代码中,主线程不会等待子线程结束。如果主线程运行结束,程序则结束

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#include <iostream>
#include <thread>
using namespace std;

void thread_1()
{
  while(1)
  {
      cout<<"子线程1111"<<endl;
  }
}

void thread_2(int x)
{
    while(1)
    {
        cout<<"子线程2222"<<endl;
    }
}

int main()
{
    thread first ( thread_1);  // 开启线程,调用:thread_1()
    thread second (thread_2,100); // 开启线程,调用:thread_2(100)

    first.detach();                
    second.detach();            
    for(int i = 0; i < 10; i++)
    {
        std::cout << "主线程\n";
    }
    return 0;
}

3.2 this_thread

std::this_thread 提供了对 当前线程的控制能力,比如让线程休眠、获取线程 ID、主动让出执行权等

函数 作用
std::this_thread::sleep_for() 当前线程睡一段时间(持续时间)
std::this_thread::sleep_until() 当前线程睡到某个时刻
std::this_thread::yield() 当前线程主动让出 CPU 执行权
std::this_thread::get_id() 获取当前线程的 唯一 ID
3.2.1 sleep_for
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#include <iostream>
#include <thread>
#include <chrono>

void task() {
    std::cout << "任务开始..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(2)); // 睡 2 秒
    std::cout << "任务结束!" << std::endl;
}

int main() {
    std::thread t(task);
    t.join();
    return 0;
}
3.2.2 get_id
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
#include <iostream>
#include <thread>

void show_thread_id() {
    std::cout << "当前线程 ID: " << std::this_thread::get_id() << std::endl;
}

int main() {
    std::thread t1(show_thread_id);
    std::thread t2(show_thread_id);
    t1.join();
    t2.join();
    return 0;
}
3.2.3 yield

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
#include <iostream>
#include <thread>

void busy_task() {
    for (int i = 0; i < 5; ++i) {
        std::cout << "任务正在执行:" << i << std::endl;
        std::this_thread::yield(); // 让出CPU,让其他线程执行
    }
}

int main() {
    std::thread t(busy_task);
    t.join();
    return 0;
}
yield() 是一种优化建议,系统可能让当前线程暂停,也可能忽略(不是强制让出)

3.2.4 sleep_util
C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
#include <iostream>
#include <thread>
#include <chrono>

int main() {
    using namespace std::chrono;
    auto target = steady_clock::now() + seconds(3); // 3 秒后的时刻
    std::cout << "开始等待...\n";
    std::this_thread::sleep_until(target);
    std::cout << "时间到!\n";
}

4. 线程安全与同步

多线程之间共享资源可能导致数据竞争(data race)和不确定行为,为此需引入同步机制

工具 作用
std::mutex 互斥锁,防止多个线程同时访问同一资源
std::lock_guard 自动加锁解锁,RAII风格
std::unique_lock 更灵活的锁管理
std::condition_variable 条件变量,用于线程等待和唤醒

4.1 mutex

mutex(互斥锁,全称 mutual exclusion)是多线程编程中用来 保护共享资源,避免多个线程同时访问而造成冲突或数据不一致的一种机制

  • 互斥锁:在某段代码执行前加锁(lock),执行完毕后解锁(unlock)。
  • 加锁成功的线程可以继续执行,其他线程会阻塞,直到该锁被释放。

加锁/解锁

  • lock():当前线程尝试获得互斥量,若已经被其他线程锁住,则阻塞等待。

  • unlock():释放锁,允许其他线程进入。

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mutex; //定义一个全局互斥量
int counter = 0;

void increate(){
    for (int i =0; i <1000; ++i){
        mtx.lock();
        ++counter;
        mtx.unlock();
    }
}

int main(){
    std::thread t1(increate);
    std::thread t2(increate);
    t1.join();
    t2.join();
    std::cout << "counter = " << counter << std::endl;
}
由于加锁保护,counter的结果是2000,不会出现竞争导致的少加。

4.3 std::lock_guard

手动lock()/ unlock()容易忘记释放锁,c++推荐用RAII形式的std::lock_guard

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;  // 全局互斥锁
int counter = 0;

void increase() {
    for (int i = 0; i < 10000; ++i) {
        std::lock_guard<std::mutex> lock(mtx); // 自动加锁和解锁
        ++counter;
    }
}// 离开作用域自动解锁

int main() {
    std::thread t1(increase);
    std::thread t2(increase);

    t1.join();
    t2.join();

    std::cout << "Final counter: " << counter << std::endl;
    return 0;
}

4.4 std::unique_lock

std::unique_lock会在构造时获取互斥锁,在析构时自动释放锁,确保不因异常或提前return忘记解锁;相比std::lock_guard,它的特点是可延迟加锁、可提前解锁,可重新加锁,可转移所有权。

C++
1
2
3
4
5
6
#include <mutex>
std::mutex m;

{
    std::unique_lock<std::mutex> lk(m); // 默认立即加锁
} // 出作用域自动解锁
| 构造模式 | 说明 | | ----------------- | ------------------------------------------------------------------------- | | 默认构造(锁对象时立即 lock) | std::unique_lock<std::mutex> lk(m); | | 延迟加锁 | std::unique_lock<std::mutex> lk(m, std::defer_lock);(不加锁,需要手动 lock()) | | 尝试加锁 | std::unique_lock<std::mutex> lk(m, std::try_to_lock);(非阻塞) | | 已加锁 | std::unique_lock<std::mutex> lk(m, std::adopt_lock);(假设外部已经加锁) |

延迟加锁

C++
1
2
3
std::unique_lock<std::mutex> lk(m, std::defer_lock);
// 做一些无关工作
lk.lock();//需要时再加锁

可提前解锁再加锁

C++
1
2
3
4
5
std::unique_lock<std::mutex> lk(m);
do_work();
lk.unlock(); // 提前释放锁
do_other_work();
lk.lock(); // 重新加锁

配合条件变量

std::condition_variable需要std::unique_lock而不是lock_guard, 因为它在wait()时会自动释放锁并在唤醒时重新加锁

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker(){
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return ready; }); // 等待直到ready 为true
}
用法 说明
mutex.lock() 手动加锁(不推荐)
mutex.unlock() 手动解锁(不推荐)
std::lock_guard 推荐方式,自动加解锁
std::unique_lock 更灵活,可延迟加锁、提前解锁,重复加锁
std::try_lock 非阻塞加锁,锁失败立即返回

4.5 condition_variable

std::condition_variable允许一个线程在某个条件满足之前进入等待状态,而由另一个线程在条件满足时通知它继续执行。 常用于 生产者-消费者模型、任务调度 等场景。

  • std::condition_variable:条件变量对象

  • std::mutex:互斥锁,用于保护共享数据

  • wait():阻塞当前线程,直到条件满足

  • notify_one() / notify_all():唤醒一个或所有等待线程

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv;
bool ready = false;

void worker() {
    std::unique_lock<std::mutex> lock(mtx);  // 一定要用 unique_lock
    cv.wait(lock, []{ return ready; });      // 条件不满足就一直阻塞
    std::cout << "Worker is running!\n";
}

int main() {
    std::thread t(worker);

    {
        std::lock_guard<std::mutex> lock(mtx);
        ready = true;           // 设置条件
    }

    cv.notify_one();            // 通知等待的线程
    t.join();
    return 0;
}

wait的三种形式

C++
1
2
3
4
5
cv.wait(lock);  // 等待 notify,被唤醒后要手动检查条件是否满足(可能虚假唤醒)

cv.wait(lock, [] { return ready; });  // 推荐写法,自动循环判断条件

cv.wait_for(lock, timeout);  // 等待一段时间(超时自动返回)

示例:生产者消费者模型

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::queue<int> data_queue;
std::mutex data_mutex;
std::condition_variable data_cv;

void producer() {
    for (int i = 0; i < 5; ++i) {
        {
            std::lock_guard<std::mutex> lock(data_mutex);
            data_queue.push(i);
        }
        data_cv.notify_one();  // 通知消费者
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
}

void consumer() {
    while (true) {
        std::unique_lock<std::mutex> lock(data_mutex);
        data_cv.wait(lock, []{ return !data_queue.empty(); }); // 等待非空
        int val = data_queue.front();
        data_queue.pop();
        lock.unlock();
        std::cout << "Consumed: " << val << std::endl;
        if (val == 4) break;
    }
}

注意点 说明
wait() 要配合 std::unique_lock 使用 因为它需要释放锁并在唤醒时重新加锁
要防止 虚假唤醒 wait(lock, predicate) 会自动循环判断,推荐使用
条件变量本身不保存状态 notify_one() 不会“记住”曾经调用过,如果此时没人等待,通知会丢失
避免死锁 修改条件时应先加锁,通知时应解锁或在锁内快速通知

4.6 std::atomic

在 C++ 中,std::atomic 是用于 多线程编程 中的一种类型,提供了原子操作支持,用于避免数据竞争(data race),实现线程安全的数据访问和修改

原子读写

C++
1
2
3
4
std::atomic<int> value(0);

int x = value.load();   // 原子读取
value.store(10);        // 原子写入

原子加减

C++
1
2
value++;   // 或 value.fetch_add(1);
value--;   // 或 value.fetch_sub(1);

CAS(Compare-And-Swap)

C++
1
2
3
4
5
6
7
int expected = 5;
int desired = 10;
if (value.compare_exchange_strong(expected, desired)) {
    // 当前 value == expected,成功替换为 desired
} else {
    // 替换失败,expected 被更新为当前 value 的值
}

示例:多线程安全加法

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <atomic>
#include <thread>

std::atomic<int> counter(0);

void increment() {
    for (int i = 0; i < 1000; ++i) {
        counter++;
    }
}

int main() {
    std::thread t1(increment);
    std::thread t2(increment);
    t1.join();
    t2.join();
    std::cout << "Counter: " << counter << std::endl;
}

若不使用 std::atomic,则会发生数据竞争,导致计数不准确。


常用成员函数(以 std::atomic<int> 为例)

函数名 含义
load() 读取当前值
store(val) 写入新值
exchange(val) 设置新值并返回旧值
compare_exchange_strong(expected, desired) 原子比较并替换
fetch_add(n) / fetch_sub(n) 原子加/减

5. 线程池

如果你频繁地创建和销毁线程,会带来性能开销,因此可以使用线程池(Thread Pool): 一般线程池都会有以下几个部分构成:

1. 任务队列

存放待执行的任务,一般用std::queuestd::mutex保护

2. 工作线程 worker threads, 线程池启动时创建的线程集合,循环从任务队列取任务执行

3. 同步机制 - std::mutex : 保护任务队列读写安全 - std::condition_variable:任务到来时通知线程执行

4. 关闭控制

线程池析构时需要关闭所有线程(设置停止标志,唤醒所有线程,join)

简易实现

C++
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>


class ThreadPool{

public:
    explicit ThreadPool(size_t threads):stop(false){
        for (size_t i = 0; i < threads; ++i){
            workers.emplace_back([this]{
                for (;;){
                    std::function<void()> task;
                    {
                        std:unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock, [this]{
                            return this->stop || !this->tasks.empty();
                        });
                        if (this->stop && this->tasks.empty()){
                            return; // 线程退出条件
                        }
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    task();
                }
            });
        }
    }
    // 加任务
    template<class F, class... Args>
    auto enqueue(F&&f, Args&&... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
    {
        // 把函数和参数绑定成一个 nullary callable
        // 使用 std::bind 会把参数拷贝/移动到内部存储
        typedef typename std::result_of<F(Args...)>::type return_type;
        auto task = std::make_shared<std::packaged_task<return_type()>>(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );

        std::future<return_type> res = task->get_future();
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop)
                throw std::runtime_error("enqueue on stopped ThreadPool");
            tasks.emplace([task](){(*task)();});
        }
        condition.notify_one();
        return res;
    }

    ~ThreadPool(){
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            if (stop) return; // 已经关闭
            stop = true;
        }
        condition.notify_all();
        for (std::thread &worker : workers) {
            if (worker.joinable())
                worker.join();
        }
    }

private:
    std::vector<std::thread> workers; // 线程容器,存储线程池中的线程对象, 线程池里有多少线程
    std::queue<std::function<void()>> tasks; // 任务队列,存放待执行任务,存放由用户提交的任务, 用std::function<void()>包装

    std::mutex queue_mutex; // 保护任务队列的互斥锁
    std::condition_variable condition; // 条件队列,用于线程等待/ 唤醒
    bool stop; // 标志,线程池是否停止运行

};