多线程编程介绍-条件变量
条件变量定义
条件变量是多线程对共享资源数据的变化的通知机制。条件变量与互斥量明显不同为互斥量是对临界资源的保护机制,但条件变量可以理解为一种通信机制。
条件变量的应用场景
设想如下编程场景,我们要实现一个消息接收转发并处理的流程,为了提高程序执行效率。我们启动两个线程一个是接收消息线程,专门负责接收消息,将消息加入到一个共享链表中;而一个线程是工作线程,专门负责等待读取链表中的消息,如果链表为空,则工作线程则进入等待队列,如果有节点插入则工作线程需要被唤醒继续工作。
-
问题1:共享资源消息链表需要保护,怎么做?
互斥量是解决共享资源的典型方法,相信大家都知道,使用mutex锁保护一下对链表的操作就好了。对于链表操作的代码段我们成为临界资源。
-
问题2:如何通知工作线程有消息到达?
方法1:大家应该都能想到,利用现有知识互斥量,工作线程一直轮询去检查链表中是否有消息可以处理,不就可以了,但这样显然效率太低,浪费cpu资源。
方法2:当有资源时接收消息线程通知工作线程一下不就可以了,其他时间工作线程就休眠就好啦。是的,条件变量就是为了达到这个目的的。当工作线程检查没有消息处理时,就主动将自己挂起,等待接收消息线程唤醒。
条件变量功能函数介绍
int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);
pthread_cond_init初始化一个条件变量,需要在其他条件变量执行函数之前执行。
只能被初始化一次,如果初始化两次则结果不可预期。
restrict cond 是一个条件变量地址,使用此函数前需要定义一个条件变量结构体pthread_cond_t
如果需要copy条件变量,只能copy条件变量地址,不能复制这个结构体,否则结果将不可预知。(想想也知道这个条件变量是一个结构体,如果复制一个结构体则信息被复制两份在多线程中一定会出现问题,而复制地址则原内容不变)
-
第二个字段属性字段,一般默认填NULL(其他高级用法待研究,如果有知道的小伙伴还望分享一下啊~)
int pthread_cond_destroy(pthread_cond_t *cond);
-
释放一个条件变量,一般值只在线程结束时调用,用于回收条件变量资源。
int pthread_cond_timedwait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex,const struct timespec *restrict abstime);int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);
这两个函数用于主动将线程阻塞,等待其他线程唤醒。对应场景中工作线程判断没有可以获取的消息时将线程主动放入阻塞队列中(这里的阻塞队列指操作系统中的),即线程将自己挂起。
函数中前两个参数一个是条件变量本身,一个是mutex互斥量。条件变量没什么好说的,因为这组函数就是和他相关的,那mutex呢?mutex是用来保护临界资源使用的mutex,如场景中所说的对于消息队列的访问是需要加锁保护的。之所以要出入mutex是因为线程在阻塞挂起自己之前要先释放锁,不然其他线程也不能获取锁了。
根据上一点分析mutex不但要作为入参传入,而且需要在传入之前先获得锁。
-
当然pthread_cond_timedwait函数的最后一个参数可以指定阻塞的时间,即即使在指定的时间没没有线程唤醒这个阻塞线程,阻塞线程也会自己被唤醒执行。
int pthread_cond_broadcast(pthread_cond_t *cond);int pthread_cond_signal(pthread_cond_t *cond);
以上这两个函数是用来向等待此条件变量的线程发送信号,表示可以继续运行了。对应我们的场景就是接收线程有消息到达,之后将消息插入队列,通知工作线程。
两个函数不同之处,pthread_cond_broadcast唤醒等待中的所有线程。pthread_cond_signal至少唤醒一个等待线程。
如果执行这两个函数的时候没有任何等待此条件变量的线程,则无任何影响,也无任何变化。这一点在后续问题介绍时会再次提到。
条件变量编程实例
#include#include #include #include #define MSG_LEN_MAX 31typedef struct tagNode{ char szMsg[MSG_LEN_MAX+1]; struct tagNode *pstNextNode;}NODE_S;typedef struct tagList{ pthread_mutex_t mutex; pthread_cond_t cond; NODE_S stHead;}LIST_S;LIST_S g_stMsgList = {};void list_Init(LIST_S *pstList){ pstList->stHead.pstNextNode = NULL; pthread_mutex_init(&pstList->mutex, NULL); pthread_cond_init(&pstList->cond, NULL);}/*工作线程用于处理并打印消息*/void * dealMsgThread(void * pVoid){ NODE_S *pstNode = NULL; int count = 0; while (1) { pthread_mutex_lock(&g_stMsgList.mutex); /* 查找链表中的节点的头节点返回 */ if ( NULL == g_stMsgList.stHead.pstNextNode ) { printf("can not find msg,waiting...\n"); pthread_cond_wait(&g_stMsgList.cond, &g_stMsgList.mutex); printf("notify msg, wake up\n"); } if ( NULL != g_stMsgList.stHead.pstNextNode ) { pstNode = g_stMsgList.stHead.pstNextNode; g_stMsgList.stHead.pstNextNode = pstNode->pstNextNode; printf("echo: %s\n", pstNode->szMsg); free(pstNode); count++; } pthread_mutex_unlock(&g_stMsgList.mutex); if ( count >= 10 ) { break; } } return NULL;}/*向消息队列中放入节点,并通知等待的工线程*/void receiveMsg(char *pcMsg, int iLen){ NODE_S *pstNode = NULL; NODE_S* pstTemp = NULL; if ( iLen > MSG_LEN_MAX ) { iLen = MSG_LEN_MAX; } pstNode = (NODE_S*)malloc(sizeof(NODE_S)); memset(pstNode, 0, sizeof(NODE_S)); snprintf(pstNode->szMsg, MSG_LEN_MAX+1, "%s", pcMsg); /* 获取锁 */ pthread_mutex_lock(&g_stMsgList.mutex); pstTemp = &g_stMsgList.stHead; while ( pstTemp ) { if ( NULL == pstTemp->pstNextNode ) { break; } pstTemp = pstTemp->pstNextNode; } pstTemp->pstNextNode = pstNode; printf("recieve msg %s add list, send signal\n", pcMsg); /* 发送通知到工作线程 */ pthread_cond_signal(&g_stMsgList.cond); pthread_mutex_unlock(&g_stMsgList.mutex); return;}/*main函数*/int main(int argc, char **argv){ pthread_t iThreadId; void *ret = NULL; char szMsg[MSG_LEN_MAX+1]; list_Init(&g_stMsgList); pthread_create(&iThreadId, NULL, dealMsgThread, NULL); //sleep(1); for(int i =0 ; i < 10; i++) { sprintf(szMsg, "%d : hello", i); receiveMsg(szMsg, strlen(szMsg)); } pthread_join(iThreadId, &ret); return 0;}
-
输出结果:
recieve msg 0 : hello add list, send signal
recieve msg 1 : hello add list, send signal recieve msg 2 : hello add list, send signal recieve msg 3 : hello add list, send signal recieve msg 4 : hello add list, send signal recieve msg 5 : hello add list, send signal recieve msg 6 : hello add list, send signal recieve msg 7 : hello add list, send signal echo: 0 : hello echo: 1 : hello echo: 2 : hello echo: 3 : hello echo: 4 : hello echo: 5 : hello echo: 6 : hello echo: 7 : hello can not find msg,waiting... recieve msg 8 : hello add list, send signal recieve msg 9 : hello add list, send signal notify msg, wake up echo: 8 : hello echo: 9 : hello
条件变量函数内部实现猜想
1.条件变量内部猜想一:条件变量pthread_cond_timedwait函数内部对于条件变量本身还存在一个锁。
这个锁的用途就是保证条件变量挂起和释放锁是一个原子操作。
设想场景,如果进入pthread_cond_timedwait函数之后,先释放mutex(必须执行释放操作,前文提到),之后再进入等待挂起之前,cpu切换到执行pthread_cond_signal线程,此时由于没有等待线程,从而不会有任何影响,这就导致后续进入wait态的工作线程永远等不到被接收线程唤醒。所以为保证释放和进入wait态不能被打断,所以需要加锁保护。
当然不只是猜想,通过一篇内核态对于pthread_cond_timedwait函数的分析也证实了这一点。链接:
条件变量使用注意事项
条件变量使用前初始化,且初始化一次。
条件变量使用pthread_cond_timedwait类函数时需要在获取mutex锁和释放mutex锁之间。
使用pthread_cond_timedwait函数被唤醒后仍然需要判断队列中的状态,因为可能被其他线程首先抢占了mutex并处理了消息队列消息。即等待的条件变量失效,需要重新等待。
-
条件变量使用pthread_cond_signal类函数时可以在获取mutex锁和释放mutex锁之间,也可以在获取mutex锁和释放mutex锁之后。但建议在获取mutex锁和释放mutex锁之间。
-
pthread_cond_signal函数在mutex lock与unlock之间执行。
缺点:可能导致pthread_cond_wait线程执行后重新进入休眠,因为wait线程需要获取mutex锁,但此时signal线程可能并没有释放,导致频繁的cpu切换。
-
pthread_cond_signal函数在mutex lock,unlock之后执行。
缺点:先unlock操作之后此时低优先级任务可能会占用cpu资源导致wait的高优先级任务得不到调度。因为wait的函数还没有收到signal信号唤醒。
-