1、Condition的简介
线程通信中的互斥除了用synchronized、Object类的wait()和notify()/notifyAll()方式实现外,方法JDK1.5中提供的Condition配套Lock可以实现相同的功能。Condition中的await()和signal()/signalAll()就相当于Object的wait()和notify()/notifyAll()。传统线程的通信方式,Condition都可以实现。不过要注意的是,Condition是被绑定到Lock上的,所以要创建一个Lock的Condition必须使用newCondition()方法。在等待Condition时,允许发生“虚假唤醒”, 这通常作为对基础平台语义的让步。
Condition的强大之处在于它可以为多个线程间建立不同的Condition。
2、Condition源码中的例子
看JDK文档中的一个例子:假定有一个绑定的缓冲区,它支持 put 和 take 方法。如果试图在空的缓冲区上执行take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行 put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待 set 中保存put 线程和take 线程,这样就可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程。可以使用两个Condition实例来做到这一点。
——其实就是java.util.concurrent.ArrayBlockingQueue的功能。
class BoundedBuffer { final Lock lock = new ReentrantLock(); //锁对象 final Condition notFull = lock.newCondition(); //写线程锁 final Condition notEmpty = lock.newCondition(); //读线程锁 final Object[] items = new Object[100];//缓存队列 int putptr; //写索引 int takeptr; //读索引 int count; //队列中数据数目 //写 public void put(Object x) throws InterruptedException { lock.lock(); //锁定 try { // 如果队列满,则阻塞 <写线程> while (count == items.length) { notFull.await(); } // 写入队列,并更新写索引 items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; // 唤醒 <读线程> notEmpty.signal(); } finally { lock.unlock();//解除锁定 } } //读 public Object take() throws InterruptedException { lock.lock(); //锁定 try { // 如果队列空,则阻塞 <读线程> while (count == 0) { notEmpty.await(); } //读取队列,并更新读索引 Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; // 唤醒 <写线程> notFull.signal(); return x; } finally { lock.unlock();//解除锁定 } } 写线程> 读线程> 读线程> 写线程>
3、Condition demo
1)demo 1
接着上次()的一道面试题,现在改用Condition和Lock来实现:
/** * 面试题目:子线程循环10次,接着主线程循环100次,接着又回到子线程循环10次,接着又回到主线程又循环100次,如此循环50次。写出程序 * * 经验:要用到共同数据(包括同步锁)或共同算法的若干个方法应该归在同一个类身上,这种设计体现了高类聚和程序的健壮性。 * * 采用condition通信方式:Condition的功能类似在传统线程技术中的Object.wait()和Object.notify的功能。 * 在等待Condition时,允许发生“虚假唤醒”。 */public class ConditionCommunication { public static void main(String[] args) { final Business business = new Business(); // 子线程循环 new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { try { business.sub(i); } catch (InterruptedException e) { } } } }).start(); // 主线程循环 for (int i = 1; i <= 50; i++) { try { business.mian(i); } catch (InterruptedException e) { } } } /** * 业务类型(包含各色的同步锁) */ static class Business { Lock lock = new ReentrantLock(); Condition condition = lock.newCondition(); // sub()方法是否该运行标识 private boolean bShouldSub = true; /** * 循环100次打印的方法sub() * * @param i * @throws InterruptedException */ public void sub(int i) throws InterruptedException { lock.lock(); try { while (!bShouldSub) { // 当 bShouldSub 为 false 时,则等待 condition.await(); // 不能写成condition.wait();,这里的wait()是Object的方法 } for (int j = 1; j <= 10; j++) { System.out.println("sub thread : 第" + i + "行, 第" + j + "列"); } bShouldSub = false; // 执行for循环后,标志sub()方法不可再执行 condition.signal(); // 发信号 } finally { lock.unlock(); } } /** * 循环100次打印的方法mian() * * @param i * @throws InterruptedException */ public void mian(int i) throws InterruptedException { lock.lock(); try { while (bShouldSub) { condition.await(); // 不能写成condition.wait();,这里的wait()是Object的方法 } for (int j = 1; j <= 100; j++) { System.out.println("main thread : 第" + i + "行, 第" + j + "列"); } bShouldSub = true; // 执行for循环后,标志sub()方法可再执行了 condition.signal(); // 发信号 } finally { lock.unlock(); } } }}
2)demo 2
用上面的demo1题目改装为:线程1循环10次,接着线程2循环20次,接着线程3循环30次,又回到线程1循环10次,接着又回到线程2循环20次……,如此循环50次。
分析:这种实现顺序唤醒的,可以采用创建多个Condition来实现。
写出程序:
import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;/** * 面试题目:线程1循环10次,接着线程2循环20次,接着线程3循环30次,又回到线程1循环10次,接着又回到线程2循环20次……,如此循环50次。写出程序 */public class ConditionCommunication2 { public static void main(String[] args) { final Business business = new Business(); // 线程2循环sub2() new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { try { business.sub2(i); } catch (InterruptedException e) { } } } }).start(); // 线程3循环sub3() new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 50; i++) { try { business.sub3(i); } catch (InterruptedException e) { } } } }).start(); // (主线程)线程1循环sub1() for (int i = 1; i <= 50; i++) { try { business.sub1(i); } catch (InterruptedException e) { } } } /** * 业务类型(包含各色的同步锁) */ static class Business { Lock lock = new ReentrantLock(); Condition condition1 = lock.newCondition(); Condition condition2 = lock.newCondition(); Condition condition3 = lock.newCondition(); // sub()方法是否该运行标识 private int shouldSub = 1; /** * 循环10次打印的方法sub1() * * @param i * @throws InterruptedException */ public void sub1(int i) throws InterruptedException { lock.lock(); try { while (shouldSub != 1) { // 当 shouldSub 不为 1 时,则等待 condition1.await(); // 不能写成condition.wait();,这里的wait()是Object的方法 } for (int j = 1; j <= 10; j++) { System.out.println("sub1 thread : 第" + i + "行, 第" + j + "列"); } shouldSub = 2; // 执行for循环后,标志下一个可以循环的方法时sub2() condition2.signal(); // condition2发信号 } finally { lock.unlock(); } } /** * 循环20次打印的方法sub2() * * @param i * @throws InterruptedException */ public void sub2(int i) throws InterruptedException { lock.lock(); try { while (shouldSub != 2) { // 当 shouldSub 不为 2 时,则等待 condition2.await(); // 不能写成condition.wait();,这里的wait()是Object的方法 } for (int j = 1; j <= 20; j++) { System.out.println("sub2 thread : 第" + i + "行, 第" + j + "列"); } shouldSub = 3; // 执行for循环后,标志下一个可以循环的方法时sub3() condition3.signal(); // condition3发信号 } finally { lock.unlock(); } } /** * 循环30次打印的方法sub3() * * @param i * @throws InterruptedException */ public void sub3(int i) throws InterruptedException { lock.lock(); try { while (shouldSub != 3) { // 当 shouldSub 不为 3时,则等待 condition3.await(); // 不能写成condition.wait();,这里的wait()是Object的方法 } for (int j = 1; j <= 30; j++) { System.out.println("sub3 thread : 第" + i + "行, 第" + j + "列"); } shouldSub = 1; // 执行for循环后,标志下一个可以循环的方法时sub1() condition1.signal(); // condition1发信号 } finally { lock.unlock(); } } }}