上海古都建筑设计集团,上海办公室装修设计公司,上海装修公司高质量的内容分享社区,上海装修公司我们不是内容生产者,我们只是上海办公室装修设计公司内容的搬运工平台

Python Tornado 实现SSE服务端主动推送方案

guduadmin281月前

一、SSE 服务端消息推送

SSE 是 Server-Sent Events 的简称, 是一种服务器端到客户端(浏览器)的单项消息推送。对应的浏览器端实现 Event Source 接口被制定为HTML5 的一部分。相比于 WebSocket,服务器端和客户端工作量都要小很多、简单很多,而 Tornado 又是Python中的一款优秀的高性能web框架,本文带领大家一起实践下 Tornado SSE 的实现。

本文主要探索两个方面的实践:一个是客户端发送请求,服务端的返回是分多次进行传输的,直到传输完成,这种情况下请求结束后,就可以考虑关闭 SSE了,所以这种连接可以认为是暂时的。另一种是由服务端在特定的时机下主动推送消息给到客户端,推送的时机具有不确定性,随时性,所以这种情况下需要客户端和服务端保持长久连接。

本次使用的 Tornado 版本:

tornado==6.3.2

二、短暂性场景下的 SSE 实现

短暂性场景下就是对应上面的第一点,客户端主动发送请求后,服务端分多次传输,直到完成,数据获取完成后连接就可以断开了,适用于一些接口复杂,操作步骤多的场景,可以提前告诉客户端现在进行到了哪一步了,并且这种方式也有利于服务端的横向扩展。

在 Tornado 中实现,需要注意的是要关闭 _auto_finish ,这样的话就不会被框架自己主动停止连接了,下面是一个实现的案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
class SSE(RequestHandler):
    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")
    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")
    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool
    @tornado.gen.coroutine
    def get(self):
        result = yield self.doHandle()
        self.write(result)
        # 结束
        self.finish()
    @run_on_executor
    def doHandle(self):
        tornado.ioloop.IOLoop.current()
        # 分十次推送信息
        for i in range(10):
            time.sleep(1)
            self.flush()
            self.callback(f"current: {i}")
        return f"data: end\n\n"
    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()
class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)
def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)
    print(f"Start server success", f"The prot = {port}")
    tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
    startServer(8020)

运行后可以到浏览器访问:http://localhost:8020/sse,此时就可以看到服务端在不断地推送数据过来了:

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第1张

那如何在前端用 JS 获取数据呢,前面提到在 JS 层面,有封装好的 Event Source 组件可以直接拿来使用,例如:




    
    测试服务器推送技术


    

运行后可以看到服务端分阶段推送过来的数据:

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第2张

三、长连接场景下的 SSE 实现

上面实现了客户端请求后,分批次返回,但是有些情况下是客户端连接后没有东西返回,而是在某个特定的时机下返回给某几个客户端,所以这种情况,我们需要和客户端保持长久的连接,同时进行客户端连接的缓存,因为同时有可能有 100 个用户,但是推送时可能只需要给 10 个用户推送,这种方式相当于将一个客户端和一个服务端进行了绑定,一定程度上不利于服务端的横向扩展,但也可以通过一些消息订阅的方式解决类似问题。

下面是一个实现案例:

import time
from tornado.concurrent import run_on_executor
from tornado.web import RequestHandler
import tornado.gen
from concurrent.futures.thread import ThreadPoolExecutor
# 单例
def singleton(cls):
    instances = {}
    def wrapper(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    return wrapper
# 订阅推送工具类
@singleton
class Pusher():
    def __init__(self):
        self.clients = {}
    def add_client(self, client_id, callback):
        if client_id not in self.clients:
            self.clients[client_id] = callback
            print(f"{client_id} 连接")
    def send_all(self, message):
        for client_id in self.clients:
            callback = self.clients[client_id]
            print("发送消息给:", client_id)
            callback(message)
    def send(self, client_id, message):
        callback = self.clients[client_id]
        print("发送消息给:", client_id)
        callback(message)
class SSE(RequestHandler):
    # 定义推送者
    pusher = Pusher()
    def initialize(self):
        # 关闭自动结束
        self._auto_finish = False
        print("initialize")
    def set_default_headers(self):
        # 设置为事件驱动模式
        self.set_header('Content-Type', "text/event-stream")
        # 不使用缓存
        self.set_header('Content-Control', "no-cache")
        # 保持长连接
        self.set_header('Connection', "keep-alive")
        # 允许跨域
        self.set_header('Access-Control-Allow-Origin', "*")
    @tornado.gen.coroutine
    def get(self):
        # 客户端唯一标识
        client_id = self.get_argument("client_id")
        self.pusher.add_client(client_id, self.callback)
    def callback(self, message):
        # 事件推送
        message = f"data: {message}\n\n"
        self.write(message)
        self.flush()
# 定义推送接口,模拟推送
class Push(RequestHandler):
    # 定义推送者
    pusher = Pusher()
    def prepare(self):
        # 准备线程池
        self.executor = self.application.pool
    @tornado.gen.coroutine
    def get(self):
        # 客户端标识
        client_id = self.get_argument("client_id")
        # 推送的消息
        message = self.get_argument("message")
        result = yield self.doHandle(client_id, message)
        self.write(result)
    @run_on_executor
    def doHandle(self, client_id, message):
        tornado.ioloop.IOLoop.current()
        self.pusher.send(client_id, message)
        return "success"
class Application(tornado.web.Application):
    def __init__(self):
        handlers = [
            ("/sse", SSE),
            ("/push", Push),
            ("/(.*)$", tornado.web.StaticFileHandler, {
                "path": "resources/static",
                "default_filename": "index.html"
            })
        ]
        super(Application, self).__init__(handlers)
        self.pool = ThreadPoolExecutor(200)
def startServer(port):
    app = Application()
    httpserver = tornado.httpserver.HTTPServer(app)
    httpserver.listen(port)
    print(f"Start server success", f"The prot = {port}")
    tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
    startServer(8020)

这里我定义了一个 Pusher 订阅推送工具类,用来存储客户端的连接,以及给指定客户端或全部客户端发送消息,然后我又定义 Push 接口,模拟不定时的指定客户端发送信息的场景。

同样前端也要修改,需要给自己定义 client_id ,例如:




    
    测试服务器推送技术


    

这里我用 uuid 模拟客户端的唯一ID,在真实使用时可不要这么做。

下面使用浏览器打开三个页面,可以看到三个不同的 client_id :

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第3张

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第4张

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第5张

在服务端的日志中也能看到这三个客户端的连接:

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第6张

下面调用 push 接口来给任意一个客户端发送消息,例如这里发给client_id = 2493045e-84dd-4118-8d96-0735c4ac186b 的用户 :

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第7张

下面看到 client_id 是 2493045e-84dd-4118-8d96-0735c4ac186b的页面:

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第8张

已经成功收到推送的消息,反之看另外两个:

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第9张

Python Tornado 实现SSE服务端主动推送方案,在这里插入图片描述,第10张

都没有消息,到这里就实现了长连接下不定时的服务端消息推送方案。

网友评论

搜索
最新文章
热门文章
热门标签
 
 梦到自己迷路  梦见自己主动给别人钱  十二生肖配对合婚