Java自1995年面世以來得到了廣泛得一個運用,但是對多線程編程的支持Java很長時間一直停留在初級階段。在Java 5.0之前Java里的多線程編程主要是通過Thread類,Runnable接口,Object對象中的wait()、 notify()、 notifyAll()等方法和synchronized關(guān)鍵詞來實現(xiàn)的。這些工具雖然能在大多數(shù)情況下解決對共享資源的管理和線程間的調(diào)度,但存在以下幾個問題
1. 過于原始,拿來就能用的功能有限,即使是要實現(xiàn)簡單的多線程功能也需要編寫大量的代碼。這些工具就像匯編語言一樣難以學習和使用,比這更糟糕的是稍有不慎它們還可能被錯誤地使用,而且這樣的錯誤很難被發(fā)現(xiàn)。
2. 如果使用不當,會使程序的運行效率大大降低。
3. 為了提高開發(fā)效率,簡化編程,開發(fā)人員在做項目的時候往往需要寫一些共享的工具來實現(xiàn)一些普遍適用的功能。但因為沒有規(guī)范,相同的工具會被重復地開發(fā),造成資源浪費。
4. 因為鎖定的功能是通過Synchronized來實現(xiàn)的,這是一種塊結(jié)構(gòu),只能對代碼中的一段代碼進行鎖定,而且鎖定是單一的。如以下代碼所示:
synchronized(lock){
//執(zhí)行對共享資源的操作
……
}
|
一些復雜的功能就很難被實現(xiàn)。比如說如果程序需要取得lock A和lock B來進行操作1,然后需要取得lock C并且釋放lock A來進行操作2,Java 5.0之前的多線程框架就顯得無能為力了。
因為這些問題,程序員對舊的框架一直頗有微詞。這種情況一直到Java 5.0才有較大的改觀,一系列的多線程工具包被納入了標準庫文件。這些工具包括了一個新的多線程程序的執(zhí)行框架,使編程人員可方便地協(xié)調(diào)和調(diào)度線程的運行,并且新加入了一些高性能的常用的工具,使程序更容易編寫,運行效率更高。本文將分類并結(jié)合例子來介紹這些新加的多線程工具。
在我們開始介紹Java 5.0里的新Concurrent工具前讓我們先來看一下一個用舊的多線程工具編寫的程序,這個程序里有一個Server線程,它需要啟動兩個 Component,Server線程需等到Component線程完畢后再繼續(xù)。相同的功能在Synchronizer一章里用新加的工具 CountDownLatch有相同的實現(xiàn)。兩個程序,孰優(yōu)孰劣,哪個程序更容易編寫,哪個程序更容易理解,相信大家看過之后不難得出結(jié)論。
public class ServerThread {
Object concLock = new Object();
int count = 2;
public void runTwoThreads() {
//啟動兩個線程去初始化組件
new Thread(new ComponentThread1(this)).start();
new Thread(new ComponentThread1(this)).start();
// Wait for other thread
while(count != 0) {
synchronized(concLock) {
try {
concLock.wait();
System.out.println("Wake up.");
} catch (InterruptedException ie) { //處理異常}
}
}
System.out.println("Server is up.");
}
public void callBack() {
synchronized(concLock) {
count--;
concLock.notifyAll();
}
}
public static void main(String[] args){
ServerThread server = new ServerThread();
server.runTwoThreads();
}
}
public class ComponentThread1 implements Runnable {
private ServerThread server;
public ComponentThread1(ServerThread server) {
this.server = server;
}
public void run() {
//做組件初始化的工作
System.out.println("Do component initialization.");
server.callBack();
}
}
|
1:三個新加的多線程包
Java 5.0里新加入了三個多線程包:java.util.concurrent, java.util.concurrent.atomic, java.util.concurrent.locks.
- java.util.concurrent包含了常用的多線程工具,是新的多線程工具的主體。
- java.util.concurrent.atomic 包含了不用加鎖情況下就能改變值的原子變量,比如說AtomicInteger提供了addAndGet()方法。Add和Get是兩個不同的操作,為了保證別的線程不干擾,以往的做法是先鎖定共享的變量,然后在鎖定的范圍內(nèi)進行兩步操作。但用AtomicInteger.addAndGet()就不用擔心鎖定的事了,其內(nèi)部實現(xiàn)保證了這兩步操作是在原子量級發(fā)生的,不會被別的線程干擾。
- java.util.concurrent.locks包包含鎖定的工具。
2:Callable 和 Future接口
Callable是類似于Runnable的接口,實現(xiàn)Callable接口的類和實現(xiàn)Runnable的類都是可被其它線程執(zhí)行的任務。Callable和Runnable有幾點不同:
- Callable規(guī)定的方法是call(),而Runnable規(guī)定的方法是run().
- Callable的任務執(zhí)行后可返回值,而Runnable的任務是不能返回值的。
- call()方法可拋出異常,而run()方法是不能拋出異常的。
- 運行Callable任務可拿到一個Future對象,通過Future對象可了解任務執(zhí)行情況,可取消任務的執(zhí)行,還可獲取任務執(zhí)行的結(jié)果。
以下是Callable的一個例子:
public class DoCallStuff implements Callable{ // *1
private int aInt;
public DoCallStuff(int aInt) {
this.aInt = aInt;
}
public String call() throws Exception { //*2
boolean resultOk = false;
if(aInt == 0){
resultOk = true;
} else if(aInt == 1){
while(true){ //infinite loop
System.out.println("looping....");
Thread.sleep(3000);
}
} else {
throw new Exception("Callable terminated with Exception!"); //*3
}
if(resultOk){
return "Task done.";
} else {
return "Task failed";
}
}
}
|
*1: 名為DoCallStuff類實現(xiàn)了Callable,String將是call方法的返回值類型。例子中用了String,但可以是任何Java類。
*2: call方法的返回值類型為String,這是和類的定義相對應的。并且可以拋出異常。
*3: call方法可以拋出異常,如加重的斜體字所示。
以下是調(diào)用DoCallStuff的主程序。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Executor {
public static void main(String[] args){
//*1
DoCallStuff call1 = new DoCallStuff(0);
DoCallStuff call2 = new DoCallStuff(1);
DoCallStuff call3 = new DoCallStuff(2);
//*2
ExecutorService es = Executors.newFixedThreadPool(3);
//*3
Future future1 = es.submit(call1);
Future future2 = es.submit(call2);
Future future3 = es.submit(call3);
try {
//*4
System.out.println(future1.get());
//*5
Thread.sleep(3000);
System.out.println("Thread 2 terminated? :" + future2.cancel(true));
//*6
System.out.println(future3.get());
} catch (ExecutionException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
|
*1: 定義了幾個任務
*2: 初始了任務執(zhí)行工具。任務的執(zhí)行框架將會在后面解釋。
*3: 執(zhí)行任務,任務啟動時返回了一個Future對象,如果想得到任務執(zhí)行的結(jié)果或者是異常可對這個Future對象進行操作。Future所含的值必須跟Callable所含的值對映,比如說例子中Future對印Callable
*4: 任務1正常執(zhí)行完畢,future1.get()會返回線程的值
*5: 任務2在進行一個死循環(huán),調(diào)用future2.cancel(true)來中止此線程。傳入的參數(shù)標明是否可打斷線程,true表明可以打斷。
*6: 任務3拋出異常,調(diào)用future3.get()時會引起異常的拋出。
運行Executor會有以下運行結(jié)果:
looping....
Task done. //*1
looping....
looping....//*2
looping....
looping....
looping....
looping....
Thread 2 terminated? :true //*3
//*4
java.util.concurrent.ExecutionException: java.lang.Exception: Callable terminated with Exception!
at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:205)
at java.util.concurrent.FutureTask.get(FutureTask.java:80)
at concurrent.Executor.main(Executor.java:43)
…….
|
*1: 任務1正常結(jié)束
*2: 任務2是個死循環(huán),這是它的打印結(jié)果
*3: 指示任務2被取消
*4: 在執(zhí)行future3.get()時得到任務3拋出的異常
3:新的任務執(zhí)行架構(gòu)
在Java 5.0之前啟動一個任務是通過調(diào)用Thread類的start()方法來實現(xiàn)的,任務的提于交和執(zhí)行是同時進行的,如果你想對任務的執(zhí)行進行調(diào)度或是控制同時執(zhí)行的線程數(shù)量就需要額外編寫代碼來完成。5.0里提供了一個新的任務執(zhí)行架構(gòu)使你可以輕松地調(diào)度和控制任務的執(zhí)行,并且可以建立一個類似數(shù)據(jù)庫連接池的線程池來執(zhí)行任務。這個架構(gòu)主要有三個接口和其相應的具體類組成。這三個接口是Executor, ExecutorService和ScheduledExecutorService,讓我們先用一個圖來顯示它們的關(guān)系:
圖的左側(cè)是接口,圖的右側(cè)是這些接口的具體類。注意Executor是沒有直接具體實現(xiàn)的。
Executor接口:
是用來執(zhí)行Runnable任務的,它只定義一個方法:
- execute(Runnable command):執(zhí)行Ruannable類型的任務
ExecutorService接口:
ExecutorService繼承了Executor的方法,并提供了執(zhí)行Callable任務和中止任務執(zhí)行的服務,其定義的方法主要有:
- submit(task):可用來提交Callable或Runnable任務,并返回代表此任務的Future對象
- invokeAll(collection of tasks):批處理任務集合,并返回一個代表這些任務的Future對象集合
- shutdown():在完成已提交的任務后關(guān)閉服務,不再接受新任務
- shutdownNow():停止所有正在執(zhí)行的任務并關(guān)閉服務。
- isTerminated():測試是否所有任務都執(zhí)行完畢了。
- isShutdown():測試是否該ExecutorService已被關(guān)閉
ScheduledExecutorService接口
在ExecutorService的基礎上,ScheduledExecutorService提供了按時間安排執(zhí)行任務的功能,它提供的方法主要有:
- schedule(task, initDelay): 安排所提交的Callable或Runnable任務在initDelay指定的時間后執(zhí)行。
- scheduleAtFixedRate():安排所提交的Runnable任務按指定的間隔重復執(zhí)行
- scheduleWithFixedDelay():安排所提交的Runnable任務在每次執(zhí)行完后,等待delay所指定的時間后重復執(zhí)行。
代碼:ScheduleExecutorService的例子
public class ScheduledExecutorServiceTest {
public static void main(String[] args)
throws InterruptedException, ExecutionException{
//*1
ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
//*2
Runnable task1 = new Runnable() {
public void run() {
System.out.println("Task repeating.");
}
};
//*3
final ScheduledFuture future1 =
service.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
//*4
ScheduledFuture future2 = service.schedule(new Callable(){
public String call(){
future1.cancel(true);
return "task cancelled!";
}
}, 5, TimeUnit.SECONDS);
System.out.println(future2.get());
//*5
service.shutdown();
}
}
|
這個例子有兩個任務,第一個任務每隔一秒打印一句“Task repeating”,第二個任務在5秒鐘后取消第一個任務。
*1: 初始化一個ScheduledExecutorService對象,這個對象的線程池大小為2。
*2: 用內(nèi)函數(shù)的方式定義了一個Runnable任務。
*3: 調(diào)用所定義的ScheduledExecutorService對象來執(zhí)行任務,任務每秒執(zhí)行一次。能重復執(zhí)行的任務一定是Runnable類型。注意我們可以用TimeUnit來制定時間單位,這也是Java 5.0里新的特征,5.0以前的記時單位是微秒,現(xiàn)在可精確到奈秒。
*4: 調(diào)用ScheduledExecutorService對象來執(zhí)行第二個任務,第二個任務所作的就是在5秒鐘后取消第一個任務。
*5: 關(guān)閉服務。
Executors類
雖然以上提到的接口有其實現(xiàn)的具體類,但為了方便Java 5.0建議使用Executors的工具類來得到Executor接口的具體對象,需要注意的是Executors是一個類,不是Executor的復數(shù)形式。Executors提供了以下一些static的方法:
- callable(Runnable task): 將Runnable的任務轉(zhuǎn)化成Callable的任務
- newSingleThreadExecutor: 產(chǎn)生一個ExecutorService對象,這個對象只有一個線程可用來執(zhí)行任務,若任務多于一個,任務將按先后順序執(zhí)行。
- newCachedThreadPool(): 產(chǎn)生一個ExecutorService對象,這個對象帶有一個線程池,線程池的大小會根據(jù)需要調(diào)整,線程執(zhí)行完任務后返回線程池,供執(zhí)行下一次任務使用。
- newFixedThreadPool(int poolSize):產(chǎn)生一個ExecutorService對象,這個對象帶有一個大小為poolSize的線程池,若任務數(shù)量大于poolSize,任務會被放在一個queue里順序執(zhí)行。
- newSingleThreadScheduledExecutor:產(chǎn)生一個ScheduledExecutorService對象,這個對象的線程池大小為1,若任務多于一個,任務將按先后順序執(zhí)行。
- newScheduledThreadPool(int poolSize): 產(chǎn)生一個ScheduledExecutorService對象,這個對象的線程池大小為poolSize,若任務數(shù)量大于poolSize,任務會在一個queue里等待執(zhí)行
以下是得到和使用ExecutorService的例子:
代碼:如何調(diào)用Executors來獲得各種服務對象
//Single Threaded ExecutorService
ExecutorService singleThreadeService = Executors.newSingleThreadExecutor();
//Cached ExecutorService
ExecutorService cachedService = Executors.newCachedThreadPool();
//Fixed number of ExecutorService
ExecutorService fixedService = Executors.newFixedThreadPool(3);
//Single ScheduledExecutorService
ScheduledExecutorService singleScheduledService =
Executors.newSingleThreadScheduledExecutor();
//Fixed number of ScheduledExecutorService
ScheduledExecutorService fixedScheduledService =
Executors.newScheduledThreadPool(3);
|
4:Lockers和Condition接口
在多線程編程里面一個重要的概念是鎖定,如果一個資源是多個線程共享的,為了保證數(shù)據(jù)的完整性,在進行事務性操作時需要將共享資源鎖定,這樣可以保證在做事務性操作時只有一個線程能對資源進行操作,從而保證數(shù)據(jù)的完整性。在5.0以前,鎖定的功能是由Synchronized關(guān)鍵字來實現(xiàn)的,這樣做存在幾個問題:
- 每次只能對一個對象進行鎖定。若需要鎖定多個對象,編程就比較麻煩,一不小心就會出現(xiàn)死鎖現(xiàn)象。
- 如果線程因拿不到鎖定而進入等待狀況,是沒有辦法將其打斷的
在Java 5.0里出現(xiàn)兩種鎖的工具可供使用,下圖是這兩個工具的接口及其實現(xiàn):

Lock接口
ReentrantLock是Lock的具體類,Lock提供了以下一些方法:
- lock(): 請求鎖定,如果鎖已被別的線程鎖定,調(diào)用此方法的線程被阻斷進入等待狀態(tài)。
- tryLock():如果鎖沒被別的線程鎖定,進入鎖定狀態(tài),并返回true。若鎖已被鎖定,返回false,不進入等待狀態(tài)。此方法還可帶時間參數(shù),如果鎖在方法執(zhí)行時已被鎖定,線程將繼續(xù)等待規(guī)定的時間,若還不行才返回false。
- unlock():取消鎖定,需要注意的是Lock不會自動取消,編程時必須手動解鎖。
代碼:
//生成一個鎖
Lock lock = new ReentrantLock();
public void accessProtectedResource() {
lock.lock(); //取得鎖定
try {
//對共享資源進行操作
} finally {
//一定記著把鎖取消掉,鎖本身是不會自動解鎖的
lock.unlock();
}
}
|
ReadWriteLock接口
為了提高效率有些共享資源允許同時進行多個讀的操作,但只允許一個寫的操作,比如一個文件,只要其內(nèi)容不變可以讓多個線程同時讀,不必做排他的鎖定,排他的鎖定只有在寫的時候需要,以保證別的線程不會看到數(shù)據(jù)不完整的文件。ReadWriteLock可滿足這種需要。ReadWriteLock內(nèi)置兩個 Lock,一個是讀的Lock,一個是寫的Lock。多個線程可同時得到讀的Lock,但只有一個線程能得到寫的Lock,而且寫的Lock被鎖定后,任何線程都不能得到Lock。ReadWriteLock提供的方法有:
- readLock(): 返回一個讀的lock
- writeLock(): 返回一個寫的lock, 此lock是排他的。
ReadWriteLock的例子:
public class FileOperator{
//初始化一個ReadWriteLock
ReadWriteLock lock = new ReentrantReadWriteLock();
public String read() {
//得到readLock并鎖定
Lock readLock = lock.readLock();
readLock.lock();
try {
//做讀的工作
return "Read something";
} finally {
readLock.unlock();
}
}
public void write(String content) {
//得到writeLock并鎖定
Lock writeLock = lock.writeLock();
writeLock.lock();
try {
//做讀的工作
} finally {
writeLock.unlock();
}
}
}
|
需要注意的是ReadWriteLock提供了一個高效的鎖定機理,但最終程序的運行效率是和程序的設計息息相關(guān)的,比如說如果讀的線程和寫的線程同時在等待,要考慮是先發(fā)放讀的lock還是先發(fā)放寫的lock。如果寫發(fā)生的頻率不高,而且快,可以考慮先給寫的lock。還要考慮的問題是如果一個寫正在等待讀完成,此時一個新的讀進來,是否要給這個新的讀發(fā)鎖,如果發(fā)了,可能導致寫的線程等很久。等等此類問題在編程時都要給予充分的考慮。
Condition接口:
有時候線程取得lock后需要在一定條件下才能做某些工作,比如說經(jīng)典的Producer和Consumer問題,Consumer必須在籃子里有蘋果的時候才能吃蘋果,否則它必須暫時放棄對籃子的鎖定,等到Producer往籃子里放了蘋果后再去拿來吃。而Producer必須等到籃子空了才能往里放蘋果,否則它也需要暫時解鎖等Consumer把蘋果吃了才能往籃子里放蘋果。在Java 5.0以前,這種功能是由Object類的wait(), notify()和notifyAll()等方法實現(xiàn)的,在5.0里面,這些功能集中到了Condition這個接口來實現(xiàn),Condition提供以下方法:
- await():使調(diào)用此方法的線程放棄鎖定,進入睡眠直到被打斷或被喚醒。
- signal(): 喚醒一個等待的線程
- signalAll():喚醒所有等待的線程
Condition的例子:
public class Basket {
Lock lock = new ReentrantLock();
//產(chǎn)生Condition對象
Condition produced = lock.newCondition();
Condition consumed = lock.newCondition();
boolean available = false;
public void produce() throws InterruptedException {
lock.lock();
try {
if(available){
consumed.await(); //放棄lock進入睡眠
}
/*生產(chǎn)蘋果*/
System.out.println("Apple produced.");
available = true;
produced.signal(); //發(fā)信號喚醒等待這個Condition的線程
} finally {
lock.unlock();
}
}
public void consume() throws InterruptedException {
lock.lock();
try {
if(!available){
produced.await();//放棄lock進入睡眠
}
/*吃蘋果*/
System.out.println("Apple consumed.");
available = false;
consumed.signal();//發(fā)信號喚醒等待這個Condition的線程
} finally {
lock.unlock();
}
}
}
|
ConditionTester:
public class ConditionTester {
public static void main(String[] args) throws InterruptedException{
final Basket basket = new Basket();
//定義一個producer
Runnable producer = new Runnable() {
public void run() {
try {
basket.produce();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
};
//定義一個consumer
Runnable consumer = new Runnable() {
public void run() {
try {
basket.consume();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
};
//各產(chǎn)生10個consumer和producer
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i < 10; i++)
service.submit(consumer);
Thread.sleep(2000);
for(int i=0; i<10; i++)
service.submit(producer);
service.shutdown();
}
}
|
5: Synchronizer:同步裝置
Java 5.0里新加了4個協(xié)調(diào)線程間進程的同步裝置,它們分別是Semaphore, CountDownLatch, CyclicBarrier和Exchanger.
Semaphore:
用來管理一個資源池的工具,Semaphore可以看成是個通行證,線程要想從資源池拿到資源必須先拿到通行證,Semaphore提供的通行證數(shù)量和資源池的大小一致。如果線程暫時拿不到通行證,線程就會被阻斷進入等待狀態(tài)。以下是一個例子:
public class Pool {
ArrayList pool = null;
Semaphore pass = null;
public Pool(int size){
//初始化資源池
pool = new ArrayList();
for(int i=0; i
pool.add("Resource "+i);
}
//Semaphore的大小和資源池的大小一致
pass = new Semaphore(size);
}
public String get() throws InterruptedException{
//獲取通行證,只有得到通行證后才能得到資源
pass.acquire();
return getResource();
}
public void put(String resource){
//歸還通行證,并歸還資源
pass.release();
releaseResource(resource);
}
private synchronized String getResource() {
String result = pool.get(0);
pool.remove(0);
System.out.println("Give out "+result);
return result;
}
private synchronized void releaseResource(String resource) {
System.out.println("return "+resource);
pool.add(resource);
}
}
|
SemaphoreTest:
public class SemaphoreTest {
public static void main(String[] args){
final Pool aPool = new Pool(2);
Runnable worker = new Runnable() {
public void run() {
String resource = null;
try {
//取得resource
resource = aPool.get();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
//用resource做工作
System.out.println("I worked on "+resource);
//歸還resource
aPool.put(resource);
}
};
ExecutorService service = Executors.newCachedThreadPool();
for(int i=0; i<20; i++){
service.submit(worker);
}
service.shutdown();
}
}
|
CountDownLatch:
CountDownLatch是個計數(shù)器,它有一個初始數(shù),等待這個計數(shù)器的線程必須等到計數(shù)器倒數(shù)到零時才可繼續(xù)。比如說一個Server啟動時需要初始化4個部件,Server可以同時啟動4個線程去初始化這4個部件,然后調(diào)用CountDownLatch(4).await()阻斷進入等待,每個線程完成任務后會調(diào)用一次CountDownLatch.countDown()來倒計數(shù), 當4個線程都結(jié)束時CountDownLatch的計數(shù)就會降低為0,此時Server就會被喚醒繼續(xù)下一步操作。CountDownLatch的方法主要有:
- await():使調(diào)用此方法的線程阻斷進入等待
- countDown(): 倒計數(shù),將計數(shù)值減1
- getCount(): 得到當前的計數(shù)值
CountDownLatch的例子:一個server調(diào)了三個ComponentThread分別去啟動三個組件,然后server等到組件都啟動了再繼續(xù)。
public class Server {
public static void main(String[] args) throws InterruptedException{
System.out.println("Server is starting.");
//初始化一個初始值為3的CountDownLatch
CountDownLatch latch = new CountDownLatch(3);
//起3個線程分別去啟動3個組件
ExecutorService service = Executors.newCachedThreadPool();
service.submit(new ComponentThread(latch, 1));
service.submit(new ComponentThread(latch, 2));
service.submit(new ComponentThread(latch, 3));
service.shutdown();
//進入等待狀態(tài)
latch.await();
//當所需的三個組件都完成時,Server就可繼續(xù)了
System.out.println("Server is up!");
}
}
public class ComponentThread implements Runnable{
CountDownLatch latch;
int ID;
/** Creates a new instance of ComponentThread */
public ComponentThread(CountDownLatch latch, int ID) {
this.latch = latch;
this.ID = ID;
}
public void run() {
System.out.println("Component "+ID + " initialized!");
//將計數(shù)減一
latch.countDown();
}
}
|
運行結(jié)果:
Server is starting.
Component 1 initialized!
Component 3 initialized!
Component 2 initialized!
Server is up!
|
CyclicBarrier:
CyclicBarrier類似于CountDownLatch也是個計數(shù)器,不同的是CyclicBarrier數(shù)的是調(diào)用了 CyclicBarrier.await()進入等待的線程數(shù),當線程數(shù)達到了CyclicBarrier初始時規(guī)定的數(shù)目時,所有進入等待狀態(tài)的線程被喚醒并繼續(xù)。CyclicBarrier就象它名字的意思一樣,可看成是個障礙,所有的線程必須到齊后才能一起通過這個障礙。CyclicBarrier 初始時還可帶一個Runnable的參數(shù),此Runnable任務在CyclicBarrier的數(shù)目達到后,所有其它線程被喚醒前被執(zhí)行。
CyclicBarrier提供以下幾個方法:
- await():進入等待
- getParties():返回此barrier需要的線程數(shù)
- reset():將此barrier重置
以下是使用CyclicBarrier的一個例子:兩個線程分別在一個數(shù)組里放一個數(shù),當這兩個線程都結(jié)束后,主線程算出數(shù)組里的數(shù)的和(這個例子比較無聊,我沒有想到更合適的例子)
public class MainThread {
public static void main(String[] args)
throws InterruptedException, BrokenBarrierException, TimeoutException{
final int[] array = new int[2];
CyclicBarrier barrier = new CyclicBarrier(2,
new Runnable() {//在所有線程都到達Barrier時執(zhí)行
public void run() {
System.out.println("Total is:"+(array[0]+array[1]));
}
});
//啟動線程
new Thread(new ComponentThread(barrier, array, 0)).start();
new Thread(new ComponentThread(barrier, array, 1)).start();
}
}
public class ComponentThread implements Runnable{
CyclicBarrier barrier;
int ID;
int[] array;
public ComponentThread(CyclicBarrier barrier, int[] array, int ID) {
this.barrier = barrier;
this.ID = ID;
this.array = array;
}
public void run() {
try {
array[ID] = new Random().nextInt();
System.out.println(ID+ " generates:"+array[ID]);
//該線程完成了任務等在Barrier處
barrier.await();
} catch (BrokenBarrierException ex) {
ex.printStackTrace();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
|
Exchanger:
顧名思義Exchanger讓兩個線程可以互換信息。用一個例子來解釋比較容易。例子中服務生線程往空的杯子里倒水,顧客線程從裝滿水的杯子里喝水,然后通過Exchanger雙方互換杯子,服務生接著往空杯子里倒水,顧客接著喝水,然后交換,如此周而復始。
class FillAndEmpty {
//初始化一個Exchanger,并規(guī)定可交換的信息類型是DataCup
Exchanger exchanger = new Exchanger();
Cup initialEmptyCup = ...; //初始化一個空的杯子
Cup initialFullCup = ...; //初始化一個裝滿水的杯子
//服務生線程
class Waiter implements Runnable {
public void run() {
Cup currentCup = initialEmptyCup;
try {
//往空的杯子里加水
currentCup.addWater();
//杯子滿后和顧客的空杯子交換
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ... }
}
}
//顧客線程
class Customer implements Runnable {
public void run() {
DataCup currentCup = initialFullCup;
try {
//把杯子里的水喝掉
currentCup.drinkFromCup();
//將空杯子和服務生的滿杯子交換
currentCup = exchanger.exchange(currentCup);
} catch (InterruptedException ex) { ... handle ...}
}
}
void start() {
new Thread(new Waiter()).start();
new Thread(new Customer()).start();
}
}
|
6: BlockingQueue接口
BlockingQueue是一種特殊的Queue,若BlockingQueue是空的,從BlockingQueue取東西的操作將會被阻斷進入等待狀態(tài)直到BlocingkQueue進了新貨才會被喚醒。同樣,如果BlockingQueue是滿的任何試圖往里存東西的操作也會被阻斷進入等待狀態(tài),直到BlockingQueue里有新的空間才會被喚醒繼續(xù)操作。BlockingQueue提供的方法主要有:
- add(anObject): 把anObject加到BlockingQueue里,如果BlockingQueue可以容納返回true,否則拋出IllegalStateException異常。
- offer(anObject):把anObject加到BlockingQueue里,如果BlockingQueue可以容納返回true,否則返回false。
- put(anObject):把anObject加到BlockingQueue里,如果BlockingQueue沒有空間,調(diào)用此方法的線程被阻斷直到BlockingQueue里有新的空間再繼續(xù)。
- poll(time):取出BlockingQueue里排在首位的對象,若不能立即取出可等time參數(shù)規(guī)定的時間。取不到時返回null。
- take():取出BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態(tài)直到BlockingQueue有新的對象被加入為止。
根據(jù)不同的需要BlockingQueue有4種具體實現(xiàn):
- ArrayBlockingQueue:規(guī)定大小的BlockingQueue,其構(gòu)造函數(shù)必須帶一個int參數(shù)來指明其大小。其所含的對象是以FIFO(先入先出)順序排序的。
- LinkedBlockingQueue:大小不定的BlockingQueue,若其構(gòu)造函數(shù)帶一個規(guī)定大小的參數(shù),生成的BlockingQueue有大小限制,若不帶大小參數(shù),所生成的 BlockingQueue的大小由Integer.MAX_VALUE來決定。其所含的對象是以FIFO(先入先出)順序排序的。 LinkedBlockingQueue和ArrayBlockingQueue比較起來,它們背后所用的數(shù)據(jù)結(jié)構(gòu)不一樣,導致 LinkedBlockingQueue的數(shù)據(jù)吞吐量要大于ArrayBlockingQueue,但在線程數(shù)量很大時其性能的可預見性低于 ArrayBlockingQueue。
- PriorityBlockingQueue:類似于LinkedBlockingQueue,但其所含對象的排序不是FIFO,而是依據(jù)對象的自然排序順序或者是構(gòu)造函數(shù)所帶的Comparator決定的順序。
- SynchronousQueue:特殊的BlockingQueue,對其的操作必須是放和取交替完成的。
下面是用BlockingQueue來實現(xiàn)Producer和Consumer的例子:
public class BlockingQueueTest {
static BlockingQueue basket;
public BlockingQueueTest() {
//定義了一個大小為2的BlockingQueue,也可根據(jù)需要用其他的具體類
basket = new ArrayBlockingQueue(2);
}
class Producor implements Runnable {
public void run() {
while(true){
try {
//放入一個對象,若basket滿了,等到basket有位置
basket.put("An apple");
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
class Consumer implements Runnable {
public void run() {
while(true){
try {
//取出一個對象,若basket為空,等到basket有東西為止
String result = basket.take();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
public void execute(){
for(int i=0; i<10; i++){
new Thread(new Producor()).start();
new Thread(new Consumer()).start();
}
}
public static void main(String[] args){
BlockingQueueTest test = new BlockingQueueTest();
test.execute();
}
}
|
7:Atomics 原子級變量
原子量級的變量,主要的類有AtomicBoolean, AtomicInteger, AotmicIntegerArray, AtomicLong, AtomicLongArray, AtomicReference ……。這些原子量級的變量主要提供兩個方法:
- compareAndSet(expectedValue, newValue): 比較當前的值是否等于expectedValue,若等于把當前值改成newValue,并返回true。若不等,返回false。
- getAndSet(newValue): 把當前值改為newValue,并返回改變前的值。
這些原子級變量利用了現(xiàn)代處理器(CPU)的硬件支持可把兩步操作合為一步的功能,避免了不必要的鎖定,提高了程序的運行效率。
8:Concurrent Collections 共點聚集
在Java的聚集框架里可以調(diào)用Collections.synchronizeCollection(aCollection)將普通聚集改變成同步聚集,使之可用于多線程的環(huán)境下。 但同步聚集在一個時刻只允許一個線程訪問它,其它想同時訪問它的線程會被阻斷,導致程序運行效率不高。Java 5.0里提供了幾個共點聚集類,它們把以前需要幾步才能完成的操作合成一個原子量級的操作,這樣就可讓多個線程同時對聚集進行操作,避免了鎖定,從而提高了程序的運行效率。Java 5.0目前提供的共點聚集類有:ConcurrentHashMap, ConcurrentLinkedQueue, CopyOnWriteArrayList和CopyOnWriteArraySet.