• C语言线程间通信

    C11 标准为线程间通信提供了条件变量(condition variable)。线程可以使用条件变量,以等待来自另一个线程的通知,通知告知了指定的条件已被满足。例如,这类通知可能代表某些数据已经准备好进行处理。

    条件变量由类型为 cnd_t 的对象表示,并配合互斥一起使用。一般过程如下:线程获得互斥,然后测试条件。如果条件不满足,则线程继续等待条件变量(释放互斥),直到另一个线程再次唤醒它,然后该线程再次获得互斥,并再次测试条件,重复上述过程,直到条件满足。

    头文件 threads.h 定义了使用条件变量的函数,它们如下所示:

    int cnd_init(cnd_t*cond);

    初始化 cond 引用的条件变量。

    void cnd_destroy(cnd_t*cond);

    释放指定条件变量使用的所有资源。

    int cnd_signal(cnd_t*cond);

    在等待指定条件变量的任意数量的线程中,唤醒其中一个线程。

    int cnd_broadcast(cnd_t*cond);

    唤醒所有等待指定条件变量的线程。

    int cnd_wait(cnd_t*cond,mtx_t*mtx);

    阻塞正在调用的线程,并释放指定的互斥。在调用 cnd_wait()之前,线程必须持有互斥。如果另一线程通过发送一个信号解除当前线程的阻塞(也就是说,通过指定同样的条件变量作为参数调用 cond_signal()或 cnd_broadcast()),那么调用 cnd_wait()的线程在 cnd_wait()返回之前会再次获得互斥。

    int cnd_timedwait(cnd_t*restrict cond,mtx_t*restrict mtx,const struct timespec*restrict ts);

    与 cnd_wait()类似,cnd_timedwait()阻塞调用它们的线程,但仅维持由参数 ts 指定的时间。可以通过调用函数 timespec_get()获得一个 struct timespec 对象,它表示当前时间。

    除 cnd_destroy()以外的所有条件变量函数,如果它们引发错误,则返回值 thrd_error,否则返回值 thrd_success。当时间达到限定值时,函数 cnd_timedwait()也会返回值 thrd_timedout。

    例 1 与例 2 中的程序展示了在常见的“生产者-消费者”模型中使用条件变量。程序为每个生产者和消费者开启一个新线程。生产者将一个新产品(在我们的示例中,新产品为一个 int 变量)放入一个环形缓冲区中,假设这个缓冲区没有满,然后通知等待的消费者:产品已经准备好。每个消费者从该缓冲区中取出产品,然后将实际情况通知给正在等待的生产者。

    在任一特定时间,只有一个线程可以修改环形缓冲器。因此,在函数 bufPut()和 bufGet()间将存在线程同步问题,函数 bufPut()将一个元素插入到缓冲区,函数 buf-Get()将一个元素从缓冲区移除。

    有两个条件变量:生产者等待其中一个条件变量,以判断缓冲器是否满了;消费者等待另一个条件变量,以判断缓冲器是否空了。缓冲区的所有必需元素都包括在结构 Buffer 中。函数 bufInit()初始化具有指定大小的 Buffer 对象,而函数 bufDestroy()销毁 Buffer 对象。

    【例1】用于“生产者-消费者”模型的环形缓冲区

    /* buffer.h
    * 用于线程安全缓冲区的所有声明
    */
    #include <stdbool.h>
    #include <threads.h>
    
    typedef struct Buffer
    {
        int *data;                          // 指向数据数组的指针
        size_t size, count;                 // 元素数量的最大值和当前值
        size_t tip, tail;                   // tip = 下一个空点的索引
        mtx_t mtx;                          // 一个互斥
        cnd_t cndPut, cndGet;               // 两个条件变量
    } Buffer;
    
    bool bufInit( Buffer *bufPtr, size_t size );
    void bufDestroy(Buffer *bufPtr);
    
    bool bufPut(Buffer *bufPtr, int data);
    bool bufGet(Buffer *bufPtr, int *dataPtr, int sec);
    /* -------------------------------------------------------------
    * buffer.c
    * 定义用于处理Buffer的函数
    */
    #include "buffer.h"
    #include <stdlib.h>                       // 为了使用malloc()和free()
    bool bufInit( Buffer *bufPtr, size_t size)
    {
        if ((bufPtr->data = malloc( size * sizeof(int))) == NULL)
           return false;
        bufPtr->size = size;
        bufPtr->count = 0;
        bufPtr->tip = bufPtr->tail = 0;
        return    mtx_init( &bufPtr->mtx, mtx_plain) == thrd_success
               && cnd_init( &bufPtr->cndPut) == thrd_success
               && cnd_init( &bufPtr->cndGet) == thrd_success;
    }
    
    void bufDestroy(Buffer *bufPtr)
    {
        cnd_destroy( &bufPtr->cndGet );
        cnd_destroy( &bufPtr->cndPut );
        mtx_destroy( &bufPtr->mtx );
        free( bufPtr->data );
    }
    
    // 在缓冲区中插入一个新元素
    bool bufPut(Buffer *bufPtr, int data)
    {
        mtx_lock( &bufPtr->mtx );
    
        while (bufPtr->count == bufPtr->size)
           if (cnd_wait( &bufPtr->cndPut, &bufPtr->mtx ) != thrd_success)
              return false;
    
        bufPtr->data[bufPtr->tip] = data;
        bufPtr->tip = (bufPtr->tip + 1) % bufPtr->size;
        ++bufPtr->count;
    
        mtx_unlock( &bufPtr->mtx );
        cnd_signal( &bufPtr->cndGet );
    
        return true;
    }
    
    // 从缓冲区中移除一个元素
    // 如果缓冲区是空的,则等待不超过sec秒
    bool bufGet(Buffer *bufPtr, int *dataPtr, int sec)
    {
        struct timespec ts;
        timespec_get( &ts, TIME_UTC );             // 当前时间
        ts.tv_sec += sec;                              // + sec秒延时
    
        mtx_lock( &bufPtr->mtx );
        while ( bufPtr->count == 0 )
           if (cnd_timedwait(&bufPtr->cndGet,
                             &bufPtr->mtx, &ts) != thrd_success)
              return false;
    
        *dataPtr = bufPtr->data[bufPtr->tail];
        bufPtr->tail = (bufPtr->tail + 1) % bufPtr->size;
        --bufPtr->count;
    
        mtx_unlock( &bufPtr->mtx );
        cnd_signal( &bufPtr->cndPut );
    
        return true;
    }

    例 2 中的 main()函数创建了一个缓冲区,并启动了若干个生产者和消费者线程,给予每个线程一个识别号码和一个指向缓冲区的指针。每个生产者线程创建一定数量的“产品”,然后用一个 return 语句退出。一个消费者线程如果在给定延时期间无法获得产品以进行消费,则直接返回。

    【例2】启动生产者和消费者线程

    // producer_consumer.c
    #include "buffer.h"
    #include <stdio.h>
    #include <stdlib.h>
    
    #define NP 2                             // 生产者的数量
    #define NC 3                             // 消费者的数量
    
    int producer(void *);                    // 线程函数
    int consumer(void *);
    
    struct Arg { int id; Buffer *bufPtr; };          // 线程函数的参数
    _Noreturn void errorExit(const char* msg)
    {
        fprintf(stderr, "%s\n", msg); exit(0xff);
    }
    
    int main(void)
    {
        printf("Producer-Consumer Demo\n\n");
        Buffer buf;                                  // 为5个产品创建一个缓冲区
        bufInit( &buf, 5 );
    
        thrd_t prod[NP], cons[NC];           // 线程
        struct Arg prodArg[NP], consArg[NC]; // 线程的参数
        int i = 0, res = 0;
    
        for ( i = 0; i < NP; ++i )                // 启动生产者
        {
            prodArg[i].id = i+1, prodArg[i].bufPtr = &buf;
            if (thrd_create( &prod[i], producer, &prodArg[i] ) != thrd_success)
               errorExit("Thread error.");
        }
    
        for ( i = 0; i < NC; ++i )                // 启动消费者
        {
           consArg[i].id = i+1, consArg[i].bufPtr = &buf;
           if ( thrd_create( &cons[i], consumer, &consArg[i] ) != thrd_success)
              errorExit("Thread error.");
        }
    
        for ( i = 0; i < NP; ++i )                // 等待线程结束
          thrd_join(prod[i], &res),
          printf("\nProducer %d ended with result %d.\n", prodArg[i].id, res);
    
          for ( i = 0; i < NC; ++i )
             thrd_join(cons[i], &res),
             printf("Consumer %d ended with result %d.\n", consArg[i].id, res);
          bufDestroy( &buf );
          return 0;
    }
    
    int producer(void *arg)                         // 生产者线程函数
    {
        struct Arg *argPtr = (struct Arg *)arg;
        int id = argPtr->id;
        Buffer *bufPtr = argPtr->bufPtr;
        int count = 0;
        for (int i = 0; i < 10; ++i)
        {
            int data = 10*id + i;
            if (bufPut( bufPtr, data ))
                printf("Producer %d produced %d\n", id, data), ++count;
            else
            { fprintf( stderr,
                     "Producer %d: error storing %d\n", id, data);
              return -id;
            }
        }
        return count;
    }
    
    int consumer(void *arg)                         // 消费者线程函数
    {
        struct Arg *argPtr = (struct Arg *)arg;
        int id = argPtr->id;
        Buffer *bufPtr = argPtr->bufPtr;
      
        int count = 0;
        int data = 0;
        while (bufGet( bufPtr, &data, 2 ))
        {
            ++count;
            printf("Consumer %d consumed %d\n", id, data);
        }
        return count;
    }

更多...

加载中...