课程笔记:极客时间—设计模式之美
观察者模式
观察者模式也称为发布订阅模式。在 GoF 的《设计模式》中,定义为:
Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
“在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。”
通常,被依赖的对象叫做被观察者(Observable),依赖的对象叫做观察者(Observer)。其它叫法有:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener等。
观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子。比如,邮件订阅、RSS Feeds,本质上都是观察者模式。
观察者模式是一个比较抽象的模式,根据不同的应用场景和需求,有完全不同的实现方式。比如,有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有进程间的实现方式。最经典的一种实现方式的代码如下:
public interface Subject {
void registerObserver(Observer observer);
void removeObserver(Observer observer);
void notifyObservers(Message msg);
}
public interface Observer {
void update(Message msg);
}
public ConcreteSubject implements Subject {
private List<Observer> observers = new ArrayList<Observer>();
@override
public void registerObserver(Observer observer) {
observers.add(observer);
}
@override
public void removeObserver(Observer observer) {
observers.remove(observer);
}
@override
public void notifyObservers(Message msg) {
for (Observer observer: observers) {
observer.update(msg);
}
}
}
public ConcreteObserverOne implements Observer {
@override
public update(Message msg) {
System.out.Println("ConcreteObserverOne is notified.");
}
}
public ConcreteObserverTwo implements Observer {
@override
public update(Message msg) {
System.out.Println("ConcreteObserverTwo is notified.");
}
}
public class Demo {
public static void main(String[] args) {
ConcreteSubject subject = new ConcreteSubject();
subject.registerObserver(new ConcreteObserverOne());
subject.registerObserver(new ConcreteObserverTwo());
subject.notifyObservers(new Message());
}
}
从刚刚的分类方式来看,它是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。
基于不同应用场景的不同实现方式
- 同步阻塞:主要是为了代码解耦
- 异步非阻塞:除了能实现代码解耦之外,还能提高代码的执行效率
- 创建一个新的线程的实现方式
- 基于
EventBus
的实现方式
- 进程内
- 进程间:解耦更彻底,可以实现不同进程间的被观察者和观察者之间的交互
- 基于 RPC 接口的实现方式
- 基于消息队列的实现方式
同步阻塞实现
例1:一个P2P投资理财系统实现
原始需求:用户注册成功之后,我们会给用户发放投资体验金。
第1版代码(v1)实现如下:
public class UserController {
private UserService userService;
private PromotionService promotionService;
public Long register(String telphone, String password) {
Long userId = userService.register(telphone, password);
promotionService.issueNewUserExperienceCash(userId);
return userId;
}
}
考虑未来的可扩展性,如:
- 不在发放体验金,而是改为发放优惠券;
- 另外,用户注册成功之后,还需要给用户发送一封“欢迎注册成功”的站内信。
此时,v1版代码的问题:
- 需要修改
register()
函数中的代码,违反开闭原则; - 注册成功后需要执行的后续操作越来越多,那
register()
函数的逻辑会变得越来越复杂,影响到代码的可读性和可维护性;
我们可以使用观察者模式对上述代码进行重构,它的同步阻塞方式实现代码(v2)如下:
public class UserController {
private UserService userService;
private List<RegObserver> regObservers = new ArrayList<>();
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telphone, String password) {
Long userId = userService.register(telphone, password);
for (RegObserver observer: regObservers) {
observer.handleRegSuccess(userId);
}
return userId;
}
}
// 观察者类实现
public interface RegObserver {
void handleRegSuccess(Long userId);
}
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService;
@override
public void handleRegSuccess(Long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
public class RegNotificationObserver implements RegObserver {
private NotificationService notificationService;
@override
public void handleRegSuccess(Long userId) {
notificationService.sendInboxMessage(userId);
}
}
v2版代码的优点:
- 当需要添加新的观察者时,
UserController
类的register()
函数完全不需要修改,只需要再添加一个实现了RegObserver
接口的类,并通过setRegObservers()
函数将它注册到UserController
类中即可。(可扩展性好)
异步非阻塞实现
再进一步,考虑如下性能场景。如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那么我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间。
如果不考虑通用性、复用性,异步非阻塞观察者模式有 2 种简易的实现方式:
- 在每个
handleRegSuccess()
函数种创建一个新的线程执行代码逻辑; - 在
UserController
类的register()
函数中使用线程池来执行每个观察者的handleRegSuccess()
函数;
它们的部分代码实现(v3)如下:
// 第 1 种实现方式
public class RegPromotionObserver implements RegObserver {
private PromotionService promotionService;
@override
public void handleRegSuccess(Long userId) {
Thread thread = new Thread(new Runnable() {
@override
public void run() {
promotionService.issueNewUserExperienceCash(userId);
}
});
thread.start();
}
}
// 第 2 种实现方式
public class UserController {
private UserService userService;
private List<RegObserver> regObservers = new ArrayList<>();
private Executor executor;
public void setRegObservers(List<RegObserver> observers) {
regObservers.addAll(observers);
}
public Long register(String telphone, String password) {
Long userId = userService.register(telphone, password);
for (RegObserver observer: regObservers) {
executor.execute(new Runnable() {
@override
public void run() {
observer.handleRegSuccess(userId);
}
});
}
return userId;
}
}
对于第 1 种实现方式,缺点是:频繁的创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。
对于第 2 种实现方式,尽管利用了线程池解决了第 1 种实现方式的问题,但缺点是:线程池、异步执行逻辑都耦合在了 register()
函数中,增加了这部分业务代码的维护成本(可维护性差)。如果需要在同步阻塞和异步非阻塞之间进行灵活切换,那就需要不停地修改 UserController
的代码。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那这种的代码实现也无法做到复用(复用性差)。
一个更优的方案是使用 EventBus
框架,它对异步非阻塞观察者模式的实现进行了抽象,这将在下一节介绍。
框架的作用有:
- 隐藏实现细节;
- 降低开发难度;
- 做到代码复用;
- 解耦业务于非业务代码,让程序员聚焦业务开发。
案例剖析:EventBus
框架
EventBus
翻译为“事件总线”,它提供了实现观察者模式的框架代码。其中,Google 的 Guava EventBus
就是一个比较著名的 EventBus
框架。
使用 Guava EventBus
进行重构
针对上节中的用户注册的例子,我们使用 Guava EventBus
进行重构,代码(v4)如下:
public class UserController {
private UserService userService;
private EventBus eventBus;
private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
public UserController() {
// eventBus = new EventBus(); // 同步阻塞模式
eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE));
}
public void setRegObservers(List<Object> observers) {
for (Object observer: observers) {
eventBus.register(observer);
}
}
public Long register(String telphone, String password) {
Long userId = userService.register(telphone, password);
eventBus.post(userId);
return userId;
}
}
public class RegPromotionObserver {
private PromotionService promotionService;
@Subscribe
public void handleRegSuccess(Long userId) {
promotionService.issueNewUserExperienceCash(userId);
}
}
public class RegNotificationObserver {
private NotificationService notificationService;
@Subscribe
public void handleRegSuccess(Long userId) {
notificationService.sendInboxMessage(userId);
}
}
基于 EventBus
框架与之前编写的观察者模式的实现的异同点有,
相同点:
- 整体流程大致相同,都需要定义
Observer
,并通过 register() 函数注册Observer
,且都需要通过调用某个函数(如,EventBus
中的post()
函数)来给Observer
发消息(在EventBus
中消息被称作事件event
)。
差异点:
- 基于
EventBus
,不需要定义Observer
接口,任意类型的对象都可以注册到EventBus
中,通过@Subscribe
注解来标明类中哪个函数可以接收被观察者发送的消息。
Guava EventBus
框架分析
主要的类:
EventBus
类:Guava EventBus
对外暴露的所有可调用的接口,都封装在EventBus
类中,它实现了同步阻塞的观察者模式。AsyncEventBus
类:继承自EventBus
类,提供了异步非阻塞的观察者模式。
主要的函数:
register()
函数:EventBus
类提供,用来注册观察者,它可以接受任何类型(Object
)的观察者。public void register(Object object);
unregister()
函数:EventBus
类提供,用来删除观察者。public void unregister(Object object);
post()
函数:EventBus
类提供,用来给观察者发送消息。public void post(Object event);
当调用
post()
函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息类型(post()
函数定义的event
)的父类型。示例:假设
AObserver
、BObserver
、CObserver
能接收到的消息类型分别是XMsg
、YMsg
、ZMsg
。其中,XMsg
是YMsg
的父类。则每个Observer
可接收到的消息类型如下所示,XMsg xMsg = new XMsg(); YMsg yMsg = new YMsg(); ZMsg zMsg = new ZMsg(); post(xMsg); // => AObserver 接收到消息 post(yMsg); // => AObserver、BObserver 接收到消息 post(zMsg); // => CObserver 接收到消息
每个
Observer
能接收的消息类型是通过@Subscribe
注解机制来实现。EventBus
通过@Subscribe
注解来标明,某个函数能接收哪种类型的消息。代码示例如下:public DObserver { @Subscribe public void f1(PMsg event) {} @Subscribe public void f2(QMsg event) {} }
当通过
register()
函数将DObserver
类对象注册到EventBus
的时候,EventBus
会根据@Subscribe
注解找到f1()
和f2()
,并将两个函数能接收的消息类型记录下来(PMsg->f1
,QMsg->f2
)。当我们通过post()
函数发送消息(比如QMsg
消息)的时候,EventBus
会通过之前的记录(QMsg->f2
),调用相应的函数(f2
)。