设计模式之观察者模式


课程笔记:极客时间—设计模式之美

观察者模式

观察者模式也称为发布订阅模式。在 GoF 的《设计模式》中,定义为:

Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

“在对象之间定义一个一对多的依赖,当一个对象状态改变的时候,所有依赖的对象都会自动收到通知。”

通常,被依赖的对象叫做被观察者(Observable),依赖的对象叫做观察者(Observer)。其它叫法有:Subject-Observer、Publisher-Subscriber、Producer-Consumer、EventEmitter-EventListener、Dispatcher-Listener等。

观察者模式的应用场景非常广泛,小到代码层面的解耦,大到架构层面的系统解耦,再或者一些产品的设计思路,都有这种模式的影子。比如,邮件订阅、RSS Feeds,本质上都是观察者模式。

观察者模式是一个比较抽象的模式,根据不同的应用场景和需求,有完全不同的实现方式。比如,有同步阻塞的实现方式,也有异步非阻塞的实现方式;有进程内的实现方式,也有进程间的实现方式。最经典的一种实现方式的代码如下:

public interface Subject {
    void registerObserver(Observer observer);
    void removeObserver(Observer observer);
    void notifyObservers(Message msg);
}

public interface Observer {
    void update(Message msg);
}

public ConcreteSubject implements Subject {
    private List<Observer> observers = new ArrayList<Observer>();
    
    @override
    public void registerObserver(Observer observer) {
        observers.add(observer);
    }
    
    @override
    public void removeObserver(Observer observer) {
        observers.remove(observer);
    }
    
    @override
    public void notifyObservers(Message msg) {
        for (Observer observer: observers) {
            observer.update(msg);
        }
    }
}

public ConcreteObserverOne implements Observer {
    @override
    public update(Message msg) {
        System.out.Println("ConcreteObserverOne is notified.");
    }
}

public ConcreteObserverTwo implements Observer {
    @override
    public update(Message msg) {
        System.out.Println("ConcreteObserverTwo is notified.");
    }
}

public class Demo {
    public static void main(String[] args) {
        ConcreteSubject subject = new ConcreteSubject();
        subject.registerObserver(new ConcreteObserverOne());
        subject.registerObserver(new ConcreteObserverTwo());
        subject.notifyObservers(new Message());
    }
}

从刚刚的分类方式来看,它是一种同步阻塞的实现方式。观察者和被观察者代码在同一个线程内执行,被观察者一直阻塞,直到所有的观察者代码都执行完成之后,才执行后续的代码。

基于不同应用场景的不同实现方式

  • 同步阻塞:主要是为了代码解耦
  • 异步非阻塞:除了能实现代码解耦之外,还能提高代码的执行效率
    • 创建一个新的线程的实现方式
    • 基于 EventBus 的实现方式
  • 进程内
  • 进程间:解耦更彻底,可以实现不同进程间的被观察者和观察者之间的交互
    • 基于 RPC 接口的实现方式
    • 基于消息队列的实现方式
同步阻塞实现

例1:一个P2P投资理财系统实现

原始需求:用户注册成功之后,我们会给用户发放投资体验金。

第1版代码(v1)实现如下:

public class UserController {
    private UserService userService;
    private PromotionService promotionService;
    
    public Long register(String telphone, String password) {
        Long userId = userService.register(telphone, password);
        promotionService.issueNewUserExperienceCash(userId);
        return userId;
    }
}

考虑未来的可扩展性,如:

  1. 不在发放体验金,而是改为发放优惠券;
  2. 另外,用户注册成功之后,还需要给用户发送一封“欢迎注册成功”的站内信。

此时,v1版代码的问题:

  • 需要修改 register() 函数中的代码,违反开闭原则;
  • 注册成功后需要执行的后续操作越来越多,那 register() 函数的逻辑会变得越来越复杂,影响到代码的可读性和可维护性;

我们可以使用观察者模式对上述代码进行重构,它的同步阻塞方式实现代码(v2)如下:

public class UserController {
    private UserService userService;
    private List<RegObserver> regObservers = new ArrayList<>();
    
    public void setRegObservers(List<RegObserver> observers) {
        regObservers.addAll(observers);
    }
    
    public Long register(String telphone, String password) {
        Long userId = userService.register(telphone, password);
        for (RegObserver observer: regObservers) {
            observer.handleRegSuccess(userId);
        }
        return userId;
    }
}

// 观察者类实现
public interface RegObserver {
    void handleRegSuccess(Long userId);
}

public class RegPromotionObserver implements RegObserver {
    private PromotionService promotionService;
    
    @override
    public void handleRegSuccess(Long userId) {
        promotionService.issueNewUserExperienceCash(userId);
    }
}

public class RegNotificationObserver implements RegObserver {
    private NotificationService notificationService;
    
    @override
    public void handleRegSuccess(Long userId) {
        notificationService.sendInboxMessage(userId);
    }
}

v2版代码的优点:

  • 当需要添加新的观察者时,UserController 类的 register() 函数完全不需要修改,只需要再添加一个实现了 RegObserver 接口的类,并通过 setRegObservers() 函数将它注册到 UserController 类中即可。(可扩展性好)
异步非阻塞实现

再进一步,考虑如下性能场景。如果注册接口是一个调用比较频繁的接口,对性能非常敏感,希望接口的响应时间尽可能短,那么我们可以将同步阻塞的实现方式改为异步非阻塞的实现方式,以此来减少响应时间。

如果不考虑通用性、复用性,异步非阻塞观察者模式有 2 种简易的实现方式:

  1. 在每个 handleRegSuccess() 函数种创建一个新的线程执行代码逻辑;
  2. UserController 类的 register() 函数中使用线程池来执行每个观察者的 handleRegSuccess() 函数;

它们的部分代码实现(v3)如下:

// 第 1 种实现方式
public class RegPromotionObserver implements RegObserver {
    private PromotionService promotionService;
    
    @override
    public void handleRegSuccess(Long userId) {
        Thread thread = new Thread(new Runnable() {
            @override
            public void run() {
                promotionService.issueNewUserExperienceCash(userId);
            }
        });
        thread.start();
    }
}

// 第 2 种实现方式
public class UserController {
    private UserService userService;
    private List<RegObserver> regObservers = new ArrayList<>();
    private Executor executor;
    
    public void setRegObservers(List<RegObserver> observers) {
        regObservers.addAll(observers);
    }
    
    public Long register(String telphone, String password) {
        Long userId = userService.register(telphone, password);
        for (RegObserver observer: regObservers) {
            executor.execute(new Runnable() {
                @override
                public void run() {
                     observer.handleRegSuccess(userId);
                }
            });
        }
        return userId;
    }
}

对于第 1 种实现方式,缺点是:频繁的创建和销毁线程比较耗时,并且并发线程数无法控制,创建过多的线程会导致堆栈溢出。

对于第 2 种实现方式,尽管利用了线程池解决了第 1 种实现方式的问题,但缺点是:线程池、异步执行逻辑都耦合在了 register() 函数中,增加了这部分业务代码的维护成本(可维护性差)。如果需要在同步阻塞和异步非阻塞之间进行灵活切换,那就需要不停地修改 UserController 的代码。除此之外,如果在项目中,不止一个业务模块需要用到异步非阻塞观察者模式,那这种的代码实现也无法做到复用(复用性差)。

一个更优的方案是使用 EventBus 框架,它对异步非阻塞观察者模式的实现进行了抽象,这将在下一节介绍。

框架的作用有:

  • 隐藏实现细节;
  • 降低开发难度;
  • 做到代码复用;
  • 解耦业务于非业务代码,让程序员聚焦业务开发。

案例剖析:EventBus 框架

EventBus 翻译为“事件总线”,它提供了实现观察者模式的框架代码。其中,Google 的 Guava EventBus 就是一个比较著名的 EventBus 框架。

使用 Guava EventBus 进行重构

针对上节中的用户注册的例子,我们使用 Guava EventBus 进行重构,代码(v4)如下:

public class UserController {
    private UserService userService;
    
    private EventBus eventBus;
    private static final int DEFAULT_EVENTBUS_THREAD_POOL_SIZE = 20;
    
    public UserController() {
        // eventBus = new EventBus(); // 同步阻塞模式
        eventBus = new AsyncEventBus(Executors.newFixedThreadPool(DEFAULT_EVENTBUS_THREAD_POOL_SIZE));
    }
    
    public void setRegObservers(List<Object> observers) {
        for (Object observer: observers) {
            eventBus.register(observer);
        }
    }
    
    public Long register(String telphone, String password) {
        Long userId = userService.register(telphone, password);
        eventBus.post(userId);
        return userId;
    }
}

public class RegPromotionObserver {
    private PromotionService promotionService;
    
    @Subscribe
    public void handleRegSuccess(Long userId) {
        promotionService.issueNewUserExperienceCash(userId);
    }
}

public class RegNotificationObserver {
    private NotificationService notificationService;
    
    @Subscribe
    public void handleRegSuccess(Long userId) {
        notificationService.sendInboxMessage(userId);
    }
}

基于 EventBus 框架与之前编写的观察者模式的实现的异同点有,

相同点:

  • 整体流程大致相同,都需要定义 Observer ,并通过 register() 函数注册 Observer ,且都需要通过调用某个函数(如,EventBus 中的 post() 函数)来给 Observer 发消息(在 EventBus 中消息被称作事件 event)。

差异点:

  • 基于 EventBus ,不需要定义 Observer 接口,任意类型的对象都可以注册到 EventBus 中,通过 @Subscribe 注解来标明类中哪个函数可以接收被观察者发送的消息。
Guava EventBus 框架分析

主要的类:

  • EventBus 类:Guava EventBus 对外暴露的所有可调用的接口,都封装在 EventBus 类中,它实现了同步阻塞的观察者模式。
  • AsyncEventBus 类:继承自 EventBus 类,提供了异步非阻塞的观察者模式。

主要的函数:

  • register() 函数:EventBus 类提供,用来注册观察者,它可以接受任何类型(Object)的观察者。

    public void register(Object object);
  • unregister() 函数:EventBus 类提供,用来删除观察者。

    public void unregister(Object object);
  • post() 函数:EventBus 类提供,用来给观察者发送消息。

    public void post(Object event);

    当调用 post() 函数发送消息的时候,并非把消息发送给所有的观察者,而是发送给可匹配的观察者。所谓可匹配指的是,能接收的消息类型是发送消息类型(post() 函数定义的 event)的父类型。

    示例:假设 AObserverBObserverCObserver 能接收到的消息类型分别是 XMsgYMsgZMsg 。其中,XMsgYMsg 的父类。则每个 Observer 可接收到的消息类型如下所示,

    XMsg xMsg = new XMsg();
    YMsg yMsg = new YMsg();
    ZMsg zMsg = new ZMsg();
    post(xMsg); // => AObserver 接收到消息
    post(yMsg); // => AObserver、BObserver 接收到消息
    post(zMsg); // => CObserver 接收到消息

    每个 Observer 能接收的消息类型是通过 @Subscribe 注解机制来实现。EventBus 通过 @Subscribe 注解来标明,某个函数能接收哪种类型的消息。代码示例如下:

    public DObserver {
        @Subscribe
        public void f1(PMsg event) {}
        
        @Subscribe
        public void f2(QMsg event) {}
    }

    当通过 register() 函数将 DObserver 类对象注册到 EventBus 的时候,EventBus 会根据 @Subscribe 注解找到 f1()f2() ,并将两个函数能接收的消息类型记录下来(PMsg->f1QMsg->f2)。当我们通过 post() 函数发送消息(比如 QMsg 消息)的时候,EventBus 会通过之前的记录(QMsg->f2),调用相应的函数(f2)。


文章作者: algorithmofdish
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 algorithmofdish !
  目录