如何使用 EventEmitter 实现自定义异步事件总线解耦模块间通信

文章导读
先说结论:Node.js 内置 events 模块足够大多数单进程场景,核心在于管理监听器生命周期与异常安全。
📋 目录
  1. 核心封装:单例模式
  2. 项目结构建议
  3. 业务场景:订单创建解耦
  4. 异步监听与异常安全
  5. 监听器生命周期管理
  6. 验证与排查
  7. 常见坑
  8. 参考来源
A A

先说结论:Node.js 内置 events 模块足够大多数单进程场景,核心在于管理监听器生命周期与异常安全。

  • 适用:单进程内部模块间松耦合通信
  • 注意:emit 默认同步执行,异步监听器需处理 Promise rejection
  • 建议:业务逻辑结束后手动移除监听,避免内存泄漏
  • 架构:建议封装为单例模式全局使用

核心封装:单例模式

为避免多处实例化导致事件隔离,建议封装为单例。创建 src/utils/event-bus.js

const { EventEmitter } = require('events');

class EventBus extends EventEmitter {
  constructor() {
    super();
    // 调整最大监听器数量,根据业务规模设定,避免默认 10 个限制触发警告
    this.setMaxListeners(20);
  }
}

// 导出单例实例
module.exports = new EventBus();

其他模块直接引入即可:

const bus = require('../utils/event-bus');

项目结构建议

在中型项目中,建议将事件定义与业务逻辑分离,便于维护:

project-root/
├── src/
│   ├── events/
│   │   ├── event-bus.js       # 事件总线单例
│   │   └── event-types.js     # 事件名称常量管理
│   ├── modules/
│   │   ├── order/
│   │   │   └── service.js     # 触发事件
│   │   ├── email/
│   │   │   └── service.js     # 监听事件
│   │   └── stock/
│   │       └── service.js     # 监听事件
│   └── app.js                 # 入口文件

业务场景:订单创建解耦

假设订单创建后需要发送邮件、扣减库存、记录日志。使用事件总线可避免模块间直接依赖。

1. 定义事件常量(event-types.js):

module.exports = {
  ORDER_CREATED: 'order:created'
};

2. 触发事件(order/service.js):

const bus = require('../../events/event-bus');
const { ORDER_CREATED } = require('../../events/event-types');

async function createOrder(data) {
  // 1. 创建订单逻辑
  const orderId = await saveOrder(data);
  
  // 2. 触发事件,通知其他模块
  bus.emit(ORDER_CREATED, { orderId, userId: data.userId });
  
  return orderId;
}

3. 监听事件(email/service.js):

const bus = require('../../events/event-bus');
const { ORDER_CREATED } = require('../../events/event-types');

// 注册监听
bus.on(ORDER_CREATED, async (payload) => {
  try {
    await sendEmail(payload.userId, 'Order Created');
  } catch (err) {
    console.error('Send email failed:', err);
    // 注意:此处异常不会阻断其他监听器,但需记录日志
  }
});

异步监听与异常安全

EventEmitter 的 emit 是同步的,但监听器可以是异步函数。若异步监听器内部抛出未捕获的 Promise rejection,可能导致进程退出。

方案一:监听器内部 try-catch(推荐)

如上例所示,在 async 监听器内部包裹 try-catch。

如何使用 EventEmitter 实现自定义异步事件总线解耦模块间通信

方案二:全局监听 unhandledRejection

process.on('unhandledRejection', (reason, promise) => {
  console.error('Unhandled Rejection at:', promise, 'reason:', reason);
  // 可选:记录日志或告警,通常不建议直接退出进程
});

方案三:监听 error 事件

bus.on('error', (err) => {
  console.error('EventBus error:', err);
});

监听器生命周期管理

长期运行的服务中,反复注册监听器而不移除会导致内存泄漏。必须在模块卸载或请求结束时移除监听。

1. 移除特定监听器:

function handler(payload) { console.log(payload); }

// 注册
bus.on('user:login', handler);

// 移除
bus.off('user:login', handler); 
// 或 bus.removeListener('user:login', handler);

2. 一次性监听:

若只需触发一次,使用 once 方法,自动移除监听:

bus.once('data', (msg) => console.log(msg));

3. 请求级监听清理示例:

app.post('/subscribe', (req, res) => {
  const handler = (data) => res.send(data);
  bus.on('realtime:update', handler);
  
  // 响应结束后移除,防止累积
  res.on('finish', () => {
    bus.off('realtime:update', handler);
  });
});

验证与排查

1. 验证监听器数量:

console.log(bus.listenerCount('order:created')); // 输出当前监听该事件的数量

若数字符合预期且日志正常打印,说明通信链路打通。

2. 查看当前所有监听器:

console.log(bus.listeners('order:created')); // 输出所有回调函数数组

常见坑

  • 内存泄漏:监听器累积会占用内存,具体阈值取决于系统资源,建议避免累积。务必在模块卸载或请求结束时调用 off 移除监听。
  • 默认警告:单个事件默认最多 10 个监听器,超过会打印 Warning。可以通过 setMaxListeners 调整,但不要盲目调大,应先检查是否未移除监听。
  • 同步阻塞:emit 会同步执行所有监听器。若监听器内有耗时操作,会阻塞主线程。耗时任务建议放入独立队列或微任务。
  • 错误传播:监听器抛出未捕获异常可能导致进程退出,建议包裹 try-catch 或监听 error 事件。

参考来源