0%

AI对话接口 (后端实现Go)

接口简介

详见手册

后端实现 Go

  1. 云端启用websocket server接受本地PC的链接请求

  2. 监听AI相关的http请求,然后将请求通过ws转发给本地PC,从本地PC获取结果后再回复给http请求

以上步骤类似于通过一个公网ip来进行内网穿透,将内网服务暴露至公网。比较简单的办法就是就是直接使用开源的内网穿透工具,将本地PC上的AI服务暴露出去。

这里选择修改Go服务器,自行开发内网穿透相关逻辑,为了继续熟悉Go语言和便于后续定制

websocket 服务器框架

wsserver实现

1. ClientManager

通过go程与chan,实现对所有连入的wsclient的管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// ClientManager is a websocket manager
type ClientManager struct {
Clients map[*Client]bool
Broadcast chan []byte
Register chan *Client
Unregister chan *Client
}

// Client is a websocket client
type Client struct {
ID string
Socket *websocket.Conn
Send chan []byte
Count int // 心跳计数
Type int // 服务类型 1ai 2web
UserId string
}

// Start is to start a ws server
func (manager *ClientManager) Start() {
for {
select {
case conn := <-manager.Register:
manager.Clients[conn] = true
log.Println("ws连接:" + conn.Socket.RemoteAddr().String())
// jsonMessage, _ := json.Marshal(&Message{Content: "/A new socket has connected."})
// manager.Send(jsonMessage, conn)
case conn := <-manager.Unregister:
if _, ok := manager.Clients[conn]; ok {
log.Println("ws断开:" + conn.Socket.RemoteAddr().String())
if conn == GAIClient.Client {
GAIClient = nil
}
close(conn.Send)
delete(manager.Clients, conn)
// jsonMessage, _ := json.Marshal(&Message{Content: "/A socket has disconnected."})
// manager.Send(jsonMessage, conn)
}
case message := <-manager.Broadcast:
for conn := range manager.Clients {
select {
case conn.Send <- message:
default:
close(conn.Send)
delete(manager.Clients, conn)
}
}
}
}
}

2. Client

通过go程与chan,异步处理每个Client的读写逻辑

每个client的写逻辑是通用的,只需要将从web收到的数据,写给client即可

读逻辑则需要特殊处理,收到来自web的消息后,临时开辟出一个chan,然后阻塞接收chan的消息。当从ai收到回复后,将来自ai的回复写入chan,再通过http返回给web

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50

// 写逻辑,直接通过chan将数据写进wswrite即可
// 对于aiclient,server收到http请求后,会直接将请求转发给aiclient
func (c *Client) Write() {
defer func() {
c.Socket.Close()
}()

for {
select {
case message, ok := <-c.Send:
if !ok {
c.Socket.WriteMessage(websocket.CloseMessage, []byte{})
return
}
c.Socket.WriteMessage(websocket.TextMessage, message)
}
}
}

// 针对本地aiclient的读逻辑
func (c *AIClient) Read() {
defer func() {
Manager.Unregister <- c.Client
c.Socket.Close()
}()

for {
_, message, err := c.Socket.ReadMessage()
if err != nil {
Manager.Unregister <- c.Client
c.Socket.Close()
break
}

// 重置心跳计时
c.Client.Count = 0
var msg2ai Msg2AI

err = json.Unmarshal(message, &msg2ai)
if err != nil {
log.Println("ai解析失败:" + c.Socket.RemoteAddr().String())
Manager.Unregister <- c.Client
c.Socket.Close()
} else {
c.runAILogic(msg2ai)
}
}
}

通过路由ws请求,以接受wsclient的连入

引入github.com/gorilla/websocket库,以实现websocket功能

ws请求基于gin框架,收到http请求后将其升级为ws长链接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (u *WS) Bind() {
u.Ge.GET("/ai", u.execAI) // 接收pc发来的ws请求
// ...
}

func (u *WS) execAI(c *gin.Context) {
log.Println("ai上线")

// change the reqest to websocket model
conn, error := (&websocket.Upgrader{CheckOrigin: func(r *http.Request) bool { return true }}).Upgrade(c.Writer, c.Request, nil)
if error != nil {
// 实现内部重定向
c.Redirect(http.StatusTemporaryRedirect, config.ServerCfg.Url+"/404.html")
return
}
// websocket connect
}

接收来自web的http请求

匹配全部/api/.*请求,然后直接转发给ai的ws通道。同时开辟一个chan给ai,阻塞等待ai将回复写入chan

借助chan的阻塞特性,以达到同步获取ai回复的效果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (u *WS) Bind() {
// 改为路由参数配置
u.Ge.POST("/api/:name", u.execAPI)
}

func (u *WS) execAPI(c *gin.Context) {
api := c.Param("name")
log.Println("收到请求:" + api)

if GAIClient == nil {
c.AbortWithStatus(http.StatusNotFound)
return
}

data, err := c.GetRawData()
if err != nil {
c.AbortWithError(http.StatusNotFound, err)
return
}

ctx := c.Request.Context()
strChan := make(chan string)
defer close(strChan)

GAIClient.runAIReq(c, api, string(data), strChan)

select {
case <-ctx.Done():
return
case str := <-strChan:
c.String(http.StatusOK, str)
}
}

心跳包超时检测

直接go启动一个定时器,定时去轮询每个ws链接。每个ws链接一定时间内没有消息发来,则主动断开

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 心跳检测
func (manager *ClientManager) Timer() {
ticker := time.NewTicker(1 * time.Second)
for range ticker.C {
for conn := range manager.Clients {
conn.Count++
// if conn.Count >= 5 && conn != GAIClient.Client {
if conn.Count >= 600 {
log.Println("心跳超时:" + conn.Socket.RemoteAddr().String())
close(conn.Send)
delete(manager.Clients, conn)
}
}
}
}