Oracle Client 安装
日期:2019年8月1日
公司已经安装好Oracle服务端
Windows版本:Windows10专业版
系统类型:64位操作系统,基于x64的处理器
Python版本:Python 3.6.4 :: Anaconda, Inc.
https://www.oracle.com/database/technologies/instant-client/downloads.html
解压后(这里放D盘)
控制面板/系统和安全/系统 -> 高级系统设置 -> 环境变量
新建ORACLE_HOME,值为包解压的路径
编辑PATH,添加%ORACLE_HOME%
Navicat连接测试
安装命令
conda install cx_Oracle
基础代码
import cx_Oracledef execute(query): db = cx_Oracle.connect('用户名/密码@IP/ServiceName') cursor = db.cursor() cursor.execute(query) result = cursor.fetchall() cursor.close() db.close() return resultdef commit(sql): db = cx_Oracle.connect('用户名/密码@IP/ServiceName') cursor = db.cursor() cursor.execute(sql) db.commit() cursor.close() db.close()
封装成类
from cx_Oracle import Connection # conda install cx_Oraclefrom conf import CONN, Colorclass Oracle(Color): def __init__(self, conn=CONN): self.db = Connection(*conn, encoding='utf8') # 用户名 密码 IP/ServiceName self.cursor = self.db.cursor() def __del__(self): self.cursor.close() self.db.close() def commit(self, sql): try: self.cursor.execute(sql) self.db.commit() except Exception as e: self.red(e) def fetchall(self, query): self.cursor.execute(query) return self.cursor.fetchall() def fetchone(self, query, n=9999999): self.cursor.execute(query) for _ in range(n): one = self.cursor.fetchone() if one: yield one def fetchone_dt(self, query, n=9999999): self.cursor.execute(query) columns = [i[0] for i in self.cursor.description] length = len(columns) for _ in range(n): one = self.cursor.fetchone() # tuple yield {columns[i]: one[i] for i in range(length)} def read_clob(self, query): self.cursor.execute(query) one = self.cursor.fetchone() while one: try: yield one[0].read() except Exception as e: self.red(e) one = self.cursor.fetchone() def db2sheet(self, query, prefix): df = pd.read_sql_query(query, self.db) if 'url' in df.columns: df['url'] = "'" + df['url'] df.to_excel(prefix.replace('.xlsx', '')+'.xlsx', index=False) def db2sheets(self, queries, prefix): writer = pd.ExcelWriter(prefix.replace('.xlsx', '')+'.xlsx') for sheet_name, query in queries.items(): df = pd.read_sql_query(query, self.db) if 'url' in df.columns: df['url'] = "'" + df['url'] df.to_excel(writer, sheet_name=sheet_name, index=False) writer.save() def tb2sheet(self, table): sql = "SELECT * FROM " + table self.db2sheet(sql, table) def insert(self, dt, tb): for k, v in dt.items(): if isinstance(v, str): dt[k] = v.replace("'", '').strip() ls = [(k, v) for k, v in dt.items() if v is not None] sql = 'INSERT INTO %s (' % tb + ','.join(i[0] for i in ls) + / ') VALUES (' + ','.join('%r' % i[1] for i in ls) + ')' self.commit(sql) def insert_clob(self, dt, tb, clob): for k, v in dt.items(): if isinstance(v, str): dt[k] = v.replace("'", '').strip() # 把超长文本保存在一个变量中 # declare = "DECLARE variate CLOB := '%s';/n" % dt[clob] join = lambda x: '||'.join("'%s'" % x[10922*i: 10922*(i+1)] for i in range(len(x)//10922+1)) # 32768//3 declare = "DECLARE variate CLOB := %s;/n" % join(dt[clob]) dt[clob] = 'variate' ls = [(k, v) for k, v in dt.items() if v is not None] sql = 'INSERT INTO %s (' % tb + ','.join(i[0] for i in ls) + ') VALUES (' +/ ','.join('%r' % i[1] for i in ls) + ');' sql = declare + 'BEGIN/n%s/nEND;' % sql.replace("'variate'", 'variate') self.commit(sql) def update(self, dt_update, dt_condition, table): sql = 'UPDATE %s SET ' % table + ','.join('%s=%r' % (k, v) for k, v in dt_update.items()) / + ' WHERE ' + ' AND '.join('%s=%r' % (k, v) for k, v in dt_condition.items()) self.commit(sql) def truncate(self, tb): self.commit('truncate table ' + tb)db_read = Oracle()fetchall = db_read.fetchallfetchone = db_read.fetchoneread_clob = db_read.read_clobif __name__ == '__main__': query = ''' '''.strip() for i in fetchone(query, 99): print(i)
新闻热点
疑难解答