OOP 学习笔记(12)——并发编程与并发设计模式

OOP 学习笔记(12)——并发编程与并发设计模式

并发编程与并发设计模式

并发编程

并发与并行

并发

  • 在一个时间段内几个程序都处于启动到完成之间。
  • 任意时刻一个程序在计算单元上运行
  • 宏观上同时,微观上仍然顺序执行。
  • 比如多个队列交替使用同一台机器就是并发。

并行

  • 在一个时间段内几个程序都处于启动到完成之间。
  • 任意时刻可以有多个程序同时运行。
  • 宏观微观均为同时。
  • 比如多个队列同时使用多台机器就是并行。

两者都是多任务环境下的系统。

一般来说:

  • 支持多任务同时存在,就可认为是并发系统
  • 支持多任务同时执行,就可认为是并行系统
  • 并发、并行主要区别为是否存在多个处理器同时处理多个任务。

为了简化问题,不考虑处理器数量情况下,我们统称为并发

并发编程的必要性

非并发程序执行时间常常受到算法时间复杂度以及计算设备的性能影响。

然而现代计算设备的发展使得 CPU 单核性能难以突破,发展转为核的增长,并且出现了 GPU 等高度并行的计算设备

因此,掌握并发编程技术是充分发挥计算设备运算能力的基础。

进程与线程

进程

  • 计算机中已经运行的程序。

线程

  • 操作系统可进行运算调度的最小单位。
  • 被包含在进程中,是进程中的实际运作单位。
  • 一个进程可以并发多个线程,设备允许的情况下,数个线程可以并发执行不同任务。
  • 同一进程的多条线程共享该进程中的所有系统资源,包括虚拟地址空间、文件描述符、信号处理等。

一般来说,程序通常会在一个进程中运行,而一个进程至少包含一个线程,即主线程

默认情况下,代码都是在进程的主线程中运行,除非程序中创建了新线程。

当系统仅有一个计算内核时,所有进程、线程会分时占用这个内核。

若存在多个,则多个进程、线程可并行在不同内核上。

一般所说的 CPU 核数、线程数就是指 CPU 的物理核心和逻辑核心数量,可以体现处理器的并行能力。

进程与线程是程序运行调度的基础,也是并发编程的操作对象。

thread 与主从模式

需求:计算 $1$ 到 $5000000$ 之间的素数

理论上这个当然可以通过简单的线性筛得出,但这里我们希望通过朴素的枚举因子法判断一个数是否为素数。

也就是判断是否存在其根号以下的非 $1$ 因子。

常规做法耗时巨大,需要约 $12$ 秒。

在不进行算法改进的前提下,可以使用并发编程优化。

thread

其默认构造函数定义为:

thread() noexcept;

创建一个空线程对象,不代表任何可执行的线程。

比如:

#include <thread>

using namespace std;

int main()
{
    thread s1;
    return 0;
}

而初始化构造函数定义为:

template <class Fn, class... Args>
explicit thread(Fn&& fn, Args&& ... args);

使用时需要给定入口函数 fn,并且 fn 函数的参数由 args 给出。

这里的 fn 需要是可调用表达式,包括函数、函数对象、lambda 表达式等。

与普通对象创建不一样的是,这里编译器会创建一个新的操作系统线程,线程启动后会执行入口函数。

注意:线程一旦创建,线程就开始执行。

join()detach()

thread 线程一旦创立,就开始执行。

但我们可能仍需要对当前线程与目标线程进行协调,因此 thread 提供了两种接口来协调当前线程与目标现成:

  • thread::join()
    • 调用此接口,当前线程会一直阻塞,直到目标线程执行完成。
  • thread::detach()
    • 调用此接口时,目标线程将成为守护线程(daemon threads),将完全独立执行。
    • 即使目标线程对应的 thread 对象被销毁也不影响线程的执行。

如:

#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

void test(int seconds)
{
    this_thread::sleep_for(chrono::seconds(seconds));
}

int main()
{
    thread t1(test, 3);
    t1.join();
    // 此处由于 t1.join() 导致主线程阻塞,主线程须等待 t1 执行 3s 后才能继续执行
    this_thread::sleep_for(chrono::seconds(6));
    return 0;
}

可知总共运行约 $9$ 秒。

但如果交换 t1.join() 与后一句的顺序,则由于 t1 指向的线程与主线程会同时进行,运行到 t1.join()t1 指向的线程已经完成,因此不会发生阻塞,总共运行 $6$ 秒。

又比如:

#include <iostream>
#include <thread>
#include <chrono>
using namespace std;

void test(int seconds)
{
    this_thread::sleep_for(chrono::seconds(seconds));
}

int main()
{
    thread t1(test, 3);
    t1.detach();
    // 此时 t1 指向线程与当前线程脱钩,两者独立,当前延时操作也就不会被阻塞
    this_thread::sleep_for(chrono::seconds(6));
    return 0;
}

总共运行 $6$ 秒。

拷贝构造函数

定义为:

thread(const thread &) = delete;

也就是说拷贝构造函数被禁用,因而 thread 不可被拷贝构造。

这是由于线程涉及系统底层,无法拷贝。

移动构造函数

定义为:

thread(thread&& x) noexcept;

有默认的移动构造函数。

调用成功后 x 不指向任何具体执行线程。

比如如下程序可以照常进行:

thread t1;
thread t2(test, 3);
t1 = move(t2);
t1.join();

但若改为 t2.join(),则会产生系统错误。

joinable()

thread::joinable() 可判断 thread 实例指向的线程是否可以 join()detach()

返回 true 表示可以。

三种情况会使得实例返回 false

  • 默认构造函数创建的实例。
  • 被移动构造函数操作过的实例。
  • 已经调用过 join() 或者 detach() 的实例。

特别注意,启动目标线程后,我们必须决定当前线程是要等待目标线程结束(join()),还是让目标线程独立(detach()),必须二选一

如果到对象被销毁时仍未做出选择,析构时将导致程序进程异常退出。

也就是说,thread 析构时,其 joinable() 必须是 false

其他接口

  • this_thread::get_id()
    • 返回当前线程的 id,可以标识不同的线程。
  • this_thread::sleep_for()
    • 前面已经使用,使得当前线程停止执行一段时间。
  • this_thread::sleep_until()
    • 与上面类似,但是以具体时间点为停止参数。
  • this_thread::yield()
    • 如果当前线程任务已经完成,将处理器让给其他任务使用。

实现:任务划分

前面的求素数我们将其等分为四段,分别由四个线程单独解决,合并四个结果即得最终结果。

具体实现:

#include <iostream>
#include <cmath>
#include <vector>
#include <thread>

using namespace std;

thread* threads[4];
int thread_total[4];
int total = 0, mi, mx;

bool check_num(int num)
{
    // ...具体判断略
}

void check(int l, int r, int num) 
{
    thread_total[num] = 0;
    for (int i = l; i < r; i++)
        if (check_num(i))
            thread_total[num]++; 
}

int main()
{
    mi = 1;
    for (int i = 0; i < 4; i++)
    {
        mx = mi + 5000000 / 4;
        if (mx > 5000000) mx = 5000000;
        // 为第 i 个线程分配 [mi, mx) 区间的任务
        threads[i] = new thread(check, mi, mx, i);
        mi = mx; 
    }
    // 阻塞主线程,等待所有子线程完成统计
    for (int i = 0; i < 4; i++)
        threads[i]->join();
    for (int i = 0; i < 4; i++)
    {
        total += thread_total[i];
        delete threads[i];
    }
    cout << total << endl;
    return 0; 
}

最终耗时降为 $4.5$ 秒。

实际上,这就是一种典型的并发设计模式——主从模式

主从模式

主从模式是最常用的并发设计模式。

  • 系统由 Master 和 Worker 两部分组成。
  • Master 负责接收和分配任务。
  • Worker 负责处理子任务。
  • 任务处理过程中:
    • Worker 负责工作。
    • Master 负责监督任务进展和 Worker 的健康状态。
    • Master 将接收 Client 提交的任务,比将任务进展汇总反馈给 Client。

其适用场景要求:

  • 整体人物可被划分为诸多子任务。
  • 子任务之间关联较弱,可以并发执行。
  • 计算设备支持多任务同时执行。

mutex 与互斥锁模式

优化:减少临时变量

前面的实现中我们对于每个子任务通过 thread_total[] 来存储,但如果我们希望直接将结果存入 total 是否可能呢?

实际上直接修改会发现,尽管耗时不变,但是得出的结果减少了。

甚至进行多次运行,得出的结果也不同。

竞争条件与临界区

竞争条件

  • 多个线程同时访问共享数据时,只要有一个任务修改数据,那么就可能发生问题——多个线程同时争相修改数据,导致部分修改失败,这也就是所谓的竞争条件(race condition)

临界区

  • 访问共享数据的代码片段称为临界区(critical section),比如之前修改后的 total++

避免竞争条件需要对临界区进行数据保护,也就是一次只让一个线程访问共享数据,其他的线程需等待其访问完成之后继续访问。

互斥量与锁

C++11 提供了互斥量(mutex来进行数据保护。

其本身是一个类对象,也常被称为

各个线程可尝试用 mutexlock() 接口来对临界区数据加锁。

每次只有一个线程可以成功锁定,成功标志是 lock() 成功返回。

如若失败,则线程会阻塞。

同样还有 unlock() 接口可以解锁互斥量,这时阻塞的其他线程又会继续尝试锁定。

修改:利用 mutex

#include <iostream>
#include <cmath>
#include <vector>
#include <thread>

using namespace std;

thread* threads[4];
int total = 0, mi, mx;

static mutex exclusive;

bool check_num(int num)
{
    // ...具体判断略
}

void check(int l, int r, int num) 
{
    for (int i = l; i < r; i++)
        if (check_num(i))
        {
            exclusive.lock();
            total++;
            exclusive.unlock();
        }
}

int main()
{
    mi = 1;
    for (int i = 0; i < 4; i++)
    {
        mx = mi + 5000000 / 4;
        if (mx > 5000000) mx = 5000000;
        // 为第 i 个线程分配 [mi, mx) 区间的任务
        threads[i] = new thread(check, mi, mx, i);
        mi = mx; 
    }
    // 阻塞主线程,等待所有子线程完成统计
    for (int i = 0; i < 4; i++)
        threads[i]->join();
    for (int i = 0; i < 4; i++)
        delete threads[i];
    cout << total << endl;
    return 0; 
}

当然也可以再加修改(优化):

#include <iostream>
#include <cmath>
#include <vector>
#include <thread>

using namespace std;

thread* threads[4];
int total = 0, mi, mx;

static mutex exclusive;

bool check_num(int num)
{
    // ...具体判断略
}

void check(int l, int r, int num) 
{
    int tmp_total = 0;
    for (int i = l; i < r; i++)
        if (check_num(i))
            tmp_total++;
    exclusive.lock();
    total += tmp_total;
    exclusive.unlock();
}

int main()
{
    mi = 1;
    for (int i = 0; i < 4; i++)
    {
        mx = mi + 5000000 / 4;
        if (mx > 5000000) mx = 5000000;
        // 为第 i 个线程分配 [mi, mx) 区间的任务
        threads[i] = new thread(check, mi, mx, i);
        mi = mx; 
    }
    // 阻塞主线程,等待所有子线程完成统计
    for (int i = 0; i < 4; i++)
        threads[i]->join();
    for (int i = 0; i < 4; i++)
        delete threads[i];
    cout << total << endl;
    return 0; 
}

这样便可以在获得正确结果、耗时不变的前提下,简化代码。

这也就是典型的互斥锁设计模式

互斥锁模式

互斥锁模式是最基本的并行数据处理模式:

  • 访问共享资源之前进行加锁操作。
  • 访问完成之后进行解锁操作。
  • 加锁后,其他试图加锁的线程均会阻塞,就如前面所说,直到当前线程解锁才会继续尝试加锁。

弊端也十分明显,也就是低效,共享资源的读操作往往无需互斥。

解决方案也就是采用读写锁模式,读共享,而写互斥。

其他互斥量

C++11 之后提供了丰富的各类互斥量:

  • timed_mutex
    • 带超时功能,一定时间内未取得锁,则直接返回,不再继续等待。
  • recursive_mutex
    • 能被同一线程递归锁定的互斥量,即在同一个线程中,同一把锁可以被锁定多次。
  • recursive_timed_mutex
    • 上述功能的综合体。
  • shared_mutex
    • 共享互斥量,实际上提供了两把锁,一把共享锁,一把互斥锁,常用于读写锁模式。

更多相关参考书:

  • 《深入理解并行编程》
  • 《C++ Concurrency in Action》

async()futurepromise 与异步

需求:动态判断是否为素数

不断输入整数 $n$,判断 $n$ 是否为素数。

尽管 thread 可以提高素数计数的速度,然而第 $n + 1$ 次输入仍需要等待第 $n$ 次判断结束方可执行。

能否让输入不受判断方法的阻塞?

或者说利用并发编程实现某种意义上的”离线“?

同步与异步

同步(Synchronous)

  • 同步调用一旦开始,调用者必须等到调用返回结果才可继续后续行为。

23

异步(Asynchronous)

  • 异步调用一旦开始,被调用方法就会立即返回,调用者可无阻塞继续后续操作。
  • 被调用方法通常会在另外一个线程中默默运行,整个过程不会阻碍调用者的工作。
  • 被调用方法完成后可以通过特殊机制传递信息给调用者。

24

async()

很多变成语言,比如 JavaScript,都提供了异步执行机制,这也使得耗时的机制不会影响当前线程的整体执行。

而 C++11 中 async() 便有异步的功能。

其定义为:

future async(Fn&& fn, Args&& ... args);
future async(launch policy, Fn&& fn, Args&& ... args);

thread 的入口类似,包括入口函数 fn 以及其参数 args

函数会返回一个 future 对象,用于存储异步任务执行状态和结果。

policy 有三种选择:

  • launch::async:保证异步行为,系统创建一个线程执行对应的函数。
  • launch::deffered:表示延迟调用,在调用 futurewait()get() 函数后,才执行入口函数。
  • launch::async || launch::deffered:默认策略,系统自主决定调用方式。

初步实现:利用 async()

#include <iostream>
#include <future>
#include <cmath>
using namespace std;

bool check_num(int num)
{
    bool flag = true;
    if (num == 1)
        flag = false;
    else
        for (int i = sqrt(num); i > 1; i--)
            if (num % i == 0)
            {
                flag = false;
                break;
            }
    if (flag)
        cout << num << " is a prime number" << endl;
    else
        cout << num << " is not a prime number" << endl;
    return flag;
}

int main()
{
    std::future<bool> fut = async(check_num, 194232491);
    cout << "check the number ..." << endl;
    return 0;
}

运行结果为:

check the number ...
194232491 is a prime number

同理也可以改为 lambda 表达式等其他可调用对象。

future

future 提供访问异步操作结果的接口:

  • wait() 接口,阻塞当前线程,等待异步线程结束。
  • get() 接口,获取异步线程执行结果。
    • future 对象只能被一个线程获取值,并且在调用 get() 过后,就没有可获取的值了。
    • 若多个线程调用同一实例的 get() 造成数据竞争,结果未定义。
    • get() 后如果异步线程没有结束,会一直等待,类似 wait()
  • wait_for(timeout_duration) 如果在指定的超时间隔后仍然无法结束异步线程,则返回当前状态,取消其阻塞。
    • future_status::deferred:仍未启动。
    • future_status::ready:结果就绪。
    • future_status::timeout:超过时限,但异步线程仍在执行。

实现:利用 async()future

#include <future>
#include <chrono>
#include <vector>
#include <thread>
#include <random>
#include <iostream>

using namespace std;

int total = 0;
bool check_num(int num)
{
    // ...具体判断略
}

// 设置一个随机来代替输入
default_random_engine e;
int input()
{
    return e();
}

// 存放异步调用的 future 和输入数值
vector<future<bool> > future_lists;
vector<int> num_lists;

int main()
{
    while (true)
    {
        int num = input();
        future_lists.push_back(async(check_num, num));
        num_lists.push_back(num);

        for (int i = future_lists.size() - 1; i >= 0; i--)
        {
            // 每个 future 等待 0.1 秒来检测状态
            future_status status = future_lists[i].wait_for(chrono::milliseconds(100));
            if (status == future_status::ready)
            {
                if (future_lists[i].get())
                    cout << num_lists[i] << " is a prime number" << endl;
                else
                    cout << num_lists[i] << " is not a prime number" << endl;
                // 删除已经完成任务的 future
                future_lists.erase(future_lists.begin() + i);
                num_lists.erase(num_lists.begin() + i);
            }
        }
    }
    return 0;
}

使用异步线程进行运算,并通过主线程进行状态管理。

主线程不断检查输入状态和异步线程的执行状态,有输入则创建新线程,某一异步线程完成则立刻输出结果。

这种不断消耗极短时间进行检测的方式即为轮询

轮询

轮询是服务器与客户端开发常用范式。

客户端:

  • 客户端向服务器上传,每隔一段时间检测上传是否完成。
  • 未完成时,上传操作不阻塞客户端其他操作,只有检测的微笑瞬间产生阻塞。

服务器端:

  • 服务器为客户端处理耗时较大的任务时,也会开启异步线程在后台处理任务。
  • 间隔一段时间,服务器就会确认任务是否完成。
  • 整个过程也不影响服务器其他操作,比如接受反馈客户端的请求。

当然,管理轮询的线程也可以开启单独线程,而不是利用主线程。

25

promise

为了更好满足跨线程取值需求,C++ 还提供了 promise 来配合 future

future 对象只能被一个线程获取值,并且调用 get() 之后,就无法再获取。

之前 future 对象需要在异步线程完成后返回值,这也十分不方便。

一般流程

  • 在当前线程中创建 promise 对象,并从该 promise 对象中获得对应的 future 对象。
  • promise 对象传给目标线程,目标线程通过 promise 的接口设置特定值,然后可继续执行目标线程自身的工作。
  • 在特定时间,当前线程按需求通过 promise 对应的 future 取值。

优化:利用 promise

void check_num(int num, promise<bool>* res_promise)
{
    if (num == 1)
    {
        res_promise->set_value(false);
        return;
    }
    for (int i = sqrt(num); i > 1; i--)
        if (num % i == 0)
        {
            res_promise->set_value(false);
            return;
        }
    res_promise->set_value(true);
}

int main()
{
    int num = 194232491;
    promise<bool> res_promise;
    future<bool> res_future = res_promise.get_future();

    thread worker(check_num, num, &res_promise);
    worker.detach();

    if (res_future.get())
        cout << num << " is a prime number" << endl;
    else
        cout << num << " is not a prime number" << endl;
    return 0;
}

通过 promiseget_future() 接口获得配对的 future

此处通过 futurepromise 之间的联系,使得结果可以传递,因此可以使用 detach() 使得 worker 线程独立于当前线程。

最后在通过 futureget() 获取结果即可。

这样可以更好的控制在何处获取 future 的结果。

promise 总结

promise 的功能和其自身的翻译承诺类似。

工作线程将结果存入 promise,只有 promise 应允的 future 才能获取到值。

这也就是通过状态统治调用者的异步方法。

而回调方法则暂时不做介绍。

总结

主要介绍的是 C++ 的并发编程技术以及并发设计模式,与前面的设计模式有着很大的不同,这也是专门开一讲来介绍的原因。

 

点赞 0

No Comments

Add your comment