本文实例讲述了flask 框架操作MySQL数据库。分享给大家供大家参考,具体如下:
一、创建数据库表格
"""Created on 19-10-8@requirement:Anaconda 4.3.0 (64-bit) Python3.6@description:创建表格"""import pymysqlserver = '127.0.0.1'user = 'root'password = 'password'# 连接数据库conn = pymysql.connect(server, user, password, database='information_collection') # 获取连接cursor = conn.cursor() # 获取游标# "**ENGINE=InnoDB DEFAULT CHARSET=utf8**"-创建表的过程中增加这条,中文就不是乱码# 创建表cursor.execute("""CREATE TABLE if not exists user( user_id INT NOT NULL auto_increment primary key, user_name VARCHAR(100), user_password VARCHAR(100), user_nickname VARCHAR(100), user_email VARCHAR(100) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 """)# 查询数据库表user内容cursor.execute('SELECT * FROM user')# 查看一行 多行:cursor.fetchall()row = cursor.fetchone()print(row)# if row[0] is None:# row0 = list(row)# row0[0] = 0# row = tuple(row0)# # 插入数据,注:与sqlserver有些区别cursor.execute("INSERT INTO user VALUES('%s','%s','%s','%s')" % ('xiaoming','qwe','ming','@163.com'))# 提交数据,才会写入表格conn.commit()# 关闭游标关闭数据库cursor.close()conn.close()
二、flask操作mysql
"""Created on 19-10-8@requirement:Anaconda 4.3.0 (64-bit) Python3.6@description:"""from flask_sqlalchemy import SQLAlchemyfrom flask import Flask, jsonify, requestimport configparserimport osapp = Flask(__name__)# 使用ConfigParser 首选需要初始化实例,并读取配置文件:my_config = configparser.ConfigParser()my_config.read('db.conf')# 连接数据库information_collectionapp.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or / "mysql+pymysql://root:password@127.0.0.1:3306/information_collection"app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Truemydb = SQLAlchemy()mydb.init_app(app)# 用户模型class User(mydb.Model): user_id = mydb.Column(mydb.Integer, primary_key=True) user_name = mydb.Column(mydb.String(60), nullable=False) user_password = mydb.Column(mydb.String(30), nullable=False) user_nickname = mydb.Column(mydb.String(50)) user_email = mydb.Column(mydb.String(30), nullable=False) def __repr__(self): return '<User %r>' % self.user_name# 获取用户列表,所有数据@app.route('/users', methods=['GET'])def getUsers(): data = User.query.all() datas = [] for user in data: datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}) return jsonify(data=datas)# 添加用户数据,一条一条添加@app.route('/user', methods=['POST'])def addUser(): user_name = request.form.get('user_name') user_password = request.form.get('user_password') user_nickname = request.form.get('user_nickname') user_email = request.form.get('user_email') user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email) try: mydb.session.add(user) mydb.session.commit() except: mydb.session.rollback() mydb.session.flush() userId = user.user_id if (user.user_id is None): result = {'msg': '添加失败'} return jsonify(data=result) data = User.query.filter_by(user_id=userId).first() result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email} return jsonify(data=result)# 获取单条数据@app.route('/user/<int:userId>', methods=['GET'])def getUser(userId): user = User.query.filter_by(user_id=userId).first() if (user is None): result = {'msg': '找不到数据'} else: result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email} return jsonify(data=result)# 修改用户数据@app.route('/user/<int:userId>', methods=['PATCH'])def updateUser(userId): user_name = request.form.get('user_name') user_password = request.form.get('user_password') user_nickname = request.form.get('user_nickname') user_email = request.form.get('user_email') try: user = User.query.filter_by(user_id=userId).first() if (user is None): result = {'msg': '找不到要修改的记录'} return jsonify(data=result) else: user.user_name = user_name user.user_password = user_password user.user_nickname = user_nickname user.user_email = user_email mydb.session.commit() except: mydb.session.rollback() # 回滚 mydb.session.flush() # 重置 userId = user.user_id data = User.query.filter_by(user_id=userId).first() result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email} return jsonify(data=result)# 删除用户数据@app.route('/user/<int:userId>', methods=['DELETE'])def deleteUser(userId): User.query.filter_by(user_id=userId).delete() mydb.session.commit() return getUsers()if __name__ == '__main__': app.run()
新闻热点
疑难解答