关于Monitor
A synchronization abstraction supporting waiting on arbitrary boolean conditions
Monitor类是作为ReentrantLock的一个替代,代码中使用 Monitor比使用ReentrantLock更不易出错,可读性也更强,并且也没有显著的性能损失,使用Monitor甚至有潜在的性能得到优化。下面我们整体上对Monitor的源码结构做一下梳理,总的来说也就在从jdk最原生的wait、notify.再做了一层warp。提供更加丰富的API。比如,当我们要实现一个blockingQueue的时候,原生的代码大概是这样写的1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22public class SafeBox<V> {
private V value;
public synchronized V get() throws InterruptedException {
while (value == null) {
wait();//vaule 为空 等待
}
//获得cpu,取出value
V result = value;
value = null;
notifyAll();//唤醒其他wait方法
return result;
}
public synchronized void set(V newValue) throws InterruptedException {
while (value != null) {
wait();//等待
}
//被唤醒后,给value赋值
value = newValue;
//唤醒
notifyAll();
}
}
上面的代码可能还不足以说明原生jdk中纯在的问题,但是原生的wait、notify无法做到更加精细的唤醒操作,而Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,就是多个监视器的意思。在不同的情况下使用不同的Condition。
例如,假如多线程读/写同一个缓冲区:当向缓冲区中写入数据之后,唤醒”读线程”;当从缓冲区读出数据之后,唤醒”写线程”
如果采用Object类中的wait(), notify(), notifyAll()实现该缓冲区,当向缓冲区写入数据之后需要唤醒”读线程”时,不可能通过notify()或notifyAll()明确的指定唤醒”读线程”,而只能通过notifyAll唤醒所有线程(但是notifyAll无法区分唤醒的线程是读线程,还是写线程)。当所有的线程都被唤醒,这里会再次产生一个锁的竞争. 但是,通过Condition,就能明确的指定唤醒读线程。我们在编程的时候可以指定唤醒任何一个线程,如下1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37public class SafeBox<V> {
private final ReentrantLock lock = new ReentrantLock();
private final Condition valuePresent = lock.newCondition();//read condition
private final Condition valueAbsent = lock.newCondition();//write condition
private V value;
public V get() throws InterruptedException {
lock.lock();
try {
while (value == null) {
//读线程等待
valuePresent.await();
}
V result = value;
value = null;
//value置为null的时候,指定唤醒write condition.
valueAbsent.signal();
return result;
} finally {
lock.unlock();
}
}
public void set(V newValue) throws InterruptedException {
lock.lock();
try {
while (value != null) {
//value还存在,不可以写,写线程等待
valueAbsent.await();
}
value = newValue;
//指定唤醒read线程,表示可读
valuePresent.signal();
} finally {
lock.unlock();
}
}
}
看吧有些事情是原生的wait、nofity而不能做到的,现在向大家展示Google guava库中的monitor。提供更多的API,更丰富的功能1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41public class MonitorSample {
private Queue<Integer> queue = new LinkedList<Integer>();
private Monitor monitor = new Monitor();
//put 的Guard,重写方法,可以设置什么情况下返回ture,true就表示放行
//这里表示当queue的大小在3个以下的时候可是进入
private Monitor.Guard put = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return queue.size() < 3;
}
};
//只要queue里面有值,我们就可以取
private Monitor.Guard get = new Monitor.Guard(monitor) {
@Override
public boolean isSatisfied() {
return queue.size() > 0;
}
};
public void set(int value) throws InterruptedException {
//这种方式和try lock的方式相近.当时可以看出来,
//比condition更加直观,每一个guard都可以设置一个 门槛,来放行,
//当任何一个guard达到了条件,就会被唤醒.比如在size等于2的时候,做一些操作,
//添加一个guard然后触发.这比condition更加好用
//而且提供了更多的API
monitor.enterWhen(put);
try {
queue.add(value);
} finally {
monitor.leave();
}
}
public int get() throws InterruptedException {
monitor.enterWhen(get);
try {
return queue.poll();
} finally {
monitor.leave();
}
}
}
提供更多的API
- enter():进入到当前Monitor,无限期阻塞。
- enterInterruptibly():进入到当前Monitor,无限期阻塞,但可能会被打断。
- enter(long time, TimeUnit unit):进入到当前Monitor,最多阻塞给定的时间,返回是否进入Monitor。
- enterInterruptibly(long time, TimeUnit unit):进入到当前Monitor,最多阻塞给定的时间,但可能会被打断,返回是否进入Monitor。
- tryEnter():如果可以的话立即进入Monitor,不阻塞,返回是否进入Monitor。
- enterWhen(Guard guard):当Guard的isSatisfied()为true时,进入当前Monitor,无限期阻塞,但可能会被打断。
- enterWhenUninterruptibly(Guard guard):当Guard的isSatisfied()为true时,进入当前Monitor,无限期阻塞。
- enterWhen(Guard guard, long time, TimeUnit unit):当Guard的isSatisfied()为true时,进入当前Monitor,最多阻塞给定的时间,这个时间包括获取锁的时间和等待Guard satisfied的时间,但可能会被打断。
- enterWhenUninterruptibly(Guard guard, long time, TimeUnit unit):当Guard的isSatisfied()为true时,进入当前Monitor,最多阻塞给定的时间,这个时间包括获取锁的时间和等待Guard satisfied的时间。
- enterIf(Guard guard):如果Guard的isSatisfied()为true,进入当前Monitor,无限期的获得锁,不需要等待Guard satisfied。
- enterIfInterruptibly(Guard guard):如果Guard的isSatisfied()为true,进入当前Monitor,无限期的获得锁,不需要等待Guard satisfied,但可能会被打断。
- enterIf(Guard guard, long time, TimeUnit unit):如果Guard的isSatisfied()为true,进入当前Monitor,在给定的时间内持有锁,不需要等待Guard satisfied。
- enterIfInterruptibly(Guard guard, long time, TimeUnit unit):如果Guard的isSatisfied()为true,进入当前Monitor,在给定的时间内持有锁,不需要等待Guard satisfied,但可能会被打断。
- tryEnterIf(Guard guard):如果Guard的isSatisfied()为true并且可以的话立即进入Monitor,不等待获取锁,也不等待Guard satisfied。
- waitFor(Guard guard):等待Guard satisfied,无限期等待,但可能会被打断,当一个线程当前占有Monitor时,该方法才可能被调用。
- waitForUninterruptibly(Guard guard):等待Guard satisfied,无限期等待,当一个线程当前占有Monitor时,该方法才可能被调用。
- waitFor(Guard guard, long time, TimeUnit unit):等待Guard satisfied,在给定的时间内等待,但可能会被打断,当一个线程当前占有Monitor时,该方法才可能被调用。
- waitForUninterruptibly(Guard guard, long time, TimeUnit unit):等待Guard satisfied,在给定的时间内等待,当一个线程当前占有Monitor时,该方法才可能被调用。
- leave():离开当前Monitor,当一个线程当前占有Monitor时,该方法才可能被调用。
- isFair():判断当前Monitor是否使用一个公平的排序策略。
- isOccupied():返回当前Monitor是否被任何线程占有,此方法适用于检测系统状态,不适用于同步控制。
- isOccupiedByCurrentThread():返回当前线程是否占有当前Monitor。
- getOccupiedDepth():返回当前线程进入Monitor的次数,如果房前线程不占有Monitor,返回0。
- getQueueLength():返回一个估计的等待进入Monitor的线程数量,只是一个估算值,因为线程的数量在这个方法访问那不数据结构的时候可能会动态改变。此方法适用于检测系统状态,不适用于同步控制。
- getWaitQueueLength(Guard guard):返回一个等待给定Guard satisfied的线程估计数量, 注意,因为超时和中断可能发生在任何时候,所以估计只作为一个等待线程的实际数目的上限。此方法适用于检测系统状态,不适用于同步控制。
- hasQueuedThreads():返回是否有任何线程正在等待进入这个Monitor,注意,因为取消随时可能发生,所以返回true并不保证任何其他线程会进入这个Monitor。此方法设计用来检测系统状态。
- hasQueuedThread(Thread thread):返回给定线程是否正在等待进入这个Monitor,注意,因为取消随时可能发生,所以返回true并不保证给定线程会进入这个Monitor。此方法设计用来检测系统状态。
- hasWaiters(Guard guard):返回是否有任何线程正在等待给定Guard satisfied,注意,因为取消随时可能发生,所以返回true并不保证未来Guard变成satisfied时唤醒任意线程。此方法设计用来检测系统状态。
Future编程
我们都知道jdk给了我们异步接口,叫做Future<V>
,我们一般在写异步操作的时候一般都是这样1
2
3
4
5
6
7Future<String> future1 = executorService.submit(new Callable<String>() {
public String call() throws Exception {
//模拟方法调用耗时
Thread.currentThread().sleep(3000);
return "first";
}
});
这里我们会立即得到一个Future对象,但是我们的方法调用,并不能马上得到值,只有当我们调用Future#get()方法的时候,会导致一直阻塞到方法的值被得到,假如我们有3个RPC方法需要调用,RPC-1耗时3秒。RPC-2耗时2秒。RPC-1耗时1秒,如果我们通过传统的方法调用,就会耗时6s,必须等RPC-1调用完成之后,才能进行RPC-2,之后才能进行RPC-1.
如果按照我们异步的思想,其实我们的耗时应该是max(RPC-1,RPC-2,RPC-3)=3s.所以我们利用传统的future接口可以利用future#get的阻塞,拿到3个调用的最长耗时。
但是如果我们希望当future,刚好返回的时候,我们就能调用呢。就是我们常说的异步回调。如果我们用传统的future,只能去轮训future的计算状态来判断future是否计算完成,有了Google guava的包装,让一切都变得非常的简单。ListeningExecutorService是guava实现的可添加监听事件的executor,ListenableFuture则继承了Future接口,添加了addListener接口1
2
3public interface ListenableFuture<V> extends Future<V> {
void addListener(Runnable listener, Executor executor);
}
我们只需要像下面这样做,就可以实现回调了1
2
3
4
5
6
7
8
9
10
11
12
13
14//ListeningExecutorService是guava实现的可添加监听事件的executor
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(3));
ListenableFuture<String> future1 = executorService.submit(new Callable<String>() {
public String call() throws Exception {
Thread.currentThread().sleep(3000);//模拟延迟
return "first";
}
});
future1.addListener(new Runnable() {
public void run() {
//回调函数
System.out.println("do something");
}
},executorService);
Futures类还提供了callback方法,可以得到future的返回值的回调方法1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 Futures.addCallback(ListenableFuture, new FutureCallback<String>(){
public void onSuccess(String result) {
//调用成功
}
public void onFailure(Throwable t) {
//调用失败
}
});
```
Futures类还提供了一个allasList方法,把所有的ListenableFuture组合成一个list来处理,这样方便我们有多个回调操作的时候对返回值做一些聚合处理
```java
//构成一个返回值数组
ListenableFuture<List<Object>> listListenableFuture = Futures.allAsList((Iterable<? extends ListenableFuture<?>>) Arrays.asList(future1, future2, future3));
Futures.addCallback(listListenableFuture, new FutureCallback<List<Object>>() {
public void onSuccess(List<Object> result) {
//list 和数组里的下标是相互对应的,可以做一些聚合操作
}
public void onFailure(Throwable t) {
System.out.println("fair");
}
});
然们来看看callback的源码实现1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static <V> void addCallback(final ListenableFuture<V> future,
final FutureCallback<? super V> callback, Executor executor) {
Preconditions.checkNotNull(callback);
Runnable callbackListener = new Runnable() {
@Override
public void run() {
final V value;
try {
//FutureCallback接口下的方法就是onSuccess和onFailure,然后重新开启了一个线程c
//allbackListener,里面调用了getUninterruptibly方法,
//那么这个方法是干嘛的呢?见下文
value = getUninterruptibly(future);
} catch (ExecutionException e) {
callback.onFailure(e.getCause());
return;
} catch (RuntimeException e) {
callback.onFailure(e);
return;
} catch (Error e) {
callback.onFailure(e);
return;
}
callback.onSuccess(value);
}
};
future.addListener(callbackListener, executor);
}
先笔者说过了,如果我们要实现回调,还有一种方式就是,不断的去轮训future的计算状态是否是已完成。那么我们在getUninterruptibly方法里面看到了这个,虽然get会阻塞,但是getUninterruptibly是在callbackListener这个新的线程当中的
1 | public static <V> V getUninterruptibly(Future<V> future) |
最后调用了future.addListener(callbackListener, executor);这个方法,最简单的回调,只是现在的callbackListener线程里面我们可以得到success和failure状态做一些事情。详情参加guava的源码吧