首页 > 编程 > Python > 正文

python利用微信公众号实现报警功能

2020-02-15 21:44:22
字体:
来源:转载
供稿:网友

微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。

其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。

而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。

为兼容这两种方式,因此按照订阅号的方式处理。

 处理办法与接口文档中的要求相同:

为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。

而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。

 下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数

据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。

设计思路和使用方法:

自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据

尽可能的保证Class中每一个可执行的函数单独调用都能成功。

Class中只将真正能被用到的方法和变量设置为public的。

使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使

用WeiXinTokenClass().get()方法即可得到access_token。

#!/usr/bin/python# encoding: utf-8# -*- coding: utf8 -*-#Python学习群125240963每天更新资料,包括2018最新企业级项目案例,同千人一起交流。import osimport sqlite3import sysimport urllibimport urllib2import jsonimport datetime# import timeenable_debug = Truedef debug(msg, code=None):  if enable_debug:    if code is None:      print "message: %s" % msg    else:      print "message: %s, code: %s " % (msg, code)AUTHOR_MAIL = "uberurey_ups@163.com"weixin_qy_CorpID = "your_corpid"weixin_qy_Secret = "your_secret"# Build paths inside the project like this: os.path.join(BASE_DIR, ...)BASE_DIR = os.path.dirname(os.path.abspath(__file__))# Database# https://docs.djangoproject.com/en/1.9/ref/settings/#databasesDATABASES = {  'default': {    'ENGINE': 'db.backends.sqlite3',    'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'),  }}sqlite3_db_file = str(DATABASES['default']['NAME'])def sqlite3_conn(database):  try:    conn = sqlite3.connect(database)  except sqlite3.Error:    print >> sys.stderr, """/  There was a problem connecting to Database:    %s  The error leading to this problem was:    %s  It's possible that this database is broken or permission denied.  If you cannot solve this problem yourself, please mail to:    %s  """ % (database, sys.exc_value, AUTHOR_MAIL)    sys.exit(1)  else:    return conndef sqlite3_commit(conn):  return conn.commit()def sqlite3_close(conn):  return conn.close()def sqlite3_execute(database, sql):  try:    sql_conn = sqlite3_conn(database)    sql_cursor = sql_conn.cursor()    sql_cursor.execute(sql)    sql_conn.commit()    sql_conn.close()  except sqlite3.Error as e:    print e    sys.exit(1)def sqlite3_create_table_token():  sql_conn = sqlite3_conn(sqlite3_db_file)  sql_cursor = sql_conn.cursor()  sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (        "id" INTEGER ,        "access_token" TEXT,        "expires_in" TEXT,        "expires_on" TEXT,        "is_expired" INTEGER        )        ;  ''')  sqlite3_commit(sql_conn)  sqlite3_close(sql_conn)def sqlite3_create_table_account():  sql_conn = sqlite3_conn(sqlite3_db_file)  sql_cursor = sql_conn.cursor()  sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (        "id" INTEGER,        "name" TEXT,        "corpid" TEXT,        "secret" TEXT,        "current" INTEGER        )        ;  ''')  sqlite3_commit(sql_conn)  sqlite3_close(sql_conn)def sqlite3_create_tables():  print "sqlite3_create_tables"  sql_conn = sqlite3_conn(sqlite3_db_file)  sql_cursor = sql_conn.cursor()  sql_cursor.execute('''CREATE TABLE "main"."weixin_token" (        "id" INTEGER ,        "access_token" TEXT,        "expires_in" TEXT,        "expires_on" TEXT,        "is_expired" INTEGER        )        ;  ''')  sql_cursor.execute('''CREATE TABLE "main"."weixin_account" (        "id" INTEGER,        "name" TEXT,        "corpid" TEXT,        "secret" TEXT,        "current" INTEGER        )        ;  ''')  sqlite3_commit(sql_conn)  sqlite3_close(sql_conn)def sqlite3_set_credential(corpid, secret):  try:    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES                (1,                'odbp',                ?,                ?,                1)''', (corpid, secret))    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)  except sqlite3.Error:    sqlite3_create_table_account()    sqlite3_set_credential(corpid, secret)def sqlite3_set_token(access_token, expires_in, expires_on, is_expired):  try:    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    sql_cursor.execute('''INSERT INTO "weixin_token"               ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES               (               1,               ?,               ?,               ?,               ?               )''', (access_token, expires_in, expires_on, is_expired))    sqlite3_commit(sql_conn)    sqlite3_close(sql_conn)  except sqlite3.Error:    sqlite3_create_table_token()    sqlite3_set_token(access_token, expires_in, expires_on, is_expired)def sqlite3_get_credential():  try:    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    credential = sql_cursor.execute('''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''')    result = credential.fetchall()    sqlite3_close(sql_conn)  except sqlite3.Error:    sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret)    return sqlite3_get_credential()  else:    if result is not None and len(result) != 0:      return result    else:      print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL      sys.exit(1)def sqlite3_get_token():  try:    sql_conn = sqlite3_conn(sqlite3_db_file)    sql_cursor = sql_conn.cursor()    credential = sql_cursor.execute(      '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''')    result = credential.fetchall()    sqlite3_close(sql_conn)  except sqlite3.Error:    info = sys.exc_info()    print info[0], ":", info[1]  else:    if result is not None and len(result) != 0:      return result    else:      # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL      # sys.exit(1)      return Nonedef sqlite3_update_token(access_token, expires_on):  sql_conn = sqlite3_conn(sqlite3_db_file)  sql_cursor = sql_conn.cursor()  sql_cursor.execute('''UPDATE "weixin_token" SET             access_token=?,             expires_on=?             WHERE _ROWID_ = 1;''', (access_token, expires_on)            )  sqlite3_commit(sql_conn)  sqlite3_close(sql_conn)class WeiXinTokenClass(object):  def __init__(self):    self.__corpid = None    self.__corpsecret = None    self.__use_persistence = True    self.__access_token = None    self.__expires_in = None    self.__expires_on = None    self.__is_expired = None    if self.__use_persistence:      self.__corpid = sqlite3_get_credential()[0][0]      self.__corpsecret = sqlite3_get_credential()[0][1]    else:      self.__corpid = weixin_qy_CorpID      self.__corpsecret = weixin_qy_Secret  def __get_token_from_weixin_qy_api(self):    parameters = {      "corpid": self.__corpid,      "corpsecret": self.__corpsecret    }    url_parameters = urllib.urlencode(parameters)    token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?"    url = token_url + url_parameters    response = urllib2.urlopen(url)    result = response.read()    token_json = json.loads(result)    if token_json['access_token'] is not None:      get_time_now = datetime.datetime.now()      # TODO(Guodong Ding) token will expired ahead of time or not expired after the time      expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in'])      token_json['expires_on'] = str(expire_time)      self.__access_token = token_json['access_token']      self.__expires_in = token_json['expires_in']      self.__expires_on = token_json['expires_on']      self.__is_expired = 1      try:        token_result_set = sqlite3_get_token()      except sqlite3.Error:        token_result_set = None      if token_result_set is None and len(token_result_set) == 0:        sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired)      else:        if self.__is_token_expired() is True:          sqlite3_update_token(self.__access_token, self.__expires_on)        else:          debug("pass")          return    else:      if token_json['errcode'] is not None:        print "errcode is: %s" % token_json['errcode']        print "errmsg is: %s" % token_json['errmsg']      else:        print result  def __get_token_from_persistence_storage(self):    try:      token_result_set = sqlite3_get_token()    except sqlite3.Error:      self.__get_token_from_weixin_qy_api()    finally:      if token_result_set is None:        self.__get_token_from_weixin_qy_api()        token_result_set = sqlite3_get_token()        access_token = token_result_set[0][0]        expire_time = token_result_set[0][1]      else:        access_token = token_result_set[0][0]        expire_time = token_result_set[0][1]    expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')    now_time = datetime.datetime.now()    if now_time < expire_time:      # print "The token is %s" % access_token      # print "The token will expire on %s" % expire_time      return access_token    else:      self.__get_token_from_weixin_qy_api()      return self.__get_token_from_persistence_storage()  @staticmethod  def __is_token_expired():    try:      token_result_set = sqlite3_get_token()    except sqlite3.Error as e:      print e      sys.exit(1)    expire_time = token_result_set[0][1]    expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f')    now_time = datetime.datetime.now()    if now_time < expire_time:      return False    else:      return True  def get(self):    return self.__get_token_from_persistence_storage()            
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表