微信公众号共有三种,服务号、订阅号、企业号。它们在获取AccessToken上各有不同。
其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效。
而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果。
为兼容这两种方式,因此按照订阅号的方式处理。
处理办法与接口文档中的要求相同:
为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器。
而其他业务逻辑服务器所使用的access_token均来自于该中控服务器,不应该各自去刷新,否则会造成access_token覆盖而影响业务。
下面的代码以企业号为例,将access_token储存在sqlite3数据库中,相比储存在文本中,放在数
据库里,可以为后期存放其他数据提供向后兼容。如果放在文本中,则不如放在数据库中灵活。
设计思路和使用方法:
自动创建sqlite3数据库,包括表结构和数据,并能在数据库表结构不存在或者数据不存在或遭删除的情况下,创建新的可用的数据
尽可能的保证Class中每一个可执行的函数单独调用都能成功。
Class中只将真正能被用到的方法和变量设置为public的。
使用时只需要修改此文件中的weixin_qy_CorpID和weixin_qy_Secret改成自己的,并import此文件,使
用WeiXinTokenClass().get()方法即可得到access_token。
#!/usr/bin/python# encoding: utf-8# -*- coding: utf8 -*-#Python学习群125240963每天更新资料,包括2018最新企业级项目案例,同千人一起交流。import osimport sqlite3import sysimport urllibimport urllib2import jsonimport datetime# import timeenable_debug = Truedef debug(msg, code=None): if enable_debug: if code is None: print "message: %s" % msg else: print "message: %s, code: %s " % (msg, code)AUTHOR_MAIL = "uberurey_ups@163.com"weixin_qy_CorpID = "your_corpid"weixin_qy_Secret = "your_secret"# Build paths inside the project like this: os.path.join(BASE_DIR, ...)BASE_DIR = os.path.dirname(os.path.abspath(__file__))# Database# https://docs.djangoproject.com/en/1.9/ref/settings/#databasesDATABASES = { 'default': { 'ENGINE': 'db.backends.sqlite3', 'NAME': os.path.join(BASE_DIR, '.odbp_db.sqlite3'), }}sqlite3_db_file = str(DATABASES['default']['NAME'])def sqlite3_conn(database): try: conn = sqlite3.connect(database) except sqlite3.Error: print >> sys.stderr, """/ There was a problem connecting to Database: %s The error leading to this problem was: %s It's possible that this database is broken or permission denied. If you cannot solve this problem yourself, please mail to: %s """ % (database, sys.exc_value, AUTHOR_MAIL) sys.exit(1) else: return conndef sqlite3_commit(conn): return conn.commit()def sqlite3_close(conn): return conn.close()def sqlite3_execute(database, sql): try: sql_conn = sqlite3_conn(database) sql_cursor = sql_conn.cursor() sql_cursor.execute(sql) sql_conn.commit() sql_conn.close() except sqlite3.Error as e: print e sys.exit(1)def sqlite3_create_table_token(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn)def sqlite3_create_table_account(): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn)def sqlite3_create_tables(): print "sqlite3_create_tables" sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''CREATE TABLE "main"."weixin_token" ( "id" INTEGER , "access_token" TEXT, "expires_in" TEXT, "expires_on" TEXT, "is_expired" INTEGER ) ; ''') sql_cursor.execute('''CREATE TABLE "main"."weixin_account" ( "id" INTEGER, "name" TEXT, "corpid" TEXT, "secret" TEXT, "current" INTEGER ) ; ''') sqlite3_commit(sql_conn) sqlite3_close(sql_conn)def sqlite3_set_credential(corpid, secret): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''INSERT INTO "weixin_account" ("id", "name", "corpid", "secret", "current") VALUES (1, 'odbp', ?, ?, 1)''', (corpid, secret)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_account() sqlite3_set_credential(corpid, secret)def sqlite3_set_token(access_token, expires_in, expires_on, is_expired): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''INSERT INTO "weixin_token" ("id", "access_token", "expires_in", "expires_on", "is_expired") VALUES ( 1, ?, ?, ?, ? )''', (access_token, expires_in, expires_on, is_expired)) sqlite3_commit(sql_conn) sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_create_table_token() sqlite3_set_token(access_token, expires_in, expires_on, is_expired)def sqlite3_get_credential(): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute('''SELECT "corpid", "secret" FROM weixin_account WHERE current == 1;''') result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: sqlite3_set_credential(weixin_qy_CorpID, weixin_qy_Secret) return sqlite3_get_credential() else: if result is not None and len(result) != 0: return result else: print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL sys.exit(1)def sqlite3_get_token(): try: sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() credential = sql_cursor.execute( '''SELECT "access_token", "expires_on" FROM weixin_token WHERE "is_expired" == 1 ;''') result = credential.fetchall() sqlite3_close(sql_conn) except sqlite3.Error: info = sys.exc_info() print info[0], ":", info[1] else: if result is not None and len(result) != 0: return result else: # print "unrecoverable problem, please alter to %s" % AUTHOR_MAIL # sys.exit(1) return Nonedef sqlite3_update_token(access_token, expires_on): sql_conn = sqlite3_conn(sqlite3_db_file) sql_cursor = sql_conn.cursor() sql_cursor.execute('''UPDATE "weixin_token" SET access_token=?, expires_on=? WHERE _ROWID_ = 1;''', (access_token, expires_on) ) sqlite3_commit(sql_conn) sqlite3_close(sql_conn)class WeiXinTokenClass(object): def __init__(self): self.__corpid = None self.__corpsecret = None self.__use_persistence = True self.__access_token = None self.__expires_in = None self.__expires_on = None self.__is_expired = None if self.__use_persistence: self.__corpid = sqlite3_get_credential()[0][0] self.__corpsecret = sqlite3_get_credential()[0][1] else: self.__corpid = weixin_qy_CorpID self.__corpsecret = weixin_qy_Secret def __get_token_from_weixin_qy_api(self): parameters = { "corpid": self.__corpid, "corpsecret": self.__corpsecret } url_parameters = urllib.urlencode(parameters) token_url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken?" url = token_url + url_parameters response = urllib2.urlopen(url) result = response.read() token_json = json.loads(result) if token_json['access_token'] is not None: get_time_now = datetime.datetime.now() # TODO(Guodong Ding) token will expired ahead of time or not expired after the time expire_time = get_time_now + datetime.timedelta(seconds=token_json['expires_in']) token_json['expires_on'] = str(expire_time) self.__access_token = token_json['access_token'] self.__expires_in = token_json['expires_in'] self.__expires_on = token_json['expires_on'] self.__is_expired = 1 try: token_result_set = sqlite3_get_token() except sqlite3.Error: token_result_set = None if token_result_set is None and len(token_result_set) == 0: sqlite3_set_token(self.__access_token, self.__expires_in, self.__expires_on, self.__is_expired) else: if self.__is_token_expired() is True: sqlite3_update_token(self.__access_token, self.__expires_on) else: debug("pass") return else: if token_json['errcode'] is not None: print "errcode is: %s" % token_json['errcode'] print "errmsg is: %s" % token_json['errmsg'] else: print result def __get_token_from_persistence_storage(self): try: token_result_set = sqlite3_get_token() except sqlite3.Error: self.__get_token_from_weixin_qy_api() finally: if token_result_set is None: self.__get_token_from_weixin_qy_api() token_result_set = sqlite3_get_token() access_token = token_result_set[0][0] expire_time = token_result_set[0][1] else: access_token = token_result_set[0][0] expire_time = token_result_set[0][1] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f') now_time = datetime.datetime.now() if now_time < expire_time: # print "The token is %s" % access_token # print "The token will expire on %s" % expire_time return access_token else: self.__get_token_from_weixin_qy_api() return self.__get_token_from_persistence_storage() @staticmethod def __is_token_expired(): try: token_result_set = sqlite3_get_token() except sqlite3.Error as e: print e sys.exit(1) expire_time = token_result_set[0][1] expire_time = datetime.datetime.strptime(expire_time, '%Y-%m-%d %H:%M:%S.%f') now_time = datetime.datetime.now() if now_time < expire_time: return False else: return True def get(self): return self.__get_token_from_persistence_storage()
新闻热点
疑难解答