using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading;
using NewLife.Log;
using NewLife.Serialization;
namespace NewLife.Messaging
{
/// <summary>消息提供者接口</summary>
/// <remarks>
/// 同步结构使用<see cref="SendAndReceive"/>;
/// 异步结构使用<see cref="Send"/>和<see cref="OnReceived"/>;
/// 异步结构中也可以使用<see cref="SendAndReceive"/>,但是因为通过事件量完成,会极为不稳定。
///
/// 如果只需要操作某个通道的消息,可通过<see cref="M:IMessageConsumer Register(Byte channel)"/>实现。
///
/// <see cref="SendAndReceive"/>适合客户端的大多数情况,比如同步Http、同步Tcp。
/// 如果内部实现是异步模型,则等待指定时间获取异步返回的第一条消息,该消息不再触发消息到达事件<see cref="OnReceived"/>。
/// </remarks>
public interface IMessageProvider
{
/// <summary>最大消息大小,超过该大小将分包发送。0表示不限制。</summary>
Int32 MaxMessageSize { get; set; }
/// <summary>是否自动组合<see cref="GroupMessage"/>消息。</summary>
Boolean AutoJoinGroup { get; set; }
/// <summary>组合组消息超时时间,毫秒。对于<see cref="GroupMessage"/>,如果后续包不能在当前时间之内到达,则认为超时,放弃该组。</summary>
Int32 JoinGroupTimeout { get; set; }
/// <summary>发送并接收消息。主要用于应答式的请求和响应。该方法的实现不是线程安全的,使用时一定要注意。</summary>
/// <remarks>如果内部实现是异步模型,则等待指定时间获取异步返回的第一条消息,该消息不再触发消息到达事件<see cref="OnReceived"/>。</remarks>
/// <param name="message"></param>
/// <param name="millisecondsTimeout">等待的毫秒数,或为 <see cref="F:System.Threading.Timeout.Infinite" /> (-1),表示无限期等待。默认0表示不等待</param>
/// <returns></returns>
Message SendAndReceive(Message message, Int32 millisecondsTimeout = 0);
/// <summary>发送消息。如果有响应,可在消息到达事件<see cref="OnReceived"/>中获得。</summary>
/// <param name="message"></param>
void Send(Message message);
/// <summary>消息到达时触发</summary>
event EventHandler<MessageEventArgs> OnReceived;
/// <summary>注册消息消费者,仅消费指定范围的消息</summary>
/// <param name="start">消息范围的起始</param>
/// <param name="end">消息范围的结束</param>
/// <returns>消息消费者</returns>
[Obsolete("请采用消息消费者接口IMessageConsumer Register(Byte channel)!")]
IMessageProvider Register(MessageKind start, MessageKind end);
/// <summary>注册消息消费者,仅消费指定范围的消息</summary>
/// <param name="kinds">消息类型的集合</param>
/// <returns>消息消费者</returns>
[Obsolete("请采用消息消费者接口IMessageConsumer Register(Byte channel)!")]
IMessageProvider Register(MessageKind[] kinds);
/// <summary>注册消息消费者,仅消费指定通道的消息</summary>
/// <param name="channel">通道</param>
/// <returns>消息消费者</returns>
IMessageConsumer Register(Byte channel);
}
/// <summary>消息消费接口</summary>
public interface IMessageConsumer
{
/// <summary>发送并接收消息。主要用于应答式的请求和响应。该方法的实现不是线程安全的,使用时一定要注意。</summary>
/// <remarks>如果内部实现是异步模型,则等待指定时间获取异步返回的第一条消息,该消息不再触发消息到达事件<see cref="OnReceived"/>。</remarks>
/// <param name="message"></param>
/// <param name="millisecondsTimeout">等待的毫秒数,或为 <see cref="F:System.Threading.Timeout.Infinite" /> (-1),表示无限期等待。默认0表示不等待</param>
/// <returns></returns>
Message SendAndReceive(Message message, Int32 millisecondsTimeout = 0);
/// <summary>发送消息。如果有响应,可在消息到达事件<see cref="OnReceived"/>中获得。</summary>
/// <param name="message"></param>
void Send(Message message);
/// <summary>消息到达时触发</summary>
event EventHandler<MessageEventArgs> OnReceived;
/// <summary>通道</summary>
Byte Channel { get; }
/// <summary>消息提供者</summary>
IMessageProvider Provider { get; }
}
/// <summary>消息事件参数</summary>
public class MessageEventArgs : EventArgs
{
private Message _Message;
/// <summary>消息</summary>
public Message Message { get { return _Message; } set { _Message = value; } }
/// <summary>实例化</summary>
/// <param name="message"></param>
public MessageEventArgs(Message message) { Message = message; }
}
interface IMessageProvider2 : IMessageProvider
{
/// <summary>收到消息时调用该方法</summary>
/// <param name="message">消息</param>
/// <param name="remoteIdentity">远程标识</param>
void Process(Message message, Object remoteIdentity = null);
}
/// <summary>消息提供者基类</summary>
public abstract class MessageProvider : DisposeBase, IMessageProvider2
{
#region 属性
private Int32 _MaxMessageSize;
/// <summary>最大消息大小,超过该大小将分包发送。0表示不限制。</summary>
public Int32 MaxMessageSize { get { return _MaxMessageSize; } set { _MaxMessageSize = value; } }
private Boolean _AutoJoinGroup;
/// <summary>是否自动组合<see cref="GroupMessage"/>消息。</summary>
public Boolean AutoJoinGroup { get { return _AutoJoinGroup; } set { _AutoJoinGroup = value; } }
private Int32 _JoinGroupTimeout;
/// <summary>组合组消息超时时间,毫秒。对于<see cref="GroupMessage"/>,如果后续包不能在当前时间之内到达,则认为超时,放弃该组。</summary>
public Int32 JoinGroupTimeout { get { return _JoinGroupTimeout; } set { _JoinGroupTimeout = value; } }
private IMessageProvider _Parent;
/// <summary>消息提供者</summary>
public IMessageProvider Parent { get { return _Parent; } set { _Parent = value; } }
private MessageKind[] _Kinds;
/// <summary>响应的消息类型集合</summary>
public MessageKind[] Kinds { get { return _Kinds; } set { _Kinds = value; } }
#endregion
#region 基本收发
/// <summary>发送消息。如果有响应,可在消息到达事件<see cref="OnReceived"/>中获得。这里会实现大消息分包。</summary>
/// <param name="message"></param>
public virtual void Send(Message message)
{
var ms = message.GetStream();
WriteLog("发送消息 [{0}] {1}", ms.Length, message);
if (MaxMessageSize <= 0 || ms.Length < MaxMessageSize)
OnSend(ms);
else
{
var mg = new MessageGroup();
//mg.Split(ms, MaxMessageSize, message.Header);
var count = 0;
foreach (var item in mg)
{
if (item.Index == 1) count = item.Count;
ms = item.GetStream();
WriteLog("发送分组 Identity={0} {1}/{2} [{3}] [{4}]", item.Identity, item.Index, count, item.Data == null ? 0 : item.Data.Length, ms.Length);
Debug.Assert(item.Index == count || ms.Length == MaxMessageSize, "分拆的组消息大小不合适!");
OnSend(ms);
}
}
}
/// <summary>发送数据流。</summary>
/// <param name="stream"></param>
protected abstract void OnSend(Stream stream);
/// <summary>收到消息时调用该方法</summary>
/// <param name="stream">数据流</param>
/// <param name="state">用户状态</param>
/// <param name="remoteIdentity">远程标识</param>
public virtual void Process(Stream stream, Object state, Object remoteIdentity = null)
{
var len = 0L;
// 如果大小大于一个数据包大小,就认为这有一个完整的数据包
while ((len = stream.Length - stream.Position) > 0)
{
// 只有数据流大小小于包大小时,才忽略异常
var msg = Message.Read(stream, RWKinds.Binary, MaxMessageSize > 0 && len < MaxMessageSize);
// 如果返回空,表示不是完整的消息
if (msg == null)
{
//XTrace.WriteLine("数据流中无法读取消息 {0},{1}=>{2}", stream.Position, stream.Length, stream.Length - stream.Position);
break;
}
if (msg is CompressionMessage) msg = (msg as CompressionMessage).Message;
msg.UserState = state;
Process(msg, remoteIdentity);
}
}
/// <summary>收到消息时调用该方法</summary>
/// <param name="message">消息</param>
/// <param name="remoteIdentity">远程标识</param>
public virtual void Process(Message message, Object remoteIdentity = null)
{
if (message == null) return;
// 检查消息范围
if (Kinds != null && Array.IndexOf<MessageKind>(Kinds, message.Kind) < 0) return;
if (message.Kind == MessageKind.Group && AutoJoinGroup)
{
message = JoinGroup(message as GroupMessage);
// 如果为空,表明还没完成组合,直接返回
if (message == null) return;
}
WriteLog("接收消息 {0}", message);
// 为Receive准备的事件,只用一次
EventHandler<MessageEventArgs> handler;
do
{
handler = innerOnReceived;
}
while (handler != null && Interlocked.CompareExchange<EventHandler<MessageEventArgs>>(ref innerOnReceived, null, handler) != handler);
if (handler != null) handler(this, new MessageEventArgs(message));
if (OnReceived != null) OnReceived(this, new MessageEventArgs(message));
// 记录已过期的,要删除
var list = new List<WeakReference<IMessageProvider2>>();
var cs = Consumers;
foreach (var item in cs)
{
IMessageProvider2 mp;
if (item.TryGetTarget(out mp) && mp != null)
mp.Process(message);
else
list.Add(item);
}
if (list.Count > 0)
{
lock (cs)
{
foreach (var item in list)
{
if (cs.Contains(item)) cs.Remove(item);
}
}
}
// 记录已过期的,要删除
var list2 = new List<WeakReference<MessageConsumer2>>();
var cs2 = Consumers2;
foreach (var item in cs2)
{
MessageConsumer2 mp;
if (item.TryGetTarget(out mp) && mp != null)
mp.Process(message);
else
list2.Add(item);
}
if (list2.Count > 0)
{
lock (cs2)
{
foreach (var item in list2)
{
if (cs2.Contains(item)) cs2.Remove(item);
}
}
}
}
private Dictionary<String, MessageGroup> groups = new Dictionary<String, MessageGroup>();
/// <summary>组合组消息</summary>
/// <param name="message">消息</param>
/// <param name="remoteIdentity">远程标识</param>
/// <returns></returns>
protected virtual Message JoinGroup(GroupMessage message, Object remoteIdentity = null)
{
var key = String.Format("{0}#{1}", remoteIdentity, message.Identity);
MessageGroup mg = null;
if (!groups.TryGetValue(key, out mg))
{
mg = new MessageGroup();
mg.Identity = message.Identity;
groups.Add(key, mg);
}
// 加入到组,如果返回false,表示未收到所有消息
if (!mg.Add(message))
{
WriteLog("接收分组 Identity={0} {1}/{2} [{3}] 已完成:{4}/{5}", message.Identity, message.Index, message.Count, message.Data == null ? 0 : message.Data.Length, mg.Count, mg.Total);
return null;
}
WriteLog("接收分组 Identity={0} {1}/{2} [{3}] 已完成:{4}/{5}", message.Identity, message.Index, message.Count, message.Data == null ? 0 : message.Data.Length, mg.Count, mg.Total);
// 否则,表示收到所有消息
groups.Remove(key);
// 读取真正的消息
return mg.GetMessage();
}
/// <summary>消息到达时触发。这里将得到所有消息</summary>
public event EventHandler<MessageEventArgs> OnReceived;
event EventHandler<MessageEventArgs> innerOnReceived;
/// <summary>发送并接收消息。主要用于应答式的请求和响应。</summary>
/// <remarks>如果内部实现是异步模型,则等待指定时间获取异步返回的第一条消息,该消息不再触发消息到达事件<see cref="OnReceived"/>。</remarks>
/// <param name="message"></param>
/// <param name="millisecondsTimeout">等待的毫秒数,或为 <see cref="F:System.Threading.Timeout.Infinite" /> (-1),表示无限期等待。默认0表示不等待</param>
/// <returns></returns>
public virtual Message SendAndReceive(Message message, Int32 millisecondsTimeout = 0)
{
Send(message);
var _wait = new AutoResetEvent(true);
_wait.Reset();
Message msg = null;
innerOnReceived += (s, e) => { msg = e.Message; _wait.Set(); };
//if (!_wait.WaitOne(millisecondsTimeout, true)) return null;
_wait.WaitOne(millisecondsTimeout, false);
// 这里不能Close,否则上面Set的时候会抛出句柄被关闭的异常
//_wait.Close();
return msg;
}
#endregion
#region 新的消费者模型
private List<WeakReference<MessageConsumer2>> _Consumers2 = new List<WeakReference<MessageConsumer2>>();
/// <summary>消费者集合</summary>
private List<WeakReference<MessageConsumer2>> Consumers2 { get { return _Consumers2; } }
/// <summary>注册消息消费者,仅消费指定通道的消息</summary>
/// <param name="channel">通道</param>
/// <returns>消息消费者</returns>
public virtual IMessageConsumer Register(Byte channel)
{
if (channel <= 0 || channel >= 0x80) throw new ArgumentOutOfRangeException("channel", "通道必须在0到128之间!");
var mc = new MessageConsumer2();
mc.Channel = channel;
mc.Provider = this;
lock (Consumers2)
{
Consumers2.Add(mc);
}
mc.OnDisposed += (s, e) => { lock (Consumers2) { Consumers2.Remove(s as MessageConsumer2); } };
return mc;
}
class MessageConsumer2 : DisposeBase, IMessageConsumer
{
#region 属性
private Byte _Channel;
/// <summary>通道</summary>
public Byte Channel { get { return _Channel; } set { _Channel = value; } }
private IMessageProvider _Provider;
/// <summary>消息提供者</summary>
public IMessageProvider Provider { get { return _Provider; } set { _Provider = value; } }
#endregion
#region 基本收发
/// <summary>发送消息</summary>
/// <param name="message"></param>
public void Send(Message message)
{
//message.Header.Channel = Channel;
Provider.Send(message);
}
/// <summary>发送并接收消息。主要用于应答式的请求和响应。</summary>
/// <remarks>如果内部实现是异步模型,则等待指定时间获取异步返回的第一条消息,该消息不再触发消息到达事件<see cref="OnReceived"/>。</remarks>
/// <param name="message"></param>
/// <param name="millisecondsTimeout">等待的毫秒数,或为 <see cref="F:System.Threading.Timeout.Infinite" /> (-1),表示无限期等待。默认0表示不等待</param>
/// <returns></returns>
public virtual Message SendAndReceive(Message message, Int32 millisecondsTimeout = 0)
{
//message.Header.Channel = Channel;
return Provider.SendAndReceive(message);
}
/// <summary>收到消息时调用该方法</summary>
/// <param name="message"></param>
public void Process(Message message)
{
if (message == null) return;
//// 检查消息范围
//if (!message.Header.UseHeader || message.Header.Channel != Channel) return;
// 为Receive准备的事件,只用一次
EventHandler<MessageEventArgs> handler;
do
{
handler = innerOnReceived;
}
while (handler != null && Interlocked.CompareExchange<EventHandler<MessageEventArgs>>(ref innerOnReceived, null, handler) != handler);
if (handler != null) handler(this, new MessageEventArgs(message));
if (OnReceived != null) OnReceived(this, new MessageEventArgs(message));
}
/// <summary>消息到达时触发。这里将得到所有消息</summary>
public event EventHandler<MessageEventArgs> OnReceived;
event EventHandler<MessageEventArgs> innerOnReceived;
#endregion
}
#endregion
#region 注册消费者
/// <summary>注册消息消费者,仅消费指定范围的消息</summary>
/// <param name="start">消息范围的起始</param>
/// <param name="end">消息范围的结束</param>
/// <returns>消息消费者</returns>
[Obsolete("请采用消息消费者接口!")]
public virtual IMessageProvider Register(MessageKind start, MessageKind end)
{
if (start > end) throw new ArgumentOutOfRangeException("start", "起始不能大于结束!");
//return Register(Enumerable.Range(start, end - start + 1).Select(e => (MessageKind)e).ToArray());
var list = new List<MessageKind>();
for (MessageKind i = start; i <= end; i++)
{
list.Add(i);
}
return Register(list.ToArray());
}
/// <summary>注册消息消费者,仅消费指定范围的消息</summary>
/// <param name="kinds">消息类型的集合</param>
/// <returns>消息消费者</returns>
[Obsolete("请采用消息消费者接口!")]
public virtual IMessageProvider Register(MessageKind[] kinds)
{
if (kinds == null || kinds.Length < 1) throw new ArgumentNullException("kinds");
kinds = kinds.Distinct().OrderBy(e => e).ToArray();
if (kinds == null || kinds.Length < 1) throw new ArgumentNullException("kinds");
// 检查注册范围是否有效
var ks = Kinds;
if (ks != null)
{
foreach (var item in kinds)
{
if (Array.IndexOf<MessageKind>(ks, item) < 0) throw new ArgumentOutOfRangeException("kinds", "当前消息提供者不支持Kind=" + item + "的消息!");
}
}
var mc = new MessageConsumer() { Parent = this, Kinds = kinds };
lock (Consumers)
{
Consumers.Add(mc);
}
mc.OnDisposed += (s, e) => Consumers.Remove(s as MessageConsumer);
return mc;
}
private List<WeakReference<IMessageProvider2>> _Consumers = new List<WeakReference<IMessageProvider2>>();
/// <summary>消费者集合</summary>
private List<WeakReference<IMessageProvider2>> Consumers { get { return _Consumers; } }
#endregion
#region 消息消费者
class MessageConsumer : MessageProvider
{
/// <summary>发送消息</summary>
/// <param name="message"></param>
public override void Send(Message message) { Parent.Send(message); }
/// <summary>发送数据流。</summary>
/// <param name="stream"></param>
protected override void OnSend(Stream stream) { }
}
#endregion
#region 日志
[Conditional("DEBUG")]
static void WriteLog(String format, params Object[] args)
{
if (XTrace.Debug) XTrace.WriteLine(format, args);
}
#endregion
}
}
|