Server-Sent-Events在Go中的应用

Server-Sent-Events在Go中的应用
机器人
摘要
lomtom

Server Sent Events (SSE) 是一种允许服务器向客户端推送事件的技术。与 WebSocket 不同,SSE 是单向的,只允许服务器向客户端发送数据。在 Go 中,我们可以使用 gin 框架来实现 SSE。

什么是 Server Sent Events?

Server Sent Events 是一种基于 HTTP 的服务器推送技术。在一个持久的 HTTP 连接上,服务器可以向客户端发送事件,而无需客户端进行请求。这使得服务器可以实时地将新数据推送到客户端。

与 WebSocket 等其他实时通信技术不同,SSE 利用从服务器到客户端的单向连接,而不是服务端和客户端都保持连接的双向通信,所以也只能限制使用SSE一旦建立连接之后,只能由服务端主动推送消息,。

如何在 Go 中使用 SSE?

服务端

在 Go 中,我们可以使用 gin 框架来实现 SSE。下面是一个简单的示例:

package main

import (
	"fmt"
	"io"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
)

// usage:https://github.com/gin-gonic/examples/blob/master/server-sent-event/main.go
func main() {
	// create a new gin engine
	r := gin.New()
	// set route
	r.GET("/sse", func(ctx *gin.Context) {
		var count = 0
		ctx.Stream(func(w io.Writer) bool {
			count++
			if count <= 10 {
				time.Sleep(time.Millisecond * 300)
				// send server sent event message
				ctx.SSEvent("", gin.H{
					"status": http.StatusOK,
					"resMsg": fmt.Sprintf("server sent event message: %d", count),
				})
				return true
			}
			return false
		})
	})
	// run gin engine
	err := r.Run(":9090")
	if err != nil {
		panic(err)
	}
}

在这个示例中,我们定义了一个 SSE 路由 /sse。当客户端连接到这个路由时,服务器开始发送事件。我们使用 ctx.Stream 方法来发送事件,这个方法接受一个回调函数,该函数在每次需要发送事件时被调用。

在回调函数中,我们首先增加 count 的值,然后判断 count 是否小于或等于 10。如果是,我们就暂停 300 毫秒,然后发送一个事件。事件的数据是一个 res.Response 对象,包含了 HTTP 状态码和一个消息。最后,我们返回 true 来表示我们还有更多的事件要发送。如果 count 大于 10,我们就返回 false 来关闭连接。

优化: 然而,正常的用户场景不仅仅是用一个for循环来进行消息的通信,往往需要额外启动一个线程用于处理业务场景,而这个线程需要与gin的消息处理线程进行通信,所以修改后的代码如下:

package main

import (
	"fmt"
	"io"
	"net/http"
	"time"

	"github.com/gin-gonic/gin"
)

func main() {
	r := gin.Default()

	r.GET("/sse", func(c *gin.Context) {

		// Initialize client channel
		msgsChan := make(chan string)
		// anotherHandler will send message to client channel
		go anotherHandler(msgsChan)

		c.Stream(func(w io.Writer) bool {
			// Stream message to client from message channel
			if msg, ok1 := <-msgsChan; ok1 {
				c.SSEvent("", gin.H{
					"status": http.StatusOK,
					"resMsg": msg,
				})
				return true
			}
			return false
		})
	})

	// run gin engine
	err := r.Run(":9090")
	if err != nil {
		panic(err)
	}
}

func anotherHandler(msgsChan chan string) {
	defer close(msgsChan)
	count := 1
	for {
		time.Sleep(time.Millisecond * 300)

		// Send current time to clients message channel
		msgsChan <- fmt.Sprintf("server sent event message: %d", count)
		if count >= 10 {
			return
		}
		count++
	}
}

与原来不同的是创建了一个字符串通道 msgsChan,用于在主函数和另一个 goroutine 之间传递消息。其中的anotherHandler 函数是在另一个 goroutine 中运行的,它用来模拟正常的业务处理,它定期发送消息到 msgsChan 通道。这些消息被 Stream 处理函数捕获,并以 SSE 事件的形式传递给客户端。最终,这段代码实现了一个简单的 SSE 示例,服务器每隔300毫秒向客户端推送一条消息,重复10次。

注意: 需要注意的是c.Stream需要在请求的整个生命周期中运行,它不能放在 goroutine 中异步执行。c.Stream是用来将 ch 中的数据通过 SSE 的方式发送给客户端。这个操作需要在请求的整个生命周期中进行,直到所有数据都发送给客户端。因此,它不能在后台异步执行,否则请求可能会在数据发送完成之前结束,导致客户端无法接收到所有数据,更严重的可以会导致服务端异常,程序panic。

客户端

在Go中,我们可以使用bufio包的NewScanner函数来处理服务器发送的事件。以下是一个简单的示例,它创建一个HTTP客户端,连接到我们之前创建的服务器发送事件(SSE)端点,并打印出接收到的所有事件

package main

import (
	"bufio"
	"log"
	"net/http"
)

func main() {
	// create a new client
	client := &http.Client{}

	// create a new request
	req, err := http.NewRequest("GET", "http://localhost:9090/sse", nil)
	if err != nil {
		log.Fatal(err)
	}

	// set request
	resp, err := client.Do(req)
	if err != nil {
		log.Fatal(err)
	}
	defer resp.Body.Close()

	// read response body and print
	scanner := bufio.NewScanner(resp.Body)
	for scanner.Scan() {
		line := scanner.Bytes()
		if len(line) == 0 {
			continue
		}
		log.Println(string(line))
	}
}

这个客户端会连接到我们之前在http://localhost:9090/sse上创建的SSE端点,并打印出从服务器接收到的所有事件。

Got a connection, launched process (pid = 73312).
2023/12/05 10:09:45 data:{"resMsg":"server sent event message: 1","status":200}
2023/12/05 10:09:45 data:{"resMsg":"server sent event message: 2","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 3","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 4","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 5","status":200}
2023/12/05 10:09:46 data:{"resMsg":"server sent event message: 6","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 7","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 8","status":200}
2023/12/05 10:09:47 data:{"resMsg":"server sent event message: 9","status":200}
2023/12/05 10:09:48 data:{"resMsg":"server sent event message: 10","status":200}
Exiting.

优化: 同样客户端也可以采用第三方组件库进行优化:

package main

import (
	"bufio"
	"log"

	"github.com/go-resty/resty/v2"
)

func main() {
	// create a new client
	client := resty.New().SetBaseURL("http://localhost:9090")

	// create a new request
	resp, err := client.R().SetDoNotParseResponse(true).Get("/sse")
	if err != nil {
		log.Fatal(err)
	}
	defer resp.RawResponse.Body.Close()

	// read response body and print
	scanner := bufio.NewScanner(resp.RawResponse.Body)
	for scanner.Scan() {
		line := scanner.Bytes()
		if len(line) == 0 {
			continue
		}
		log.Println(string(line))
	}
}

其中设置 SetDoNotParseResponse(true),客户端告诉 Resty 不要解析响应的主体。随后,通过 bufio.Scanner 逐行读取响应主体,并将每一行的内容打印到日志中。这种操作通常在处理 SSE(Server-Sent Events)等情况下很有用,因为 SSE 会将实时事件以文本行的形式推送给客户端,而无需解析为结构体。

工作原理

Server-Sent Events(SSE)是一种通过单向通信从服务器向客户端推送实时事件的 Web 技术。它建立在 HTTP 和 DOM 事件模型之上,提供了一种轻量级的实时通信机制。以下是 SSE 的基本原理:

  1. HTTP 协议: SSE 利用了 HTTP/1.1 协议的特性。与传统的 HTTP 请求不同,SSE 请求保持打开的连接,允许服务器在有事件发生时持续向客户端推送数据,而无需客户端再次发起请求。

  2. Content-Type 设置: 服务器在响应头中设置 Content-Typetext/event-stream,以指示该响应是 SSE 事件流。这告知客户端该响应包含一系列事件。

    Content-Type: text/event-stream
    
  3. 事件格式: 服务器通过每条消息使用一定格式发送事件。每个事件由一条或多条字段组成,使用类似键值对的结构。常见的字段包括 event(事件类型)、data(事件数据)、id(事件标识符)等。例如:

    event: message
    data:{"resMsg":"server sent event message: 1","status":200}
    

    这表示一个类型为 “message” 的事件,携带了相应的消息数据,如果event设置为空字符串,那么event: message将不显示。

  4. 消息分隔: 每个事件之间使用两个连续的换行符(\n\n)来分隔,告知客户端事件的结束,从而可以开始处理下一个事件。

    event: message
    data:{"resMsg":"server sent event message: 1","status":200}
    
    event: message
    data:{"resMsg":"server sent event message: 2","status":200}
    
  5. 客户端处理: 客户端使用 JavaScript 中的 EventSource API 来监听服务器发送的事件。通过创建 EventSource 对象,客户端可以指定服务器端点,并通过 onmessage 事件处理程序来处理收到的事件。以下是一个简单的 JavaScript 示例:

    const eventSource = new EventSource('/sse');
    
    eventSource.onmessage = function (event) {
      const data = JSON.parse(event.data);
      console.log('Received event:', data);
      // 在此处处理接收到的数据
    };
    
  6. 自动重连机制: SSE 内置了自动重连机制。如果连接中断,客户端会自动尝试重新连接服务器。这确保了即使在网络故障后,也能够保持实时通信的能力。

再回头看示例,当客户端请求了http://localhost:9090/sse接口后,发生了以下事情:

  1. 客户端发起请求,接口为http://localhost:9090/sse
  2. 服务端接收请求,建立连接
  3. 服务端每隔0.3s向客户端推送消息,消息为{"resMsg":"server sent event message: 1","status":200},因为消息类行为空,所以不显示消息类型。
  4. 再服务端主动推送10次后,全部消息推送完毕,服务端主动关闭连接。

image-20231205150209961

总体而言,SSE 是一种简单而有效的实时通信机制,适用于服务器向客户端推送事件的场景。它遵循标准的 HTTP/1.1 协议,易于实现和使用,使得开发者能够构建实时更新的 Web 应用程序,而无需引入复杂的双向通信机制。

lomtom

标题:Server-Sent-Events在Go中的应用

作者:lomtom

链接:https://lomtom.cn/b2hfnj6kwjdkr