test/ws/ws.go
package ws import ( "bytes" "encoding/json" "errors" "fmt" "log" "time" "github.com/gofiber/websocket/v2" "github.com/xurwxj/viper" "go.etcd.io/bbolt" ) const ( // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 ) var ( newline = []byte{'\n'} space = []byte{' '} ) // Client is a middleman between the websocket connection and the hub. type Client struct { Hub *Hub // The websocket connection. Conn *websocket.Conn // Buffered channel of outbound messages. Send chan []byte } // readPump pumps messages from the websocket connection to the hub. // // The application runs readPump in a per-connection goroutine. The application // ensures that there is at most one reader on a connection by executing all // reads from this goroutine. func (c *Client) ReadPump() { defer func() { c.Hub.Unregister <- c c.Conn.Close() }() maxMessageSize := viper.GetInt64("ws.maxMessageSize") if maxMessageSize == 0 { maxMessageSize = 5120000000 } c.Conn.SetReadLimit(maxMessageSize) c.Conn.SetReadDeadline(time.Now().Add(pongWait)) c.Conn.SetPongHandler(func(string) error { c.Conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) for { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { log.Printf("ReadPump error: %v from %s", err, c.Hub.Name) } break } message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) c.Hub.Broadcast <- message } } // writePump pumps messages from the hub to the websocket connection. // // A goroutine running writePump is started for each connection. The // application ensures that there is at most one writer to a connection by // executing all writes from this goroutine. func (c *Client) WritePump() { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() c.Conn.Close() }() for { select { case message, ok := <-c.Send: writeWait := viper.GetInt64("ws.writeWait") if writeWait == 0 { writeWait = 60 } c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(writeWait) * time.Second)) if !ok { // The hub closed the channel. c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) return } w, err := c.Conn.NextWriter(websocket.TextMessage) if err != nil { return } w.Write(message) // Add queued chat messages to the current websocket message. n := len(c.Send) for i := 0; i < n; i++ { w.Write(newline) w.Write(<-c.Send) } if err := w.Close(); err != nil { return } case <-ticker.C: writeWait := viper.GetInt64("ws.writeWait") if writeWait == 0 { writeWait = 60 } c.Conn.SetWriteDeadline(time.Now().Add(time.Duration(writeWait) * time.Second)) if err := c.Conn.WriteMessage(websocket.PingMessage, nil); err != nil { return } } } } // Hub maintains the set of active clients and broadcasts messages to the // clients. type Hub struct { // Hub name Name string // Registered clients. Clients map[*Client]bool // Inbound messages from the clients. Broadcast chan []byte BroadcastOut chan []byte // Register requests from the clients. Register chan *Client // Unregister requests from clients. Unregister chan *Client } func (h *Hub) Run() { for { select { case client := <-h.Register: h.Clients[client] = true case client := <-h.Unregister: if _, ok := h.Clients[client]; ok { delete(h.Clients, client) close(client.Send) } case message := <-h.BroadcastOut: for client := range h.Clients { select { case client.Send <- message: default: close(client.Send) delete(h.Clients, client) } } } } } // serveWs handles websocket requests from the peer. func ServeWs(hub *Hub, conn *websocket.Conn) { client := &Client{Hub: hub, Conn: conn, Send: make(chan []byte, 256)} client.Hub.Register <- client // Allow collection of memory referenced by the caller by doing all work in // new goroutines. go client.WritePump() go client.ReadPump() } func NewHub() *Hub { return &Hub{ Broadcast: make(chan []byte), BroadcastOut: make(chan []byte), Register: make(chan *Client), Unregister: make(chan *Client), Clients: make(map[*Client]bool), } } func PubWs(hub *Hub, msg []byte) error { if hub != nil { hub.BroadcastOut <- msg return nil } else { return errors.New("ws service not exist!") } } func PubMsg(db *bbolt.DB, Servers map[string]interface{}, socks string, msg []byte, persist bool) error { // fmt.Println("PubMsg: ", string(msg)) h, ok := Servers[socks] if ok { hub := h.(*Hub) if len(hub.Clients) != 0 { hub.BroadcastOut <- msg // fmt.Println("send msg from sock: ", hub.Name, " for content: ", string(msg)) } else { if persist { // fmt.Println("no connection from sock: ", hub.Name, " for content: ", string(msg)) err := db.Update(func(tx *bbolt.Tx) error { msgOffline := viper.GetString("bbolt.bucket.msgOffline") if msgOffline == "" { msgOffline = "offline_msg" } if bt, err := tx.CreateBucketIfNotExists([]byte(msgOffline)); err != nil { return err } else { m := Msg{Channel: hub.Name, Content: string(msg)} if tm, err := json.Marshal(m); err != nil { return err } else { bt.Put([]byte(fmt.Sprintf("%d", time.Now().Unix())), tm) } } return nil }) if err != nil { // fmt.Println("no connection from sock: ", hub.Name, " for content: ", string(msg), " with err: ", err) } } return errors.New("localsock: offline") } } else { // fmt.Println("socks not exist: ", socks) return errors.New("unreachable") } return nil } type Msg struct { Channel string Content string }test/main.go
/* * @description: */ /* * @description: */ package main import ( "errors" "fmt" "log" "test/ws" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/compress" "github.com/gofiber/fiber/v2/middleware/cors" "github.com/gofiber/fiber/v2/middleware/csrf" "github.com/gofiber/fiber/v2/middleware/recover" "github.com/gofiber/websocket/v2" "github.com/xurwxj/viper" ) var Servers = make(map[string]interface{}) var r *fiber.App func main() { ListenHTTP() } func ListenHTTP() { routerPrefix := viper.GetString("http.routerPrefix") if routerPrefix == "" { routerPrefix = "/" } app := setupHTTP(routerPrefix) serverAddr := viper.GetString("http.port") if serverAddr == "" { serverAddr = ":8088" } fmt.Println("addr", serverAddr, "starting http...") app.Listen(serverAddr) } // fiber 错误处理函数 var fiberErrorHandler = func(c *fiber.Ctx, err error) error { fmt.Println(err, "http server handler error") c.JSON(map[string]string{"status": "fail", "result": "serverErr"}) return errors.New("err") } func setupHTTP(prefix string) *fiber.App { r = fiber.New(fiber.Config{ ServerHeader: "websocket server", BodyLimit: 1 * 1024 * 1024 * 1024, Concurrency: 1 * 1024 * 1024 * 1024, DisableStartupMessage: true, ErrorHandler: fiberErrorHandler, }) r.Use(recover.New()) r.Use(cors.New()) r.Use(csrf.New()) r.Use(compress.New()) rootGroup := r.Group(prefix) routerHttp(rootGroup) routerWebsocket(rootGroup) return r } func routerHttp(router fiber.Router) { staticFile := router.Group("/static") ///< 路由组静态文件 staticFile.Static("/log/", "img") // http://localhost:8088/static/log/hh.txt } func routerWebsocket(router fiber.Router) { socket := router.Group("/socket") ///< 路由组 socket.Use("/ws1", func(c *fiber.Ctx) error { // IsWebSocketUpgrade returns true if the client // requested upgrade to the WebSocket protocol. if websocket.IsWebSocketUpgrade(c) { c.Locals("allowed", true) return c.Next() } return fiber.ErrUpgradeRequired }) socket.Get("/ws1", websocket.New(func(c *websocket.Conn) { // ws://localhost:8088/socket/ws1 var ( mt int msg []byte err error ) for { if mt, msg, err = c.ReadMessage(); err != nil { log.Println("read:", err) break } log.Printf("recv: %s", msg) if err = c.WriteMessage(mt, msg); err != nil { log.Println("write:", err) break } } })) slices := []string{"ws22", "ws33"} for _, v := range slices { sHub := ws.NewHub() sHub.Name = v Servers[v] = sHub go sHub.Run() go Broadcast(sHub) socket.Get("/"+v, websocket.New(func(c *websocket.Conn) { // ws://localhost:8088/socket/ws1 ws.ServeWs(sHub, c) select {} })) } } // Broadcast 处理客户端发来的websocket消息 func Broadcast(hub *ws.Hub) { for { msg := <-hub.Broadcast go WsMsgRoute(hub.Name, msg) hub.BroadcastOut <- msg } } // WsMsgRoute 处理客户端发来的websocket消息 func WsMsgRoute(from string, m []byte) { fmt.Println(from, string(m)) }