SuperSocket服务端默认是以 "\r\n" (回车换行0x0D0A)作为终止符(TerminatorReceiveFilter),接收默认最多处理1024个字节【DefaultMaxRequestLength=1024】。如果一次发送超过1024的字节【发送数据中无 "\r\n"】,将抛出ProtocolError异常,并将非法的客户端连接关闭。
接收的结果StringRequestInfo对象由Key和Body通过空格字符拼接而成。
如果Body为空,则接收的实际结果就是 Key。
如果Body不为空,则接收的实际结果是 Key+" "+Body。
新建窗体应用程序TestServer,重命名窗体名为FormServer。窗体设计如下:
添加对类库。SuperSocket.Common,SuperSocket.SocketBase,SuperSocket.SocketEngine,Log4Net 四个类库的引用。
using SuperSocket.SocketBase; using SuperSocket.SocketBase.Config; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;
namespace TestServer { public partial class FormServer : Form { AppServer appServer = new AppServer(); public FormServer() { InitializeComponent(); }
private void FormServer_Load(object sender, EventArgs e) { ServerConfig serverConfig = new ServerConfig() { Ip = "127.0.0.1", Port = int.Parse(txtPort.Text), TextEncoding = "GBK", MaxConnectionNumber = ServerConfig.DefaultMaxConnectionNumber, Mode = SocketMode.Tcp, MaxRequestLength = ServerConfig.DefaultMaxRequestLength, KeepAliveTime = ServerConfig.DefaultKeepAliveTime, KeepAliveInterval = ServerConfig.DefaultKeepAliveInterval, ListenBacklog = ServerConfig.DefaultListenBacklog, ReceiveBufferSize = ServerConfig.DefaultReceiveBufferSize, SendTimeOut = ServerConfig.DefaultSendTimeout };
if (!appServer.Setup(serverConfig)) { MessageBox.Show("开启监听端口失败"); return; }
if (!appServer.Start()) { MessageBox.Show("服务器开启监听失败"); return; }
DisplayContent(string.Format("服务器启动监听成功,服务器IP:{0},端口:{1}...", appServer.Config.Ip, appServer.Config.Port));
//客户端连接事件 appServer.NewSessionConnected += AppServer_NewSessionConnected; //接收事件 appServer.NewRequestReceived += AppServer_NewRequestReceived; //客户端已关闭事件 appServer.SessionClosed += AppServer_SessionClosed; }
private void AppServer_SessionClosed(AppSession session, SuperSocket.SocketBase.CloseReason value) { string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port); DisplayContent(string.Format("客户端已关闭:{0},端口:{1},原因:{2}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port, value)); }
private void AppServer_NewRequestReceived(AppSession session, SuperSocket.SocketBase.Protocol.StringRequestInfo requestInfo) { string body = requestInfo.Body; string charSet = session.Charset.BodyName; DateTime dt = session.LastActiveTime; string key = requestInfo.Key;//session.CurrentCommand; string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port); StringBuilder sb = new StringBuilder(sessionIp + ":\n接收内容:"); string content = key; if (body.Length > 0) { content = key + (" " + body); } sb.Append(content); sb.Append(" \n发送时间:" + dt.ToString("yyyy-MM-dd HH:mm:ss")); sb.Append(" \n字符编码:" + charSet); DisplayContent(sb.ToString()); }
private void AppServer_NewSessionConnected(AppSession session) { string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port); DisplayContent(string.Format("客户端已连接:{0},虚拟端口:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port)); }
/// <summary> /// 异步显示内容 /// </summary> /// <param name="addContent"></param> private void DisplayContent(string addContent) { if (this.InvokeRequired) { Action<string> actionUpd = DisplayContentEx; actionUpd.BeginInvoke(addContent, null, null); } else { DisplayContentEx(addContent); } }
public void DisplayContentEx(string addContent) { this.Invoke(new MethodInvoker(() => { if (rtxtDisplay.TextLength >= 10240) { rtxtDisplay.Clear(); } rtxtDisplay.AppendText(addContent + "\n"); rtxtDisplay.ScrollToCaret(); })); } } }
最终的本质就是 实例化类 new AsyncSocketServer(appServer, listeners)
1.实例化类 SuperSocket.SocketEngine.SocketServerFactory.
如果不指定编码格式,将默认为ASCII
if (socketServerFactory == null) { var socketServerFactoryType = Type.GetType("SuperSocket.SocketEngine.SocketServerFactory, SuperSocket.SocketEngine", true);
socketServerFactory = (ISocketServerFactory)Activator.CreateInstance(socketServerFactoryType); }
m_SocketServerFactory = socketServerFactory;
//Read text encoding from the configuration if (!string.IsNullOrEmpty(config.TextEncoding)) TextEncoding = Encoding.GetEncoding(config.TextEncoding); else TextEncoding = new ASCIIEncoding();
2.设置监听配置:设置服务端的IP和端口
if (config.Port > 0) { listeners.Add(new ListenerInfo { EndPoint = new IPEndPoint(ParseIPAddress(config.Ip), config.Port), BackLog = config.ListenBacklog, Security = BasicSecurity }); }
3.设置接收过滤器:
return new CommandLineReceiveFilterFactory(TextEncoding);
=>new CommandLineReceiveFilterFactory(encoding, new BasicRequestInfoParser());
=>new TerminatorReceiveFilterFactory("\r\n",encoding, new BasicRequestInfoParser());
//注意:这里指定 终止符为 回车换行。
4.创建socket服务,最终目的就是实例化类:AsyncSocketServer
CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)
=> case(SocketMode.Tcp): return new AsyncSocketServer(appServer, listeners);
1.开始实例化【最大连接数】个异步socket代理对象
=>AsyncSocketServer 对象m_SocketServer.Start()
实例化最大连接数【MaxConnectionNumber】个异步socket代理对象:SocketAsyncEventArgsProxy
new SocketAsyncEventArgsProxy(SocketAsyncEventArgs socketEventArgs,true);
并订阅 【完成异步操作】的事件 Completed
public SocketAsyncEventArgsProxy(SocketAsyncEventArgs socketEventArgs, bool isRecyclable) { SocketEventArgs = socketEventArgs; OrigOffset = socketEventArgs.Offset; SocketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed); IsRecyclable = isRecyclable; }
static void SocketEventArgs_Completed(object sender, SocketAsyncEventArgs e) { var socketSession = e.UserToken as IAsyncSocketSession;
if (socketSession == null) return;
if (e.LastOperation == SocketAsyncOperation.Receive) { socketSession.AsyncRun(() => socketSession.ProcessReceive(e)); } else { throw new ArgumentException("The last operation completed on the socket was not a receive"); } }
2.执行父类的启动监听:抽象类SocketServerBase的Start()方法
内部类 TcpAsyncSocketListener的Start()方法
var listener => return new TcpAsyncSocketListener(listenerInfo);
var listener = CreateListener(ListenerInfos[i]); listener.Error += new ErrorHandler(OnListenerError); listener.Stopped += new EventHandler(OnListenerStopped);
//当接受Accept一个客户端时触发 listener.NewClientAccepted += new NewClientAcceptHandler(OnNewClientAccepted);
3.执行异步监听类TcpAsyncSocketListener的Start(Config)方法,这才是真正的启用监听。
public override bool Start(IServerConfig config) { m_ListenSocket = new Socket(this.Info.EndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
try { m_ListenSocket.Bind(this.Info.EndPoint); m_ListenSocket.Listen(m_ListenBackLog);
m_ListenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); m_ListenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);
SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs(); m_AcceptSAE = acceptEventArg; acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(acceptEventArg_Completed);
if (!m_ListenSocket.AcceptAsync(acceptEventArg)) ProcessAccept(acceptEventArg);
return true;
} catch (Exception e) { OnError(e); return false; } }
4.等待一个新的客户端连接:
开始一个异步操作以接受传入的连接尝试。
m_ListenSocket.AcceptAsync(acceptEventArg)
TcpAsyncSocketListener类的 void ProcessAccept(SocketAsyncEventArgs e)
生成一个用于发送和接收的Socket对象
socket = e.AcceptSocket;
//本次客户端已连接,同时开启:等待下一个新的客户端连接
e.AcceptSocket = null;
bool willRaiseEvent = false;
try { willRaiseEvent = m_ListenSocket.AcceptAsync(e);//引发接受下一个新的客户端连接尝试 }
if (socket != null) OnNewClientAccepted(socket, null);
自动触发事件:AsyncSocketServer类的
protected override void OnNewClientAccepted(ISocketListener listener, Socket client, object state)
即执行private IAppSession ProcessNewClient(Socket client, SslProtocols security)
相关代码:
if (security == SslProtocols.None) socketSession = new AsyncSocketSession(client, socketEventArgsProxy); else socketSession = new AsyncStreamSocketSession(client, security, socketEventArgsProxy);
创建Session用于发送和接收
protected IAppSession CreateSession(Socket client, ISocketSession session)
实例化并初始化 new AppSession()并Initialize()
public virtual void Initialize(IAppServer<TAppSession, TRequestInfo> appServer, ISocketSession socketSession) { var castedAppServer = (AppServerBase<TAppSession, TRequestInfo>)appServer; AppServer = castedAppServer; Charset = castedAppServer.TextEncoding; SocketSession = socketSession; SessionID = socketSession.SessionID; m_Connected = true; m_ReceiveFilter = castedAppServer.ReceiveFilterFactory.CreateFilter(appServer, this, socketSession.RemoteEndPoint);
var filterInitializer = m_ReceiveFilter as IReceiveFilterInitializer; if (filterInitializer != null) filterInitializer.Initialize(castedAppServer, this);
socketSession.Initialize(this);
OnInit(); }
为 异步会话.AsyncSocketSession绑定关闭事件 socketSession.Closed += SessionClosed;
然后异步AsyncSocketServer的注册会话方法。RegisterSession(IAppSession appSession)
然后AppServerBase的RegisterSession
触发新的会话连接事件AppServerBase的 OnNewSessionConnected(appSession);
NewSessionConnected.BeginInvoke(session, OnNewSessionConnectedCallback, handler);
然后 开始执行任务:
AppServer.AsyncRun(() => socketSession.Start());
即 Task.Factory.StartNew(task, taskOption).ContinueWith(...)
启动的任务 即 AsyncSocketSession的Start()方法
public override void Start() { StartReceive(SocketAsyncProxy.SocketEventArgs);
if (!m_IsReset) StartSession(); }
AsyncSocketSession的StartReceive方法
private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)
有个关键的异步接收 :
willRaiseEvent = Client.ReceiveAsync(e);
本质是为事件NewRequestReceived的参数StringRequestInfo对象赋值后,触发事件NewRequestReceived。
如果超过了最大请求字节数,服务端将这个的客户端连接关闭,并提示协议错误【ProtocolError】,此时并不会触发事件NewRequestReceived
1.当有新的发送文本到达服务端时,会触发 SocketAsyncEventArgsProxy的完成事件
SocketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);
也就是任务:处理接收 ProcessReceive:
if (e.LastOperation == SocketAsyncOperation.Receive) { socketSession.AsyncRun(() => socketSession.ProcessReceive(e)); }
AsyncSocketSession的ProcessReceive()f方法
public void ProcessReceive(SocketAsyncEventArgs e)
2,执行方法 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);
其中AppSession的处理请求方法ProcessRequest
3.执行过滤请求方法:FilterRequest
SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter类的Filter方法
BasicRequestInfoParser类的ParseRequestInfo方法返回StringRequestInfo对象
AppSession的FilterRequest 里面判断 如果请求的字节数 超过 最大请求字节数,将关闭客户端连接,返回ProtocolError
if (currentRequestLength >= maxRequestLength) { if (Logger.IsErrorEnabled) Logger.Error(this, string.Format("Max request length: {0}, current processed length: {1}", maxRequestLength, currentRequestLength));
Close(CloseReason.ProtocolError); return null; }
如果请求字节数 小于 最大请求字节数,将执行 AppServer.ExecuteCommand(this, requestInfo);
也就是触发事件 NewRequestReceived。
最后执行异步等待读取下一次数据块,新的异步接收开始了
//read the next block of data sent from the client StartReceive(e, offsetDelta);