解决MySql布尔型新旧版本兼容问题,采用枚举来表示布尔型的数据表。由正向工程赋值
|
# ²¢ÐÐÄ£ÐÍ Actor
## ¸ÅÊö
`Actor` ÊÇ NewLife.Core ÖеÄÎÞËø²¢Ðбà³ÌÄ£ÐÍ£¬»ùÓÚÏûÏ¢¶ÓÁÐʵÏÖḬ̈߳²È«µÄÒì²½´¦Àí¡£Ã¿¸ö Actor ÓµÓжÀÁ¢µÄÏûÏ¢ÓÊÏäºÍ´¦ÀíỊ̈߳¬Í¨¹ýÏûÏ¢´«µÝ½øÐÐͨÐÅ£¬±ÜÃâÁË´«Í³Ëø»úÖÆ´øÀ´µÄ¸´ÔÓÐÔºÍÐÔÄÜÎÊÌâ¡£
**ÃüÃû¿Õ¼ä**£º`NewLife.Model`
**ÎĵµµØÖ·**£ºhttps://newlifex.com/core/actor
## ºËÐÄÌØÐÔ
- **ÎÞËøÉè¼Æ**£ºÍ¨¹ýÏûÏ¢¶ÓÁиôÀë״̬£¬ÎÞÐèÏÔʽ¼ÓËø
- **¶ÀÁ¢Ïß³Ì**£ºÃ¿¸ö Actor ʹÓöÀÁ¢Ï̴߳¦ÀíÏûÏ¢£¬²»Ó°ÏìÏ̳߳Ø
- **ÅúÁ¿´¦Àí**£ºÖ§³ÖÅúÁ¿Ïû·ÑÏûÏ¢£¬Ìá¸ßÍÌÍÂÁ¿
- **ÈÝÁ¿ÏÞÖÆ**£ºÖ§³ÖÉèÖÃÏûÏ¢¶ÓÁÐ×î´óÈÝÁ¿£¬·ÀÖ¹ÄÚ´æÒç³ö
- **×Ô¶¯Æô¶¯**£º·¢ËÍÏûϢʱ×Ô¶¯Æô¶¯ Actor
- **ÐÔÄÜ×·×Ù**£º¼¯³É `ITracer` Ö§³ÖÁ´Â·×·×Ù
## ¿ìËÙ¿ªÊ¼
```csharp
using NewLife.Model;
// ¶¨Òå Actor
public class MyActor : Actor
{
protected override Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
{
var message = context.Message;
Console.WriteLine($"ÊÕµ½ÏûÏ¢: {message}");
return Task.CompletedTask;
}
}
// ʹÓà Actor
var actor = new MyActor();
// ·¢ËÍÏûÏ¢
actor.Tell("Hello");
actor.Tell("World");
// µÈ´ý´¦ÀíÍê³ÉºóÍ£Ö¹
actor.Stop(5000);
```
## API ²Î¿¼
### IActor ½Ó¿Ú
```csharp
public interface IActor
{
/// <summary>Ìí¼ÓÏûÏ¢£¬Çý¶¯ÄÚ²¿´¦Àí</summary>
/// <param name="message">ÏûÏ¢¶ÔÏó</param>
/// <param name="sender">·¢ËÍÕßActor</param>
/// <returns>·µ»Ø´ý´¦ÀíÏûÏ¢Êý</returns>
Int32 Tell(Object message, IActor? sender = null);
}
```
### ActorContext Àà
```csharp
public class ActorContext
{
/// <summary>·¢ËÍÕß</summary>
public IActor? Sender { get; set; }
/// <summary>ÏûÏ¢</summary>
public Object? Message { get; set; }
}
```
### Actor »ùÀà
#### ÊôÐÔ
```csharp
/// <summary>Ãû³Æ</summary>
public String Name { get; set; }
/// <summary>ÊÇ·ñÆôÓÃ</summary>
public Boolean Active { get; }
/// <summary>ÊÜÏÞÈÝÁ¿¡£×î´ó¿É¶Ñ»ýµÄÏûÏ¢Êý£¬Ä¬ÈÏInt32.MaxValue</summary>
public Int32 BoundedCapacity { get; set; }
/// <summary>Åú´óС¡£Ã¿´Î´¦ÀíÏûÏ¢Êý£¬Ä¬ÈÏ1</summary>
public Int32 BatchSize { get; set; }
/// <summary>ÊÇ·ñ³¤Ê±¼äÔËÐС£Ä¬ÈÏtrue£¬Ê¹ÓöÀÁ¢Ïß³Ì</summary>
public Boolean LongRunning { get; set; }
/// <summary>µ±Ç°¶ÓÁ㤶È</summary>
public Int32 QueueLength { get; }
/// <summary>ÐÔÄÜ×·×ÙÆ÷</summary>
public ITracer? Tracer { get; set; }
```
#### Tell - ·¢ËÍÏûÏ¢
```csharp
public virtual Int32 Tell(Object message, IActor? sender = null)
```
Ïò Actor ·¢ËÍÏûÏ¢¡£Èç¹û Actor δÆô¶¯£¬»á×Ô¶¯Æô¶¯¡£
**²ÎÊý**£º
- `message`£ºÏûÏ¢¶ÔÏ󣬿ÉÒÔÊÇÈÎÒâÀàÐÍ
- `sender`£º·¢ËÍÕß Actor£¬ÓÃÓڻظ´ÏûÏ¢
**·µ»ØÖµ**£ºµ±Ç°´ý´¦ÀíµÄÏûÏ¢Êý
**ʾÀý**£º
```csharp
var actor = new MyActor();
// ·¢Ëͼòµ¥ÏûÏ¢
actor.Tell("Hello");
// ·¢Ë͸´ÔÓ¶ÔÏó
actor.Tell(new { Id = 1, Name = "Test" });
// ´ø·¢ËÍÕß
actor.Tell("Ping", senderActor);
```
#### Start - Æô¶¯ Actor
```csharp
public virtual Task? Start()
public virtual Task? Start(CancellationToken cancellationToken)
```
ÊÖ¶¯Æô¶¯ Actor¡£Í¨³£²»ÐèÒªÊÖ¶¯µ÷Óã¬`Tell` »á×Ô¶¯Æô¶¯¡£
**ʾÀý**£º
```csharp
var actor = new MyActor();
// ÊÖ¶¯Æô¶¯
actor.Start();
// ´øÈ¡ÏûÁîÅÆÆô¶¯
using var cts = new CancellationTokenSource();
actor.Start(cts.Token);
```
#### Stop - ֹͣ Actor
```csharp
public virtual Boolean Stop(Int32 msTimeout = 0)
```
Í£Ö¹ Actor£¬²»ÔÙ½ÓÊÜÐÂÏûÏ¢¡£
**²ÎÊý**£º
- `msTimeout`£ºµÈ´ýºÁÃëÊý¡£0=²»µÈ´ý£¬-1=ÎÞÏ޵ȴý
**·µ»ØÖµ**£ºÊÇ·ñÔÚ³¬Ê±Ç°Íê³ÉËùÓÐÏûÏ¢´¦Àí
**ʾÀý**£º
```csharp
// Á¢¼´Í£Ö¹£¬²»µÈ´ý
actor.Stop(0);
// µÈ´ý×î¶à5Ãë
var completed = actor.Stop(5000);
if (!completed)
Console.WriteLine("ÓÐÏûϢδ´¦ÀíÍê³É");
// ÎÞÏ޵ȴý
actor.Stop(-1);
```
#### ReceiveAsync - ´¦ÀíÏûÏ¢
```csharp
// µ¥Ìõ´¦Àí£¨BatchSize=1£©
protected virtual Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
// ÅúÁ¿´¦Àí£¨BatchSize>1£©
protected virtual Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken)
```
×ÓÀàÖØÐ´´Ë·½·¨ÊµÏÖÏûÏ¢´¦ÀíÂß¼¡£
## ʹÓó¡¾°
### 1. ÈÕÖ¾ÊÕ¼¯Æ÷
```csharp
public class LogActor : Actor
{
private readonly StreamWriter _writer;
public LogActor(String filePath)
{
Name = "LogActor";
BatchSize = 100; // ÅúÁ¿Ð´Èë
BoundedCapacity = 10000; // ÏÞÖÆ¶ÓÁÐ
_writer = new StreamWriter(filePath, true) { AutoFlush = false };
}
protected override async Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken)
{
foreach (var ctx in contexts)
{
if (ctx.Message is String log)
{
await _writer.WriteLineAsync(log);
}
}
await _writer.FlushAsync();
}
protected override void Dispose(Boolean disposing)
{
base.Dispose(disposing);
_writer?.Dispose();
}
}
// ʹÓÃ
var logger = new LogActor("app.log");
logger.Tell($"[{DateTime.Now:HH:mm:ss}] Ó¦ÓÃÆô¶¯");
logger.Tell($"[{DateTime.Now:HH:mm:ss}] ´¦ÀíÇëÇó");
```
### 2. ÏûÏ¢´¦ÀíÆ÷
```csharp
public class MessageProcessor : Actor
{
private readonly IMessageHandler _handler;
public MessageProcessor(IMessageHandler handler)
{
Name = "MessageProcessor";
_handler = handler;
}
protected override async Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
{
if (context.Message is Message msg)
{
try
{
await _handler.HandleAsync(msg, cancellationToken);
// »Ø¸´·¢ËÍÕß
context.Sender?.Tell(new Ack { MessageId = msg.Id });
}
catch (Exception ex)
{
context.Sender?.Tell(new Error { MessageId = msg.Id, Exception = ex });
}
}
}
}
```
### 3. Êý¾Ý¾ÛºÏÆ÷
```csharp
public class DataAggregator : Actor
{
private readonly Dictionary<String, Int32> _counts = new();
private DateTime _lastFlush = DateTime.Now;
public DataAggregator()
{
Name = "DataAggregator";
BatchSize = 50;
}
protected override Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken)
{
foreach (var ctx in contexts)
{
if (ctx.Message is String key)
{
_counts.TryGetValue(key, out var count);
_counts[key] = count + 1;
}
}
// ÿ·ÖÖÓÊä³öÒ»´Îͳ¼Æ
if ((DateTime.Now - _lastFlush).TotalMinutes >= 1)
{
foreach (var kv in _counts)
{
Console.WriteLine($"{kv.Key}: {kv.Value}");
}
_counts.Clear();
_lastFlush = DateTime.Now;
}
return Task.CompletedTask;
}
}
```
### 4. Actor Ö®¼äͨÐÅ
```csharp
public class PingActor : Actor
{
protected override Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
{
if (context.Message is String msg && msg == "Ping")
{
Console.WriteLine("PingActor ÊÕµ½ Ping£¬·¢ËÍ Pong");
context.Sender?.Tell("Pong", this);
}
return Task.CompletedTask;
}
}
public class PongActor : Actor
{
protected override Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
{
if (context.Message is String msg && msg == "Pong")
{
Console.WriteLine("PongActor ÊÕµ½ Pong");
}
return Task.CompletedTask;
}
}
// ʹÓÃ
var ping = new PingActor();
var pong = new PongActor();
// pong ·¢ËÍ Ping ¸ø ping£¬ping »á»Ø¸´ Pong
ping.Tell("Ping", pong);
```
### 5. ÏÞÁ÷´¦ÀíÆ÷
```csharp
public class RateLimitedActor : Actor
{
private readonly SemaphoreSlim _semaphore;
private readonly Int32 _maxConcurrency;
public RateLimitedActor(Int32 maxConcurrency = 10)
{
_maxConcurrency = maxConcurrency;
_semaphore = new SemaphoreSlim(maxConcurrency);
BatchSize = maxConcurrency;
}
protected override async Task ReceiveAsync(ActorContext[] contexts, CancellationToken cancellationToken)
{
var tasks = new List<Task>();
foreach (var ctx in contexts)
{
await _semaphore.WaitAsync(cancellationToken);
tasks.Add(Task.Run(async () =>
{
try
{
await ProcessAsync(ctx.Message, cancellationToken);
}
finally
{
_semaphore.Release();
}
}, cancellationToken));
}
await Task.WhenAll(tasks);
}
private async Task ProcessAsync(Object? message, CancellationToken cancellationToken)
{
// ´¦ÀíÂß¼
await Task.Delay(100, cancellationToken);
}
}
```
## ×î¼Ñʵ¼ù
### 1. ºÏÀíÉèÖÃÅú´óС
```csharp
// IOÃܼ¯ÐÍ£º½Ï´óÅú´Î
var ioActor = new IoActor { BatchSize = 100 };
// CPUÃܼ¯ÐÍ£º½ÏСÅú´Î
var cpuActor = new CpuActor { BatchSize = 10 };
// ʵʱÐÔÒªÇó¸ß£ºµ¥Ìõ´¦Àí
var realtimeActor = new RealtimeActor { BatchSize = 1 };
```
### 2. ÉèÖöÓÁÐÈÝÁ¿
```csharp
// ·ÀÖ¹ÄÚ´æÒç³ö
var actor = new MyActor
{
BoundedCapacity = 10000 // ×î¶à¶Ñ»ý1ÍòÌõÏûÏ¢
};
// ¼ì²é¶ÓÁ㤶È
if (actor.QueueLength > 5000)
{
Console.WriteLine("¾¯¸æ£ºÏûÏ¢»ýѹ");
}
```
### 3. ÓÅÑÅÍ£Ö¹
```csharp
// Í£Ö¹½ÓÊÕÐÂÏûÏ¢£¬µÈ´ýÏÖÓÐÏûÏ¢´¦ÀíÍê³É
var completed = actor.Stop(30_000); // ×î¶àµÈ30Ãë
if (!completed)
{
Console.WriteLine($"ÓÐ {actor.QueueLength} ÌõÏûϢδ´¦Àí");
}
```
### 4. Òì³£´¦Àí
```csharp
public class SafeActor : Actor
{
protected override async Task ReceiveAsync(ActorContext context, CancellationToken cancellationToken)
{
try
{
await ProcessAsync(context.Message);
}
catch (Exception ex)
{
// ¼Ç¼ÈÕÖ¾£¬²»Å׳öÒì³£
XTrace.WriteException(ex);
// ¿ÉÑ¡£º·¢Ë͵½ËÀÐŶÓÁÐ
DeadLetterActor?.Tell(new DeadLetter
{
Message = context.Message,
Exception = ex
});
}
}
}
```
### 5. ÐÔÄÜ×·×Ù
```csharp
var actor = new MyActor
{
Tracer = new DefaultTracer() // »òʹÓÃÐdz¾×·×Ù
};
// ×·×ÙÐÅÏ¢»á×Ô¶¯¼Ç¼£º
// - actor:Start
// - actor:Loop
// - actor:Stop
```
## ÓëÆäËû²¢·¢Ä£ÐͶԱÈ
| ÌØÐÔ | Actor | Task/async | Ëø |
|------|-------|------------|-----|
| Ḭ̈߳²È« | ÌìÈ»°²È« | ÐèҪעÒâ | ÐèÒªÏÔʽ¼ÓËø |
| ±à³Ì¸´ÔÓ¶È | ÖÐµÈ | µÍ | ¸ß |
| ÊÊÓó¡¾° | IOÃܼ¯ | ͨÓà | ¹²Ïí״̬ |
| ±³Ñ¹´¦Àí | Ö§³Ö | ²»Ö§³Ö | ²»Ö§³Ö |
| ÏûϢ˳Ðò | ±£Ö¤ | ²»±£Ö¤ | ²»ÊÊÓà |
## Ïà¹ØÁ´½Ó
- [¸ß¼¶¶¨Ê±Æ÷ TimerX](/NewLife/X/Blob/master/Doc/timerx-¸ß¼¶¶¨Ê±Æ÷TimerX.md)
- [ÇáÁ¿¼¶Ó¦ÓÃÖ÷»ú Host](/NewLife/X/Blob/master/Doc/host-ÇáÁ¿¼¶Ó¦ÓÃÖ÷»úHost.md)
- [Á´Â·×·×Ù ITracer](/NewLife/X/Blob/master/Doc/tracer-Á´Â·×·×ÙITracer.md)
|