本文实例讲述了Python使用sqlalchemy模块连接数据库操作。分享给大家供大家参考,具体如下:
安装:
pip install sqlalchemy# 安装数据库驱动:pip install pymysqlpip install cx_oracle
举例:(在url后面加入?charset=utf8可以防止乱码)
from sqlalchemy import create_engineengine=create_engine('mysql+pymysql://username:password@hostname:port/dbname', echo=True) #echo=True 打印sql语句信息
create_engine
接受一个url,格式为:
# '数据库类型+数据库驱动名称://用户名:口令@机器地址:端口号/数据库名'# 常用的engine = create_engine('sqlite:///:memory:', echo=True) # sqlite内存engine = create_engine('sqlite:///./cnblogblog.db',echo=True) # sqlite文件engine = create_engine("mysql+pymysql://username:password@hostname:port/dbname",echo=True) # mysql+pymysqlengine = create_engine('mssql+pymssql://username:password@hostname:port/dbname',echo=True) # mssql+pymssqlengine = create_engine('postgresql://scott:tiger@hostname:5432/dbname') # postgresql示例engine = create_engine('oracle://scott:tiger@hostname:1521/sidname') # oracleengine = create_engine('oracle+cx_oracle://scott:tiger@tnsname') #pdb就可以用tns连接
简单demo:
from sqlalchemy import create_engine, Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy.ext.declarative import declarative_base engine = create_engine('oracle://spark:a@orclpdb',echo=True) #echo要求打印sql语句等调试信息session_maker = sessionmaker(bind=engine)session = session_maker()Base = declarative_base()#对应一张表class Student(Base): __tablename__ = 'STUDENT' id = Column('STUID', Integer, primary_key=True) name = Column('STUNAME', String(32), nullable=False) age = Column('STUAGE', Integer) def __repr__(self): return '<Student(id:%s, name:%s, age:%s)>' % (self.id, self.name, self.age)Base.metadata.create_all(engine) #若存在STUDENT表则不做,不存在则创建。queryObject = session.query(Student).order_by(Student.id.desc())for ins in queryObject: print(ins.id, ins.name, ins.age)'''4 hey 243 lwtxxs 272 gyb 891 ns 23'''
将查询结果映射为DataFrame:
import pandas as pddf = pd.read_sql(session.query(Student).filter(Student.id > 1).statement, engine) print(df)''' STUID STUNAME STUAGE0 4 hey 241 2 gyb 892 3 lwtxxs 27'''
查询:
session的query方法除了可以接受Base子类对象作为参数外,还可以是字段,如:
query = session.query(Student.name, Student.age) # query为一个sqlalchemy.orm.query.Query对象for stu_name, stu_age in query: print(stu_name, stu_age)
查询条件filter:
# = / likequery.filter(Student.name == 'wendy')query.filter(Student.name.like('%ed%'))# inquery.filter(Student.name.in_(['wendy', 'jack']))query.filter(Student.name.in_( session.query(User.name).filter(User.name.like('%ed%'))))# not inquery.filter(~Student.name.in_(['ed', 'wendy', 'jack']))# is null / is not nullquery.filter(Student.name == None)query.filter(Student.name.is_(None))query.filter(Student.name != None)query.filter(Student.name.isnot(None))# andfrom sqlalchemy import and_, or_query.filter(and_(Student.name == 'ed', Student.age != 23))query.filter(Student.name == 'ed', Student.age != 23)query.filter(Student.name == 'ed').filter(Student.age != 23)# orquery.filter(or_(Student.name == 'ed', Student.name == 'wendy'))# matchquery.filter(Student.name.match('wendy'))
Query的方法:
all()
方法以列表形式返回结果集:
from sqlalchemy import or_, and_queryObject = session.query(Student).filter(or_(Student.id == 1, Student.id == 2))print(queryObject.all()) # [<Student(id:1, name:ns, age:23)>, <Student(id:2, name:gyb, age:89)>]queryObject = session.query(Student.name).filter(or_(Student.id == 1, Student.id == 2))print(queryObject.all()) # [('ns',), ('gyb',)]
first()
方法返回单个结果。(若结果集为空则返回None)
print(queryObject.first()) # ('ns',)
one()
方法返回单个结果,与first()
方法不同的是:当结果集中没有元素或有多于一个元素会抛出异常。
one_or_none()
方法同one()
一样,不同是结果集为空则返回None,为多个抛出异常。
查询数量:
from sqlalchemy import funcsession.query(func.count(Student.id)).scalar() # SELECT count("STUDENT"."STUID") AS count_1 FROM "STUDENT"
分组:
session.query(func.count(Student.id), Student.name).group_by(Student.name).all()
嵌套SQL语句:
from sqlalchemy import textquery = session.query(Student.id, Student.name).filter(text('stuid>2'))query = session.query('stuid', 'stuname', 'stuage').from_statement(/text("select * from student where stuname=:stuname")).params(stuname='hey').all() #[(4, 'hey', 24)]
希望本文所述对大家Python程序设计有所帮助。
注:相关教程知识阅读请移步到python教程频道。