添加URL
相关文章推荐
孤独求败  ·  Logback ...·  2 年前    · 
crhgxdqw  ·  3.[Python]多线程程序Ctrl+C的 ...·  1 年前    · 

转载出处: http://www.dbafree.net/?p=1128

N个线程,做同样的事,跑的一直好好的,突然某个线程就挂住了。于是使用 ps -eLf|grep name查看了线程相关的PID,并对其进行了strace.如下:

 $ strace -p 13251
 Process 13251 attached - interrupt to quit
 futex(0x1fcc500, FUTEX_WAIT_PRIVATE, 0, NULL

发现其一直hang在futex-FUTEX_WAIT_PRIVATE这里,可以看到futex一直在wait状态,长时间被挂起,就好象睡觉睡着了,没有人叫他起床。

我们的程序代码如下:

while 1:
     #操作线程共享变量:
     #1.操作一个线程安全的队列
     if(queue.size>0):
         queue.get()
     #2.操作线程不安全的hash结构,不过N个线程操作的key都是各不同的
     #操作方式为hash{线程名}=xxx)。
         hash{thread_name}=xxx

开始有点怀疑这个hash字典,因为非线程安全的,不过strace看到的是一个futex,那么应该和它关系不大,而跟线程安全的queue有更大的关系。因为futex可以对它进行保护。

于是先了解futex相关的情况,下面这段解释很清楚:
Futex是一种用户态和内核态混合的同步机制。首先,同步的进程间通过mmap共享一段内存,futex变量就位于这段共享 的内存中且操作是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,如果没有竞争发生,则只修改futex,而不 用再执行系统调用了。当通过访问futex变量告诉进程有竞争发生,则还是得执行系统调用去完成相应的处理(wait 或者 wake up)。简单的说,futex就是通过在用户态的检查,(motivation)如果了解到没有竞争就不用陷入内核了,大大提高了low-contention时候的效率。

FUTEX_WAIT: 原子性的检查uaddr中计数器的值是否为val,如果是则让进程休眠,直到FUTEX_WAKE或者超时(time-out)。也就是把进程挂到uaddr相对应的等待队列上去。
futex这个方法,有timeout等参数,如果不加timeout参数,它会一直被阻塞,直到FUTEX_WAKE
int futex (int *uaddr, int op, int val, const struct timespec *timeout,int *uaddr2, int val3);

所以基本确认,挂起是由于futex没有被唤醒的原因导致的。

if(queue.size()>0
    #在多线程并发情况,执行下面这步get操作的时候,可能队列是空的,此时get()方法会阻塞
    queue.get()

而我在另一个线程里面,如果执行了queue.put(),那么应该是可以唤醒get()的,难道程序会这么笨?
于是检查了下我的put方法,发现我其实不是一个真正的put.代码如下:

for 1..1000
    tmpqueue.put(xx)
    #因为queue并没有真正的put,而是直接指向了一个新对象,所以不能唤醒get.
    queue=tmpqueue

找到了原因,问题就好解决了。当然因为程序设计上面的问题,我不能修改put的情况,于是,我修改get()方法,增加timeout参数。问题解决。

if(queue.size()):
    queue.get(timeout=0.1)

类似这种多线程的问题排查很困难,麻烦。不过碰到好几次,在同事们的帮助下,都比较幸运的很快就找到原因了。

这次也学到了不少。这篇文章挺不错的http://www.cnblogs.com/yysblog/archive/2012/11/03/2752728.html,转一下:

Linux中的线程同步机制(一) — Futex

引子
在编译2.6内核的时候,你会在编译选项中看到[*] Enable futex support这一项,上网查,有的资料会告诉你”不选这个内核不一定能正确的运行使用glibc的程序”,那futex是什么?和glibc又有什么关系呢?

1. 什么是Futex
Futex 是Fast Userspace muTexes的缩写,由Hubertus Franke, Matthew Kirkwood, Ingo Molnar and Rusty Russell共同设计完成。几位都是linux领域的专家,其中可能Ingo Molnar大家更熟悉一些,毕竟是O(1)调度器和CFS的实现者。

Futex按英文翻译过来就是快速用户空间互斥体。其设计思想其实 不难理解,在传统的Unix系统中,System V IPC(inter process communication),如 semaphores, msgqueues, sockets还有文件锁机制(flock())等进程间同步机制都是对一个内核对象操作来完成的,这个内核对象对要同步的进程都是可见的,其提供了共享 的状态信息和原子操作。当进程间要同步的时候必须要通过系统调用(如semop())在内核中完成。可是经研究发现,很多同步是无竞争的,即某个进程进入 互斥区,到再从某个互斥区出来这段时间,常常是没有进程也要进这个互斥区或者请求同一同步变量的。但是在这种情况下,这个进程也要陷入内核去看看有没有人 和它竞争,退出的时侯还要陷入内核去看看有没有进程等待在同一同步变量上。这些不必要的系统调用(或者说内核陷入)造成了大量的性能开销。为了解决这个问 题,Futex就应运而生,Futex是一种用户态和内核态混合的同步机制。首先,同步的进程间通过mmap共享一段内存,futex变量就位于这段共享 的内存中且操作是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,如果没有竞争发生,则只修改futex,而不 用再执行系统调用了。当通过访问futex变量告诉进程有竞争发生,则还是得执行系统调用去完成相应的处理(wait 或者 wake up)。简单的说,futex就是通过在用户态的检查,(motivation)如果了解到没有竞争就不用陷入内核了,大大提高了low-contention时候的效率。 Linux从2.5.7开始支持Futex。

2. Futex系统调用
Futex是一种用户态和内核态混合机制,所以需要两个部分合作完成,linux上提供了sys_futex系统调用,对进程竞争情况下的同步处理提供支持。
其原型和系统调用号为
#include <linux/futex.h>
#include <sys/time.h>
int futex (int *uaddr, int op, int val, const struct timespec *timeout,int *uaddr2, int val3);
#define __NR_futex              240

虽然参数有点长,其实常用的就是前面三个,后面的timeout大家都能理解,其他的也常被ignore。
uaddr就是用户态下共享内存的地址,里面存放的是一个对齐的整型计数器。
op存放着操作类型。定义的有5中,这里我简单的介绍一下两种,剩下的感兴趣的自己去man futex
FUTEX_WAIT: 原子性的检查uaddr中计数器的值是否为val,如果是则让进程休眠,直到FUTEX_WAKE或者超时(time-out)。也就是把进程挂到uaddr相对应的等待队列上去。
FUTEX_WAKE: 最多唤醒val个等待在uaddr上进程。

可见FUTEX_WAIT和FUTEX_WAKE只是用来挂起或者唤醒进程,当然这部分工作也只能在内核态下完成。有些人尝试着直接使用futex系统调 用来实现进程同步,并寄希望获得futex的性能优势,这是有问题的。应该区分futex同步机制和futex系统调用。futex同步机制还包括用户态 下的操作,我们将在下节提到。

3. Futex同步机制
所有的futex同步操作都应该从用户空间开始,首先创建一个futex同步变量,也就是位于共享内存的一个整型计数器。
当 进程尝试持有锁或者要进入互斥区的时候,对futex执行”down”操作,即原子性的给futex同步变量减1。如果同步变量变为0,则没有竞争发生, 进程照常执行。如果同步变量是个负数,则意味着有竞争发生,需要调用futex系统调用的futex_wait操作休眠当前进程。
当进程释放锁或 者要离开互斥区的时候,对futex进行”up”操作,即原子性的给futex同步变量加1。如果同步变量由0变成1,则没有竞争发生,进程照常执行。如 果加之前同步变量是负数,则意味着有竞争发生,需要调用futex系统调用的futex_wake操作唤醒一个或者多个等待进程。

这里的原子性加减通常是用CAS(Compare and Swap)完成的,与平台相关。CAS的基本形式是:CAS(addr,old,new),当addr中存放的值等于old时,用new对其替换。在x86平台上有专门的一条指令来完成它: cmpxchg。

可见: futex是从用户态开始,由用户态和核心态协调完成的。

4. 进/线程利用futex同步
进程或者线程都可以利用futex来进行同步。
对于线程,情况比较简单,因为线程共享虚拟内存空间,虚拟地址就可以唯一的标识出futex变量,即线程用同样的虚拟地址来访问futex变量。
对 于进程,情况相对复杂,因为进程有独立的虚拟内存空间,只有通过mmap()让它们共享一段地址空间来使用futex变量。每个进程用来访问futex的 虚拟地址可以是不一样的,只要系统知道所有的这些虚拟地址都映射到同一个物理内存地址,并用物理内存地址来唯一标识futex变量。

小结:
1. Futex变量的特征:1)位于共享的用户空间中 2)是一个32位的整型 3)对它的操作是原子的
2. Futex在程序low-contention的时候能获得比传统同步机制更好的性能。
3. 不要直接使用Futex系统调用。
4. Futex同步机制可以用于进程间同步,也可以用于线程间同步。

Linux中的线程同步机制(二)–In Glibc

在linux中进行多线程开发,同步是不可回避的一个问题。在POSIX标准中定义了三种线程同步机制: Mutexes(互斥量), Condition Variables(条件变量)和POSIX Semaphores(信号量)。NPTL基本上实现了POSIX,而glibc又使用NPTL作为自己的线程库。因此glibc中包含了这三种同步机制 的实现(当然还包括其他的同步机制,如APUE里提到的读写锁)。

Glibc中常用的线程同步方式举例:

Semaphore
变量定义:    sem_t sem;
初始化:      sem_init(&sem,0,1);
进入加锁:     sem_wait(&sem);
退出解锁:     sem_post(&sem);

Mutex
变量定义:    pthread_mutex_t mut;
初始化:      pthread_mutex_init(&mut,NULL);
进入加锁:     pthread_mutex_lock(&mut);
退出解锁:     pthread_mutex_unlock(&mut);

这些用于同步的函数和futex有什么关系?下面让我们来看一看:
以Semaphores为例,
进入互斥区的时候,会执行sem_wait(sem_t *sem),sem_wait的实现如下:
int sem_wait (sem_t *sem)
{
int *futex = (int *) sem;
if (atomic_decrement_if_positive (futex) > 0)
return 0;
int   err = lll_futex_wait (futex, 0);
return -1;
)
atomic_decrement_if_positive()的语义就是如果传入参数是正数就将其原子性的减一并立即返回。如果信号量为正,在Semaphores的语义中意味着没有竞争发生,如果没有竞争,就给信号量减一后直接返回了。

如果传入参数不是正数,即意味着有竞争,调用lll_futex_wait(futex,0),lll_futex_wait是个宏,展开后为:
#define lll_futex_wait(futex, val) \
({                                          \

__asm __volatile (LLL_EBX_LOAD                          \
LLL_ENTER_KERNEL                          \
LLL_EBX_LOAD                          \
: “=a” (__status)                          \
: “0″ (SYS_futex), LLL_EBX_REG (futex), “S” (0),          \
“c” (FUTEX_WAIT), “d” (_val),                  \
“i” (offsetof (tcbhead_t, sysinfo))              \
: “memory”);                          \
…                                      \
})
可以看到当发生竞争的时候,sem_wait会调用SYS_futex系统调用,并在val=0的时候执行FUTEX_WAIT,让当前线程休眠。

从 这个例子我们可以看出,在Semaphores的实现过程中使用了futex,不仅仅是说其使用了futex系统调用(再重申一遍只使用futex系统调 用是不够的),而是整个建立在futex机制上,包括用户态下的操作和核心态下的操作。其实对于其他glibc的同步机制来说也是一样,都采纳了 futex作为其基础。所以才会在futex的manual中说:对于大多数程序员不需要直接使用futexes,取而代之的是依靠建立在futex之上 的系统库,如NPTL线程库(most programmers will in fact not be using futexes directly but instead rely on system libraries built on them, such as the NPTL pthreads implementation)。所以才会有如果在编译内核的时候不 Enable futex support,就”不一定能正确的运行使用Glibc的程序”。

小结:
1. Glibc中的所提供的线程同步方式,如大家所熟知的Mutex,Semaphore等,大多都构造于futex之上了,除了特殊情况,大家没必要再去实现自己的futex同步原语。
2. 大家要做的事情,似乎就是按futex的manual中所说得那样: 正确的使用Glibc所提供的同步方式,并在使用它们的过程中,意识到它们是利用futex机制和linux配合完成同步操作就可以了。

Linux中的线程同步机制(三)–Practice

上回说到Glibc中(NPTL)的线程同步方式如Mutex,Semaphore等都使用了futex作为其基础。那么实际使用是什么样子,又会碰到什么问题呢?
先来看一个使用semaphore同步的例子。

sem_t sem_a;
void *task1();

int main(void){
int ret=0;
pthread_t thrd1;
sem_init(&sem_a,0,1);
ret=pthread_create(&thrd1,NULL,task1,NULL); //创建子线程
pthread_join(thrd1,NULL); //等待子线程结束
}

void *task1()
{
int sval = 0;
sem_wait(&sem_a); //持有信号量
sleep(5); //do_nothing
sem_getvalue(&sem_a,&sval);
printf(“sem value = %d\n”,sval);
sem_post(&sem_a); //释放信号量
}

程序很简单,我们在主线程(执行main的线程)中创建了一个线程,并用join等待其结束。在子线程中,先持有信号量,然后休息一会儿,再释放信号量,结束。
因为这段代码中只有一个线程使用信号量,也就是没有线程间竞争发生,按照futex的理论,因为没有竞争,所以所有的锁操作都将在用户态中完成,而不会执行系统调用而陷入内核。我们用strace来跟踪一下这段程序的执行过程中所发生的系统调用:

20533 futex(0xb7db1be8, FUTEX_WAIT, 20534, NULL <unfinished …>
20534 futex(0×8049870, FUTEX_WAKE, 1)   = 0
20533 <… futex resumed> )             = 0

20533是main线程的id,20534是其子线程的id。出乎我们意料之外的是这段程序还是发生了两次futex系统调用,我们来分析一下这分别是什么原因造成的。

1. 出人意料的”sem_post()”
20534 futex(0×8049870, FUTEX_WAKE, 1)   = 0
子 线程还是执行了FUTEX_WAKE的系统调用,就是在sem_post(&sem_a);的时候,请求内核唤醒一个等待在sem_a上的线程, 其返回值是0,表示现在并没有线程等待在sem_a(这是当然的,因为就这么一个线程在使用sem_a),这次futex系统调用白做了。这似乎和 futex的理论有些出入,我们再来看一下sem_post的实现。
int sem_post (sem_t *sem)
{
int *futex = (int *) sem;
int nr = atomic_increment_val (futex);
int err = lll_futex_wake (futex, nr);
return 0;
}
我们看到,Glibc在实现sem_post的时候给futex原子性的加上1后,不管futex的值是什么,都执行了lll_futex_wake(),即futex(FUTEX_WAKE)系统调用。
在 第二部分中(见前文),我们分析了sem_wait的实现,当没有竞争的时候是不会有futex调用的,现在看来真的是这样,但是在sem_post的时 候,无论有无竞争,都会调用sys_futex(),为什么会这样呢?我觉得应该结合semaphore的语义来理解。在semaphore的语义 中,sem_wait()的意思是:”挂起当前进程,直到semaphore的值为非0,它会原子性的减少semaphore计数值。” 我们可以看到,semaphore中是通过0或者非0来判断阻塞或者非阻塞线程。即无论有多少线程在竞争这把锁,只要使用了 semaphore,semaphore的值都会是0。这样,当线程推出互斥区,执行sem_post(),释放semaphore的时候,将其值由0改 1,并不知道是否有线程阻塞在这个semaphore上,所以只好不管怎么样都执行futex(uaddr, FUTEX_WAKE, 1)尝试着唤醒一个进程。而相反的,当sem_wait(),如果semaphore由1变0,则意味着没有竞争发生,所以不必去执行futex系统调 用。我们假设一下,如果抛开这个语义,如果允许semaphore值为负,则也可以在sem_post()的时候,实现futex机制。

2. 半路杀出的”pthread_join()”
那另一个futex系统调用是怎么造成的呢? 是因为pthread_join();在Glibc中,pthread_join也是用futex系统调用实现的。程序中的pthread_join(thrd1,NULL); 就对应着20533 futex(0xb7db1be8, FUTEX_WAIT, 20534, NULL <unfinished …>很 好解释,主线程要等待子线程(id号20534上)结束的时候,调用futex(FUTEX_WAIT),并把var参数设置为要等待的子线程号 (20534),然后等待在一个地址为0xb7db1be8的futex变量上。当子线程结束后,系统会负责把主线程唤醒。于是主线程就20533 <… futex resumed> ) = 0恢复运行了。要注意的是,如果在执行pthread_join()的时候,要join的线程已经结束了,就不会再调用futex()阻塞当前进程了。

3. 更多的竞争。
我们把上面的程序稍微改改:
在main函数中:
int main(void){

sem_init(&sem_a,0,1);
ret=pthread_create(&thrd1,NULL,task1,NULL);
ret=pthread_create(&thrd2,NULL,task1,NULL);
ret=pthread_create(&thrd3,NULL,task1,NULL);
ret=pthread_create(&thrd4,NULL,task1,NULL);
pthread_join(thrd1,NULL);
pthread_join(thrd2,NULL);
pthread_join(thrd3,NULL);
pthread_join(thrd4,NULL);

}

这样就有更的线程参与sem_a的争夺了。我们来分析一下,这样的程序会发生多少次futex系统调用。
1) sem_wait()
第一个进入的线程不会调用futex,而其他的线程因为要阻塞而调用,因此sem_wait会造成3次futex(FUTEX_WAIT)调用。
2) sem_post()
所有线程都会在sem_post的时候调用futex, 因此会造成4次futex(FUTEX_WAKE)调用。
3) pthread_join()
别忘了还有pthread_join(),我们是按thread1, thread2, thread3, thread4这样来join的,但是线程的调度存在着随机性。如果thread1最后被调度,则只有thread1这一次futex调用,所以 pthread_join()造成的futex调用在1-4次之间。(虽然不是必然的,但是4次更常见一些)
所以这段程序至多会造成3+4+4=11次futex系统调用,用strace跟踪,验证了我们的想法。
19710 futex(0xb7df1be8, FUTEX_WAIT, 19711, NULL <unfinished …>
19712 futex(0×8049910, FUTEX_WAIT, 0, NULL <unfinished …>
19713 futex(0×8049910, FUTEX_WAIT, 0, NULL <unfinished …>
19714 futex(0×8049910, FUTEX_WAIT, 0, NULL <unfinished …>
19711 futex(0×8049910, FUTEX_WAKE, 1 <unfinished …>
19710 futex(0xb75f0be8, FUTEX_WAIT, 19712, NULL <unfinished …>
19712 futex(0×8049910, FUTEX_WAKE, 1 <unfinished …>
19710 futex(0xb6defbe8, FUTEX_WAIT, 19713, NULL <unfinished …>
19713 futex(0×8049910, FUTEX_WAKE, 1 <unfinished …>
19710 futex(0xb65eebe8, FUTEX_WAIT, 19714, NULL <unfinished …>
19714 futex(0×8049910, FUTEX_WAKE, 1)   = 0
(19710是主线程,19711,19712,19713,19714是4个子线程)

4. 更多的问题
事 情到这里就结束了吗? 如果我们把semaphore换成Mutex试试。你会发现当自始自终没有竞争的时候,mutex会完全符合futex机制,不管是lock还是 unlock都不会调用futex系统调用。有竞争的时候,第一次pthread_mutex_lock的时候不会调用futex调用,看起来还正常。但 是最后一次pthread_mutex_unlock的时候,虽然已经没有线程在等待mutex了,可还是会调用futex(FUTEX_WAKE)。原因是什么?欢迎讨论!!!

小结:
1. 虽然semaphore,mutex等同步方式构建在futex同步机制之上。然而受其语义等的限制,并没有完全按futex最初的设计实现。
2. pthread_join()等函数也是调用futex来实现的。
3. 不同的同步方式都有其不同的语义,不同的性能特征,适合于不同的场景。我们在使用过程中要知道他们的共性,也得了解它们之间的差异。这样才能更好的理解多线程场景,写出更高质量的多线程程序。

1.设置最大并发数量:通过设置maxConcurrentOperationCount属性来设置,默认值是-1,不限制最大并发数量. 2.队列的挂起:设置属性suspended为YES,即可将队列挂起,也可以称为暂停 1).当前任务处于执行状态,设置队列挂起不会影响其执行,受影响的是那些还没有执行的任务 2).当队列设置为挂起状态后,可以通过修改其状态再次恢复任务 3.队...
本文讨论JAVA多线程中,使用 thread.suspend()方法暂停线程,使用 thread.resume()恢复暂停的线程 的特点。 先介绍二个关于线程的基本知识: ①线程的执行体是run()方法里面的每一条语句,main线程执行的则是main()方法里面的语句。 ②Thread.sleep()方法 使当前正在执行的线程睡眠。 二,suspend()方法 ①当某个... N个线程,做同样的事,跑的一直好好的,突然某个线程就住了。于是使用 ps -eLf|grep name查看了线程相关的PID,并对其进行了strace.如下: $ strace -p 13251 Process 13251 attached - interrupt to quit futex(0x1fcc
国内几乎没啥说的,公司很多网站很多还不让上,用手机谷歌还可能会被领导批评,寻思回家查,还强制加班下班还很晚,这是又要马儿跑又要马儿不吃草啊。 源码中这块是这么写的 +#define FUTEX_PRIVATE_FLAG 128 +#define FUTEX_CMD_MASK ~FUTEX_PRIVATE_FLAG +#define FUTEX_WAIT_PRIVATE (FUTEX_WAIT...
1226 16:49:07.930478 futex(0x7a27d710, FUTEX_WAIT_PRIVATE, 2, NULL) = -1 EAGAIN (Resource temporarily unavailable) 1225 16:49:07.930878 futex(0x7a27d710, FUTEX_WAKE_PRIVATE, 1 <unfinished ...> 1226 16:49:07.931055 futex(0x7a27d710, F
这两天在研究多线程的使用方法,我们的需求是这样的:程序启动时,先让子线程处于运行状态,运行过程中会人为的更改该线程的运行状态为挂起,等待随机的时长之后,再把该线程重新唤醒,让其继续运行;人为挂起线程和恢复线程的次数也是随机的。经过不懈努力,最终找到了如下壹個实现了 Runnable 的线程类,完美解决问题,记录于此。首先是 MyThread 线程类的定义: public class MyThr...
nodejs 内存泄漏,cpu100%问题追踪一次线上事故新的改变功能快捷键合理的创建标题,有助于目录的生成如何改变文本的样式插入链接与图片如何插入一段漂亮的代码片生成一个适合你的列表创建一个表格设定内容居中、居左、居右SmartyPants创建一个自定义列表如何创建一个注脚注释也是必不可少的KaTeX数学公式新的甘特图功能,丰富你的文章UML 图表FLowchart流程图导出与导入导出导入 一次...
1)概述:线程的挂起操作实质上就是使线程进入“非可执行”状态下,在这个状态下CPU不会分给线程时间片,进入这个状态可以用来暂停一个线程的运行。在线程挂起后,可以通过重新唤醒线程来使之恢复运行。 run() 和start() 是大家都很熟悉的两个方法。把希望并行处理的代码都放在run() 中;stat() 用于自动调用run(),这是JAVA的内在机制规定的。当一个线程进入“非可...