python 全栈开发,Day121(DButils,websocket)
DBUtils提供两种外部接口:
- PersistentDB :提供线程专用的数据库连接,并自动管理连接。
- PooledDB :提供线程间可共享的数据库连接,并自动管理连接。
安装DButils
安装DButils,它依赖于pymysql
pip install pymysql
pip install DBUtils
创建数据库连接池
首先保证你有一个MySQL服务器,并且已经启动了!已经有一个数据库以及表数据
之前我们要操作MySQL,使用pymsql
import pymysql conn = pymysql.connect(host="localhost",port=3306, user="root",password="",db="student", charset="utf8") cur = conn.cursor(pymysql.cursors.DictCursor) # 插入一条数据 sql = "insert into stu(name,age) VALUE ('%s','%s')" %("小甜甜",22) cur.execute(sql) conn.commit() cur.close() conn.close()
可以发现,每次操作数据库,都需要连接数据库。需要消耗链接时间!效率非常低!
新建文件utils.py,内容如下:
确保主机ip 127.0.0.1,数据库book,用户名root,密码为空,能够连接MySQL
import pymysql from DBUtils.PooledDB import PooledDB POOL = PooledDB( creator=pymysql, # 使用链接数据库的模块 maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数 mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建 maxcached=5, # 链接池中最多闲置的链接,0和None不限制 maxshared=1, # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。 blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错 maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制 setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."] ping=0, # ping MySQL服务端,检查是否服务可用。 # 如:0 = None = never, # 1 = default = whenever it is requested, # 2 = when a cursor is created, # 4 = when a query is executed, # 7 = always host='127.0.0.1', port=3306, user='root', password='', database='book', charset='utf8' )
maxconnections 最大连接数,不建议写0。可能会拖死服务器!
ping = 0 表示关闭服务检测。它会耗费服务器性能
注意:这些参数是必须要有的
creator=pymysql, host='127.0.0.1', port=3306, user='root', password='', database='book', charset='utf8'
使用数据库连接池
新建文件poolconn.py
确保book数据库已经创建了表student,并录入了数据
from utils import POOL import pymysql def func(): # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常 # 否则 # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。 # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。 # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。 # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。 conn = POOL.connection() # 从连接池POOL中拿出一个已经创建好的连接,一次只能拿一个 cursor = conn.cursor(pymysql.cursors.DictCursor) cursor.execute('select * from student') result = list(cursor.fetchall()) # 使用list效率是很低的,这里仅做测试 print(result) cursor.close() conn.close() func()
执行输出:
[{'id': 1, 'name': '韩雪', 'age': 24, 'gender': '女'}, {'id': 2, 'name': '舒畅', 'age': 23, 'gender': '女'}, {'id': 3, 'name': '唐嫣', 'age': 25, 'gender': '女'}]
封装mysqlhelp
为了方便操作MySQL,需要将增删改查操作。封装成一个类,方便程序调用!
注意:Python版本为3.6.5。只要是3.x版本都可以运行!
MyDbUtils.py
# -*- coding: UTF-8 -*- import pymysql from DBUtils.PooledDB import PooledDB import DB_config as Config # import MySQLdb ''' @功能:PT数据库连接池 ''' class PTConnectionPool(object): __pool = None # def __init__(self): # self.conn = self.__getConn() # self.cursor = self.conn.cursor() def __enter__(self): self.conn = self.__getConn() self.cursor = self.conn.cursor() print("PT数据库创建con和cursor") return self def __getConn(self): if self.__pool is None: self.__pool = PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED, maxcached=Config.DB_MAX_CACHED, maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS, blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE, setsession=Config.DB_SET_SESSION, host=Config.DB_TEST_HOST, port=Config.DB_TEST_PORT, user=Config.DB_TEST_USER, passwd=Config.DB_TEST_PASSWORD, db=Config.DB_TEST_DBNAME, use_unicode=Config.DB_USE_UNICODE, charset=Config.DB_CHARSET) return self.__pool.connection() """ @summary: 释放连接池资源 """ def __exit__(self, type, value, trace): self.cursor.close() self.conn.close() # print("PT连接池释放con和cursor") # 重连接池中取出一个连接 def getconn(self): conn = self.__getConn() # 设置返回数据为字典 cursor = conn.cursor(pymysql.cursors.DictCursor) return cursor, conn # 关闭连接归还给连接池 # def close(self): # self.cursor.close() # self.conn.close() # print u"PT连接池释放con和cursor" def getPTConnection(): return PTConnectionPool()
配置文件:DB_config.py
# -*- coding: UTF-8 -*- # TEST数据库信息 DB_TEST_HOST = "localhost" DB_TEST_PORT = 3306 DB_TEST_DBNAME = "book" DB_TEST_USER = "root" DB_TEST_PASSWORD = "" # 数据库连接编码 DB_CHARSET = "utf8" # mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接) DB_MIN_CACHED = 10 # maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小) DB_MAX_CACHED = 10 # maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用 DB_MAX_SHARED = 20 # maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制) DB_MAX_CONNECYIONS = 100 # blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配) DB_BLOCKING = True # maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开) DB_MAX_USAGE = 0 # setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...] DB_SET_SESSION = None # 是否使用unicode编码 DB_USE_UNICODE = True
封装的mysqlhelp.py
# import MySQLdb import pymysql from MyDbUtils import getPTConnection import DB_config as Config class MysqlHelp(object): # mysql = None def __init__(self): # self.connect() self.db = getPTConnection() def __new__(cls, *args, **kwargs): if not hasattr(cls, 'inst'): cls.inst = super(MysqlHelp, cls).__new__(cls, *args, **kwargs) return cls.inst # 查询所有 def selectall(self, sql='', param=()): # 判断是否连接,并设置重连机制 # self.connected() try: cursor, conn = self.execute(sql, param) res = cursor.fetchall() self.close(cursor, conn) return res except Exception as e: print('selectall except ', e.args) self.close(cursor, conn) return None # 查询一条 def selectone(self, sql='', param=()): # self.connected() try: # cur = self.db.cursor() cursor, conn = self.execute(sql, param) res = cursor.fetchone() self.close(cursor, conn) return res except Exception as e: print('selectone except ', e.args) self.close(cursor, conn) return None # 增加 def insert(self, sql='', param=()): # self.connected() try: # self.db.getconn().execute(sql, param) cursor, conn = self.execute(sql, param) # print('============') # _id=self.db.conn.insert_id() _id = cursor.lastrowid # print('_id ', _id) conn.commit() self.close(cursor, conn) # 防止表中没有id返回0 if _id == 0: return True return _id except Exception as e: print('insert except ', e.args) conn.rollback() self.close(cursor, conn) # self.conn.rollback() return 0 # 增加多行 def insertmany(self, sql='', param=()): # self.connected() cursor, conn = self.db.getconn() try: cursor.executemany(sql, param) # self.execute(sql,param) conn.commit() self.close(cursor, conn) return True except Exception as e: print('insert many except ', e.args) conn.rollback() self.close(cursor, conn) # self.conn.rollback() return False # 删除 def delete(self, sql='', param=()): # self.connected() try: cursor, conn = self.execute(sql, param) conn.commit() self.close(cursor, conn) return True except Exception as e: print('delete except ', e.args) conn.rollback() self.close(cursor, conn) # self.conn.rollback() return False # 更新 def update(self, sql='', param=()): # self.connected() try: cursor, conn = self.execute(sql, param) conn.commit() self.close(cursor, conn) return True except Exception as e: print('update except ', e.args) conn.rollback() self.close(cursor, conn) # self.conn.rollback() return False @classmethod def getInstance(self): if MysqlHelp.mysql == None: MysqlHelp.mysql = MysqlHelp() return MysqlHelp.mysql # 执行命令 # def execute(self, sql='', param=(), autoclose=False): # cursor, conn = self.db.getconn() # try: # if param: # cursor.execute(sql, param) # else: # cursor.execute(sql) # conn.commit() # if autoclose: # self.close(cursor, conn) # except Exception as e: # pass # return cursor, conn # 执行多条命令 '[{"sql":"xxx","param":"xx"}....]' # def executemany(self, list=[]): # cursor, conn = self.db.getconn() # try: # for order in list: # sql = order['sql'] # param = order['param'] # if param: # cursor.execute(sql, param) # else: # cursor.execute(sql) # conn.commit() # self.close(cursor, conn) # return True # except Exception as e: # print('execute failed========', e.args) # conn.rollback() # self.close(cursor, conn) # return False # def connect(self): # self.conn = pymysql.connect(user=Config.DB_TEST_USER, db=Config.DB_TEST_DBNAME, passwd=Config.DB_TEST_PASSWORD, host=Config.DB_TEST_HOST) def close(self, cursor, conn): cursor.close() conn.close() # print("PT连接池释放con和cursor")
这些代码,参考链接:
https://blog.****.net/jacke121/article/details/79852146
他是基于Python 2.x写的。我改成了Python 3.x,并修复了一些bug
查询所有记录
新建一个test.py
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "select * from student" # SQL语句 res = conn.selectall(sql) # 执行sql,这里可以接收参数,用来拼接sql语句 print(res)
执行输出:
[{'id': 1, 'name': '韩雪', 'age': 24, 'gender': '女'}, {'id': 2, 'name': '舒畅', 'age': 23, 'gender': '女'}, {'id': 3, 'name': '唐嫣', 'age': 25, 'gender': '女'}]
查询一条记录
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "select * from student" # SQL语句 res = conn.selectone(sql) # 执行sql,这里可以接收参数,用来拼接sql语句 print(res)
执行输出:
{'id': 1, 'name': '韩雪', 'age': 24, 'gender': '女'}
如果有多条记录,默认返回第一条记录
插入一条记录
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "insert into student set name='{}',age='{}',gender='{}'".format('小甜甜','20','女') res = conn.insert(sql) # 执行sql print(res)
执行输出:4
注意:这个4是插入数据之后,生成的主键id
使用Navicat连接MySQL,查看表记录
插入多条记录
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "insert into student(name,age,gender) values (%s,%s,%s)" res = conn.insertmany(sql,[('赵丽颖','21','女'),('Angelababy','22','女')]) # 执行sql,注意:多行是列表 print(res)
执行输出:True
使用Navicat连接MySQL,查看表记录
更新一条数据
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "update student set name='%s' where id=%s" res = conn.update(sql,('杨颖','7')) # 执行sql,这里可以接收参数,用来拼接sql语句 print(res)
执行输出:True
使用Navicat连接MySQL,查看表记录
删除一条记录
from mysqlhelp import MysqlHelp # 导入MysqlHelp类 conn = MysqlHelp() # 实例化 sql = "delete from student where id=%s" res = conn.delete(sql,('7')) # 执行sql,这里可以接收参数,用来拼接sql语句 print(res)
执行输出:True
使用Navicat连接MySQL,查看表记录