Add XCode skills for entity caching, ORM, and sharding ETL
|
---
name: mqtt-client-server
description: >
使用 NewLife.MQTT 构建 MQTT 客户端(MqttClient)和内嵌 Broker(MqttServer),
涵盖连接管ç†ã€å‘布/订阅ã€QoS 0/1/2ã€é—嘱消æ¯ã€ä¿ç•™æ¶ˆæ¯ã€æ–线自动é‡è¿žé‡è®¢é˜…,
æ”¯æŒ MQTT 3.1/3.1.1/5.0 å议,以åŠé›†ç¾¤/桥接/规则引擎/WebHook/ACL ä¼ä¸šåŠŸèƒ½ã€‚
适用于 IoT 设备接入ã€å®žæ—¶æ¶ˆæ¯æŽ¨é€ã€è®¾å¤‡å½±ååŒæ¥ç‰åœºæ™¯ã€‚
argument-hint: >
è¯´æ˜Žä½ çš„ MQTT 场景:客户端连接å‘布订阅还是æå»º Brokerï¼›
å议版本(3.1.1 还是 5.0);QoS ç‰çº§éœ€æ±‚;是å¦éœ€è¦ TLS/SSLï¼›
是å¦éœ€è¦é—嘱消æ¯ã€ä¿ç•™æ¶ˆæ¯ï¼›Broker 是å¦éœ€è¦è®¤è¯/集群/规则引擎。
---
# MQTT 客户端与æœåŠ¡ç«¯æŠ€èƒ½ï¼ˆNewLife.MQTT)
## 适用场景
- IoT è®¾å¤‡ï¼ˆä¼ æ„Ÿå™¨ã€ç½‘å…³ã€å·¥ä¸šè®¾å¤‡ï¼‰é€šè¿‡ MQTT 上报数æ®ã€æŽ¥æ”¶æŒ‡ä»¤ã€‚
- 需è¦å†…嵌轻é‡çº§ Broker,ä¸ä¾èµ–第三方 MQTT æœåŠ¡ï¼ˆå¦‚ Mosquitto/EMQ)。
- 设备异常æ–线需è¦è‡ªåЍé‡è¿žå¹¶æ¢å¤å…¨éƒ¨è®¢é˜…关系。
- 需è¦ä¼ä¸šçº§åŠŸèƒ½ï¼šACL æƒé™æŽ§åˆ¶ã€æ¶ˆæ¯æ¡¥æŽ¥ï¼ˆè·¨é›†ç¾¤è½¬å‘)ã€è§„则引擎(消æ¯è·¯ç”±/处ç†ï¼‰ã€‚
- 代ç 审查:确认 QoS 使用æ£ç¡®ã€é—嘱消æ¯é…ç½®è§„èŒƒã€æ–线é‡è¿žç–ç•¥åˆç†ã€‚
## æ ¸å¿ƒåŽŸåˆ™
1. **`MqttClient` 自动é‡è¿žé‡è®¢é˜…**:设置 `Reconnect = true`(默认)åŽï¼Œè¿žæŽ¥æ–开时框架自动按指数退é¿é‡è¿žï¼Œå¹¶æ¢å¤ `SubscribeAsync` æ³¨å†Œçš„å…¨éƒ¨è®¢é˜…å…³ç³»ï¼Œä¸šåŠ¡å±‚æ— éœ€æ‰‹åŠ¨å®žçŽ°æ–线æ¢å¤ã€‚
2. **QoS 级别按需选择**:`AtMostOnce(0)` 高åžå/坿ޥå—丢消æ¯ï¼ˆä¼ æ„Ÿå™¨é¥æµ‹ï¼‰ï¼›`AtLeastOnce(1)` å¯é æŠ•é€’é€‚åˆæŒ‡ä»¤ä¸‹å‘ï¼›`ExactlyOnce(2)` å››æ¬¡æ¡æ‰‹å¼€é”€å¤§ï¼Œä»…用于支付/告è¦ç‰å¹‚ç‰æ€§åœºæ™¯ã€‚
3. **é—嘱消æ¯åœ¨ `Connect` å‰é…ç½®**:`WillTopic`/`WillMessage`/`WillQoS` 属性必须在 `ConnectAsync()` 调用å‰è®¾ç½®ï¼Œè¿žæŽ¥åŽæ— 法更改。
4. **ä¿ç•™æ¶ˆæ¯ç”¨äºŽçжæ€åŒæ¥**:å‘布时设置 `Retain = true`,Broker ä¿ç•™è¯¥ä¸»é¢˜æœ€åŽä¸€æ¡æ¶ˆæ¯ï¼Œæ–°è®¢é˜…者连接åŽç«‹å³æ”¶åˆ°æœ€æ–°çжæ€ï¼Œæ— 需ç‰å¾…下一次å‘布。
5. **æœåŠ¡ç«¯å¿…é¡»æ³¨å…¥ `IMqttExchange`**:`MqttExchange` 是消æ¯è·¯ç”±ä¸æž¢ï¼Œè´Ÿè´£å‘布/订阅匹é…ã€ä¿ç•™æ¶ˆæ¯å˜å‚¨ã€QoS æ¶ˆæ¯æŒä¹…åŒ–ï¼›ä¸æ³¨å…¥åˆ™æœåŠ¡ç«¯åªèƒ½æŽ¥æ”¶æ¶ˆæ¯ä½†æ— 法路由分å‘。
6. **通é…符订阅 `+` å’Œ `#` 区别**:`+` 匹é…å•层(`sensor/+/temperature` åŒ¹é… `sensor/device1/temperature`),`#` 匹é…多层(`sensor/#` åŒ¹é… `sensor/device1/data/raw`);`#` åªèƒ½å‡ºçŽ°åœ¨æœ«å°¾ã€‚
7. **é—å˜±æ¶ˆæ¯ vs æ£å¸¸æ–å¼€**:æ£å¸¸ `DisconnectAsync()` ä¸è§¦å‘é—嘱;异常æ–线(网络超时ã€è¿›ç¨‹å´©æºƒï¼‰æ‰è§¦å‘é—嘱消æ¯å‘布。
## 执行æ¥éª¤
### 一ã€å®¢æˆ·ç«¯è¿žæŽ¥ä¸Žå‘布订阅
```csharp
using NewLife.MQTT;
using NewLife.MQTT.Messaging;
// 创建客户端
var client = new MqttClient
{
Server = "tcp://127.0.0.1:1883",
ClientId = Guid.NewGuid().ToString(),
UserName = "admin",
Password = "admin",
KeepAlive = 60, // 心跳间隔(秒)
Reconnect = true, // 自动é‡è¿žï¼ˆé»˜è®¤ true)
Version = MqttVersion.V311,
Log = XTrace.Log,
};
// 连接
await client.ConnectAsync();
// 订阅主题(通é…符 + 回调)
await client.SubscribeAsync("sensor/+/temperature", msg =>
{
var payload = msg.Payload.ToStr();
Console.WriteLine($"主题: {msg.Topic}, æ•°æ®: {payload}");
});
// å‘布消æ¯ï¼ˆQoS 1)
await client.PublishAsync("sensor/device1/temperature", "25.6",
QualityOfService.AtLeastOnce);
// å‘布ä¿ç•™æ¶ˆæ¯ï¼ˆæ–°è®¢é˜…者立å³å¯è§æœ€æ–°çжæ€ï¼‰
await client.PublishAsync(new PublishMessage
{
Topic = "device/online-status",
Payload = Encoding.UTF8.GetBytes("online"),
QoS = QualityOfService.AtLeastOnce,
Retain = true,
});
```
### 二ã€è¿žæŽ¥å—符串é…ç½®
```csharp
// ç‰ä»·äºŽä¸Šé¢çš„属性é…ç½®
client.Init("Server=tcp://127.0.0.1:1883;UserName=admin;Password=admin;ClientId=client01");
await client.ConnectAsync();
```
### 三ã€é—嘱消æ¯é…ç½®
```csharp
// é—嘱消æ¯ï¼šè®¾å¤‡å¼‚常掉线时 Broker 自动å‘布
var client = new MqttClient
{
Server = "tcp://127.0.0.1:1883",
ClientId = "device-001",
WillTopic = "device/device-001/status",
WillMessage = Encoding.UTF8.GetBytes("offline"),
WillQoS = QualityOfService.AtLeastOnce,
WillRetain = true, // é—嘱消æ¯ä¹Ÿä½œä¸ºä¿ç•™æ¶ˆæ¯å˜å‚¨
};
await client.ConnectAsync();
```
### å››ã€MQTT 5.0 特性
```csharp
var client = new MqttClient
{
Server = "tcp://127.0.0.1:1883",
Version = MqttVersion.V500, // å¯ç”¨ MQTT 5.0
};
await client.ConnectAsync();
// 共享订阅(负载å‡è¡¡ï¼Œå¤šä¸ªæ¶ˆè´¹è€…)
await client.SubscribeAsync("$share/group1/sensor/#", msg =>
{
Console.WriteLine($"[5.0 共享订阅] {msg.Topic}: {msg.Payload.ToStr()}");
});
```
### 五ã€TLS/SSL 安全连接
```csharp
var client = new MqttClient
{
Server = "ssl://broker.example.com:8883",
SslProtocol = System.Security.Authentication.SslProtocols.Tls12,
Certificate = new X509Certificate2("client.pfx", "password"),
};
await client.ConnectAsync();
```
### å…ã€æå»ºå†…åµŒ Broker(MqttServer)
```csharp
using NewLife.MQTT;
using NewLife.MQTT.Handlers;
using NewLife.Remoting;
// IoC 容器注册
var services = ObjectContainer.Current;
services.AddSingleton<ILog>(XTrace.Log);
services.AddTransient<IMqttHandler, MqttHandler>(); // å议处ç†å™¨
services.AddSingleton<IMqttExchange, MqttExchange>(); // 消æ¯è·¯ç”±ï¼ˆå¿…须)
// 创建并å¯åЍ Broker
var server = new MqttServer
{
Port = 1883,
ServiceProvider = services.BuildServiceProvider(),
Log = XTrace.Log,
};
server.Start();
```
### 七ã€è‡ªå®šä¹‰è®¤è¯ä¸Ž ACL
```csharp
public class MyAuthenticator : IMqttAuthenticator
{
public Boolean Authenticate(ConnectMessage message, out String? reason)
{
var valid = CheckCredentials(message.Username, message.Password);
reason = valid ? null : "ç”¨æˆ·åæˆ–密ç 错误";
return valid;
}
public Boolean CanPublish(String clientId, String topic)
=> topic.StartsWith($"device/{clientId}/"); // 设备åªèƒ½å‘布自己的主题
public Boolean CanSubscribe(String clientId, String topicFilter)
=> topicFilter.StartsWith("sensor/");
}
// 注册
services.AddSingleton<IMqttAuthenticator, MyAuthenticator>();
```
### å…«ã€è§„则引擎(消æ¯è·¯ç”±/处ç†ï¼‰
```csharp
// 规则引擎在 MqttExchange 内é…ç½®
var exchange = new MqttExchange();
// 规则:匹é…主题 sensor/# → 转å‘到å¦ä¸€ä¸»é¢˜
exchange.AddRule(new MqttRule
{
TopicFilter = "sensor/#",
Action = RuleAction.Republish,
TargetTopic = "log/sensor",
});
// 规则:匹é…告è¦ä¸»é¢˜ → è§¦å‘ WebHook
exchange.AddRule(new MqttRule
{
TopicFilter = "alarm/#",
Action = RuleAction.WebHook,
WebHookUrl = "https://api.example.com/alarm",
});
```
### ä¹ã€é›†ç¾¤éƒ¨ç½²
```csharp
var server = new MqttServer
{
Port = 1883,
ClusterPort = 2883,
ClusterNodes = new[] { "192.168.1.2:2883", "192.168.1.3:2883" },
};
server.Start();
```
## é…ç½®å‚æ•°é€ŸæŸ¥ï¼ˆMqttClient)
| 傿•° | 默认值 | 说明 |
|------|--------|------|
| `Server` | — | æœåŠ¡å™¨åœ°å€ï¼ˆ`tcp://host:1883`,`ssl://` 表示 TLS) |
| `ClientId` | éšæœº GUID | å®¢æˆ·ç«¯å”¯ä¸€æ ‡è¯†ï¼ˆæ–线é‡è¿žå»ºè®®å›ºå®šï¼‰ |
| `KeepAlive` | `600` | 心跳间隔(秒),0 = ç¦ç”¨ |
| `CleanSession` | `true` | false = æ–çº¿åŽæœåŠ¡ç«¯ä¿ç•™ä¼šè¯ï¼ˆç¦»çº¿æ¶ˆæ¯ï¼‰ |
| `Version` | `V311` | MQTT å议版本 |
| `Reconnect` | `true` | 自动é‡è¿ž |
| `MaxReconnectAttempts` | `0` | 0 = æ— é™é‡è¿ž |
| `InitialReconnectDelay` | `1000` | åˆå§‹é‡è¿žå»¶è¿Ÿï¼ˆms) |
| `MaxReconnectDelay` | `60000` | 最大é‡è¿žå»¶è¿Ÿï¼ˆms,指数退é¿ä¸Šé™ï¼‰ |
| `Timeout` | `15000` | æ“作超时(ms) |
## 常è§é”™è¯¯ä¸Žæ³¨æ„事项
- **`CleanSession=false` æ—¶ `ClientId` 必须固定**:æŒä¹…会è¯é `ClientId` è¯†åˆ«ï¼›æ¯æ¬¡ç”¨éšæœº ID ä¼šå¯¼è‡´ç¦»çº¿æ¶ˆæ¯æ°¸è¿œå †ç§¯ï¼ŒæœåŠ¡ç«¯ä¼šè¯è†¨èƒ€ã€‚
- **QoS 2 䏿”¯æŒ Retain + 消æ¯å¹‚ç‰åœºæ™¯**:QoS 2 ä¿è¯æ°å¥½ä¸€æ¬¡ï¼Œä½†ç½‘ç»œæŠ–åŠ¨æ—¶å››æ¬¡æ¡æ‰‹å¯èƒ½è¢«é‡å¤è§¦å‘;业务层ä»éœ€åšå¹‚ç‰å¤„ç†ã€‚
- **`IMqttExchange` 是 Broker 的必è¦ç»„ä»¶**ï¼šä¸æ³¨å…¥ `MqttExchange` åˆ™æ¶ˆæ¯æ— 法在客户端间路由,所有消æ¯åªè¿›ä¸å‡ºã€‚
- **通é…符 `#` 订阅所有主题有性能风险**:`#` 匹é…全部消æ¯ï¼Œé«˜åžå场景下回调处ç†ä¸åŠæ—¶ä¼šå¯¼è‡´å†…å˜ç§¯åŽ‹ã€‚
- **é—嘱消æ¯ä¸è¦è®¾ç½®è¿‡å¤§ Payload**:é—嘱消æ¯åœ¨ CONNECT 报文ä¸ä¸€æ¬¡æ€§ä¼ 输,过大(>256KB)会拒ç»è¿žæŽ¥ã€‚
|