首页 > 编程 > Python > 正文

Sanic框架流式传输操作示例

2020-02-15 22:26:57
字体:
来源:转载
供稿:网友

本文实例讲述了Sanic框架流式传输操作。分享给大家供大家参考,具体如下:

简介

Sanic是一个类似Flask的Python 3.5+ Web服务器,它的写入速度非常快。除了Flask之外,Sanic还支持异步请求处理程序。这意味着你可以使用Python 3.5中新的闪亮的异步/等待语法,使你的代码非阻塞和快速。

在前面一篇《Sanic框架Cookies操作》中已经讲到,如何在Sanic中使用Cookie,接下来将介绍一下Sanic的流的使用:

请求流式传输

Sanic允许通过流获取请求数据,如下所示,当请求结束时,request.stream.get()返回为None,只有postputpatch decorator拥有流参数:

from sanic.response import stream@app.post("/post_stream",stream=True)async def post_stream(request):  async def streaming(response):    while True:      body = await request.stream.get()      if body is None:        break      body = body.decode("utf-8")      reponse.write(body)  return stream(streaming)@app.put("/put_stream",stream=True)async def put_stream(request):  async def streaming(response):    while True:      body = await request.stream.get()      if body is None:        break      body = body.decode("utf-8")      response.write("utf-8")  return stream(streaming)

除了上述例子的方法之外,我们之前还讲过用add_route方法动态添加路由:

from sanic.response import textfrom sanic.views import HTTPMethodViewfrom sanic.views import stream as stream_decoratorclass StreamView(HTTPMethodView)  @stream_decorator  async def post(self,request)    result = ''    while True:      body = await request.stream.get()      if body is None:        break      body = body.decode('utf-8')      result += body    return text(result)app.add_route(StreamView.as_view(),"/method_view")

值得注意的是,stream_decorator装饰器中处理函数的函数名称,若为post则为post请求,若为put则为put请求。在之前讲述路由的文章《Sanic框架路由用法》中讲到一个CompositionView类来自定义一个路由,CompositionView在流式请求中同样适用:

from sanic.views import CompositionViewasync def post_stream_view(request):  result = ''  while True:    body = await request.stream.get()    if body is None:      break    body = body.decode('utf-8')    result += body  return text(result)view = CompositionView()view.add(['POST'],post_stream_view,stream=True)app.add_route(view,"/post_stream_view")

响应流式传输

Sanic允许你使用stream方法将内容传输到客户端,该方法接受一个通过StreamingHTTPResponse传入的对象的协程回调,举个栗子:

from sanic.response import stream@app.route("/post_stream_info",methods=["POST"])async def post_stream_info(request):  async def streaming(response):    response.write("no")    response.write("bug")  return stream(streaming)            
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表