Add XCode skills for entity caching, ORM, and sharding ETL
|
---
name: rocketmq-messaging
description: >
使用 NewLife.RocketMQ 接入 Apache RocketMQ(4.x/5.x)进行消æ¯ç”Ÿäº§ä¸Žæ¶ˆè´¹ï¼Œ
涵盖 Producer/Consumer 生命周期ã€äº”ç§æ¶ˆæ¯ç±»åž‹ï¼ˆæ™®é€š/顺åº/事务/延迟/批é‡ï¼‰ã€
Pop 消费模å¼ï¼ˆ5.x)ã€å››å¤§äº‘厂商适é…(阿里/腾讯/åŽä¸º/Apache ACL)ã€
gRPC 代ç†ã€æ¶ˆæ¯è½¨è¿¹è¿½è¸ªï¼Œä»¥åŠæ¶ˆè´¹é‡è¯•与æ»ä¿¡é˜Ÿåˆ—。
适用于电商订å•ã€æ”¯ä»˜é€šçŸ¥ã€IoT æ•°æ®æŽ¥å…¥ç‰é«˜å¯é 消æ¯åœºæ™¯ã€‚
argument-hint: >
è¯´æ˜Žä½ çš„æ¶ˆæ¯åœºæ™¯ï¼šå‘é€è¿˜æ˜¯æ¶ˆè´¹ï¼›æ¶ˆæ¯ç±»åž‹ï¼ˆæ™®é€š/顺åº/事务/延迟/批é‡ï¼‰ï¼›
RocketMQ 版本(4.x 还是 5.x);是å¦ä½¿ç”¨äº‘æœåŠ¡ï¼ˆé˜¿é‡Œ/腾讯/åŽä¸ºï¼‰è¿˜æ˜¯è‡ªå»ºï¼›
是å¦éœ€è¦ gRPC 代ç†ï¼ˆ5.x 专用);消费是集群模å¼è¿˜æ˜¯å¹¿æ’模å¼ã€‚
---
# RocketMQ æ¶ˆæ¯æ”¶å‘技能(NewLife.RocketMQ)
## 适用场景
- ç”µå•†ã€æ”¯ä»˜åœºæ™¯ä¸ï¼Œéœ€è¦é€šè¿‡æ¶ˆæ¯é˜Ÿåˆ—解耦æœåŠ¡ï¼Œä¿è¯æ¶ˆæ¯å¯é 投递。
- 需è¦äº‹åŠ¡æ¶ˆæ¯ï¼ˆäºŒé˜¶æ®µæäº¤ï¼‰ä¿è¯æ•°æ®åº“æ“作与消æ¯å‘é€çš„åŽŸåæ€§ã€‚
- IoT è®¾å¤‡æ•°æ®æŽ¥å…¥ï¼Œéœ€è¦é«˜åžåã€é¡ºåºæ¶ˆè´¹ã€‚
- è¿ç§»æˆ–部署在阿里云/腾讯云/åŽä¸ºäº‘çš„ RocketMQ 实例,需è¦ç‰¹å®šè®¤è¯é€‚é…。
- å‡çº§åˆ° RocketMQ 5.x,需è¦ä½¿ç”¨ Pop æ¶ˆè´¹æ¨¡å¼æˆ– gRPC 代ç†ã€‚
## æ ¸å¿ƒåŽŸåˆ™
1. **Producer å’Œ Consumer 独立生命周期**:`Start()` åŽæ‰èƒ½æ”¶å‘;程åºé€€å‡ºå‰å¿…é¡» `Stop()` 优雅关é—,å¦åˆ™æ¶ˆè´¹ä½ç‚¹ä¸ä¼šåŠæ—¶æäº¤ï¼Œé‡å¯åŽå¯èƒ½é‡å¤æ¶ˆè´¹ã€‚
2. **消费回调返回 `false` 触å‘é‡è¯•**:`OnConsume` 委托返回 `true` 表示消费æˆåŠŸï¼›è¿”å›ž `false` 或抛出异常时,消æ¯ä¼šæŒ‰ `MaxReconsumeTimes`(默认 16 次)é‡è¯•,超次进æ»ä¿¡é˜Ÿåˆ—(`%DLQ%{Group}`)。
3. **é¡ºåºæ¶ˆè´¹å¿…é¡»é”定队列**:设置 `OrderConsume = true` åŽï¼Œæ¡†æž¶ä¼šè°ƒç”¨ `LockBatchMQAsync` 在 Broker ç«¯åŠ é”,åŒä¸€é˜Ÿåˆ—内消æ¯ä¸²è¡Œæ¶ˆè´¹ï¼›é”定失败的队列消æ¯å»¶åŽå¤„ç†ã€‚
4. **事务消æ¯ä¸¤æ¥å‘é€**:先 `PublishTransaction`ï¼ˆåŠæ¶ˆæ¯ï¼‰æ‰§è¡Œæœ¬åœ°äº‹åŠ¡ï¼Œå†é€šè¿‡ `OnCheckTransaction` 回查委托告知 Broker æäº¤è¿˜æ˜¯å›žæ»šï¼›Broker 1 分钟内å‘起回查,回查超 15 次视为回滚。
5. **云厂商适é…通过 `CloudProvider` æ’入认è¯é€»è¾‘**:ä¸åŒäº‘厂商ç¾å算法ä¸åŒï¼ˆHMAC-SHA1/MD5/AK-SK),通过 `ICloudProvider` æŽ¥å£æ³¨å…¥ï¼Œä¸æ”¹å˜ä¸šåС代ç 。
6. **RocketMQ 5.x 使用 gRPC 需设置 `GrpcProxyAddress`**:5.x æ”¯æŒ Remoting å’Œ gRPC åŒå议,`GrpcProxyAddress` 优先,ä¸è®¾åˆ™ä½¿ç”¨ Remoting(兼容 4.x)。
## 执行æ¥éª¤
### 一ã€å‘逿™®é€šæ¶ˆæ¯
```csharp
using NewLife.RocketMQ;
var producer = new Producer
{
Topic = "order-topic",
NameServerAddress = "127.0.0.1:9876",
Group = "order-producer-group",
};
producer.Start();
// åŒæ¥å‘é€ï¼ˆå¸¦æ ‡ç¾å’Œä¸šåŠ¡é”®ï¼‰
var result = producer.Publish(
body: new { OrderId = 42, Amount = 100.0m },
tags: "create",
keys: "order-42"
);
Console.WriteLine($"MsgId: {result.MsgId}, Status: {result.SendStatus}");
// 异æ¥å‘é€
var asyncResult = await producer.PublishAsync(new Message
{
Topic = "order-topic",
Tags = "pay",
Body = Encoding.UTF8.GetBytes(order.ToJson()),
});
// å•å‘å‘é€ï¼ˆä¸ç‰ç»“æžœï¼Œé€‚åˆæ—¥å¿—/监控)
producer.PublishOneway(message, queue: null);
producer.Stop();
```
### äºŒã€æ‰¹é‡å‘é€
```csharp
var messages = orders.Select(o => new Message
{
Topic = "order-topic",
Tags = "batch",
Body = Encoding.UTF8.GetBytes(o.ToJson()),
}).ToList();
var result = producer.PublishBatch(messages);
```
### 三ã€å»¶è¿Ÿæ¶ˆæ¯
```csharp
// RocketMQ 4.x:18 级预设延迟(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)
var msg = new Message
{
Topic = "order-topic",
Body = Encoding.UTF8.GetBytes(order.ToJson()),
DelayTimeLevel = 3, // 第 3 级 = 10 ç§’åŽæŠ•é€’
};
producer.Publish(msg);
// RocketMQ 5.x:精确延迟时间(毫秒)
var msg5 = new Message
{
Topic = "order-topic",
Body = Encoding.UTF8.GetBytes(order.ToJson()),
DeliveryTimestamp = DateTimeOffset.UtcNow.AddMinutes(5).ToUnixTimeMilliseconds(),
};
```
### å››ã€äº‹åŠ¡æ¶ˆæ¯
```csharp
producer.OnCheckTransaction = (msg, transId) =>
{
// 回查本地事务状æ€ï¼ˆæŸ¥è¯¢æ•°æ®åº“ç‰ï¼‰
var state = CheckOrderInDb(transId);
return state == OrderState.Success
? TransactionState.Commit
: TransactionState.Rollback;
};
producer.Start();
// å‘é€äº‹åŠ¡æ¶ˆæ¯
var result = producer.PublishTransaction(new Message
{
Topic = "finance-topic",
Body = Encoding.UTF8.GetBytes(transfer.ToJson()),
});
// 执行本地事务(数æ®åº“è½åº“)
using var tx = db.BeginTransaction();
db.InsertTransfer(transfer);
tx.Commit();
// æäº¤äº‹åŠ¡æ¶ˆæ¯ï¼ˆå‘ŠçŸ¥ Broker æäº¤ï¼‰
producer.ConfirmTransaction(result.TransactionId, TransactionState.Commit);
```
### äº”ã€æ¶ˆè´¹æ¶ˆæ¯
```csharp
var consumer = new Consumer
{
Topic = "order-topic",
Group = "order-consumer-group",
NameServerAddress = "127.0.0.1:9876",
BatchSize = 32, // æ¯æ‰¹æ‹‰å–æ•°é‡
FromLastOffset = false, // false = 从头消费(首次)
MaxReconsumeTimes = 16, // 失败最大é‡è¯•次数
};
// åŒæ¥æ¶ˆè´¹å›žè°ƒï¼ˆè¿”回 false = 消费失败,触å‘é‡è¯•)
consumer.OnConsume = (queue, messages) =>
{
foreach (var msg in messages)
{
try
{
ProcessOrder(Encoding.UTF8.GetString(msg.Body));
}
catch
{
return false; // 消费失败,é‡è¯•
}
}
return true;
};
consumer.Start();
// å¼‚æ¥æ¶ˆè´¹å›žè°ƒ
consumer.OnConsumeAsync = async (queue, messages, ct) =>
{
await ProcessBatchAsync(messages, ct);
return true;
};
```
### å…ã€é¡ºåºæ¶ˆè´¹
```csharp
var consumer = new Consumer
{
Topic = "order-topic",
Group = "order-seq-group",
NameServerAddress = "127.0.0.1:9876",
OrderConsume = true, // å¯ç”¨é¡ºåºæ¶ˆè´¹
};
consumer.OnConsume = (queue, messages) => { /* ä¸²è¡Œå¤„ç† */ return true; };
consumer.Start();
```
### 七ã€Pop 消费(RocketMQ 5.x)
```csharp
var consumer = new Consumer
{
Topic = "order-topic",
Group = "order-consumer-group",
NameServerAddress = "127.0.0.1:9876",
Version = MQVersion.V5_2_0, // 指定 5.x
};
// Pop 模å¼ï¼šæ¶ˆæ¯ä¸å†å½’属æŸä¸ª Broker é˜Ÿåˆ—ï¼Œæ¶ˆè´¹åŽ Ack
consumer.OnConsumeAsync = async (queue, messages, ct) =>
{
foreach (var msg in messages)
{
await ProcessAsync(msg);
// PopConsume 模å¼ä¸‹ Ack 由框架自动完æˆï¼ˆConsumerState.Success)
}
return true;
};
consumer.Start();
```
### å…«ã€äº‘厂商适é…
```csharp
// 阿里云公共云 RocketMQ
var producer = new Producer
{
Topic = "topic-xxx",
NameServerAddress = "http://xxx.mq.aliyuncs.com:80",
CloudProvider = new AliyunProvider
{
AccessKey = "LTAI5t...",
SecretKey = "xxx",
OnsChannel = "ALIYUN",
},
};
// Apache ACL(自建集群)
var producer2 = new Producer
{
Topic = "topic-xxx",
NameServerAddress = "127.0.0.1:9876",
CloudProvider = new AclProvider
{
AccessKey = "rocketmq_ak",
SecretKey = "rocketmq_sk",
},
};
// gRPC 代ç†ï¼ˆRocketMQ 5.x Proxy)
var producer3 = new Producer
{
Topic = "topic-xxx",
GrpcProxyAddress = "http://localhost:8081",
Group = "test-group",
};
```
## é…ç½®å‚æ•°é€ŸæŸ¥
| 傿•° | 默认值 | 说明 |
|------|--------|------|
| `NameServerAddress` | — | NameServer 地å€ï¼ˆå¿…填) |
| `Topic` | — | 主题å(必填) |
| `Group` | — | 生产/消费组 |
| `Version` | `V4_9_7` | å议版本(V5_2_0 = 5.x) |
| `RequestTimeout` | `3000` | 请求超时(ms) |
| `RetryTimesWhenSendFailed` | `3` | å‘é€å¤±è´¥é‡è¯•次数 |
| `MaxMessageSize` | `4MB` | 消æ¯ä½“最大é™åˆ¶ |
| `CompressOverBytes` | `4096` | 超过æ¤å¤§å°è‡ªåЍ ZLIB 压缩 |
| `BatchSize` | `32` | Consumer æ¯æ‰¹æ‹‰å–æ•°é‡ |
| `MaxReconsumeTimes` | `16` | 消费失败最大é‡è¯•次数 |
| `FromLastOffset` | `false` | 首次消费起始ä½ç½® |
| `EnableMessageTrace` | `false` | 是å¦å¼€å¯æ¶ˆæ¯è½¨è¿¹ |
## 常è§é”™è¯¯ä¸Žæ³¨æ„事项
- **`Stop()` 必须调用**:消费ä½ç‚¹é€šè¿‡å¿ƒè·³ç»´æŠ¤ï¼Œä¸è°ƒç”¨ `Stop()` 会导致é‡å¤æ¶ˆè´¹ã€‚
- **äº‹åŠ¡æ¶ˆæ¯ Broker 回查最多 15 次**:超é™è§†ä¸ºå›žæ»šï¼›å›žæŸ¥å§”托内ä¸è¦æœ‰è€—æ—¶ I/O,应直接查库状æ€ã€‚
- **延迟级别从 1 å¼€å§‹ï¼ˆä¸æ˜¯ 0)**:`DelayTimeLevel=0` 表示ä¸å»¶è¿Ÿï¼Œä¸Ž `1`(1 秒)ä¸åŒã€‚
- **é¡ºåºæ¶ˆè´¹ä¸Žå¤šçº¿ç¨‹å†²çª**:`OrderConsume=true` 时框架串行处ç†åŒä¸€é˜Ÿåˆ—,ä¸è¦åœ¨ `OnConsume` 内å†èµ·å¹¶å‘任务。
- **æ‰¹é‡æ¶ˆæ¯è¶… 4MB 会被拒ç»**ï¼šå‘æ‰¹é‡æ¶ˆæ¯å‰æ£€æŸ¥æ€»å¤§å°ï¼Œå»ºè®®æ¯æ‰¹ < 1MB。
- **5.x gRPC 代ç†éœ€å•独部署 Proxy 组件**:`GrpcProxyAddress` æŒ‡å‘ RocketMQ Proxyï¼Œè€Œéž Broker 直连地å€ã€‚
|