1.老生常谈线程基础的几个问题
2.线程池中空闲的线程处于什么状态?
3.Lockçawait/singal å Objectçwait/notify çåºå«
4.LockSupportçparkçå¾
çåºå±å®ç°
5.从HotSpot源码,深度解读 park 和 unpark
老生常谈线程基础的几个问题
实现线程只有一种方式
我们知道启动线程至少可以通过以下四种方式:
实现Runnable接口
继承Thread类
线程池创建线程
带返回值的Callable创建线程
但是看它们的底层就一种方式,就是通过newThread()实现,其他的只不过在它的上面做了层封装。
实现Runnable接口要比继承Thread类的更好:
结构上分工更明确,线程本身属性和任务逻辑解耦。jsp添加商品源码
某些情况下性能更好,直接把任务交给线程池执行,无需再次newThread()。
可拓展性更好:实现接口可以多个,而继承只能单继承。
有的时候可能会问到启动线程为什么是start()方法,而不是run()方法,这个问题很简单,执行run()方法其实就是在执行一个类的普通方法,并没有启动一个线程,而start()方法点进去看是一个native方法。
当我们在执行java中的start()方法的时候,它的底层会调JVM由c++编写的代码Thread::start,然后c++代码再调操作系统的create_thread创建线程,创建完线程以后并不会马上运行,要等待CPU的调度。CPU的调度算法有很多,比如先来先服务调度算法(FIFO),最短优先(就是对短作业的优先调度)、时间片轮转调度等。如下图所示:
线程的状态在Java中线程的生命周期中一共有6种状态。
NEW:初始状态,线程被构建,但是libevent目录源码还没有调用start方法
RUNNABLE:运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为运行中
BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了CPU使用权
WAITING:等待状态
TIMED_WAITING:超时等待状态,超时以后自动返回
TERMINATED:终止状态,表示当前线程执行完毕
当然这也不是我说的,源码中就是这么定义的:
publicenumState{ /***Threadstateforathreadwhichhasnotyetstarted.*/NEW,/***Threadstateforarunnablethread.Athreadintherunnable*stateisexecutingintheJavavirtualmachinebutitmay*bewaitingforotherresourcesfromtheoperatingsystem*suchasprocessor.*/RUNNABLE,/***Threadstateforathreadblockedwaitingforamonitorlock.*Athreadintheblockedstateiswaitingforamonitorlock*toenterasynchronizedblock/methodor*reenterasynchronizedblock/methodaftercalling*{ @linkObject#wait()Object.wait}.*/BLOCKED,/***Threadstateforawaitingthread.*Athreadisinthewaitingstateduetocallingoneofthe*followingmethods:*<ul>*<li>{ @linkObject#wait()Object.wait}withnotimeout</li>*<li>{ @link#join()Thread.join}withnotimeout</li>*<li>{ @linkLockSupport#park()LockSupport.park}</li>*</ul>**<p>Athreadinthewaitingstateiswaitingforanotherthreadto*performaparticularaction.**Forexample,athreadthathascalled<tt>Object.wait()</tt>*onanobjectiswaitingforanotherthreadtocall*<tt>Object.notify()</tt>or<tt>Object.notifyAll()</tt>on*thatobject.Athreadthathascalled<tt>Thread.join()</tt>*iswaitingforaspecifiedthreadtoterminate.*/WAITING,/***Threadstateforawaitingthreadwithaspecifiedwaitingtime.*Athreadisinthetimedwaitingstateduetocallingoneof*thefollowingmethodswithaspecifiedpositivewaitingtime:*<ul>*<li>{ @link#sleepThread.sleep}</li>*<li>{ @linkObject#wait(long)Object.wait}withtimeout</li>*<li>{ @link#join(long)Thread.join}withtimeout</li>*<li>{ @linkLockSupport#parkNanosLockSupport.parkNanos}</li>*<li>{ @linkLockSupport#parkUntilLockSupport.parkUntil}</li>*</ul>*/TIMED_WAITING,/***Threadstateforaterminatedthread.*Thethreadhascompletedexecution.*/TERMINATED;}下面是这六种状态的转换:
New新创建New表示线程被创建但尚未启动的状态:当我们用newThread()新建一个线程时,如果线程没有开始调用start()方法,那么此时它的状态就是New。而一旦线程调用了start(),它的状态就会从New变成Runnable。
Runnable运行状态Java中的Runable状态对应操作系统线程状态中的两种状态,分别是Running和Ready,也就是说,Java中处于Runnable状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配CPU资源。
如果一个正在运行的线程是Runnable状态,当它运行到任务的一半时,执行该线程的CPU被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是Runnable,因为它有可能随时被调度回来继续执行任务。
在Java中Blocked、Waiting、TimedWaiting,这三种状态统称为阻塞状态,下面分别来看下。
Blocked从上图可以看出,从Runnable状态进入Blocked状态只有一种可能,ash算法源码就是进入synchronized保护的代码时没有抢到monitor锁,jvm会把当前的线程放入到锁池中。当处于Blocked的线程抢到monitor锁,就会从Blocked状态回到Runnable状态。
Waiting状态我们看上图,线程进入Waiting状态有三种可能。
没有设置Timeout参数的Object.wait()方法,jvm会把当前线程放入到等待队列。
没有设置Timeout参数的Thread.join()方法。
LockSupport.park()方法。
Blocked与Waiting的区别是Blocked在等待其他线程释放monitor锁,而Waiting则是在等待某个条件,比如join的线程执行完毕,或者是notify()/notifyAll()。
当执行了LockSupport.unpark(),或者join的线程运行结束,或者被中断时可以进入Runnable状态。当调用notify()或notifyAll()来唤醒它,它会直接进入Blocked状态,因为唤醒Waiting状态的线程能够调用notify()或notifyAll(),肯定是已经持有了monitor锁,这时候处于Waiting状态的线程没有拿到monitor锁,就会进入Blocked状态,直到执行了notify()/notifyAll()唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked状态回到Runnable状态。
TimedWaiting状态在Waiting上面是TimedWaiting状态,这两个状态是365模板源码非常相似的,区别仅在于有没有时间限制,TimedWaiting会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。
以下情况会让线程进入TimedWaiting状态。
设置了时间参数的Thread.sleep(longmillis)方法。
设置了时间参数的Object.wait(longtimeout)方法。
设置了时间参数的Thread.join(longmillis)方法。
设置了时间参数的LockSupport.parkNanos(longnanos)。
LockSupport.parkUntil(longdeadline)方法。
在TimedWaiting中执行notify()和notifyAll()也是一样的道理,它们会先进入Blocked状态,然后抢夺锁成功后,再回到Runnable状态。当然,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态,而无需经历Blocked状态。
Terminated终止Terminated终止状态,要想进入这个状态有两种可能。
run()方法执行完毕,线程正常退出。
出现一个没有捕获的异常,终止了run()方法,最终导致意外终止。
线程的停止interrupt我们知道Thread提供了线程的一些操作方法,比如stop(),suspend()和resume(),这些方法已经被Java直接标记为@Deprecated,rawsocket编程源码这就说明这些方法是不建议大家使用的。
因为stop()会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。这种行为类似于在linux系统中执行kill-9类似,它是一种不安全的操作。
而对于suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。
interrupt最正确的停止线程的方式是使用interrupt,但interrupt仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。
下面我们来看下例子:
publicclassInterruptExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ System.out.println(Thread.currentThread().getName()+"--");}}}执行一下,运行了一会就停止了
主线程在调用t1的interrupt()之后,这个线程的中断标记位就会被设置成true。每个线程都有这样的标记位,当线程执行时,会定期检查这个标记位,如果标记位被设置成true,就说明有程序想终止该线程。在while循环体判断语句中,通过Thread.currentThread().isInterrupt()判断线程是否被中断,如果被置为true了,则跳出循环,线程就结束了,这个就是interrupt的简单用法。
阻塞状态下的线程中断下面来看第二个例子,在循环中加了Thread.sleep秒。
publicclassInterruptSleepExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptSleepExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();}System.out.println(Thread.currentThread().getName()+"--");}}}再来看下运行结果,卡主了,并没有停止。这是因为main线程调用了t1.interrupt(),此时t1正在sleep中,这时候是接收不到中断信号的,要sleep结束以后才能收到。这样的中断太不及时了,我让你中断了,你缺还在傻傻的sleep中。
Java开发的设计者已经考虑到了这一点,sleep、wait等方法可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个InterruptedException异常,同时清除中断信号,将中断标记位设置成false。
这时候有几种做法:
直接捕获异常,不做处理,e.printStackTrace();打印下信息
将异常往外抛出,即在方法上throwsInterruptedException
再次中断,代码如下,加上Thread.currentThread().interrupt();
@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();//把中断标记修改为trueThread.currentThread().interrupt();}System.out.println(Thread.currentThread().getName()+"--");}}这时候线程感受到了,我们人为的再把中断标记修改为true,线程就能停止了。一般情况下我们操作线程很少会用到interrupt,因为大多数情况下我们用的是线程池,线程池已经帮我封装好了,但是这方面的知识还是需要掌握的。感谢收看,多多点赞~
作者:小杰博士
线程池中空闲的线程处于什么状态?
一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;
源码追踪:
for (;;) {
...
workQueue.take();
...
}
public E take()...{
...
while (count.get() == 0) { / /这里就是任务队列中的消息数量
notEmpty.await();
}
...
}
public final void await()...{
...
LockSupport.park(this);
...
}
继续往下:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。
public native void park(boolean isAbsolute, long time);
上面就是java线程池中阻塞的源码追踪;
二.对比object的wait()方法:
@FastNative
public final native void wait(long timeout, int nanos) throws InterruptedException;
还有Thread的sleep() 方法:
@FastNative
private static native void sleep(Object lock, long millis, int nanos)throws...;
可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;
这3个方法最终实现都是通过c&c++实现的native方法.
三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:
.4.3 状态转换
Java语言定义了5种线程状态,在任意一个时间点,一个线程只能有且只有其中的一种
状态,这5种状态分别如下。
1)新建(New):创建后尚未启动的线程处于这种状态。
2)运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此
状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。
3)无限期等待(Waiting):处于这种状态的线程不会被分配CPU执行时间,它们要等待被
其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:
●没有设置Timeout参数的Object.wait()方法。
●没有设置Timeout参数的Thread.join()方法。
●LockSupport.park()方法。
4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无
须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程
进入限期等待状态:
●Thread.sleep()方法。
●设置了Timeout参数的Object.wait()方法。
●设置了Timeout参数的Thread.join()方法。
●LockSupport.parkNanos()方法。
●LockSupport.parkUntil()方法。
5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等
待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状
态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将
进入这种状态。
结束(Terminated):已终止线程的线程状态,线程已经结束执行。
Lockçawait/singal å Objectçwait/notify çåºå«
Lockçawait/singal å Objectçwait/notify çåºå«
å¨ä½¿ç¨Lockä¹åï¼æ们é½ä½¿ç¨Object çwaitånotifyå®ç°åæ¥çã举ä¾æ¥è¯´ï¼ä¸ä¸ªproduceråconsumerï¼consumeråç°æ²¡æä¸è¥¿äºï¼çå¾ ï¼produerçæä¸è¥¿äºï¼å¤éã
线ç¨consumer 线ç¨producer
synchronize(obj){
obj.wait();//没ä¸è¥¿äºï¼çå¾
} synchronize(obj){
obj.notify();//æä¸è¥¿äºï¼å¤é
}
æäºlockåï¼ä¸éåäºï¼ç°å¨æ¯ï¼
lock.lock();
condition.await();
lock.unlock(); lock.lock();
condition.signal();
lock.unlock();
为äºçªåºåºå«ï¼çç¥äºè¥å¹²ç»èãåºå«æä¸ç¹ï¼
1. lockä¸åç¨synchronizeæåæ¥ä»£ç å è£ èµ·æ¥ï¼
2. é»å¡éè¦å¦å¤ä¸ä¸ªå¯¹è±¡conditionï¼
3. åæ¥åå¤éç对象æ¯conditionèä¸æ¯lockï¼å¯¹åºçæ¹æ³æ¯awaitåsignalï¼èä¸æ¯waitånotifyã
为
ä»ä¹éè¦ä½¿ç¨conditionå¢ï¼ç®åä¸å¥è¯ï¼lockæ´çµæ´»ã以åçæ¹å¼åªè½æä¸ä¸ªçå¾ éåï¼å¨å®é åºç¨æ¶å¯è½éè¦å¤ä¸ªï¼æ¯å¦è¯»ååã为äºè¿ä¸ªçµæ´»
æ§ï¼lockå°åæ¥äºæ¥æ§å¶åçå¾ éåå离å¼æ¥ï¼äºæ¥ä¿è¯å¨æ个æ¶å»åªæä¸ä¸ªçº¿ç¨è®¿é®ä¸´çåºï¼lockèªå·±å®æï¼ï¼çå¾ éåè´è´£ä¿å被é»å¡ç线ç¨
ï¼conditionå®æï¼ã
éè¿æ¥çReentrantLockçæºä»£ç åç°ï¼conditionå ¶å®æ¯çå¾ éåçä¸ä¸ªç®¡çè ï¼conditionç¡®ä¿é»å¡ç对象æ顺åºè¢«å¤éã
å¨Lockçå®ç°ä¸ï¼LockSupport被ç¨æ¥å®ç°çº¿ç¨ç¶æçæ¹åï¼åç»å°æ´è¿ä¸æ¥ç 究LockSupportçå®ç°æºå¶ã
LockSupportçparkçå¾ çåºå±å®ç°
ä»ä¸ä¸ç¯æç« ä¸çJDKç延è¿éåä¸,æç»æ¯éè¿LockSupport.parkå®ç°çº¿ç¨ççå¾ ï¼é£ä¹åºå±æ¯å¦ä½å®ç°çå¾ åè¶ æ¶çå¾ çï¼æ¬ææ们æ¥æ¢è®¨ä¸ä¸ãLockSupportçparkåunparkçæ¹æ³publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}ä»ä¸é¢å¯ä»¥çå°å®é LockSupport.parkæ¯éè¿Unsafeççparkæ¹æ³å®ç°ï¼ä»ä¸é¢çæ¹æ³å¯ä»¥çåºè¿ä¸ªæ¯ä¸ä¸ªnativeæ¹æ³.
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVMçUnsafeçparkæ¹æ³ä»ä¸é¢JDKä¸ä»£ç ä¸å¯ä»¥threadçParkerç对象çparkæ¹æ³è¿è¡ä¸æ®µæ¶é´ççå¾ ã
UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_ENDThread.hppçæ件ä¸å é¨å®ä¹çPark对象
private:Parker_parker;public:Parker*parker(){ return&_parker;}ä¸é¢æ¯Os_posix.cppä¸æ¯Linuxä¸å®ç°çParkçparkçå®ç°æ¹å¼
é¦å å°_counterçåééè¿CAS设置为0ï¼è¿åå°±æ§çå¼ï¼å¦æä¹åæ¯å¤§äº0ï¼å说ææ¯å 许访é®ï¼ä¸ç¨é»å¡ï¼ç´æ¥è¿åã
è·åå½å线ç¨ã
å¤æ线ç¨æ¯å¦æ¯ä¸æä¸ï¼å¦ææ¯ï¼åç´æ¥è¿åï¼(ä¹å°±æ¯è¯´çº¿ç¨å¤äºä¸æç¶æä¸ä¼å¿½ç¥parkï¼ä¸ä¼é»å¡çå¾ )
å¤æå¦æä¼ å ¥çtimeåæ°å°äº0 æè æ¯ç»å¯¹æ¶é´å¹¶ä¸timeæ¯0,åç´æ¥è¿å,(ä¸é¢çUnsafeè°ç¨parkä¼ å ¥çåæ°æ¯ falseã0ï¼æ以ä¸æ»¡è¶³è¿ç§æ åµ)
å¦ætime大äº0ï¼å转æ¢æç»å¯¹æ¶é´ã
å建ThreadBlockInVM对象ï¼å¹¶ä¸è°ç¨pthread_mutex_trylockè·å线ç¨äºæ¥éï¼å¦æ没æè·åå°éï¼åç´æ¥è¿åï¼
å¤æ_counteråéæ¯å¦å¤§äº0ï¼å¦ææ¯ï¼åéç½®_counter为0ï¼éæ¾çº¿ç¨éï¼ç´æ¥è¿åã
è°ç¨ OrderAccess::fence(); å å ¥å åå±éï¼ç¦æ¢æ令éæåºï¼ç¡®ä¿å éåéæ¾éçæ令ç顺åºã
å建OSThreadWaitState对象ï¼
å¤ætimeæ¯å¦å¤§äº0ï¼å¦ææ¯0ï¼åè°ç¨pthread_cond_waitè¿è¡çå¾ ï¼å¦æä¸æ¯0ï¼ç¶åè°ç¨pthread_cond_timedwaitè¿è¡æ¶é´åæ°ä¸ºabsTimeççå¾ ï¼
è°ç¨pthread_mutex_unlockè¿è¡éæ¾_mutexéï¼
å次è°ç¨OrderAccess::fence()ç¦æ¢æ令éæåºã
//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linuxæä½ç³»ç»æ¯å¦ä½å®ç°pthread_cond_timedwaitè¿è¡æ¶é´çå¾ çpthread_cond_timedwaitå½æ°ä½äºglibcä¸pthread_cond_wait.c, å¯ä»¥çå°æ¯è°ç¨__pthread_cond_wait_commonå®ç°
/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}ä¸é¢__pthread_cond_wait_commonæ¯å®ç°éè¿__futex_abstimed_wait_cancelableå®ç°æ¶é´çå¾
static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''çç¥''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''çç¥''`}__futex_abstimed_wait_cancelableæ¯è°ç¨__futex_abstimed_wait_common
int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}__futex_abstimed_wait_commonä¸é¢åæ¯éè¿å¤æå¹³å°æ¯ä½æè ä½,è°ç¨__futex_abstimed_wait_commonæè __futex_abstimed_wait_common
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}__futex_abstimed_wait_commonæ¯è°ç¨INTERNAL_SYSCALL_CANCELå®å®ä¹å®ç°
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}ç³»ç»è°ç¨ççå®å®ä¹
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0æ»ç»ä¸»è¦å¯¹LockSupportçparkçå¾ å®ç°çåºå±å®ç°çæµ æï¼é对äºLinuxçç³»ç»è°ç¨è¿æ²¡ææ¾å°æºç ï¼åç»ä¼ç»§ç»è·è¸ªï¼å¸ææ读è ç¥éç满å¸å¯ä»¥åç¥ä¸ï¼è°¢è°¢ã
é¾æ¥ï¼/post/从HotSpot源码,深度解读 park 和 unpark
我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的小伙伴可以加入。自习室加入码:D5A7A
Java并发包下的类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的实现依赖于两个关键类:Unsafe和LockSupport。
其中,Unsafe主要提供CAS操作(关于CAS,在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),LockSupport主要提供park/unpark操作。实际上,park/unpark操作的最终调用还是基于Unsafe类,因此Unsafe类才是核心。
Unsafe类的实现是由native关键字说明的,这意味着这个方法是原生函数,是用C/C++语言实现的,并被编译成了DLL,由Java去调用。
park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。
park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。
并且,unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
在Linux系统下,park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。
简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。
每个Java线程都有一个Parker实例,Parker类的部分源码如下:
由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。
具体的执行逻辑已经用注释标记在代码中,简要来说,就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。
unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:
(如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)