首页 > 编程 > Python > 正文

flask 框架操作MySQL数据库简单示例

2020-02-15 21:27:48
字体:
来源:转载
供稿:网友

本文实例讲述了flask 框架操作MySQL数据库。分享给大家供大家参考,具体如下:

一、创建数据库表格

"""Created on 19-10-8@requirement:Anaconda 4.3.0 (64-bit) Python3.6@description:创建表格"""import pymysqlserver = '127.0.0.1'user = 'root'password = 'password'# 连接数据库conn = pymysql.connect(server, user, password, database='information_collection') # 获取连接cursor = conn.cursor() # 获取游标# "**ENGINE=InnoDB DEFAULT CHARSET=utf8**"-创建表的过程中增加这条,中文就不是乱码# 创建表cursor.execute("""CREATE TABLE if not exists user(   user_id INT NOT NULL auto_increment primary key,   user_name VARCHAR(100),   user_password VARCHAR(100),   user_nickname VARCHAR(100),   user_email VARCHAR(100)   )   ENGINE=InnoDB DEFAULT CHARSET=utf8 """)# 查询数据库表user内容cursor.execute('SELECT * FROM user')# 查看一行 多行:cursor.fetchall()row = cursor.fetchone()print(row)# if row[0] is None:#   row0 = list(row)#   row0[0] = 0#   row = tuple(row0)# # 插入数据,注:与sqlserver有些区别cursor.execute("INSERT INTO user VALUES('%s','%s','%s','%s')" % ('xiaoming','qwe','ming','@163.com'))# 提交数据,才会写入表格conn.commit()# 关闭游标关闭数据库cursor.close()conn.close()

二、flask操作mysql

"""Created on 19-10-8@requirement:Anaconda 4.3.0 (64-bit) Python3.6@description:"""from flask_sqlalchemy import SQLAlchemyfrom flask import Flask, jsonify, requestimport configparserimport osapp = Flask(__name__)# 使用ConfigParser 首选需要初始化实例,并读取配置文件:my_config = configparser.ConfigParser()my_config.read('db.conf')# 连接数据库information_collectionapp.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DEV_DATABASE_URL') or /               "mysql+pymysql://root:password@127.0.0.1:3306/information_collection"app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = Truemydb = SQLAlchemy()mydb.init_app(app)# 用户模型class User(mydb.Model):  user_id = mydb.Column(mydb.Integer, primary_key=True)  user_name = mydb.Column(mydb.String(60), nullable=False)  user_password = mydb.Column(mydb.String(30), nullable=False)  user_nickname = mydb.Column(mydb.String(50))  user_email = mydb.Column(mydb.String(30), nullable=False)  def __repr__(self):    return '<User %r>' % self.user_name# 获取用户列表,所有数据@app.route('/users', methods=['GET'])def getUsers():  data = User.query.all()  datas = []  for user in data:    datas.append({'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email})  return jsonify(data=datas)# 添加用户数据,一条一条添加@app.route('/user', methods=['POST'])def addUser():  user_name = request.form.get('user_name')  user_password = request.form.get('user_password')  user_nickname = request.form.get('user_nickname')  user_email = request.form.get('user_email')  user = User(user_name=user_name, user_password=user_password, user_nickname=user_nickname, user_email=user_email)  try:    mydb.session.add(user)    mydb.session.commit()  except:    mydb.session.rollback()    mydb.session.flush()  userId = user.user_id  if (user.user_id is None):    result = {'msg': '添加失败'}    return jsonify(data=result)  data = User.query.filter_by(user_id=userId).first()  result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}  return jsonify(data=result)# 获取单条数据@app.route('/user/<int:userId>', methods=['GET'])def getUser(userId):  user = User.query.filter_by(user_id=userId).first()  if (user is None):    result = {'msg': '找不到数据'}  else:    result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_nickname': user.user_nickname, 'user_email': user.user_email}  return jsonify(data=result)# 修改用户数据@app.route('/user/<int:userId>', methods=['PATCH'])def updateUser(userId):  user_name = request.form.get('user_name')  user_password = request.form.get('user_password')  user_nickname = request.form.get('user_nickname')  user_email = request.form.get('user_email')  try:    user = User.query.filter_by(user_id=userId).first()    if (user is None):      result = {'msg': '找不到要修改的记录'}      return jsonify(data=result)    else:      user.user_name = user_name      user.user_password = user_password      user.user_nickname = user_nickname      user.user_email = user_email      mydb.session.commit()  except:    mydb.session.rollback() # 回滚    mydb.session.flush() # 重置  userId = user.user_id  data = User.query.filter_by(user_id=userId).first()  result = {'user_id': user.user_id, 'user_name': user.user_name, 'user_password': user.user_password, 'user_nickname': user.user_nickname, 'user_email': user.user_email}  return jsonify(data=result)# 删除用户数据@app.route('/user/<int:userId>', methods=['DELETE'])def deleteUser(userId):  User.query.filter_by(user_id=userId).delete()  mydb.session.commit()  return getUsers()if __name__ == '__main__':  app.run()            
发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表