线程同步
问题引入:
2个线程同时修改一个变量x = 1
时,如果不进行同步会怎么样?
假设进行增加操作,增量操作通常分解为3步
- 从内存将变量读入寄存器(cpu)
- 在寄存器中对变量做增量操作
- 写回内存
那么在线程A对x
进行增量操作时,如果不加锁,那么最终结果取决与线程B开始增量操作时获取的值,
互斥量
- 函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);
pthread_mutex_destroy(pthread_mutex_t *mutex);
pthread_mutex_lock(pthread_mutex_t *mutex);
pthread_mutex_unlock(pthread_mutex_t *mutex);
pthread_mutex_trylock(pthread_mutex_t *mutex);
|
- 实例:多线程环境下访问动态分配的对象中,嵌入引用计数,避免对象内存空间不会被释放,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| #include "apue.h" #include <pthread.h>
struct foo { int f_count; pthread_mutex_t f_lock; int f_id; int f_use; }; struct foo * foo_alloc(int id) {
struct foo *fp;
if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = id; if (pthread_mutex_init(&fp->f_lock,NULL) != 0) { free(fp); return NULL; } } return fp; }
void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); }
void foo_rele(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); if (--fp->f_count == 0) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); printf("end\n"); free(fp); }
pthread_mutex_unlock(&fp->f_lock); }
void * func(void *arg) { return ((void *)0); }
int main(int argc,char *argv[]) { pthread_t tid[maxn]; struct foo *p = foo_alloc(99); for (int i=0; i<maxn; i++) pthread_create(&tid[i],NULL,func,(void *)p);
for (int i=0; i<maxn; i++) { pthread_join(tid[i],NULL); } free(p); exit(0); }
|
类似c++
类的构造和析构,且要在临界区中更新计数,但此段程序仍有问题: 还剩2个线程时,如果线程A进入foo_rele
此时引用计数- 1 等于 0 且线程B因为mutex
被锁住而被阻塞,那么内存被释放是不对的
死锁
- 线程对同一
mutex
加锁2次,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| #include "apue.h" #include <pthread.h>
pthread_mutex_t mutex;
void * func(void *arg) { pthread_mutex_lock(&mutex); pthread_mutex_lock(&mutex); }
int main(int argc,char *argv[]) { pthread_t tid; void *ret;
pthread_create(&tid,NULL,func,NULL); pthread_join(tid,&ret); exit(0); }
|
- 线程A给
mutex_A
加锁后试图给mutex_B
加锁,但线程B已经给mutex_B
加锁且想占有mutex_A
,2线程都想请求对方的资源,导致死锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| #include "apue.h" #include <pthread.h>
pthread_mutex_t mutex1,mutex2;
void * func1(void *arg) { pthread_mutex_lock(&mutex1);
sleep(2);
pthread_mutex_lock(&mutex2); }
void * func2(void *arg) { pthread_mutex_lock(&mutex2); sleep(1.5); pthread_mutex_lock(&mutex1); } int main(int argc,char *argv[]) { pthread_t tid1,tid2; pthread_create(&tid1,NULL,func1,NULL); pthread_create(&tid2,NULL,func2,NULL);
pthread_join(tid1,NULL); pthread_join(tid2,NULL); exit(0); }
|
防止死锁规律
- 多线程按照相同的顺序对互斥量进行加锁就不会死锁(可能其他资源导致死锁,不讨论
- 如果一个线程加锁的顺序和另一个线程加锁的顺序相反那么一定会导致死锁
tips:结构体中的互斥量能够保护整个结构的成员
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
| #include "apue.h" #include <pthread.h>
#define NHASH 29 #define HASH(id) (((unsigned long)id)%NHASH)
struct foo *fh[NHASH];
pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo { int f_count; int f_id; pthread_mutex_t f_lock; struct foo *f_next; };
struct foo * foo_alloc(int id) { struct foo *fp; int index; if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = id; if (pthread_mutex_init(&fp->f_lock,NULL) != 0) { free(fp); return NULL; } index = HASH(id); pthread_mutex_lock(&hashlock); fp->f_next = fh[index]; fh[index] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock);
pthread_mutex_unlock(&fp->f_lock); } return fp; }
void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); }
struct foo * foo_find(int id) { struct foo *fp; pthread_mutex_lock(&hashlock); for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) { if (fp->f_id == id) { foo_hold(fp); break; } } pthread_mutex_unlock(&hashlock); return fp; }
void foo_rele(struct foo *fp) { struct foo *tfp; int index;
pthread_mutex_lock(&fp->f_lock); if (fp->f_count == 1) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_lock(&hashlock); pthread_mutex_lock(&fp->f_lock);
if (fp->f_count != 1) { fp->f_count--; pthread_mutex_unlock(&fp->f_lock); pthread_mutex_unlock(&hashlock); return ; }
index = HASH(fp->f_id); tfp = fh[index]; if (tfp == fp) fh[index] = fp->f_next; else { while (tfp->f_next != fp) tfp = tfp->f_next; tfp->f_next = fp->f_next; } pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else { fp->f_count--; pthread_mutex_unlock(&fp->f_lock); } }
int main(int argc,char *argv[]) { exit(0); }
|
改进
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| #include <stdlib.h> #include <pthread.h>
#define NHASH 29 #define HASH(id) (((unsigned long)id)%NHASH) struct foo *fh[NHASH]; pthread_mutex_t hashlock = PTHREAD_MUTEX_INITIALIZER;
struct foo { int f_count; pthread_mutex_t f_lock; int f_id; struct foo *f_next; };
struct foo * foo_alloc(int id) { struct foo *fp; int index;
if ((fp = malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = id; if (pthread_mutex_init(&fp->f_lock,NULL) != 0) { free(fp); return NULL; } index = HASH(id); pthread_mutex_lock(&hashlock); fp->f_next = fh[index]; fh[index] = fp; pthread_mutex_lock(&fp->f_lock); pthread_mutex_unlock(&hashlock); pthread_mutex_unlock(&fp->f_lock); } return fp; }
void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock); }
struct foo * foo_find(int id) { struct foo *fp; pthread_mutex_lock(&hashlock); for (fp = fh[HASH(id)]; fp != NULL; fp = fp->f_next) { if (fp->f_id == id) { fp->f_count++; break; } } pthread_mutex_unlock(&hashlock); return fp; }
void foo_rele(struct foo *fp) { struct foo *tfp; int index; pthread_mutex_lock(&hashlock); if (--fp->f_count == 0) { index = HASH(fp->f_id); tfp = fh[index]; if (tfp == fp) fh[index] = fp->f_next; else { while (tfp->f_next != fp) tfp = tfp->f_next; tfp->f_next = fp->f_next; } pthread_mutex_unlock(&hashlock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else pthread_mutex_unlock(&hashlock); }
int main(int argc,char *argv[]) { exit(0); }
|
pthread_mutex_timedlock
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,const struct timespec *restrict tsptr);
尝试获得一个已经加锁的互斥量,在允许的绝对时间前阻塞,直到该互斥量的锁释放,否则超过时间返回ETIMEDOUT
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| #include "apue.h" #include <pthread.h> int main(int argc,char *argv[]) { int err; struct timespec tout; struct tm *tmp; char buf[64]; pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_lock(&lock); printf("mutex is locked\n"); clock_gettime(CLOCK_REALTIME,&tout); tmp = localtime(&tout.tv_sec); strftime(buf,sizeof(buf),"%r",tmp); printf("current time is %s\n",buf); tout.tv_sec += 10;
sleep(5); pthread_mutex_unlock(&lock); err = pthread_mutex_timedlock(&lock,&tout); clock_gettime(CLOCK_REALTIME,&tout); tmp = localtime(&tout.tv_sec); strftime(buf,sizeof(buf),"%r",tmp); printf("the time is now %s\n",buf);
if (err == 0) printf("mutex locked again!\n"); else printf("can't lock mvtex again:%s\n",strerror(err)); exit(0); }
|
读写锁
- 原理
共享互斥锁(shared-exclusive-lock
),有3种状态:
- 读模式加锁
- 写模式加锁
- 不加锁
- 读模式加锁就是共享状态,多个线程可以同时占有锁(os实现可能对线程个数有限制,所以要检查
pthread_rwlock_rdlock()
的返回值)
- 写模式加锁就是互斥,只能有一个线程占用锁,
- 读写锁是写模式加锁状态时,在这个锁解锁前,无论是以读还是写的方式试图对这个锁加锁的线程都将阻塞
- 读写锁是读模式加锁状态时,以读方式对读写锁加锁的线程可以获得访问权,但是以写的方式对读写锁加锁的线程会被阻塞(直到所有线程释放读锁为止) —-> 如果此时又有线程以读方式加锁,如果该线程不被阻塞,岂不是以写方式对读写锁加锁的线程一直处于饥饿状态,所以 读写锁会阻塞随后的读模式锁请求,避免读模式锁长期占用,而写模式锁请求等不到满足,
- 适用场景
对数据结构读的次数远大于写的次数
- 使用
初始化与pthread_mutex_t
相同,静态分配的读写锁初始化可使用PTHREAD_RWLOCK_INITIALIZER
,也可使用pthread_rwlock_init()
初始化,动态分配的读写锁在回收内存前要释放资源pthread_rwlock_destroy()
,
1 2 3
| >pthread_rwlock_rdlock(pthread_rwlock_t *rwlock); >pthread_rwlock_wrlock(pthread_rwlock_t *rwlock); >pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108
| #include "apue.h" #include <pthread.h>
struct job { struct job *j_next; struct job *j_prev; pthread_t j_id; };
struct queue { struct job *q_head; struct job *q_tail; pthread_rwlock_t q_lock; };
int queue_init(struct queue *que) { int err; que->q_head = NULL; que->q_tail = NULL; err = pthread_rwlock_init(&que->q_lock,NULL); if (err != 0) return err; return 0; }
void job_insert(struct queue *que,struct job *jp) { pthread_rwlock_wrlock(&que->q_lock); jp->j_next = que->q_head; jp->j_prev = NULL; if (que->q_head != NULL) que->q_head->j_prev = jp; else que->q_tail = jp; que->q_head = jp;
pthread_rwlock_unlock(&que->q_lock); }
void job_append(struct queue *que,struct job *jp) { pthread_rwlock_wrlock(&que->q_lock); jp->j_next = NULL; jp->j_prev = que->q_tail; if (que->q_tail != NULL) que->q_tail->j_next = jp; else que->q_head = jp; que->q_tail = jp; pthread_rwlock_unlock(&que->q_lock); }
void job_remove(struct queue *que,struct job *jp) { pthread_rwlock_wrlock(&que->q_lock);
if (jp == que->q_head) { que->q_head = jp->j_next; if (que->q_tail == jp) que->q_tail == NULL; else jp->j_next->j_prev = jp->j_prev; } else if (jp == que->q_tail) { que->q_tail = jp->j_prev; jp->j_prev->j_next = jp->j_next; } else { jp->j_prev->j_next = jp->j_next; jp->j_next->j_prev = jp->j_prev; } pthread_rwlock_unlock(&que->q_lock); }
struct job * job_find(struct queue *que,pthread_t id) { struct job *jp;
if (pthread_rwlock_rdlock(&que->q_lock) != 0) return NULL;
for (jp = que->q_head; jp != NULL; jp = jp->j_next) if (pthread_equal(id,jp->j_id)) break; pthread_rwlock_unlock(&que->q_lock); return jp; }
int main(int argc,char *argv[]) { exit(0); }
|
条件变量
线程通过共享全局变量进行同步,条件变量分为条件状态和变量(用来通信)pthread_cond_t
条件由互斥量来保护,,线程在改变条件状态前先锁住互斥量,pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex)
,然后此函数把调用线程放入等待条件的线程列表(进程挂起),对互斥量解锁,等pthread_cond_wait
返回后,互斥量再次被锁住
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95
| #include <stdio.h> #include <unistd.h> #include <sys/time.h> #include <stdlib.h> #include <pthread.h>
void maketimeout(struct timespec *tsp,long minutes) { struct timeval now; gettimeofday(&now,NULL); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; tsp->tv_sec += minutes * 60; }
struct msg { struct msg *m_next; };
struct msg *workq;
pthread_cond_t qready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;
void process_msg(void) { struct msg *mp; for (;;) { pthread_mutex_lock(&qlock); while (workq == NULL) pthread_cond_wait(&qready,&qlock); mp = workq; workq = mp->m_next; printf("proc msg \n"); pthread_mutex_unlock(&qlock); } }
void enqueue_msg(struct msg *mp) { pthread_mutex_lock(&qlock); mp->m_next = workq; workq = mp; printf("create \n"); pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready);
}
void * func1(void *arg) { process_msg(); return (void *)0; }
void * func2(void *mp) { enqueue_msg(mp); return (void *)0; }
const int maxn = 100;
int main(int argc,char *argv[]) { pthread_t tid[maxn]; struct msg* p[maxn]; for (int i=0; i<maxn; i++) p[i] = (struct msg *)malloc(sizeof(struct msg)); pthread_t tid1; pthread_create(&tid1,NULL,func1,NULL); for (int i=0; i<maxn; i++) pthread_create(&tid[i],NULL,func2,(void *)p[i]); for (int i=0; i<maxn; i++) pthread_join(tid[i],NULL); for (int i=0; i<maxn; i++) free(p[i]); pthread_join(tid1,NULL); exit(0); }
|
自旋锁(类似互斥量)
- 非抢占式内核中,由于中断处理程序不能抢占已经有锁的线程,所以中断不会导致死锁
- 单核/单cpu环境下使用自旋锁没有用,因为线程A获得锁以后,线程B一直占用CPU使得锁不能释放,知道线程B的时间片用完,
- 获得自旋锁之前一直处于忙等待状态(占用CPU),所以自旋锁被持有的时间短,且线程不希望进行睡眠or唤醒的线程调度
- 尝试获取互斥量可能会先自旋一会(避免线程调度,看具体实现)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| #include "apue.h" #include <pthread.h> pthread_spinlock_t lock;
void * func1(void *arg) { printf("func1 before\n"); pthread_spin_lock(&lock); printf("func1 中间\n"); sleep(1);
printf("func1 end\n"); pthread_spin_unlock(&lock); return (void *)0; }
void * func2(void *arg) { printf("func2 before\n"); pthread_spin_lock(&lock); printf("func2 sleep\n"); sleep(10); printf("func2 end\n"); pthread_spin_unlock(&lock); return (void *)0; }
int main(int argc,char *argv[]) { pthread_spin_init(&lock,PTHREAD_PROCESS_SHARED); pthread_t tid1,tid2;
pthread_create(&tid1,NULL,func1,NULL); pthread_create(&tid2,NULL,func2,NULL);
pthread_join(tid1,NULL); pthread_join(tid2,NULL); pthread_spin_destroy(&lock); exit(0); }
|
屏障
- 多线程并行工作的同步机制,允许每个线程等待,直到所有的合作线程都达到某一点,然后从该点继续执行,(意思是等待一些线程在某点之前必须都执行完毕
pthread_join()
就是一种简单的屏障,等待指定的线程执行完毕
- 使用屏障允许任意数量的线程等待,直到所有线程处理完毕
- 初始化相比其他几个同步少了宏定义,且初始化要声明任意线程的数量(记得主线程
1 2 3 4 5
| > >pthread_barrier_init(pthread_barrier_t *barrier, const pthread_barrierattr_t *attr, unsigned int count); >pthread_barrier_destroy(pthread_barrier_t *barrier);
|
pthread_barrier_wait()
表明此线程完成工作,等待其他线程赶上来
- 调用此函数的线程在屏蔽计数为满足时,线程进入休眠状态,最后一个调用的线程会唤醒所有此类线程(注意返回值)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101
| #include "apue.h" #include <pthread.h> #include <limits.h> #include <sys/time.h>
#define NTHR 8 #define NUMNUM 8000000L #define TNUM (NUMNUM/NTHR)
long nums[NUMNUM]; long snums[NUMNUM];
pthread_barrier_t b;
#ifdef SOLARIS #define heapsort qsort #else extern int heapsort(void *,size_t,size_t,int (*)(const void *,const void *)); #endif
int complong(const void *arg1,const void *arg2) { long l1 = *(long *)arg1; long l2 = *(long *)arg2;
if (l1 == l2) return 0; else if (l1 < l2) return -1; else return 1; }
void * thr_fn(void *arg) { long index = (long)arg; heapsort(&nums[index],TNUM,sizeof(long),complong); pthread_barrier_wait(&b); return (void *)0; }
void merge() { long index[NTHR]; long i,minindex,sindex,num;
for (i = 0; i < NTHR; i++) index[i] = i * TNUM; for (sindex = 0; sindex < NUMNUM; sindex++) { num = LONG_MAX; for (i = 0; i < NTHR; i++) { if ((index[i] < (i+1)*TNUM) && (nums[index[i]] < num)) { num = nums[index[i]]; minindex = i; } } snums[sindex] = nums[index[minindex]]; index[minindex]++; } }
int main(int argc,char *argv[]) { unsigned long i; struct timeval start,end; long long startusec,endusec; double elapsed; int err; pthread_t tid; srandom(1); for (i = 0; i < NUMNUM; i++) nums[i] = random(); gettimeofday(&start,NULL); pthread_barrier_init(&b,NULL,NTHR+1); for (i = 0; i < NTHR; i++) { err = pthread_create(&tid,NULL,thr_fn,(void *)(i * TNUM)); if (err != 0) err_exit(err,"can't create thread"); } pthread_barrier_wait(&b); merge(); gettimeofday(&end,NULL);
startusec = start.tv_sec * 1000000 + start.tv_usec; endusec = end.tv_sec * 1000000 + end.tv_usec; elapsed = (double)(endusec - startusec) / 1000000.0; printf("sort took %.4f seconds\n",elapsed); exit(0); }
|