Server-Sent-Events在Go中的应用
- December 5, 2023
Server Sent Events (SSE) 是一种允许服务器向客户端推送事件的技术。与 WebSocket 不同,SSE 是单向的,只允许服务器向客户端发送数据。在 Go 中,我们可以使用 gin 框架来实现 SSE。
什么是 Server Sent Events?
Server Sent Events 是一种基于 HTTP 的服务器推送技术。在一个持久的 HTTP 连接上,服务器可以向客户端发送事件,而无需客户端进行请求。这使得服务器可以实时地将新数据推送到客户端。
与 WebSocket 等其他实时通信技术不同,SSE 利用从服务器到客户端的单向连接,而不是服务端和客户端都保持连接的双向通信,所以也只能限制使用SSE一旦建立连接之后,只能由服务端主动推送消息,。
如何在 Go 中使用 SSE?
服务端
在 Go 中,我们可以使用 gin 框架来实现 SSE。下面是一个简单的示例:
package main
import (
"fmt"
"io"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
// usage:https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
func main() {
// create a new gin engine
r := gin.New()
// set route
r.GET("/sse", func(ctx *gin.Context) {
var count = 0
ctx.Stream(func(w io.Writer) bool {
count++
if count <= 10 {
time.Sleep(time.Millisecond * 300)
// send server sent event message
ctx.SSEvent("", gin.H{
"status": http.StatusOK,
"resMsg": fmt.Sprintf("server sent event message: %d", count),
})
return true
}
return false
})
})
// run gin engine
err := r.Run(":9090")
if err != nil {
panic(err)
}
}
在这个示例中,我们定义了一个 SSE 路由 /sse。当客户端连接到这个路由时,服务器开始发送事件。我们使用 ctx.Stream 方法来发送事件,这个方法接受一个回调函数,该函数在每次需要发送事件时被调用。
在回调函数中,我们首先增加 count 的值,然后判断 count 是否小于或等于 10。如果是,我们就暂停 300 毫秒,然后发送一个事件。事件的数据是一个 res.Response 对象,包含了 HTTP 状态码和一个消息。最后,我们返回 true 来表示我们还有更多的事件要发送。如果 count 大于 10,我们就返回 false 来关闭连接。
优化: 然而,正常的用户场景不仅仅是用一个for循环来进行消息的通信,往往需要额外启动一个线程用于处理业务场景,而这个线程需要与gin的消息处理线程进行通信,所以修改后的代码如下:
package main
import (
"fmt"
"io"
"net/http"
"time"
"github.com/gin-gonic/gin"
)
func main() {
r := gin.Default()
r.GET("/sse", func(c *gin.Context) {
// Initialize client channel
msgsChan := make(chan string)
// anotherHandler will send message to client channel
go anotherHandler(msgsChan)
c.Stream(func(w io.Writer) bool {
// Stream message to client from message channel
if msg, ok1 := <-msgsChan; ok1 {
c.SSEvent("", gin.H{
"status": http.StatusOK,
"resMsg": msg,
})
return true
}
return false
})
})
// run gin engine
err := r.Run(":9090")
if err != nil {
panic(err)
}
}
func anotherHandler(msgsChan chan string) {
defer close(msgsChan)
count := 1
for {
time.Sleep(time.Millisecond * 300)
// Send current time to clients message channel
msgsChan <- fmt.Sprintf("server sent event message: %d", count)
if count >= 10 {
return
}
count++
}
}
与原来不同的是创建了一个字符串通道 msgsChan
,用于在主函数和另一个 goroutine 之间传递消息。其中的anotherHandler
函数是在另一个 goroutine 中运行的,它用来模拟正常的业务处理,它定期发送消息到 msgsChan
通道。这些消息被 Stream
处理函数捕获,并以 SSE 事件的形式传递给客户端。最终,这段代码实现了一个简单的 SSE 示例,服务器每隔300毫秒向客户端推送一条消息,重复10次。
注意: 需要注意的是c.Stream需要在请求的整个生命周期中运行,它不能放在 goroutine 中异步执行。c.Stream是用来将 ch 中的数据通过 SSE 的方式发送给客户端。这个操作需要在请求的整个生命周期中进行,直到所有数据都发送给客户端。因此,它不能在后台异步执行,否则请求可能会在数据发送完成之前结束,导致客户端无法接收到所有数据,更严重的可以会导致服务端异常,程序panic。
客户端
在Go中,我们可以使用bufio包的NewScanner函数来处理服务器发送的事件。以下是一个简单的示例,它创建一个HTTP客户端,连接到我们之前创建的服务器发送事件(SSE)端点,并打印出接收到的所有事件
package main
import (
"bufio"
"log"
"net/http"
)
func main() {
// create a new client
client := &http.Client{}
// create a new request
req, err := http.NewRequest("GET", "http://localhost:9090/sse", nil)
if err != nil {
log.Fatal(err)
}
// set request
resp, err := client.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
// read response body and print
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
log.Println(string(line))
}
}
这个客户端会连接到我们之前在http://localhost:9090/sse
上创建的SSE端点,并打印出从服务器接收到的所有事件。
Got a connection, launched process (pid = 73312).
2023/12/05 10:09:45 data:{"resMsg":"server sent event message: 1","status":200}
2023/12/05 10:09:45 data:{"resMsg":"server sent event message: 2","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 3","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 4","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 5","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 6","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 7","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 8","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 9","status":200}
2023/12/05 10:09:48 data:{"resMsg":"server sent event message: 10","status":200}
Exiting.
优化: 同样客户端也可以采用第三方组件库进行优化:
package main
import (
"bufio"
"log"
"github.com/go-resty/resty/v2"
)
func main() {
// create a new client
client := resty.New().SetBaseURL("http://localhost:9090")
// create a new request
resp, err := client.R().SetDoNotParseResponse(true).Get("/sse")
if err != nil {
log.Fatal(err)
}
defer resp.RawResponse.Body.Close()
// read response body and print
scanner := bufio.NewScanner(resp.RawResponse.Body)
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
log.Println(string(line))
}
}
其中设置 SetDoNotParseResponse(true)
,客户端告诉 Resty 不要解析响应的主体。随后,通过 bufio.Scanner
逐行读取响应主体,并将每一行的内容打印到日志中。这种操作通常在处理 SSE(Server-Sent Events)等情况下很有用,因为 SSE 会将实时事件以文本行的形式推送给客户端,而无需解析为结构体。
工作原理
Server-Sent Events(SSE)是一种通过单向通信从服务器向客户端推送实时事件的 Web 技术。它建立在 HTTP 和 DOM 事件模型之上,提供了一种轻量级的实时通信机制。以下是 SSE 的基本原理:
-
HTTP 协议: SSE 利用了 HTTP/1.1 协议的特性。与传统的 HTTP 请求不同,SSE 请求保持打开的连接,允许服务器在有事件发生时持续向客户端推送数据,而无需客户端再次发起请求。
-
Content-Type 设置: 服务器在响应头中设置
Content-Type
为text/event-stream
,以指示该响应是 SSE 事件流。这告知客户端该响应包含一系列事件。Content-Type: text/event-stream
-
事件格式: 服务器通过每条消息使用一定格式发送事件。每个事件由一条或多条字段组成,使用类似键值对的结构。常见的字段包括
event
(事件类型)、data
(事件数据)、id
(事件标识符)等。例如:event: message data:{"resMsg":"server sent event message: 1","status":200}
这表示一个类型为 “message” 的事件,携带了相应的消息数据,如果event设置为空字符串,那么
event: message
将不显示。 -
消息分隔: 每个事件之间使用两个连续的换行符(
\n\n
)来分隔,告知客户端事件的结束,从而可以开始处理下一个事件。event: message data:{"resMsg":"server sent event message: 1","status":200} event: message data:{"resMsg":"server sent event message: 2","status":200}
-
客户端处理: 客户端使用 JavaScript 中的
EventSource
API 来监听服务器发送的事件。通过创建EventSource
对象,客户端可以指定服务器端点,并通过onmessage
事件处理程序来处理收到的事件。以下是一个简单的 JavaScript 示例:const eventSource = new EventSource('/sse'); eventSource.onmessage = function (event) { const data = JSON.parse(event.data); console.log('Received event:', data); // 在此处处理接收到的数据 };
-
自动重连机制: SSE 内置了自动重连机制。如果连接中断,客户端会自动尝试重新连接服务器。这确保了即使在网络故障后,也能够保持实时通信的能力。
再回头看示例,当客户端请求了http://localhost:9090/sse
接口后,发生了以下事情:
- 客户端发起请求,接口为
http://localhost:9090/sse
- 服务端接收请求,建立连接
- 服务端每隔0.3s向客户端推送消息,消息为
{"resMsg":"server sent event message: 1","status":200}
,因为消息类行为空,所以不显示消息类型。 - 再服务端主动推送10次后,全部消息推送完毕,服务端主动关闭连接。
总体而言,SSE 是一种简单而有效的实时通信机制,适用于服务器向客户端推送事件的场景。它遵循标准的 HTTP/1.1 协议,易于实现和使用,使得开发者能够构建实时更新的 Web 应用程序,而无需引入复杂的双向通信机制。