接口简介
详见手册
后端实现 Go
云端启用websocket server接受本地PC的链接请求
监听AI相关的http请求,然后将请求通过ws转发给本地PC,从本地PC获取结果后再回复给http请求
以上步骤类似于通过一个公网ip来进行内网穿透,将内网服务暴露至公网。比较简单的办法就是就是直接使用开源的内网穿透工具,将本地PC上的AI服务暴露出去。
这里选择修改Go服务器,自行开发内网穿透相关逻辑,为了继续熟悉Go语言和便于后续定制
websocket 服务器框架
wsserver实现
1. ClientManager
通过go程与chan,实现对所有连入的wsclient的管理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| type ClientManager struct { Clients map[*Client]bool Broadcast chan []byte Register chan *Client Unregister chan *Client }
type Client struct { ID string Socket *websocket.Conn Send chan []byte Count int Type int UserId string }
func (manager *ClientManager) Start() { for { select { case conn := <-manager.Register: manager.Clients[conn] = true log.Println("ws连接:" + conn.Socket.RemoteAddr().String()) case conn := <-manager.Unregister: if _, ok := manager.Clients[conn]; ok { log.Println("ws断开:" + conn.Socket.RemoteAddr().String()) if conn == GAIClient.Client { GAIClient = nil } close(conn.Send) delete(manager.Clients, conn) } case message := <-manager.Broadcast: for conn := range manager.Clients { select { case conn.Send <- message: default: close(conn.Send) delete(manager.Clients, conn) } } } } }
|
2. Client
通过go程与chan,异步处理每个Client的读写逻辑
每个client的写逻辑是通用的,只需要将从web收到的数据,写给client即可
读逻辑则需要特殊处理,收到来自web的消息后,临时开辟出一个chan,然后阻塞接收chan的消息。当从ai收到回复后,将来自ai的回复写入chan,再通过http返回给web
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
|
func (c *Client) Write() { defer func() { c.Socket.Close() }()
for { select { case message, ok := <-c.Send: if !ok { c.Socket.WriteMessage(websocket.CloseMessage, []byte{}) return } c.Socket.WriteMessage(websocket.TextMessage, message) } } }
func (c *AIClient) Read() { defer func() { Manager.Unregister <- c.Client c.Socket.Close() }()
for { _, message, err := c.Socket.ReadMessage() if err != nil { Manager.Unregister <- c.Client c.Socket.Close() break }
c.Client.Count = 0 var msg2ai Msg2AI
err = json.Unmarshal(message, &msg2ai) if err != nil { log.Println("ai解析失败:" + c.Socket.RemoteAddr().String()) Manager.Unregister <- c.Client c.Socket.Close() } else { c.runAILogic(msg2ai) } } }
|
通过路由ws请求,以接受wsclient的连入
引入github.com/gorilla/websocket库,以实现websocket功能
ws请求基于gin框架,收到http请求后将其升级为ws长链接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func (u *WS) Bind() { u.Ge.GET("/ai", u.execAI) }
func (u *WS) execAI(c *gin.Context) { log.Println("ai上线")
conn, error := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil) if error != nil { c.Redirect(http.StatusTemporaryRedirect, config.ServerCfg.Url+"/404.html") return } }
|
接收来自web的http请求
匹配全部/api/.*请求,然后直接转发给ai的ws通道。同时开辟一个chan给ai,阻塞等待ai将回复写入chan
借助chan的阻塞特性,以达到同步获取ai回复的效果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| func (u *WS) Bind() { u.Ge.POST("/api/:name", u.execAPI) }
func (u *WS) execAPI(c *gin.Context) { api := c.Param("name") log.Println("收到请求:" + api)
if GAIClient == nil { c.AbortWithStatus(http.StatusNotFound) return }
data, err := c.GetRawData() if err != nil { c.AbortWithError(http.StatusNotFound, err) return }
ctx := c.Request.Context() strChan := make(chan string) defer close(strChan)
GAIClient.runAIReq(c, api, string(data), strChan)
select { case <-ctx.Done(): return case str := <-strChan: c.String(http.StatusOK, str) } }
|
心跳包超时检测
直接go启动一个定时器,定时去轮询每个ws链接。每个ws链接一定时间内没有消息发来,则主动断开
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func (manager *ClientManager) Timer() { ticker := time.NewTicker(1 * time.Second) for range ticker.C { for conn := range manager.Clients { conn.Count++ if conn.Count >= 600 { log.Println("心跳超时:" + conn.Socket.RemoteAddr().String()) close(conn.Send) delete(manager.Clients, conn) } } } }
|