SuperSocket服务端源代码逻辑解析

it2025-01-17  2

SuperSocket服务端默认是以 "\r\n" (回车换行0x0D0A)作为终止符(TerminatorReceiveFilter),接收默认最多处理1024个字节【DefaultMaxRequestLength=1024】。如果一次发送超过1024的字节【发送数据中无 "\r\n"】,将抛出ProtocolError异常,并将非法的客户端连接关闭。

接收的结果StringRequestInfo对象由Key和Body通过空格字符拼接而成。

如果Body为空,则接收的实际结果就是 Key。

如果Body不为空,则接收的实际结果是 Key+" "+Body。

新建窗体应用程序TestServer,重命名窗体名为FormServer。窗体设计如下:

添加对类库。SuperSocket.Common,SuperSocket.SocketBase,SuperSocket.SocketEngine,Log4Net 四个类库的引用。

测试源程序:

using SuperSocket.SocketBase; using SuperSocket.SocketBase.Config; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Windows.Forms;

namespace TestServer {     public partial class FormServer : Form     {         AppServer appServer = new AppServer();         public FormServer()         {             InitializeComponent();         }

        private void FormServer_Load(object sender, EventArgs e)         {             ServerConfig serverConfig = new ServerConfig()             {                 Ip = "127.0.0.1",                 Port = int.Parse(txtPort.Text),                 TextEncoding = "GBK",                 MaxConnectionNumber = ServerConfig.DefaultMaxConnectionNumber,                 Mode = SocketMode.Tcp,                 MaxRequestLength = ServerConfig.DefaultMaxRequestLength,                 KeepAliveTime = ServerConfig.DefaultKeepAliveTime,                 KeepAliveInterval = ServerConfig.DefaultKeepAliveInterval,                 ListenBacklog = ServerConfig.DefaultListenBacklog,                 ReceiveBufferSize = ServerConfig.DefaultReceiveBufferSize,                 SendTimeOut = ServerConfig.DefaultSendTimeout             };

            if (!appServer.Setup(serverConfig))             {                 MessageBox.Show("开启监听端口失败");                 return;             }

            if (!appServer.Start())             {                 MessageBox.Show("服务器开启监听失败");                 return;             }

            DisplayContent(string.Format("服务器启动监听成功,服务器IP:{0},端口:{1}...", appServer.Config.Ip, appServer.Config.Port));

            //客户端连接事件             appServer.NewSessionConnected += AppServer_NewSessionConnected;             //接收事件             appServer.NewRequestReceived += AppServer_NewRequestReceived;              //客户端已关闭事件             appServer.SessionClosed += AppServer_SessionClosed;          }

        private void AppServer_SessionClosed(AppSession session, SuperSocket.SocketBase.CloseReason value)         {             string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port);             DisplayContent(string.Format("客户端已关闭:{0},端口:{1},原因:{2}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port, value));         }

        private void AppServer_NewRequestReceived(AppSession session, SuperSocket.SocketBase.Protocol.StringRequestInfo requestInfo)         {             string body = requestInfo.Body;             string charSet = session.Charset.BodyName;             DateTime dt = session.LastActiveTime;             string key = requestInfo.Key;//session.CurrentCommand;             string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port);             StringBuilder sb = new StringBuilder(sessionIp + ":\n接收内容:");             string content = key;                         if (body.Length > 0)             {                 content = key + (" " + body);             }             sb.Append(content);             sb.Append("    \n发送时间:" + dt.ToString("yyyy-MM-dd HH:mm:ss"));             sb.Append("    \n字符编码:" + charSet);             DisplayContent(sb.ToString());         }

        private void AppServer_NewSessionConnected(AppSession session)         {             string sessionIp = string.Format("{0}:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port);             DisplayContent(string.Format("客户端已连接:{0},虚拟端口:{1}", session.RemoteEndPoint.Address, session.RemoteEndPoint.Port));         }

        /// <summary>         /// 异步显示内容         /// </summary>         /// <param name="addContent"></param>         private void DisplayContent(string addContent)         {             if (this.InvokeRequired)             {                 Action<string> actionUpd = DisplayContentEx;                 actionUpd.BeginInvoke(addContent, null, null);             }             else             {                 DisplayContentEx(addContent);             }         }

        public void DisplayContentEx(string addContent)         {             this.Invoke(new MethodInvoker(() =>             {                 if (rtxtDisplay.TextLength >= 10240)                 {                     rtxtDisplay.Clear();                 }                 rtxtDisplay.AppendText(addContent + "\n");                 rtxtDisplay.ScrollToCaret();             }));         }     } }

程序运行如图

【新开一个客户端,建立连接,并发送】

具体介绍下面两个方法AppServer.Setup() 与 AppServer.Start()

一、方法appServer.Setup(serverConfig),基本配置设置。

最终的本质就是 实例化类 new AsyncSocketServer(appServer, listeners)

1.实例化类 SuperSocket.SocketEngine.SocketServerFactory.

如果不指定编码格式,将默认为ASCII

         if (socketServerFactory == null)             {                 var socketServerFactoryType =                     Type.GetType("SuperSocket.SocketEngine.SocketServerFactory, SuperSocket.SocketEngine", true);

                socketServerFactory = (ISocketServerFactory)Activator.CreateInstance(socketServerFactoryType);             }

            m_SocketServerFactory = socketServerFactory;

            //Read text encoding from the configuration             if (!string.IsNullOrEmpty(config.TextEncoding))                 TextEncoding = Encoding.GetEncoding(config.TextEncoding);             else                 TextEncoding = new ASCIIEncoding();

2.设置监听配置:设置服务端的IP和端口

                if (config.Port > 0)                 {                     listeners.Add(new ListenerInfo                     {                         EndPoint = new IPEndPoint(ParseIPAddress(config.Ip), config.Port),                         BackLog = config.ListenBacklog,                         Security = BasicSecurity                     });                 }

3.设置接收过滤器:

return new CommandLineReceiveFilterFactory(TextEncoding);

=>new CommandLineReceiveFilterFactory(encoding, new BasicRequestInfoParser());

=>new TerminatorReceiveFilterFactory("\r\n",encoding, new BasicRequestInfoParser());

//注意:这里指定 终止符为 回车换行。

4.创建socket服务,最终目的就是实例化类:AsyncSocketServer

CreateSocketServer<TRequestInfo>(IAppServer appServer, ListenerInfo[] listeners, IServerConfig config)

=> case(SocketMode.Tcp):                     return new AsyncSocketServer(appServer, listeners);

 

二、方法:appServer.Start()。启动一个服务端实例并监听接受Accept客户端连接

1.开始实例化【最大连接数】个异步socket代理对象

=>AsyncSocketServer 对象m_SocketServer.Start()

实例化最大连接数【MaxConnectionNumber】个异步socket代理对象:SocketAsyncEventArgsProxy

new SocketAsyncEventArgsProxy(SocketAsyncEventArgs socketEventArgs,true);

并订阅 【完成异步操作】的事件 Completed

public SocketAsyncEventArgsProxy(SocketAsyncEventArgs socketEventArgs, bool isRecyclable)         {             SocketEventArgs = socketEventArgs;             OrigOffset = socketEventArgs.Offset;             SocketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);             IsRecyclable = isRecyclable;         }

        static void SocketEventArgs_Completed(object sender, SocketAsyncEventArgs e)         {             var socketSession = e.UserToken as IAsyncSocketSession;

            if (socketSession == null)                 return;

            if (e.LastOperation == SocketAsyncOperation.Receive)             {                 socketSession.AsyncRun(() => socketSession.ProcessReceive(e));             }             else             {                 throw new ArgumentException("The last operation completed on the socket was not a receive");             }         } 

 

2.执行父类的启动监听:抽象类SocketServerBase的Start()方法

内部类 TcpAsyncSocketListener的Start()方法

var listener => return new TcpAsyncSocketListener(listenerInfo);

                var listener = CreateListener(ListenerInfos[i]);                 listener.Error += new ErrorHandler(OnListenerError);                 listener.Stopped += new EventHandler(OnListenerStopped);

                //当接受Accept一个客户端时触发                 listener.NewClientAccepted += new NewClientAcceptHandler(OnNewClientAccepted);

3.执行异步监听类TcpAsyncSocketListener的Start(Config)方法,这才是真正的启用监听。

public override bool Start(IServerConfig config)         {             m_ListenSocket = new Socket(this.Info.EndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);

            try             {                 m_ListenSocket.Bind(this.Info.EndPoint);                 m_ListenSocket.Listen(m_ListenBackLog);

                m_ListenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);                 m_ListenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.DontLinger, true);

                SocketAsyncEventArgs acceptEventArg = new SocketAsyncEventArgs();                 m_AcceptSAE = acceptEventArg;                 acceptEventArg.Completed += new EventHandler<SocketAsyncEventArgs>(acceptEventArg_Completed);

                if (!m_ListenSocket.AcceptAsync(acceptEventArg))                     ProcessAccept(acceptEventArg);

                return true;

            }             catch (Exception e)             {                 OnError(e);                 return false;             }         }

4.等待一个新的客户端连接:

开始一个异步操作以接受传入的连接尝试。

m_ListenSocket.AcceptAsync(acceptEventArg)

三、当一个客户端连接成功时,会自动触发完成事件:

TcpAsyncSocketListener类的 void ProcessAccept(SocketAsyncEventArgs e)

生成一个用于发送和接收的Socket对象

socket = e.AcceptSocket;

           //本次客户端已连接,同时开启:等待下一个新的客户端连接

           e.AcceptSocket = null;

            bool willRaiseEvent = false;

            try             {                 willRaiseEvent = m_ListenSocket.AcceptAsync(e);//引发接受下一个新的客户端连接尝试             }

if (socket != null)                 OnNewClientAccepted(socket, null);

自动触发事件:AsyncSocketServer类的

protected override void OnNewClientAccepted(ISocketListener listener, Socket client, object state)

即执行private IAppSession ProcessNewClient(Socket client, SslProtocols security)

相关代码:

            if (security == SslProtocols.None)                 socketSession = new AsyncSocketSession(client, socketEventArgsProxy);             else                 socketSession = new AsyncStreamSocketSession(client, security, socketEventArgsProxy);

创建Session用于发送和接收

protected IAppSession CreateSession(Socket client, ISocketSession session)

实例化并初始化 new AppSession()并Initialize()

public virtual void Initialize(IAppServer<TAppSession, TRequestInfo> appServer, ISocketSession socketSession)         {             var castedAppServer = (AppServerBase<TAppSession, TRequestInfo>)appServer;             AppServer = castedAppServer;             Charset = castedAppServer.TextEncoding;             SocketSession = socketSession;             SessionID = socketSession.SessionID;             m_Connected = true;             m_ReceiveFilter = castedAppServer.ReceiveFilterFactory.CreateFilter(appServer, this, socketSession.RemoteEndPoint);

            var filterInitializer = m_ReceiveFilter as IReceiveFilterInitializer;             if (filterInitializer != null)                 filterInitializer.Initialize(castedAppServer, this);

            socketSession.Initialize(this);

            OnInit();         }

为 异步会话.AsyncSocketSession绑定关闭事件 socketSession.Closed += SessionClosed;

然后异步AsyncSocketServer的注册会话方法。RegisterSession(IAppSession appSession)

然后AppServerBase的RegisterSession

触发新的会话连接事件AppServerBase的 OnNewSessionConnected(appSession);

NewSessionConnected.BeginInvoke(session, OnNewSessionConnectedCallback, handler);

然后 开始执行任务:

AppServer.AsyncRun(() => socketSession.Start());

即 Task.Factory.StartNew(task, taskOption).ContinueWith(...)

启动的任务 即  AsyncSocketSession的Start()方法

public override void Start()         {             StartReceive(SocketAsyncProxy.SocketEventArgs);

            if (!m_IsReset)                 StartSession();         }

AsyncSocketSession的StartReceive方法

private void StartReceive(SocketAsyncEventArgs e, int offsetDelta)

有个关键的异步接收 :

willRaiseEvent = Client.ReceiveAsync(e);

四、当客户端向服务端发送消息时,触发完成事件

本质是为事件NewRequestReceived的参数StringRequestInfo对象赋值后,触发事件NewRequestReceived。

如果超过了最大请求字节数,服务端将这个的客户端连接关闭,并提示协议错误【ProtocolError】,此时并不会触发事件NewRequestReceived

1.当有新的发送文本到达服务端时,会触发 SocketAsyncEventArgsProxy的完成事件

SocketEventArgs.Completed += new EventHandler<SocketAsyncEventArgs>(SocketEventArgs_Completed);

也就是任务:处理接收 ProcessReceive:

           if (e.LastOperation == SocketAsyncOperation.Receive)             {                 socketSession.AsyncRun(() => socketSession.ProcessReceive(e));             }

AsyncSocketSession的ProcessReceive()f方法

public void ProcessReceive(SocketAsyncEventArgs e)

2,执行方法 offsetDelta = this.AppSession.ProcessRequest(e.Buffer, e.Offset, e.BytesTransferred, true);

            其中AppSession的处理请求方法ProcessRequest

3.执行过滤请求方法:FilterRequest

             SuperSocket.SocketBase.Protocol.TerminatorReceiveFilter类的Filter方法

            BasicRequestInfoParser类的ParseRequestInfo方法返回StringRequestInfo对象

           AppSession的FilterRequest 里面判断 如果请求的字节数 超过 最大请求字节数,将关闭客户端连接,返回ProtocolError

           if (currentRequestLength >= maxRequestLength)             {                 if (Logger.IsErrorEnabled)                     Logger.Error(this, string.Format("Max request length: {0}, current processed length: {1}", maxRequestLength, currentRequestLength));

                Close(CloseReason.ProtocolError);                 return null;             }

如果请求字节数 小于 最大请求字节数,将执行 AppServer.ExecuteCommand(this, requestInfo);

也就是触发事件 NewRequestReceived。

 

最后执行异步等待读取下一次数据块,新的异步接收开始了

            //read the next block of data sent from the client             StartReceive(e, offsetDelta);

 

 

 

最新回复(0)