首页 > 编程 > Python > 正文

Python找出微信上删除你好友的人脚本写法

2020-01-04 14:10:00
字体:
来源:转载
供稿:网友

相信大家在微信上一定被上面的这段话刷过屏,群发消息应该算是微信上流传最广的找到删除好友的方法了。但群发消息不仅仅会把通讯录里面所有的好友骚扰一遍,而且你还得挨个删除好几百个聊天记录,回复大家的疑问和鄙视。作为一个互联网从业者,除了群发消息就不能有更高效、不打扰好友的方式么?

答案是当然有,本人的微:1613161916可以一起讨论python,还有许多Python资料可以发送。

微信在拉好友进群聊的时候,如果这个人删除了你好友的话,会提示你一下「请先发送朋友验证申请给某某,对方将你加为微信朋友后,你才能邀请其加入群聊。」有办法了,那我把微信好友拉一个大群里面,然后默默的删掉微信群不就好了么。

于是 Github 上就有一位叫 0x5e 的开发者写了这么一个 Python 脚本来代替你手动拉群和踢人。经过笔者测试,目前只能支持 Mac 的操作系统。在此之前,笔者在稀土掘金上先放出了Github的方法,这里再详细提下。

在 0x5e 的 Github 代码仓库描述里面,他具体的实现步骤是用 Python 脚本处理网页版微信的拉群、踢人操作。具体的步骤如下:

  • 下载代码文件 wdf.py
  • 打开 Terminal 输入:python 然后拖动刚才下载的 wdf.py 到 Terminal 窗后回车。命令格式类似: python wdf.py
  • 接下来按步骤扫码操作即可;

查询结果可能会引起一些心理上的不适,请小心使用。

笔者测试了两次,这个 Python 脚本还是有一些小问题,欢迎开发者们给原作者发 Pull Request 完善:

  • 两次结果稍有出入,可能是微信网页版返回数据的问题
  • 最终在微信会遗留一个只有自己的群组,需要手动删除
  • 此脚本暂时不支持查找被拉黑的情况

源码

#!/usr/bin/env python# coding=utf-8 from __future__ import print_function import osimport requestsimport reimport timeimport xml.dom.minidomimport jsonimport sysimport mathimport subprocessimport sslimport threading DEBUG = False MAX_GROUP_NUM = 2 # 每组人数INTERFACE_CALLING_INTERVAL = 5 # 接口调用时间间隔, 间隔太短容易出现"操作太频繁", 会被限制操作半小时左右MAX_PROGRESS_LEN = 50 QRImagePath = os.path.join(os.getcwd(), 'qrcode.jpg') tip = 0uuid = '' base_uri = ''redirect_uri = ''push_uri = '' skey = ''wxsid = ''wxuin = ''pass_ticket = ''deviceId = 'e000000000000000' BaseRequest = {} ContactList = []My = []SyncKey = [] try:  xrange  range = xrangeexcept:  # python 3  pass  def responseState(func, BaseResponse):  ErrMsg = BaseResponse['ErrMsg']  Ret = BaseResponse['Ret']  if DEBUG or Ret != 0:    print('func: %s, Ret: %d, ErrMsg: %s' % (func, Ret, ErrMsg))   if Ret != 0:    return False   return True   def getUUID():  global uuid   url = 'https://login.weixin.qq.com/jslogin'  params = {    'appid': 'wx782c26e4c19acffb',    'fun': 'new',    'lang': 'zh_CN',    '_': int(time.time()),  }   r= myRequests.get(url=url, params=params)  r.encoding = 'utf-8'  data = r.text   # print(data)   # window.QRLogin.code = 200; window.QRLogin.uuid = "oZwt_bFfRg==";  regx = r'window.QRLogin.code = (/d+); window.QRLogin.uuid = "(/S+?)"'  pm = re.search(regx, data)   code = pm.group(1)  uuid = pm.group(2)   if code == '200':    return True   return False  def showQRImage():  global tip   url = 'https://login.weixin.qq.com/qrcode/' + uuid  params = {    't': 'webwx',    '_': int(time.time()),  }   r = myRequests.get(url=url, params=params)   tip = 1   f = open(QRImagePath, 'wb')  f.write(r.content)  f.close()  time.sleep(1)   if sys.platform.find('darwin') >= 0:    subprocess.call(['open', QRImagePath])  elif sys.platform.find('linux') >= 0:    subprocess.call(['xdg-open', QRImagePath])  else:    os.startfile(QRImagePath)   print('请使用微信扫描二维码以登录')  def waitForLogin():  global tip, base_uri, redirect_uri, push_uri   url = 'https://login.weixin.qq.com/cgi-bin/mmwebwx-bin/login?tip=%s&uuid=%s&_=%s' % (    tip, uuid, int(time.time()))   r = myRequests.get(url=url)  r.encoding = 'utf-8'  data = r.text   # print(data)   # window.code=500;  regx = r'window.code=(/d+);'  pm = re.search(regx, data)   code = pm.group(1)   if code == '201': # 已扫描    print('成功扫描,请在手机上点击确认以登录')    tip = 0  elif code == '200': # 已登录    print('正在登录...')    regx = r'window.redirect_uri="(/S+?)";'    pm = re.search(regx, data)    redirect_uri = pm.group(1) + '&fun=new'    base_uri = redirect_uri[:redirect_uri.rfind('/')]     # push_uri与base_uri对应关系(排名分先后)(就是这么奇葩..)    services = [      ('wx2.qq.com', 'webpush2.weixin.qq.com'),      ('qq.com', 'webpush.weixin.qq.com'),      ('web1.wechat.com', 'webpush1.wechat.com'),      ('web2.wechat.com', 'webpush2.wechat.com'),      ('wechat.com', 'webpush.wechat.com'),      ('web1.wechatapp.com', 'webpush1.wechatapp.com'),    ]    push_uri = base_uri    for (searchUrl, pushUrl) in services:      if base_uri.find(searchUrl) >= 0:        push_uri = 'https://%s/cgi-bin/mmwebwx-bin' % pushUrl        break     # closeQRImage    if sys.platform.find('darwin') >= 0: # for OSX with Preview      os.system("osascript -e 'quit app /"Preview/"'")  elif code == '408': # 超时    pass  # elif code == '400' or code == '500':   return code  def login():  global skey, wxsid, wxuin, pass_ticket, BaseRequest   r = myRequests.get(url=redirect_uri)  r.encoding = 'utf-8'  data = r.text   # print(data)   doc = xml.dom.minidom.parseString(data)  root = doc.documentElement   for node in root.childNodes:    if node.nodeName == 'skey':      skey = node.childNodes[0].data    elif node.nodeName == 'wxsid':      wxsid = node.childNodes[0].data    elif node.nodeName == 'wxuin':      wxuin = node.childNodes[0].data    elif node.nodeName == 'pass_ticket':      pass_ticket = node.childNodes[0].data   # print('skey: %s, wxsid: %s, wxuin: %s, pass_ticket: %s' % (skey, wxsid,  # wxuin, pass_ticket))   if not all((skey, wxsid, wxuin, pass_ticket)):    return False   BaseRequest = {    'Uin': int(wxuin),    'Sid': wxsid,    'Skey': skey,    'DeviceID': deviceId,  }   return True  def webwxinit():   url = (base_uri +     '/webwxinit?pass_ticket=%s&skey=%s&r=%s' % (      pass_ticket, skey, int(time.time())) )  params = {'BaseRequest': BaseRequest }  headers = {'content-type': 'application/json; charset=UTF-8'}   r = myRequests.post(url=url, data=json.dumps(params),headers=headers)  r.encoding = 'utf-8'  data = r.json()   if DEBUG:    f = open(os.path.join(os.getcwd(), 'webwxinit.json'), 'wb')    f.write(r.content)    f.close()    # print(data)   global ContactList, My, SyncKey  dic = data  ContactList = dic['ContactList']  My = dic['User']  SyncKey = dic['SyncKey']   state = responseState('webwxinit', dic['BaseResponse'])  return state  def webwxgetcontact():   url = (base_uri +     '/webwxgetcontact?pass_ticket=%s&skey=%s&r=%s' % (      pass_ticket, skey, int(time.time())) )  headers = {'content-type': 'application/json; charset=UTF-8'}    r = myRequests.post(url=url,headers=headers)  r.encoding = 'utf-8'  data = r.json()   if DEBUG:    f = open(os.path.join(os.getcwd(), 'webwxgetcontact.json'), 'wb')    f.write(r.content)    f.close()   # print(data)   dic = data  MemberList = dic['MemberList']   # 倒序遍历,不然删除的时候出问题..  SpecialUsers = ["newsapp", "fmessage", "filehelper", "weibo", "qqmail", "tmessage", "qmessage", "qqsync", "floatbottle", "lbsapp", "shakeapp", "medianote", "qqfriend", "readerapp", "blogapp", "facebookapp", "masssendapp",          "meishiapp", "feedsapp", "voip", "blogappweixin", "weixin", "brandsessionholder", "weixinreminder", "wxid_novlwrv3lqwv11", "gh_22b87fa7cb3c", "officialaccounts", "notification_messages", "wxitil", "userexperience_alarm"]  for i in range(len(MemberList) - 1, -1, -1):    Member = MemberList[i]    if Member['VerifyFlag'] & 8 != 0: # 公众号/服务号      MemberList.remove(Member)    elif Member['UserName'] in SpecialUsers: # 特殊账号      MemberList.remove(Member)    elif Member['UserName'].find('@@') != -1: # 群聊      MemberList.remove(Member)    elif Member['UserName'] == My['UserName']: # 自己      MemberList.remove(Member)   return MemberList  def createChatroom(UserNames):  MemberList = [{'UserName': UserName} for UserName in UserNames]   url = (base_uri +     '/webwxcreatechatroom?pass_ticket=%s&r=%s' % (      pass_ticket, int(time.time())) )  params = {    'BaseRequest': BaseRequest,    'MemberCount': len(MemberList),    'MemberList': MemberList,    'Topic': '',  }  headers = {'content-type': 'application/json; charset=UTF-8'}   r = myRequests.post(url=url, data=json.dumps(params),headers=headers)  r.encoding = 'utf-8'  data = r.json()   # print(data)   dic = data  ChatRoomName = dic['ChatRoomName']  MemberList = dic['MemberList']  DeletedList = []  BlockedList = []  for Member in MemberList:    if Member['MemberStatus'] == 4: # 被对方删除了      DeletedList.append(Member['UserName'])    elif Member['MemberStatus'] == 3: # 被加入黑名单      BlockedList.append(Member['UserName'])   state = responseState('createChatroom', dic['BaseResponse'])   return ChatRoomName, DeletedList, BlockedList  def deleteMember(ChatRoomName, UserNames):  url = (base_uri +     '/webwxupdatechatroom?fun=delmember&pass_ticket=%s' % (pass_ticket) )  params = {    'BaseRequest': BaseRequest,    'ChatRoomName': ChatRoomName,    'DelMemberList': ','.join(UserNames),  }  headers = {'content-type': 'application/json; charset=UTF-8'}   r = myRequests.post(url=url, data=json.dumps(params),headers=headers)  r.encoding = 'utf-8'  data = r.json()   # print(data)   dic = data   state = responseState('deleteMember', dic['BaseResponse'])  return state  def addMember(ChatRoomName, UserNames):  url = (base_uri +     '/webwxupdatechatroom?fun=addmember&pass_ticket=%s' % (pass_ticket) )  params = {    'BaseRequest': BaseRequest,    'ChatRoomName': ChatRoomName,    'AddMemberList': ','.join(UserNames),  }  headers = {'content-type': 'application/json; charset=UTF-8'}   r = myRequests.post(url=url, data=json.dumps(params),headers=headers)  r.encoding = 'utf-8'  data = r.json()   # print(data)   dic = data  MemberList = dic['MemberList']  DeletedList = []  BlockedList = []  for Member in MemberList:    if Member['MemberStatus'] == 4: # 被对方删除了      DeletedList.append(Member['UserName'])    elif Member['MemberStatus'] == 3: # 被加入黑名单      BlockedList.append(Member['UserName'])   state = responseState('addMember', dic['BaseResponse'])   return DeletedList, BlockedList  def syncKey():  SyncKeyItems = ['%s_%s' % (item['Key'], item['Val'])          for item in SyncKey['List']]  SyncKeyStr = '|'.join(SyncKeyItems)  return SyncKeyStr  def syncCheck():  url = push_uri + '/synccheck?'  params = {    'skey': BaseRequest['Skey'],    'sid': BaseRequest['Sid'],    'uin': BaseRequest['Uin'],    'deviceId': BaseRequest['DeviceID'],    'synckey': syncKey(),    'r': int(time.time()),  }   r = myRequests.get(url=url,params=params)  r.encoding = 'utf-8'  data = r.text   # print(data)   # window.synccheck={retcode:"0",selector:"2"}  regx = r'window.synccheck={retcode:"(/d+)",selector:"(/d+)"}'  pm = re.search(regx, data)   retcode = pm.group(1)  selector = pm.group(2)   return selector  def webwxsync():  global SyncKey   url = base_uri + '/webwxsync?lang=zh_CN&skey=%s&sid=%s&pass_ticket=%s' % (    BaseRequest['Skey'], BaseRequest['Sid'], quote_plus(pass_ticket))  params = {    'BaseRequest': BaseRequest,    'SyncKey': SyncKey,    'rr': ~int(time.time()),  }  headers = {'content-type': 'application/json; charset=UTF-8'}   r = myRequests.post(url=url, data=json.dumps(params))  r.encoding = 'utf-8'  data = r.json()   # print(data)   dic = data  SyncKey = dic['SyncKey']   state = responseState('webwxsync', dic['BaseResponse'])  return state  def heartBeatLoop():  while True:    selector = syncCheck()    if selector != '0':      webwxsync()    time.sleep(1)  def main():  global myRequests   if hasattr(ssl, '_create_unverified_context'):    ssl._create_default_https_context = ssl._create_unverified_context   headers = {'User-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/44.0.2403.125 Safari/537.36'}  myRequests = requests.Session()  myRequests.headers.update(headers)    if not getUUID():    print('获取uuid失败')    return   print('正在获取二维码图片...')  showQRImage()   while waitForLogin() != '200':    pass   os.remove(QRImagePath)   if not login():    print('登录失败')    return   if not webwxinit():    print('初始化失败')    return   MemberList = webwxgetcontact()   print('开启心跳线程')  threading.Thread(target=heartBeatLoop)   MemberCount = len(MemberList)  print('通讯录共%s位好友' % MemberCount)   ChatRoomName = ''  result = []  d = {}  for Member in MemberList:    d[Member['UserName']] = (Member['NickName'], Member['RemarkName'])  print('开始查找...')  group_num = int(math.ceil(MemberCount / float(MAX_GROUP_NUM)))  for i in range(0, group_num):    UserNames = []    for j in range(0, MAX_GROUP_NUM):      if i * MAX_GROUP_NUM + j >= MemberCount:        break      Member = MemberList[i * MAX_GROUP_NUM + j]      UserNames.append(Member['UserName'])     # 新建群组/添加成员    if ChatRoomName == '':      (ChatRoomName, DeletedList, BlockedList) = createChatroom(        UserNames)    else:      (DeletedList, BlockedList) = addMember(ChatRoomName, UserNames)     # todo BlockedList 被拉黑列表     DeletedCount = len(DeletedList)    if DeletedCount > 0:      result += DeletedList     # 删除成员    deleteMember(ChatRoomName, UserNames)     # 进度条    progress = MAX_PROGRESS_LEN * (i + 1) / group_num    print('[', '#' * int(progress), '-' * int(MAX_PROGRESS_LEN - progress), ']', end=' ')    print('新发现你被%d人删除' % DeletedCount)    for i in range(DeletedCount):      if d[DeletedList[i]][1] != '':        print('%s(%s)' % (d[DeletedList[i]][0],d[DeletedList[i]][1]))      else:        print(d[DeletedList[i]][0])     if i != group_num - 1:      print('正在继续查找,请耐心等待...')      # 下一次进行接口调用需要等待的时间      time.sleep(INTERFACE_CALLING_INTERVAL)  # todo 删除群组   print('/n结果汇总完毕,20s后可重试...')  resultNames = []  for r in result:    if d[r][1] != '':      resultNames.append('%s(%s)' % (d[r][0],d[r][1]))    else:      resultNames.append(d[r][0])   print('---------- 被删除的好友列表(共%d人) ----------' % len(result))  # 过滤emoji  resultNames = list(map(lambda x: re.sub(r'<span.+/span>', '', x), resultNames))  if len(resultNames):    print('/n'.join(resultNames))  else:    print("无")  print('---------------------------------------------')  # windows下编码问题修复# http://blog.csdn.net/heyuxuanzee/article/details/8442718 class UnicodeStreamFilter:   def __init__(self, target):    self.target = target    self.encoding = 'utf-8'    self.errors = 'replace'    self.encode_to = self.target.encoding   def write(self, s):    if type(s) == str:      try:        s = s.decode('utf-8')      except:        pass    s = s.encode(self.encode_to, self.errors).decode(self.encode_to)    self.target.write(s) if sys.stdout.encoding == 'cp936':  sys.stdout = UnicodeStreamFilter(sys.stdout) if __name__ == '__main__':   print('本程序的查询结果可能会引起一些心理上的不适,请小心使用...')  print('1小时内只能使用一次,否则会因操作繁忙阻止建群')  main()  print('回车键退出...')  input()


注:相关教程知识阅读请移步到python教程频道。
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表