博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Unix pthread
阅读量:6257 次
发布时间:2019-06-22

本文共 13734 字,大约阅读时间需要 45 分钟。

线程概念

每个线程都包括线程ID、一组寄存器值、栈、调度优先级、策略、信号屏蔽字、errno变量、线程私有数据。使用_POSIX_THREADS来测试是否支持这个功能,使用_SC_THREADS运行时确定,都需要添加#include <pthread.h>,对于pthread库的函数成功返回0,错误返回错误编号。

线程标识

使用thread_t来标识一个线程,在不同系统中实现不一样,需要使用函数来比较。

int pthread_equal(pthread_t t1, pthread_t t2);// 相等返回非0值复制代码

获取自身线程pthread_t

pthread_t pthread_self(void);复制代码

通过线程pthread_t来分配任务如图所示:

创建线程

int pthread_create(pthread_t *restrict tidp,                   cosnt pthread_attr_t *restrict attr,                   void *(start_rtn)(void *), void *restrict arg);// 失败时返回错误码复制代码

attr用于定制线程的属性,为NULL时是默认属性。新创建的线程从start_rtn开始运行,将参数放到结构体中,通过void *restrict arg传递。

例子:获取线程id

#include "../include/apue.h"#include 
pthread_t ntid;void printids(const char *s){ pid_t pid; pthread_t tid; pid = getpid(); tid = pthread_self(); printf("%s pid:%lu, tid:%lu\n", s, pid, tid);}void *thread_func(void * arg){ printids("new~~~"); sleep(1); return (void*)0;}int main(void) { int err; err = pthread_create(&ntid, NULL, thread_func, NULL); // 第二个是pthread线程参数, 第四个是函数参数 if (err != 0) { err_exit(err, "can't create thread"); } printids("main~~~"); // sleep(1); exit(0);}复制代码

如果新线程睡眠1s,然后主线程退出就不会输出新线程了。如果主线程睡眠1s,则两个线程的进程号相同。在Linux里输出如下:

main~~~ pid:8081, tid:139790962181888new~~~ pid:8081, tid:139790953903872复制代码

线程终止

在任意线程中调用exit、_Exit、_exit都会将整个进程终止。线程终止的方法有三种:

  1. 从线程启动函数中返回
  2. 线程被同一进程中的其他线程取消
  3. 线程调用pthread_exit
void pthread_exit(void * rval_ptr);// rval_ptr指向返回值复制代码

也就是把需要返回的状态传进去,用于和等待的线程通信。可以使用pthread_join来等待指定线程完成

#include 
int pthread_join(pthread_t thread, void **rval_ptr);复制代码

如果被等待的线程返回,rval_ptr包含返回码,如果线程被取消rval_ptr指定内存单元设置为PTHREAD_CANCELED

例子:获取已终止线程退出码

#include "../include/apue.h"#include 
void printids(const char *s){ pid_t pid; pthread_t tid; pid = getpid(); tid = pthread_self(); printf("%s pid:%lu, tid:%lu\n", s, (long unsigned)pid, (long unsigned)tid);}void *return_thread(void * arg){ printids("thread returning~~~"); return (void*)0;}void *exit_thread(void * arg){ printids("thread exiting~~~"); pthread_exit((void*) 2); // 参数可以返回结构体,但是这个结构体必须返回后还能使用(不是在栈上分配)}int main(void) { int err; pthread_t tid1, tid2; void * rVal; err = pthread_create(&tid1, NULL, return_thread, NULL); // 第二个是pthread线程参数, 第四个是函数参数 if (err != 0) { err_exit(err, "can't create thread"); } err = pthread_create(&tid2, NULL, exit_thread, NULL); // 第二个是pthread线程参数, 第四个是函数参数 if (err != 0) { err_exit(err, "can't create thread"); } pthread_join(tid1, &rVal); printf("return_thread return:%ld\n", (long)rVal); pthread_join(tid2, &rVal); printf("exit_thread return:%ld\n", (long)rVal); printids("main~~~"); // sleep(1); exit(0);}复制代码

程序输出结果如下:

thread returning~~~ pid:5832, tid:139867556669184thread exiting~~~ pid:5832, tid:139867548276480return_thread return:0exit_thread return:2main~~~ pid:5832, tid:139867564947200复制代码

取消其他线程

int pthread_cancel(pthread_t tid);复制代码

pthrea_cancel不等待线程终止,而是提出请求。

线程清理函数

线程安排自己的退出函数,多个清理函数会注册到栈中,按找栈里顺序执行。

void pthread_cleanup_push(void (*rtn)(void *), void *arg);void pthread_cleanup_pop(int execute);复制代码

触发时机:

  • 调用pthread_exit
  • 响应取消请求时
  • 使用非零execute参数调用pthread_cleanup_pop,使用pthread_cleanup_pop(0)不会调用清理函数,只是删除清理函数。

例子:线程清理

输出:

thread 1 start upthread 2 start upthread1 return:1clean up in thread2 second handlerclean up in thread2 first handlerthread2 return:2main~~~ pid:10122, tid:140514088802048复制代码

只有第二个线程的清理函数被调用,这是因为系统正常终止是不会调用清理函数,即return结束

线程与进程对比

进程原语 线程原语 描述
fork pthread_create 创建新的控制流
exit pthread_exit 从现有控制流退出
waitpid pthread_join 从控制流中得到退出状态
atexit pthread_cleanup_push 注册在退出时调用的函数
getpid pthread_self 获取控制流的ID
abort pthread_cancel 请求控制流的非正常退出

分离线程

int pthread_detach(pthread_t tid);复制代码

线程同步

当线程B在线程A的读写间隔中读取数据就会出现不一致的值:

在存储操作需要多个总线周期时:

互斥变量

互斥变量本质是一把锁。对互斥量加锁后,任何试图再次对互斥量加锁的线程都会被阻塞。释放互斥量后,其他阻塞的线程变为可运行状态,第一个变为可运行状态的线程对互斥量加锁,其他变量依然变为阻塞。

互斥变量使用pthrea_mutex_t数据表示,使用前必须初始化,可以设置为pthread_mutex_t t = PTHREAD_MUTEX_INITIALIZER;用于静态初始化互斥量。

int pthread_mutex_init(pthread_mutex_t *restrict mutex,                       const pthread_mutexattr_t *restrict attr);// 使用函数初始化int pthread_mutex_destroy(pthread_mutex_t *mutex); // 使用malloc动态生成的,需要desotry函数销毁复制代码

加锁与解锁操作:

int pthread_mutex_lock(pthread_muex_t *mutex);int pthread_mutex_trylock(pthread_muex_t *mutex); // 线程不希望被阻塞,就使用trylock,成功返回0, 失败返回EBUSYint pthread_mutex_unlock(pthread_muex_t *mutex);复制代码

例子:互斥锁

#include "../include/apue.h"#include 
struct foo {
int f_count; pthread_mutex_t f_lock; int f_id;};struct foo * foo_alloc(int id) { struct foo *fp; if ((fp=malloc(sizeof(struct foo))) != NULL) { fp->f_count = 1; fp->f_id = id; if (pthread_mutex_init(&fp->f_lock, NULL) != 0) { free(fp); return (NULL); } } return fp;}void foo_hold(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); fp->f_count++; pthread_mutex_unlock(&fp->f_lock);}void foo_release(struct foo *fp) { pthread_mutex_lock(&fp->f_lock); if (--fp->f_count == 0) { pthread_mutex_unlock(&fp->f_lock); pthread_mutex_destroy(&fp->f_lock); free(fp); } else { printf("id:%d count:%d\n", fp->f_id, fp->f_count); pthread_mutex_unlock(&fp->f_lock); }}void * thread(void* arg) { struct foo* f = (struct foo*)arg; foo_hold(f);}int main(int argc, char const *argv[]){ pthread_t tid; struct foo* f = foo_alloc(12); pthread_create(&tid, NULL, thread, (void*)f); pthread_create(&tid, NULL, thread, (void*)f); pthread_join(tid, NULL); foo_release(f); foo_release(f); return 0;}复制代码

可以看到结果中第一次释放foo时count为2,每次运行都是。

id:12 count:2id:12 count:1复制代码

避免死锁

当线程对同一互斥量加锁两次时就会死锁。通过仔细控制互斥量加锁顺序来避免死锁发生。另一种方法: 如果已经占有某些锁,则使用pthread_mutex_trylock,如果成功则继续,如果失败则释放锁,做好清理工作,等待一段时间后再试试。

当程序师徒获取一个已加锁的互斥量时,pthread_mutex_timedlock互斥量原语绑定线程阻塞的时间。到达超时时间后pthread_mutex_timedlock不会对互斥量加锁而是返回错误码ETIMEDOUT

#include 
#include
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex, const struct timespec *restrict tsptr);复制代码

#include <time.h>中的timespec使用秒和纳秒描述时间。

struct timespec{
__time_t tv_sec; /* Seconds. */ __syscall_slong_t tv_nsec; /* Nanoseconds. */ };复制代码

#include <time.h>中的tm表示年月日星期等。

struct tm{
int tm_sec; /* Seconds. [0-60] (1 leap second) */ int tm_min; /* Minutes. [0-59] */ int tm_hour; /* Hours. [0-23] */ int tm_mday; /* Day. [1-31] */ int tm_mon; /* Month. [0-11] */ int tm_year; /* Year - 1900. */ int tm_wday; /* Day of week. [0-6] */ int tm_yday; /* Days in year.[0-365] */ int tm_isdst; /* DST. [-1/0/1]*/}复制代码

例子:pthread_mutex_timedlock阻塞时间

#include "../include/apue.h"#include 
void printTime() { char buf[64]; struct timespec tout; struct tm* tmp; clock_gettime(CLOCK_REALTIME, &tout); tmp = localtime(&tout.tv_sec); strftime(buf, sizeof(buf), "%r", tmp); printf("current time is %s\n", buf);}int main(int argc, char const *argv[]){ int err; struct timespec tout; struct tm *tmp; pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; printTime(); pthread_mutex_lock(&lock); printf("mutex is lock\n"); clock_gettime(CLOCK_REALTIME, &tout); tout.tv_sec += 10; pthread_mutex_timedlock(&lock, &tout); printTime(); return 0;}复制代码

因为已经获得了lock的锁,再次用pthread_mutex_lock锁住会导致死锁,使用pthread_mutex_timedlock只会阻塞指定时间。 输出结果如下,只阻塞了10秒

current time is 06:55:12 PMmutex is lockcurrent time is 06:55:22 PM复制代码

读写锁(共享互斥锁)

读写锁有三种状态:读模式下加锁状态、写模式下加锁状态、不加锁状态。一次只有一个线程占有写模式的读写锁,但是多个线程可以同时占有读模式的读写锁

  1. 当写锁已加锁时,试图对其加锁会使线程阻塞
  2. 在读锁已加锁时,试图对其加读锁的线程获得访问权,若对其加写锁会使线程阻塞,直到所以线程释放读锁
  3. 读写锁适用于对数据结构读取次数远大于写入的情况

读写锁必须在使用前初始化、使用后释放内存前销毁

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,                        const pthread_rwlockattr_t *restrict attr);// 初始化函数,如果读写锁默认属性则传入null给attrint pthread_rwlock_destroy(pthread_rwlock_t * rwlock);// 在free前调用pthread_rwlock_t lock = PTHREAD_RWLOCK_INITIALIZER;  // 静态初始化(Signle UNIX Specification)int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);    //读加锁int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);    // 写加锁int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);    // 解锁int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock); // 条件版本(Signle UNIX Specification)int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock); // 条件版本(Signle UNIX Specification)// 获得锁是返回0,否则返回错误EBUSY复制代码

例子:读写锁(共享互斥锁)

作业队列,使用单个读写锁保护队列,插入、删除会尝试给队列加写锁,查找队列时会给队列加读锁。

#include "../include/apue.h"#include 
struct job {
struct job *j_next; struct job *j_prev; pthread_t j_id; // 哪一个线程处理这个任务};struct queue {
struct job *q_head; struct job *q_tail; pthread_rwlock_t q_lock;};int queue_init(struct queue *qp) { int err; qp->q_head = NULL; qp->q_tail = NULL; err = pthread_rwlock_init(&qp->q_lock); if (err != 0) { return err; } return 0;}// 从队列后面插入jobvoid job_insert_tail(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); jp->j_next = NULL; jp->j_prev = qp->q_tail; if (qp->q_tail != NULL) { qp->q_tail->j_next = jp; } else { qp->q_head = jp; // 链表为空 } qp->q_tail = jp; pthread_rwlock_unlock(&qp->q_lock);}// 从队列前面插入jobvoid job_insert_front(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); jp->j_next = qp->q_head; jp->j_prev = NULL; if (qp->q_head != NULL) { qp->q_head->j_prev = jp; } else { qp->q_tail = jp; // 链表为空 } qp->q_head = jp; pthread_rwlock_unlock(&qp->q_lock);}// 从队列中删除jobvoid job_remove(struct queue *qp, struct job *jp) { pthread_rwlock_wrlock(&qp->q_lock); if (jp == qp->q_head) { qp->q_head = jp->j_next; if (jp == qp->q_tail) { qp->q_tail = NULL; } else { jp->j_next->j_prev = jp->j_prev; } } else if (jp == qp->q_tail) { jp->j_prev->j_next = jp->j_next; qp->q_tail = jp->j_prev; } else { jp->j_prev->j_next = jp->j_next; jp->j_next->j_prev = jp->j_prev; } pthread_rwlock_unlock(&qp->q_lock);}// 通过线程id查找某个任务struct job* job_find(struct queue *qp, pthread_t id) { struct job* jp; if (pthread_rwlock_rdlock(&qp->q_lock) != 0) { return NULL; } for (jp = qp->q_head; jp != NULL; jp = jp->j_next) { if (pthread_equal(jp->j_id, id)) { printf("Find you!\n"); break; } } pthread_rwlock_unlock(&qp->q_lock); return jp;}复制代码

每次只能有一个写锁,所以对于job结构体不需要对它加锁。

带有超时的读写锁(Single UNIX Specification)

int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,                               const struct timespec *restrict tsptr);int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,                               const struct timespec *restrict tsptr);复制代码

超时会返回ETIMEDOUT

条件变量

条件变量有互斥量保护,线程在改变条件状态之前首先锁住互斥量,锁住后计算条件。

// 1.动态初始化int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);`// 2.静态初始化pthread_cond_t cond = PTHREAD_COND_INITIALIZER;`// 销毁方式:int pthread_cond_destroy(pthread_cond_t *cond);// 等待条件变量变为真int pthread_cond_wait(pthread_cond_t *restrict cond,                       pthread_mutex_t *restrict mutex);int pthread_cond_timedwait(pthread_cond_t *restrict cond,                       pthread_mutex_t *restrict mutex,                      const struct timespec *restrict tsptr);复制代码

使用互斥量对条件进行保护,调用者把锁住的互斥量传给函数,函数然后自动把调用线程放到等待条件的线程列表中,然后对互斥量解锁。

等待时间使用的是绝对时间,不是之前的时间差而是将未来时间传入,使用clock_gettime获得timespec表示的当前时间,也可以通过gettimeofday获得timeval结构表示的当前时间,再转换为timespec,函数如下所示:

#include 
#include
// 以分钟作为时间间隔void maketimeout(struct timespec *tsp, long minutes) { struct timeval now; gettimeofday(&now, NULL); tsp->tv_sec = now.tv_sec; tsp->tv_nsec = now.tv_usec * 1000; tsp->tv_sec += minutes * 60;}复制代码

pthread_cond_waitpthread_cond_timedwait调用成功返回时,线程需要重新计算条件。pthread_cond_signal能唤醒至少一个睡眠线程,pthread_cond_broadcast能唤醒所以等待而睡眠的线程。

int pthread_cond_signal(pthread_cond_t *cnd);int pthread_cond_broadcast(pthread_cond_t *cond);复制代码

必须在改变条件状态之后再给线程发送信号

例子:使用条件变量和互斥量进行线程同步

#include "../include/apue.h"#include 
#define WORKS_NUMS 10struct msg{
struct msg *m_next; char message[64];};struct msg *workq;pthread_cond_t qready = PTHREAD_COND_INITIALIZER;pthread_mutex_t qlock = PTHREAD_MUTEX_INITIALIZER;void process_msg(char* name){ struct msg *mp; pthread_mutex_lock(&qlock); while (workq == NULL) pthread_cond_wait(&qready, &qlock); // 将锁住的互斥量传入,pthread_cond_wait会将线程放入等待队列中,然后解锁qlock, // 此时阻塞在pthread_cond_wait,当被pthread_cond_signal或pthread_cond_broadcast // 唤醒后此时qlock会锁住,如果workq==NULL,就会再次等待 mp = workq; workq = workq->m_next; pthread_mutex_unlock(&qlock); printf("[process-%s]:%s\n", name, mp->message);}void enqueue_msg(struct msg *mp){ pthread_mutex_lock(&qlock); // 条件workq是由互斥量mutex保护 mp->m_next = workq; // 修改条件这个操作需要保持一致 workq = mp; pthread_mutex_unlock(&qlock); pthread_cond_signal(&qready); // 修改条件后才发送信号}void *worker(void *arg){ printf("worker %s create!\n", (char *)arg); process_msg((char*)arg);}void *sender(void *arg){ printf("sender create!\n"); for (int i = 0; i < WORKS_NUMS; i++) { struct msg *m = (struct msg *)malloc(sizeof(struct msg)); sprintf(m->message, "msg-%d", i); enqueue_msg(m); sleep(1); // 发送后等待1s,让worker处于等待状态 } return 0;}int main(int argc, char const *argv[]){ pthread_t send; pthread_t works[WORKS_NUMS]; pthread_create(&send, NULL, sender, NULL); // sleep(1); for(int i= 0; i < WORKS_NUMS; i++) { char *name = (char*)malloc(10); sprintf(name, "%d", i); pthread_create(&works[i], NULL, worker, (void *)name); } pthread_join(send, NULL); for(int i= 0; i < WORKS_NUMS; i++) { pthread_join(works[i], NULL); } return 0;}复制代码

输出:

sender create!worker 0 create![worker-0]:msg-0worker 3 create!worker 2 create!worker 4 create!worker 1 create!worker 5 create!worker 6 create!worker 7 create!worker 8 create!worker 9 create![worker-3]:msg-1[worker-2]:msg-2[worker-4]:msg-3[worker-1]:msg-4[worker-5]:msg-5[worker-6]:msg-6[worker-7]:msg-7[worker-8]:msg-8[worker-9]:msg-9复制代码

因为pthread_cond_wait是在while循环中,如果不满足条件会继续进入循环。

自旋锁

转载地址:http://zxasa.baihongyu.com/

你可能感兴趣的文章
长按电源键弹出的菜单GlobalActionsDialog
查看>>
discuz日志错误
查看>>
springMVC统一异常json输出
查看>>
Flex错误:进程已终止,没有建立到调试器的连接。 initial content not ...
查看>>
mac中如何正确地安装搜狗输入法?
查看>>
perl引用和数组
查看>>
误删一个用户 引起数据不准确问题
查看>>
专题:优秀域名如何注册(1)
查看>>
Win10下开启Linux Bash
查看>>
RabbitMQ基础教程之基本使用篇
查看>>
Echart 地图 选择区域保持颜色不变
查看>>
FREEBASIC 编译可被python调用的dll函数示例-续(1)
查看>>
Delphi调用外部程序函数:WinExec() 和ShellExecute详解
查看>>
一场失败的拔河比赛
查看>>
IOS开发工程师欢迎你加入宏略信息
查看>>
php-fpm命令的参数
查看>>
删除SVN记住的密码
查看>>
bash: ./configure: /bin/sh^M: 坏的解释器: 没有那个文件或目录
查看>>
usdt的坑
查看>>
郑雨林-产业互联网
查看>>