当前位置:首页>资讯
全球讯息:Guava的EventBus事件机制实现
2023-04-25 06:34:05
来源:程序猿阿嘴

EventBus说明

EventBus是Guava封装的事件处理机制,属于设计模式中的观察者模式(生产|消费者编程模型)。使用简单、优雅只关注业务本身,仅限JVM内部使用,缺点:不适用分布式,看实际业务可选用MQ。

事件总线(EventBus)处理个服务、或业务间公共线路,而EventBus则是存储事件、处理时间中间服务,通过post()往EventBus发送事件,由EventBus去调度对应订阅方(subscriber)消费处理,解耦观察者模式中订阅方与事件源之间强依赖关系。

如何使用

引入Maven依赖
com.google.guavaguava20.0复制代码
场景举例(伪代码)
public class EventBusCenter {    private static EventBus eventBus = new EventBus();    private EventBusCenter() {    }    public static EventBus getInstance(){        return eventBus;    }    /**注册监听*/    public static void register(Object listener){        eventBus.register(listener);    }    /**取消注册*/    public static void unregister(Object listener){        eventBus.unregister(listener);    }    /**发布事件*/    public static void post(Object event){        eventBus.post(event);    }}复制代码

定义业务对象


(资料图片)

public class OrderInfo {    private long orderId;    private String orderName; }  public class OrderFailInfo{    private long orderId;    private String orderName;    private String msg;}复制代码

支付成功监听处理事件

public class OrderSuccessListener {    @Subscribe    public void orderSubscribe(OrderInfo event) {        // 订单业务监听        System.out.println("订单支付成功处理事件: " + event);    }    @Subscribe    public void ortherSubscribe(OrderInfo event) {        // 其它业务监听        System.out.println("订单支付成功其它处理事件: " + event);    }}复制代码

支付失败监听处理事件

public class OrderFailListener {    @Subscribe    public void orderSubscribe(OrderFailInfo event) {        // 订单业务监听        System.err.println("订单支付失败处理事件: " + event);    }    @Subscribe    public void ortherSubscribe(String event) {        // 其它业务监听        System.err.println("订单支付失败其它处理事件: " + event);    }}复制代码

测试

public class EventBusTest {    public static void main(String[] args) {        OrderSuccessListener successListener = new OrderSuccessListener();        OrderFailListener failListener = new OrderFailListener();            EventBusCenter.register(successListener);            EventBusCenter.register(failListener);            OrderInfo order = OrderInfo.builder().orderId(111L).orderName("订单名称xxx").build();            log.info("=====支付成功=======");            EventBusCenter.post(order);            log.info("=====支付失败=======");            OrderFailInfo orderFail = OrderFailInfo.builder().orderId(111L).orderName("订单名称xxx").msg("支付失败").build();            EventBusCenter.post(orderFail);            log.info("=====记录支付失败原因=======");            EventBusCenter.post("用户取消订单");    }}输出结果:信息: =====支付成功=======订单支付成功其它处理事件: OrderInfo(orderId=111, orderName=订单名称xxx)订单支付成功处理事件: OrderInfo(orderId=111, orderName=订单名称xxx)信息: =====支付失败=======订单支付失败处理事件: OrderFailInfo(orderId=111, orderName=订单名称xxx, msg=支付失败)信息: =====记录支付失败原因=======订单支付失败其它处理事件: 用户取消订单复制代码

使用说明:

只有通过***@Subscribe***注解才会被注册进EventBus 监听方法只能有1个参数,当有多个监听方法时,参数类型一样并行消费;不同参类型对应不对消费事件

源码分析

初始化EvenBus

EventBus(String identifier, Executor executor, Dispatcher dispatcher, SubscriberExceptionHandler exceptionHandler) {        this.subscribers = new SubscriberRegistry(this);//所有观察者列表维护对象        this.identifier = (String) Preconditions.checkNotNull(identifier);// EventBus的处理标识符(方法名称)        //executor是事件分发过程中使用到的线程池,可以自己实现        this.executor = (Executor)Preconditions.checkNotNull(executor);        //Dispatcher类型的子类,对监听者分发策略,主要有3种方式        this.dispatcher = (Dispatcher)Preconditions.checkNotNull(dispatcher);        //异常处理策咯        this.exceptionHandler = (SubscriberExceptionHandler)Preconditions.checkNotNull(exceptionHandler);    }复制代码

注册监听–>register()

作用:初始化监听者类型与监听方法的集合 subscribers ,在 发布事件 场景由传入参数类型,匹配、并执行对应监听者方法

①获取指定监听者对应的全部观察者集合(一对多)

②获取对应**@Subscribe**观察者事件类型(即:方法参数类型)集合

③添加对应类型所有观察者到集合事件集合中(即:SubscriberRegistry对象维护的 subscribers 集合)

subscribers说明:

private final ConcurrentMap, CopyOnWriteArraySet>subscribers = Maps.newConcurrentMap();复制代码

防止并发问题维护一个ConcurrentMap集合,Subscriber集合对应velue使用的是Java中的CopyOnWriteArraySet集合,

主要作用:1、避免监听重复事件 2、适用于读多写少的场景:只要register()才写入,基本都是查询

findAllSubscribers()说明

①获取注册监听对象所有包含**@Subscribe**的方法(如下图所示⤵️)

②通过反射获取对应方法、及监听参数类型

③放入当前初始化的EventBus对象里面,维护监听对象与监听方法的对应关系

首先从 subscriberMethodsCache 缓存中获取监听对象映射关系,如果缓存中不存在通过反射遍历所有包含**@Subscribe**的方法

发布事件–>post()

①getSubscribers()方法获取该事件对应的全部观察者

②Dispatcher-分发事件策略

ImmediateDispatcher:直接在当前线程中遍历所有的观察者并进行事件分发

LegacyAsyncDispatcher:使用 全局队列 ,先将观察者依次放入队列,再顺序从队列中取出观察者对象进行事件分发

PerThreadQueuedDispatcher(默认):使用 线程相关队列 ,会先获取当前线程的观察者队列,并将传入的观察者列表传入到该队列中;判断当前线程是否正在进行分发操作,如果没有在进行分发操作,就通过遍历上述队列进行事件分发

最后无论使用哪个分发器,都会执行dispatchEvent()方法,通过反射(target、method、executor)多线程触发监听方法

原文链接:https://juejin.cn/post/7224310314807083065

关键词:

相关文章