v9.10.2019.0101 全面巩固批量Insert/Update/Upsert,支持数据备份、恢复和同步,支持实体列表保存到文件以及加载
|
# EventBus 使用手册
本文档é¢å‘ NewLife.Core çš„äº‹ä»¶æ€»çº¿èƒ½åŠ›ï¼Œæ¶µç›–æŽ¥å£æ¨¡åž‹ã€é»˜è®¤å®žçް `EventBus<TEvent>` 的进程内分å‘ã€åŸºäºŽä¸»é¢˜çš„ `EventHub<TEvent>` 路由,以åŠåŸºäºŽé˜Ÿåˆ—çš„ `QueueEventBus<TEvent>`。
> 代ç ä½ç½®ï¼š`NewLife.Core\Messaging\IEventBus.cs`ã€`NewLife.Core\Messaging\EventHub.cs`ã€`NewLife.Core\Caching\QueueEventBus.cs`
---
## 1. 设计概览
### 1.1 适用场景
- **进程内事件**:åŒä¸€è¿›ç¨‹å†…å‘布/è®¢é˜…ï¼Œä½Žå»¶è¿Ÿã€æ— æŒä¹…化(`EventBus<TEvent>`)。
- **多主题事件路由**:把带 `topic` 的网络消æ¯è·¯ç”±åˆ°å¯¹åº”事件总线或回调(`EventHub<TEvent>`)。
- **借助缓å˜é˜Ÿåˆ—çš„â€œè·¨è¿›ç¨‹â€æŠ•é€’**:å‘布进队列ã€ç”±åŽå°æ¶ˆè´¹å¾ªçŽ¯æ‹‰å–并分å‘到本地订阅者(`QueueEventBus<TEvent>`)。
### 1.2 æ ¸å¿ƒç‰¹ç‚¹
- **å‘布异æ¥ã€è®¢é˜…åŒæ¥**:å‘布/处ç†ä½¿ç”¨ `PublishAsync`/`HandleAsync`ï¼›è®¢é˜…ä¸Žå–æ¶ˆä½¿ç”¨ `Subscribe`/`Unsubscribe`。
- **幂ç‰è®¢é˜…**:åŒä¸€ `clientId` é‡å¤è®¢é˜…会覆盖上一次。
- **最佳努力分å‘**:默认å•个处ç†å™¨å¼‚常ä¸ä¼šå½±å“其它处ç†å™¨ï¼›å¯é€šè¿‡ `ThrowOnHandlerError` æ”¹ä¸ºä¸¥æ ¼æ¨¡å¼ã€‚
- **ä¸Šä¸‹æ–‡ä¼ é€’**:通过 `IEventContext`(默认实现 `EventContext`)在å‘布者ã€è®¢é˜…者åŠä¸é—´å±‚ä¹‹é—´ä¼ é€’æ•°æ®ã€‚
- **链路追踪**:若事件实现 `ITraceMessage` 且 `TraceId` 为空,`EventBus<TEvent>` å‘布时会自动写入当å‰åŸ‹ç‚¹çš„ TraceId。
---
## 2. æŽ¥å£æ¨¡åž‹
### 2.1 `IEventBus`ï¼ˆéžæ³›åž‹ï¼‰
ç”¨äºŽç»Ÿä¸€æŒæœ‰ä¸åŒäº‹ä»¶ç±»åž‹çš„æ€»çº¿å¼•用(例如放入 `IEventContext.EventBus`)。
- `Task<Int32> PublishAsync(Object event, IEventContext? context = null, CancellationToken cancellationToken = default)`
> 说明:默认实现通常会把 `Object` 强转为实际的 `TEvent`。
### 2.2 `IEventBus<TEvent>`(泛型总线)
- `Task<Int32> PublishAsync(TEvent event, IEventContext? context = null, CancellationToken cancellationToken = default)`
- `Boolean Subscribe(IEventHandler<TEvent> handler, String clientId = "")`
- `Boolean Unsubscribe(String clientId = "")`
#### `clientId` çš„è¯ä¹‰
- 用于识别订阅者。
- ç›¸åŒ `clientId` é‡å¤è®¢é˜…会覆盖旧订阅(幂ç‰ï¼‰ã€‚
- 在æŸäº›å®žçŽ°é‡Œå¯ç”¨äºŽâ€œæ¶ˆè´¹ç»„/分组â€è¯ä¹‰ã€‚
### 2.3 `IAsyncEventBus<TEvent>`(异æ¥è®¢é˜…/å–æ¶ˆï¼‰
适用于订阅需è¦ç½‘络往返或其它异æ¥åŠ¨ä½œçš„åœºæ™¯ã€‚
- `Task<Boolean> SubscribeAsync(IEventHandler<TEvent> handler, String clientId = "", CancellationToken cancellationToken = default)`
- `Task<Boolean> UnsubscribeAsync(String clientId = "", CancellationToken cancellationToken = default)`
### 2.4 `IEventHandler<TEvent>`(事件处ç†å™¨ï¼‰
- `Task HandleAsync(TEvent event, IEventContext? context, CancellationToken cancellationToken)`
建议:处ç†å™¨å°½é‡å¹‚ç‰ï¼Œå¹¶å°Šé‡ `cancellationToken`。
### 2.5 `IEventContext` / `EventContext`
- `IEventBus EventBus { get; }`
`EventContext` 还æä¾›ï¼š
- `String? Topic`:多层次事件架构(例如 EventHub)ä¸ä½¿ç”¨ã€‚
- `String? ClientId`:å‘逿–¹æ ‡è¯†ï¼Œç”¨äºŽåˆ†å‘时“ä¸è¦åˆ†å‘给自己â€ã€‚
- `IDictionary<String, Object?> Items` 与索引器:æºå¸¦æ‰©å±•æ•°æ®ã€‚
> `EventBus<TEvent>` å†…éƒ¨ä¼šæ± åŒ– `EventContext`:当å‘å¸ƒæ—¶æœªä¼ å…¥ `context`ï¼Œä¼šä»Žå¯¹è±¡æ± èŽ·å–并在分å‘åŽå½’还。
---
## 3. 默认事件总线 `EventBus<TEvent>`
### 3.1 行为è¯ä¹‰
- **峿—¶åˆ†å‘,ä¸å˜å‚¨**:ä¸åœ¨çº¿çš„订阅者收ä¸åˆ°åކ岿¶ˆæ¯ã€‚
- **顺åºè°ƒç”¨å¤„ç†å™¨**:对当å‰è®¢é˜…å¿«ç…§é€ä¸ªè°ƒç”¨ `HandleAsync`。
- **异常ç–ç•¥**:
- `ThrowOnHandlerError = false`(默认):记录错误日志,继ç»åˆ†å‘。
- `ThrowOnHandlerError = true`:é‡åˆ°ç¬¬ä¸€ä¸ªå¤„ç†å™¨å¼‚å¸¸ç«‹å³æŠ›å‡ºï¼Œä¸æ–分å‘。
- **排除å‘逿–¹**:如果 `context` 是 `EventContext` 且设置了 `ClientId`ï¼Œåˆ†å‘æ—¶ä¼šè·³è¿‡ `clientId` 相åŒçš„订阅者。
### 3.2 快速开始(进程内)
1) 定义事件类型:
- å»ºè®®ä½¿ç”¨è½»é‡ DTO(class/record å‡å¯ï¼‰ã€‚
2) 订阅:
- 通过实现 `IEventHandler<TEvent>`,或使用扩展方法直接订阅委托。
3) å‘布:
- 调用 `PublishAsync`,返回已æˆåŠŸå¤„ç†è¯¥äº‹ä»¶çš„处ç†å™¨æ•°é‡ã€‚
示例(委托订阅):
- `bus.Subscribe(e => Console.WriteLine(e));`
- `await bus.PublishAsync(myEvent);`
> 扩展方法ä½äºŽ `EventBusExtensions`,会把委托包装为 `DelegateEventHandler<TEvent>`。
### 3.3 ä½¿ç”¨ä¸Šä¸‹æ–‡ï¼ˆä¼ é€’é™„åŠ æ•°æ®ï¼‰
ä½ å¯ä»¥åœ¨å‘å¸ƒæ—¶ä¼ å…¥ `EventContext`,用于:
- 设置 `Topic`/`ClientId`(尤其在 `EventHub<TEvent>` 场景);
- 通过 `Items` ä¿å˜è‡ªå®šä¹‰æ•°æ®ï¼ˆä¾‹å¦‚ `ext["Raw"]` 带原始报文);
- 在处ç†å™¨ä¸è¯»å–上下文以实现å作逻辑。
注æ„事项:
- è‹¥ä¼ å…¥çš„ `context` 为 `null`,`EventBus<TEvent>` å¯èƒ½ä»Žå¯¹è±¡æ± 创建上下文,分å‘åŽä¼šè°ƒç”¨ `Reset()` 并归还;处ç†å™¨ä¸åº”ä¿å˜è¯¥ä¸Šä¸‹æ–‡å¼•用到异æ¥ç”Ÿå‘½å‘¨æœŸä¹‹å¤–。
### 3.4 订阅/å–æ¶ˆè®¢é˜…
- `Subscribe(handler, clientId)`ï¼šè¦†ç›–åŒ `clientId` 的旧处ç†å™¨ã€‚
- `Unsubscribe(clientId)`:移除对应订阅。
建议:
- 为长期订阅者指定稳定的 `clientId`,便于é‡è¿žè¦†ç›–ä¸Žå–æ¶ˆè®¢é˜…。
---
## 4. `EventHub<TEvent>`:按主题路由的事件枢纽
`EventHub<TEvent>` çš„èŒè´£æ˜¯å°†å¸¦ä¸»é¢˜çš„输入消æ¯åˆ†å‘到对应的事件总线或回调。
### 4.1 æ¶ˆæ¯æ ¼å¼
仅处ç†ä»¥ `event#` 开头的消æ¯ï¼š
- `event#topic#clientId#message`
å—æ®µè¯´æ˜Žï¼š
- `topic`:主题å称。
- `clientId`:å‘逿–¹æ ‡è¯†/订阅分组。
- `message`:
- 事件 JSON(通常为 `TEvent` 的 JSON);
- 或控制指令:`subscribe` / `unsubscribe`。
### 4.2 注册方å¼
- `Add(topic, IEventBus<TEvent> bus)`:把æŸä¸ªæ€»çº¿å›ºå®šç»‘定到主题。
- `Add(topic, IEventHandler<TEvent> dispatcher)`:把æŸä¸ªå¤„ç†å™¨/回调绑到主题(ä¸ç»è¿‡æ€»çº¿ï¼‰ã€‚
- `GetEventBus(topic, clientId)`:通过 `Factory` 延迟创建并缓å˜ä¸»é¢˜æ€»çº¿ï¼›å¦‚果未设置 `Factory`,默认创建 `EventBus<TEvent>`。
### 4.3 订阅/å–æ¶ˆè®¢é˜…(控制指令)
当收到:
- `event#topic#clientId#subscribe`
`EventHub<TEvent>` 会:
- è¦æ±‚ `context` 䏿供 `Handler`:`(context as IExtend)?["Handler"] is IEventHandler<TEvent>`。
- `GetEventBus(topic, clientId)` 获å–主题总线。
- `bus.Subscribe(handler, clientId)` 绑定订阅。
当收到:
- `event#topic#clientId#unsubscribe`
会:
- 找到主题总线并 `Unsubscribe(clientId)`。
- 若总线为 `EventBus<TEvent>` 且没有任何订阅者,则从枢纽ä¸ç§»é™¤è¯¥ä¸»é¢˜çš„æ€»çº¿ä¸Žåˆ†å‘器(é¿å…主题长期å 用内å˜ï¼‰ã€‚
### 4.4 分å‘路径与返回值
- 命ä¸ä¸»é¢˜æ€»çº¿ï¼š`bus.PublishAsync(event, context)`,返回该总线的处ç†å™¨è®¡æ•°ã€‚
- æœªå‘½ä¸æ€»çº¿ä½†å‘½ä¸åˆ†å‘器:调用 `dispatcher.HandleAsync`,返回 `1`。
- ä¸åŒ¹é…/è§£æžå¤±è´¥/未注册:返回 `0`。
### 4.5 上下文写入
在 `DispatchAsync(topic, clientId, ...)` ä¸ï¼š
- 如果 `context` 是 `EventContext`:写入 `Topic` / `ClientId`。
- å¦åˆ™è‹¥ `context` æ”¯æŒ `IExtend`:写入 `ext["Topic"]` / `ext["ClientId"]`。
在 `HandleAsync` æ”¶åˆ°ç½‘ç»œæ¶ˆæ¯æ—¶ï¼š
- 会把原始输入ä¿å˜åˆ° `context["Raw"]`(若 `context` æ”¯æŒ `IExtend`),便于订阅者零拷è´è½¬å‘/诊æ–。
---
## 5. `QueueEventBus<TEvent>`:基于队列的事件总线
`QueueEventBus<TEvent>` 继承自 `EventBus<TEvent>`,但改å˜äº†â€œå‘布â€çš„è¯ä¹‰ï¼š
- `PublishAsync` ä¸å†è¿›ç¨‹å†…直接分å‘,而是 **写入队列**。
- 订阅时å¯åŠ¨ä¸€ä¸ªåŽå°æ¶ˆè´¹å¾ªçŽ¯ï¼Œä»Žé˜Ÿåˆ—æ‹‰å–æ¶ˆæ¯å¹¶è°ƒç”¨åŸºç±»çš„ `DispatchAsync` 分å‘到本地订阅者。
### 5.1 使用方å¼
- 创建:`new QueueEventBus<TEvent>(cache, topic)`
- 订阅:首次订阅会å¯åЍåŽå° LongRunning 消费任务。
- å‘布:写入队列;返回值为队列 `Add` 的结果(通常为 1)。
- 释放:调用 `Dispose()` ä¼šå–æ¶ˆæ¶ˆè´¹å¾ªçޝ并ç‰å¾…åŽå°ä»»åŠ¡é€€å‡ºã€‚
### 5.2 å–æ¶ˆä¸Žå…³é—
- `Dispose()` 会:
- å–æ¶ˆå†…部 `CancellationTokenSource`ï¼›
- ç‰å¾…åŽå°ä»»åŠ¡æœ€å¤šçº¦ 3 ç§’ï¼›
- ç„¶åŽé‡Šæ”¾ CTS。
注æ„:
- 释放åŽå†å‘布消æ¯ï¼Œä¼šç»§ç»å†™å…¥é˜Ÿåˆ—(队列属于外部 `ICache`),但本实例ä¸å†æ¶ˆè´¹ã€‚
---
## 6. 委托订阅:`EventBusExtensions` 与 `DelegateEventHandler<TEvent>`
### 6.1 常用订阅形å¼
`EventBusExtensions` æä¾›å¤šç§ `Subscribe`/`SubscribeAsync` ä¾¿æ·æ‰©å±•:
- `Action<TEvent>`
- `Action<TEvent, IEventContext>`
- `Func<TEvent, Task>`
- `Func<TEvent, IEventContext, CancellationToken, Task>`
注æ„:
- 内部通过 `DelegateEventHandler<TEvent>` 适é…到 `IEventHandler<TEvent>`。
### 6.2 å–æ¶ˆä»¤ç‰Œ
åªæœ‰æœ€åŽä¸€ç§å§”托ç¾åå¯ä»¥ç›´æŽ¥æ‹¿åˆ° `CancellationToken`。
---
## 7. 线程安全与并å‘è¯ä¹‰
- `EventBus<TEvent>`:
- 订阅集åˆä½¿ç”¨ `ConcurrentDictionary<String, IEventHandler<TEvent>>`。
- åˆ†å‘æ—¶æžšä¸¾å—典是快照è¯ä¹‰ï¼šåˆ†å‘过程ä¸è®¢é˜…å˜åŒ–ä¸ä¿è¯å®žæ—¶å¯è§ã€‚
- `EventHub<TEvent>`:
- `_eventBuses` / `_dispatchers` å‡ä¸º `ConcurrentDictionary`。
- `GetEventBus` å¹¶å‘下å¯èƒ½å¤šæ¬¡åˆ›å»ºï¼Œä½†æœ€ç»ˆä»…缓å˜ä¸€ä»½å®žä¾‹ã€‚
---
## 8. 错误处ç†ä¸Žæœ€ä½³å®žè·µ
### 8.1 处ç†å™¨å¼‚常
- 默认:记录日志并继ç»ã€‚
- 需è¦å¼ºä¸€è‡´/ä¸¥æ ¼å¤±è´¥ï¼šè®¾ç½® `EventBus<TEvent>.ThrowOnHandlerError = true`。
### 8.2 å¹‚ç‰æ€§
- 事件处ç†å™¨å»ºè®®å¹‚ç‰ï¼Œé¿å…é‡å¤æŠ•递带æ¥çš„副作用。
### 8.3 ä¸è¦æŒæœ‰æ± 化上下文
- 当 `context` ç”± `EventBus<TEvent>` 自动创建时,它æ¥è‡ªå¯¹è±¡æ± ,分å‘结æŸä¼šè¢«é‡ç½®å¹¶å¤ç”¨ã€‚
- 处ç†å™¨å†…如需长期ä¿å˜ä¿¡æ¯ï¼Œåº”å¤åˆ¶æ‰€éœ€å—段/æ•°æ®ï¼Œè€Œä¸æ˜¯ä¿å˜ `context` 引用。
### 8.4 `clientId` 的使用建议
- 客户端订阅:使用稳定的 `clientId`,便于覆盖旧订阅。
- å‘布者:在 `EventHub<TEvent>` 场景里,`clientId` 会被用于é¿å…“分å‘给自己â€ã€‚
---
## 9. 常è§ç”¨æ³•组åˆ
### 9.1 进程内:一个å‘布者 + 多个订阅者
- 使用 `EventBus<TEvent>`。
- ä¸éœ€è¦ `EventHub<TEvent>`。
### 9.2 网络场景:按 topic 订阅/å‘布
- 使用 `EventHub<TEvent>` 作为统一入å£ï¼š
- 输入:收到网络å—符串或 `IPacket`。
- 输出:分å‘到 topic 对应的 `IEventBus<TEvent>` 或回调。
- è‹¥éœ€è¦æŒ‰ topic 创建总线:æä¾› `IEventBusFactory`,让枢纽按需创建。
### 9.3 ç±» MQ 场景:使用缓å˜é˜Ÿåˆ—
- 使用 `QueueEventBus<TEvent>`:
- å‘布写入队列;
- 本地订阅者由åŽå°æ¶ˆè´¹å¾ªçŽ¯æ‹‰å–队列å†åˆ†å‘。
---
## 10. 相关测试用例(å¯å‚考)
- `XUnitTest.Core\Messaging\EventBusTests.cs`
- `XUnitTest.Core\Messaging\EventHubTests.cs`
- `XUnitTest.Core\Caching\QueueEventBusTests.cs`
---
## 11. FAQ
### Q1:`PublishAsync` 返回值代表什么?
默认实现(`EventBus<TEvent>`):返回æˆåŠŸæ‰§è¡Œ `HandleAsync` 的处ç†å™¨æ•°é‡ï¼ˆå¤„ç†å™¨æŠ›å¼‚常且 `ThrowOnHandlerError=false` 则ä¸è®¡å…¥æˆåŠŸï¼‰ã€‚
### Q2:为什么 `EventBus<TEvent>` æœ‰éžæ³›åž‹ `IEventBus`?
用于在ä¸å…³å¿ƒäº‹ä»¶å…·ä½“类型时(例如统一上下文或ä¸é—´ä»¶ç®¡é“ï¼‰æŒæœ‰ä¸€ä¸ªæ€»çº¿å¼•用。
### Q3:如何在 `EventHub<TEvent>` çš„ subscribe æŒ‡ä»¤ä¸æä¾›å¤„ç†å™¨ï¼Ÿ
æž„é€ `EventContext` 并写入 `context["Handler"] = myHandler`,然åŽè°ƒç”¨ `HandleAsync("event#...#subscribe", context)`。
---
## 12. 版本与兼容性
- 本模å—é¢å‘å¤šç›®æ ‡æ¡†æž¶ï¼ˆ`net45` 至 `net10`ï¼‰å¹¶ä½¿ç”¨å¼‚æ¥ API。
- åœ¨è¾ƒè€æ¡†æž¶ä¸‹ï¼Œéƒ¨åˆ† `Task` 相关 API 会使用兼容实现(例如 `TaskEx`)。
|