首页 > 编程 > JavaScript > 正文

Django+Vue实现WebSocket连接的示例代码

2019-11-19 11:26:42
字体:
来源:转载
供稿:网友

近期有一需求:前端页面点击执行任务,实时显示后端执行情况,思考一波;发现 WebSocket 最适合做这件事。

效果

测试 ping www.baidu.com 效果

点击连接建立ws连接

后端实现

所需软件包

后端主要借助Django Channels 实现socket连接,官网文档链接

这里想实现每个连接进来加入组进行广播,所以还需要引入 channels-redis

pip

channels==2.2.0channels-redis==2.4.0

引入

settings.py

INSTALLED_APPS = [  'django.contrib.admin',  'django.contrib.auth',  'django.contrib.contenttypes',  'django.contrib.sessions',  'django.contrib.messages',  'django.contrib.staticfiles',  'rest_framework.authtoken',  'rest_framework',        ...  'channels',]# Redis配置REDIS_HOST = ENV_DICT.get('REDIS_HOST', '127.0.0.1')REDIS_PORT = ENV_DICT.get('REDIS_PORT', 6379)CHANNEL_LAYERS = {  "default": {    "BACKEND": "channels_redis.core.RedisChannelLayer",    "CONFIG": {      "hosts": [(REDIS_HOST, REDIS_PORT)],    },  },}

代码

apps/consumers.py

新建一个消费处理

实现: 默认连接加入组,发送信息时的处理。

from channels.generic.websocket import WebsocketConsumerclass MyConsumer(WebsocketConsumer):  def connect(self):    """    每个任务作为一个频道    默认进入对应任务执行频道    """    self.job_name = self.scope['url_route']['kwargs']['job_name']    self.job_group_name = 'job_%s' % self.job_name    async_to_sync(self.channel_layer.group_add)(      self.job_group_name,      self.channel_name    )    self.accept()  def disconnect(self, close_code):    async_to_sync(self.channel_layer.group_discard)(      self.job_group_name,      self.channel_name    )  # job.message类型处理  def job_message(self, event):    # 默认发送收到信息    self.send(text_data=event["text"])

apps/routing.py

ws类型路由

实现:ws/job/<job_name>由 MyConsumer 去处理。

from . import consumersfrom django.urls import pathfrom channels.routing import ProtocolTypeRouter, URLRouterfrom channels.sessions import SessionMiddlewareStackapplication = ProtocolTypeRouter({  'websocket': SessionMiddlewareStack(    URLRouter(     [       path('ws/job/<str:job_name>', consumers.MyConsumer)     ]    )  ),})

apps/views.py

在执行命令中获取 webSocket 消费通道,进行异步推送

  • 使用异步推送async_to_sync是因为在连接的时候采用的异步连接,所以推送必须采用异步推送。
  • 因为执行任务时间过长,启动触发运行时加入多线程,直接先返回ok,后端运行任务。
from subprocess import Popen,PIPEimport threadingdef runPopen(job):  """  执行命令,返回popen  """  path = os.path  Path = path.abspath(path.join(BASE_DIR, path.pardir))  script_path = path.abspath(path.join(Path,'run.sh'))  cmd = "sh %s %s" % (script_path, job)  return Popen(cmd, shell=True, stdout=PIPE, stderr=PIPE)def runScript(job):  channel_layer = get_channel_layer()  group_name = "job_%s" % job  popen = runPopen(job)  while True:    output = popen.stdout.readline()    if output == '' and popen.poll() is not None:      break    if output:      output_text = str(output.strip())      async_to_sync(        channel_layer.group_send        )(          group_name,           {"type": "job.message", "text": output_text}        )    else:      err = popen.stderr.readline()      err_text = str(err.strip())      async_to_sync(        channel_layer.group_send        )(          group_name,          {"type": "job.message", "text": err_text}        )      breakclass StartJob(APIView):   def get(self, request, job=None):    run = threading.Thread(target=runScript, args=(job,))    run.start()    return HttpResponse('ok')

apps/urls.py

get请求就能启动任务

urlpatterns = [        ...  path('start_job/<str:job>', StartJob.as_view())]

前端实现

所需软件包

vue-native-websocket 

代码实现

plugins/vueNativeWebsocket.js

import Vue from 'vue'import VueNativeSock from '../utils/socket/Main.js'export default function ({ store }) { Vue.use(VueNativeSock, 'http://localhost:8000/ws/job', {connectManually: true,});}

nuxt.config.js

配置文件引入, 这里我使用的是 nuxt 框架

 plugins: [    {     src: '@/plugins/vueNativeWebsocket.js',     ***: false    },  ],

封装 socket

export default (connection_url, option) => {  // 事件  let event = ['message', 'close', 'error', 'open'];  // 拷贝选项字典  let opts = Object.assign({}, option);  // 定义实例字典  let instance = {   // socket实例   socket: '',   // 是否连接状态   is_conncet: false,   // 具体连接方法   connect: function() {    if(connection_url) {     let scheme = window.location.protocol === 'https:' ? 'wss' : 'ws'     connection_url = scheme + '://' + connection_url.split('://')[1];     this.socket = new WebSocket(connection_url);     this.initEvent();    }else{     console.log('wsurl空');    }   },   // 初始化事件   initEvent: function() {    for(let i = 0; i < event.length; i++){     this.addListener(event[i]);    }   },   // 判断事件   addListener: function(event) {    this.socket.addEventListener(event, (e) => {     switch(event){      case 'open':       this.is_conncet = true;       break;      case 'close':       this.is_conncet = false;       break;     }     typeof opts[event] == 'function' && opts[event](e);    });   },   // 发送方法,失败则回调   send: function(data, closeCallback) {    console.log('socket ---> ' + data)    if(this.socket.readyState >= 2) {     console.log('ws已经关闭');     closeCallback && closeCallback();    }else{     this.socket.send(data);    }   }  };  // 调用连接方法  instance.connect();  return instance; }

index.vue

具体代码

x2Str 方法,因为后端返回的是bytes,格式 b'xxx' ,编写了方法对其进行转换。

<template>    <div>        <el-button type="primary" @click="runFunction" >执行</el-button>        <el-button type="primary" @click="connectWebSock" >显示</el-button>  <div class="socketView">   <span v-for="i in socketMessage" :key="i">{{i}}</span>  </div> </div></template><script> import R from '@/plugins/axios'; import ws from '@/plugins/socket' export default {  data() {   return {    webSocket: '',    socketMessage: [],   }  },    methods: {     // 打开连接的处理   openSocket(e) {    if (e.isTrusted) {     const h = this.$createElement;     this.$notify({      title: '提示',      message: h('i', { style: 'color: teal'}, '已建立Socket连接')     });    }   },  // 连接时的处理  listenSocket(e) {   if (e.data){    this.socketMessage.push(this.x2Str(e.data))   }  },  // 连接webSocket        connectWebSock() {   let wsuri = process.env.BACKEND_URL + '/ws/job/' + this.selectFunctions   this.webSocket = ws(wsuri, {    open: e => this.openSocket(e),    message: e => this.listenSocket(e),    close: e => this.closeSocket(e)   })  },     // 转码  x2Str(str) {   if (str) {    let reg = new RegExp("(?<=^b').*(?='$)")    let result = str.replace(/(?://x[/da-fA-F]{2})+/g, m =>     decodeURIComponent(m.replace(///x/g, '%'))    )    return reg.exec(result)[0]   }  },  // 执行方法  runFunction() {   R.myRequest('GET','api/start_job/' + this.selectFunctions, {}, {}).then((response) => {    if (response.hasOwnProperty('response')){      this.$message({      type: 'error',      message: '服务端返回错误,返回码:' + response.response.status       });    };     if (response.data == 'ok') {      this.$message({       type: 'success',       message: '开始执行[' + this.selectFunctions + ']'      });    }   });  }     }}</script>

至此,实现前后端 websocket 通讯。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持武林网。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表