解决MySql布尔型新旧版本兼容问题,采用枚举来表示布尔型的数据表。由正向工程赋值
大石头 authored at 2018-05-15 21:21:05
12.45 KiB
X
# 设备数据从 MQ 分发到 WebSocket 物联网场景中,设备采集数据写入 Redis 消息队列(`RedisStream`),前端通过 WebSocket 实时订阅。本文介绍如何用 **单消费者 + 内存事件分发** 模式,以最少的 Redis 连接支撑大量 WebSocket 推送。 --- ## 1. 问题与方案 ### 1.1 朴素方案的缺陷 每个 WebSocket 连接各自创建一个 `RedisStream` 消费者: ```text WebSocket A → RedisStream 消费者 → Redis 长连接 WebSocket B → RedisStream 消费者 → Redis 长连接 WebSocket C → RedisStream 消费者 → Redis 长连接 …(成千上万) ``` `RedisStream` 消费使用长连接,WebSocket 数量膨胀后迅速耗尽 Redis 连接池,系统崩溃。 ### 1.2 单消费者 + EventHub 内存分发 每个进程只启动 **一个 MQ 消费者**(一条 Redis 长连接),消费到数据后由 `EventHub` 在内存中按设备路由,分发给同进程内的 WebSocket 处理器: ```text Redis MQ(1 条长连接 / 进程) ↓ DataConsumer(单例 IHostedService) ↓ EventHub<DeviceDataDTO>(按 topic=DeviceId 路由) ↓ EventBus<DeviceDataDTO>(同设备共享总线) ↙ ↓ ↘ WebSocket A B C …(IEventHandler) ``` Redis 连接数 = 进程数,与 WebSocket 连接数完全解耦。 --- ## 2. 整体架构 ```text ┌──────────────────────────────────────────────────────┐ │ ASP.NET Core 进程 │ │ │ │ DataConsumer(Singleton + IHostedService) │ │ ├─ ICacheProvider → RedisStream<DeviceDataDTO> │ │ ├─ ConsumeAsync(OnConsume) 后台消费循环 │ │ └─ EventHub<DeviceDataDTO> 内存路由 │ │ │ │ │ │ DispatchAsync(topic=DeviceId) │ │ ↓ │ │ EventBus(每设备一个共享实例) │ │ ├─ Controller A → ws.SendAsync → 客户端 A │ │ ├─ Controller B → ws.SendAsync → 客户端 B │ │ └─ Controller C → ws.SendAsync → 客户端 C │ │ │ └──────────────────────────────────────────────────────┘ ↑ Redis MQ(队列:DeviceData,消费组=机器名) ``` **组件职责**: | 组件 | 职责 | |------|------| | `DataConsumer` | 单例托管服务,持有唯一 Redis 长连接,消费 `DeviceData` 队列 | | `EventHub<DeviceDataDTO>` | 按 `topic`(DeviceId)路由,管理每设备的事件总线 | | `EventBus<DeviceDataDTO>` | 同设备共享的内存总线,广播给所有订阅该设备的处理器 | | `DeviceDataController` | WebSocket 端点,实现 `IEventHandler<DeviceDataDTO>`,收到事件后推送 JSON | --- ## 3. 数据模型 设备数据报文使用 `IoT.Data.DeviceDataDTO`,队列名称固定为 `DeviceData`: ```csharp namespace IoT.Data; /// <summary>设备数据。设备采集原始数据,按天分表存储</summary> public partial class DeviceDataDTO : IDeviceData { /// <summary>编号</summary> public Int64 Id { get; set; } /// <summary>设备</summary> public Int32 DeviceId { get; set; } /// <summary>名称。MQTT的Topic,或者属性名</summary> public String Name { get; set; } /// <summary>数值</summary> public String Value { get; set; } /// <summary>时间戳。设备生成数据时的UTC毫秒</summary> public Int64 Timestamp { get; set; } } ``` --- ## 4. MQ 消费者 `DataConsumer` 作为 `IHostedService` 单例注册,职责:以本机计算机名为消费组获取 `RedisStream<DeviceDataDTO>`,调用 `ConsumeAsync` 注册回调,回调中按 `DeviceId` 路由——无订阅直接丢弃,有订阅才分发。 ```csharp /// <summary>设备数据 MQ 消费者(每进程单例)</summary> public class DataConsumer(ICacheProvider cacheProvider) : IHostedService { #region 属性 /// <summary>内存事件枢纽</summary> public EventHub<DeviceDataDTO> Hub { get; } = new(); private RedisStream<DeviceDataDTO>? _redisStream; private CancellationTokenSource _source = new(); #endregion #region IHostedService public Task StartAsync(CancellationToken cancellationToken) { Hub.Log = XTrace.Log; // 以本机计算机名为消费组,每台服务器独立全量消费 var group = Environment.MachineName; // RedisStream 内部处理反序列化,回调直接拿到强类型对象 _redisStream = cacheProvider.GetQueue<DeviceDataDTO>("DeviceData", group) as RedisStream<DeviceDataDTO>; // ConsumeAsync 启动后台循环,每条消息回调 OnConsume _redisStream?.ConsumeAsync(OnConsume, _source.Token); return Task.CompletedTask; } public Task StopAsync(CancellationToken cancellationToken) { _source.Cancel(); return Task.CompletedTask; } #endregion #region 消费回调 private async void OnConsume(DeviceDataDTO data) { var topic = data.DeviceId.ToString(); // 无订阅直接丢弃,避免无意义分发 if (!Hub.TryGetBus<DeviceDataDTO>(topic, out _)) return; var clientId = Environment.MachineName; try { await Hub.DispatchAsync(topic, clientId, data).ConfigureAwait(false); } catch (Exception ex) { Hub.WriteLog("分发异常:{0}", ex.Message); } } #endregion } ``` > `ConsumeAsync` 内部已处理重试与异常,无需手动写消费循环。`async void` 是回调签名要求,异常需在方法内捕获。 --- ## 5. WebSocket 控制器 控制器直接实现 `IEventHandler<DeviceDataDTO>`,WebSocket 连接时订阅设备事件总线,断开时取消订阅: ```csharp [ApiController] [Route("ws")] public class DeviceDataController(DataConsumer consumer) : ControllerBase, IEventHandler<DeviceDataDTO> { private WebSocket? _ws; private readonly TaskCompletionSource<Boolean> _closed = new(); /// <summary>订阅设备实时数据推送</summary> /// <param name="deviceId">设备编号,用作事件 topic</param> [HttpGet("data/{deviceId:int}")] public async Task GetData(Int32 deviceId) { if (!HttpContext.WebSockets.IsWebSocketRequest) { HttpContext.Response.StatusCode = 400; return; } using var ws = await HttpContext.WebSockets.AcceptWebSocketAsync(); _ws = ws; var clientId = Guid.NewGuid().ToString("N")[..8]; var topic = deviceId.ToString(); var bus = consumer.Hub.GetEventBus(topic, clientId); bus.Subscribe(this, clientId); try { await WaitForCloseAsync(HttpContext.RequestAborted).ConfigureAwait(false); } finally { bus.Unsubscribe(clientId); _ws = null; } } private Task WaitForCloseAsync(CancellationToken cancellationToken) { cancellationToken.Register(() => _closed.TrySetResult(false)); return _closed.Task; } /// <summary>收到设备数据时推送 JSON 给 WebSocket 客户端</summary> public async Task HandleAsync(DeviceDataDTO @event, IEventContext? context, CancellationToken cancellationToken) { var ws = _ws; if (ws == null || ws.State != WebSocketState.Open) { _closed.TrySetResult(true); return; } try { var json = @event.ToJson(); var buffer = Encoding.UTF8.GetBytes(json); await ws.SendAsync(new ArraySegment<Byte>(buffer), WebSocketMessageType.Text, true, cancellationToken).ConfigureAwait(false); } catch (WebSocketException) { _closed.TrySetResult(true); } } } ``` --- ## 6. 服务注册(Program.cs) ```csharp var builder = WebApplication.CreateBuilder(args); // 注册 Redis(NewLife.Redis 扩展包) builder.Services.AddRedis("127.0.0.1:6379"); // 单例 + 托管服务,整个进程只占一条 Redis 长连接 builder.Services.AddSingleton<DataConsumer>(); builder.Services.AddHostedService(sp => sp.GetRequiredService<DataConsumer>()); builder.Services.AddControllers(); var app = builder.Build(); app.UseWebSockets(new WebSocketOptions { KeepAliveInterval = TimeSpan.FromSeconds(30) }); app.MapControllers(); app.Run(); ``` --- ## 7. 关键机制 ### 7.1 消费组命名与多服务器部署 RedisStream 的消费组决定了消息分发策略。不同服务器必须使用不同消费组名,否则消息被 Redis 轮流派发,客户端只收到部分数据。 ```csharp // 推荐用机器名(短且唯一) var group = Environment.MachineName; // 或用局域网 IP var group = NetHelper.MyIP()?.ToString() ?? Environment.MachineName; ``` | 部署场景 | 消费组命名 | 效果 | |---------|-----------|------| | 单台服务器 | `MACHINE-A` | 全量消费 | | 多台负载均衡 | 各用机器名 | 每台**独立全量消费**,各自分发给本机 WebSocket | | 多台共用组名 | `myapp`(❌) | 消息**轮流派发**,客户端只收到部分数据 | > **原则**:广播场景(每台服务器都需要完整消息)用不同消费组名;竞争消费场景(消息只需处理一次)才用相同组名。 ### 7.2 同设备多连接共享事件总线 `EventHub` 按 `topic`(DeviceId)管理事件总线,同一设备的所有 WebSocket 连接共享同一个 `IEventBus<DeviceDataDTO>` 实例: ```text 1 个设备 (topic) → 1 个 EventBus → N 个 WebSocket 处理器 ``` - `DataConsumer.OnConsume` 调用 `Hub.DispatchAsync(topic, ...)` 时,`EventHub` 按 `topic` 定位总线 - 各 WebSocket 通过 `Hub.GetEventBus(topic, clientId)` 拿到同一个总线实例 - 消息由该总线广播给该设备下所有订阅连接 ### 7.3 无订阅即丢弃 消费回调中通过 `TryGetBus` 检查是否有订阅者,无则直接丢弃。大量未被关注的设备数据在入口即被过滤,不产生任何分发开销。 不使用 `QueueEventBus` 缓冲:本场景中未订阅的数据本就应丢弃,缓冲只会占用额外内存。`EventHub<DeviceDataDTO>` 直接分发最合适。 ### 7.4 订阅生命周期 | 时机 | 操作 | |------|------| | WebSocket 建立 | `bus.Subscribe(this, clientId)` | | 正常断开 | `finally` 块 `bus.Unsubscribe(clientId)` | | 异常断开 | `HandleAsync` 捕获 `WebSocketException` → `_closed.TrySetResult` → 触发 `finally` | | topic 无任何订阅者 | `EventBus` 自动清空;`EventHub` 移除总线缓存 | --- ## 8. 数据流时序 ```text Redis MQ(DeviceData) │ ConsumeAsync(消费组=机器名) ▼ DataConsumer.OnConsume │ TryGetBus → 无订阅?丢弃 │ → 有订阅?DispatchAsync(deviceId, machineName, data) ▼ EventHub<DeviceDataDTO> │ 定位 topic 对应的 EventBus ▼ EventBus<DeviceDataDTO>.PublishAsync │ 广播给所有 handler ├── Controller(a1b2) → ws.SendAsync → 客户端 A ├── Controller(d4e5) → ws.SendAsync → 客户端 B └── Controller(g7h8) → ws.SendAsync → 客户端 C ``` --- ## 9. 进阶:预序列化减少开销 多个 WebSocket 推送同一条数据时,可预生成 JSON 放入 `EventContext`,避免重复序列化。`EventContext` 实现了 `IExtend`,支持通过索引器存取任意扩展数据: ```csharp // DataConsumer.OnConsume 中预序列化 private async void OnConsume(DeviceDataDTO data) { var topic = data.DeviceId.ToString(); if (!Hub.TryGetBus<DeviceDataDTO>(topic, out _)) return; var ctx = new EventContext(); ctx["Raw"] = data.ToJson(); await Hub.DispatchAsync(topic, Environment.MachineName, data, ctx).ConfigureAwait(false); } // DeviceDataController.HandleAsync 中优先取预序列化结果 public async Task HandleAsync(DeviceDataDTO @event, IEventContext? context, CancellationToken cancellationToken) { var raw = (context as IExtend)?["Raw"] as String ?? @event.ToJson(); var buffer = Encoding.UTF8.GetBytes(raw); await _ws.SendAsync(new ArraySegment<Byte>(buffer), WebSocketMessageType.Text, true, cancellationToken) .ConfigureAwait(false); } ```