python 全栈开发,Day121(DButils,websocket)

DBUtils提供两种外部接口: 

  • PersistentDB :提供线程专用的数据库连接,并自动管理连接。 
  • PooledDB :提供线程间可共享的数据库连接,并自动管理连接。 

 

安装DButils

安装DButils,它依赖于pymysql

pip install pymysql
pip install DBUtils 

 

创建数据库连接池

首先保证你有一个MySQL服务器,并且已经启动了!已经有一个数据库以及表数据

之前我们要操作MySQL,使用pymsql

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
import pymysql

conn = pymysql.connect(host="localhost",port=3306,
                user="root",password="",db="student",
                charset="utf8")

cur = conn.cursor(pymysql.cursors.DictCursor)

# 插入一条数据
sql = "insert into stu(name,age) VALUE ('%s','%s')" %("小甜甜",22)

cur.execute(sql)

conn.commit()

cur.close()

conn.close()
View Code

可以发现,每次操作数据库,都需要连接数据库。需要消耗链接时间!效率非常低!

 

新建文件utils.py,内容如下:

确保主机ip 127.0.0.1,数据库book,用户名root,密码为空,能够连接MySQL

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
import pymysql
from DBUtils.PooledDB import PooledDB

POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=1,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    # 如:0 = None = never,
    # 1 = default = whenever it is requested,
    # 2 = when a cursor is created,
    # 4 = when a query is executed,
    # 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='',
    database='book',
    charset='utf8'
)
View Code

maxconnections 最大连接数,不建议写0。可能会拖死服务器!

ping = 0 表示关闭服务检测。它会耗费服务器性能

 

注意:这些参数是必须要有的

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
creator=pymysql,
host='127.0.0.1',
port=3306,
user='root',
password='',
database='book',
charset='utf8'
View Code

 

使用数据库连接池

新建文件poolconn.py

确保book数据库已经创建了表student,并录入了数据

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from utils import POOL
import pymysql


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()  # 从连接池POOL中拿出一个已经创建好的连接,一次只能拿一个
    cursor = conn.cursor(pymysql.cursors.DictCursor)
    cursor.execute('select * from student')
    result = list(cursor.fetchall())  # 使用list效率是很低的,这里仅做测试
    print(result)
    cursor.close()
    conn.close()

func()
View Code

 

执行输出:

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
[{'id': 1, 'name': '韩雪', 'age': 24, 'gender': ''}, {'id': 2, 'name': '舒畅', 'age': 23, 'gender': ''}, {'id': 3, 'name': '唐嫣', 'age': 25, 'gender': ''}]
View Code

 

封装mysqlhelp

为了方便操作MySQL,需要将增删改查操作。封装成一个类,方便程序调用!

注意:Python版本为3.6.5。只要是3.x版本都可以运行!

 

MyDbUtils.py

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
# -*- coding: UTF-8 -*-
import pymysql
from DBUtils.PooledDB import PooledDB
import DB_config as Config

# import MySQLdb

'''
@功能:PT数据库连接池
'''


class PTConnectionPool(object):
    __pool = None

    # def __init__(self):
    #     self.conn = self.__getConn()
    #     self.cursor = self.conn.cursor()
    def __enter__(self):
        self.conn = self.__getConn()
        self.cursor = self.conn.cursor()
        print("PT数据库创建con和cursor")
        return self

    def __getConn(self):
        if self.__pool is None:
            self.__pool = PooledDB(creator=pymysql, mincached=Config.DB_MIN_CACHED, maxcached=Config.DB_MAX_CACHED,
                                   maxshared=Config.DB_MAX_SHARED, maxconnections=Config.DB_MAX_CONNECYIONS,
                                   blocking=Config.DB_BLOCKING, maxusage=Config.DB_MAX_USAGE,
                                   setsession=Config.DB_SET_SESSION,
                                   host=Config.DB_TEST_HOST, port=Config.DB_TEST_PORT,
                                   user=Config.DB_TEST_USER, passwd=Config.DB_TEST_PASSWORD,
                                   db=Config.DB_TEST_DBNAME, use_unicode=Config.DB_USE_UNICODE, charset=Config.DB_CHARSET)

        return self.__pool.connection()


    """
    @summary: 释放连接池资源
    """


    def __exit__(self, type, value, trace):
        self.cursor.close()
        self.conn.close()
        # print("PT连接池释放con和cursor")

    # 重连接池中取出一个连接
    def getconn(self):
        conn = self.__getConn()
        # 设置返回数据为字典
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return cursor, conn


    # 关闭连接归还给连接池
    # def close(self):
    #     self.cursor.close()
    #     self.conn.close()
    #     print u"PT连接池释放con和cursor"


def getPTConnection():
    return PTConnectionPool()
View Code

 

配置文件:DB_config.py

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
# -*- coding: UTF-8 -*-

# TEST数据库信息
DB_TEST_HOST = "localhost"
DB_TEST_PORT = 3306
DB_TEST_DBNAME = "book"
DB_TEST_USER = "root"
DB_TEST_PASSWORD = ""

# 数据库连接编码
DB_CHARSET = "utf8"

# mincached : 启动时开启的闲置连接数量(缺省值 0 开始时不创建连接)
DB_MIN_CACHED = 10

# maxcached : 连接池中允许的闲置的最多连接数量(缺省值 0 代表不闲置连接池大小)
DB_MAX_CACHED = 10

# maxshared : 共享连接数允许的最大数量(缺省值 0 代表所有连接都是专用的)如果达到了最大数量,被请求为共享的连接将会被共享使用
DB_MAX_SHARED = 20

# maxconnecyions : 创建连接池的最大数量(缺省值 0 代表不限制)
DB_MAX_CONNECYIONS = 100

# blocking : 设置在连接池达到最大数量时的行为(缺省值 0 或 False 代表返回一个错误<toMany......> 其他代表阻塞直到连接数减少,连接被分配)
DB_BLOCKING = True

# maxusage : 单个连接的最大允许复用次数(缺省值 0 或 False 代表不限制的复用).当达到最大数时,连接会自动重新连接(关闭和重新打开)
DB_MAX_USAGE = 0

# setsession : 一个可选的SQL命令列表用于准备每个会话,如["set datestyle to german", ...]
DB_SET_SESSION = None

# 是否使用unicode编码
DB_USE_UNICODE = True
View Code

 

封装的mysqlhelp.py

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
# import MySQLdb
import pymysql
from MyDbUtils import getPTConnection
import DB_config as Config


class MysqlHelp(object):


    # mysql = None


    def __init__(self):
        # self.connect()
        self.db = getPTConnection()


    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, 'inst'):
            cls.inst = super(MysqlHelp, cls).__new__(cls, *args, **kwargs)
        return cls.inst


    # 查询所有
    def selectall(self, sql='', param=()):
        # 判断是否连接,并设置重连机制
        # self.connected()
        try:
            cursor, conn = self.execute(sql, param)
            res = cursor.fetchall()
            self.close(cursor, conn)
            return res
        except Exception as e:
            print('selectall except   ', e.args)
            self.close(cursor, conn)
            return None


    # 查询一条
    def selectone(self, sql='', param=()):
        # self.connected()
        try:
            # cur = self.db.cursor()
            cursor, conn = self.execute(sql, param)
            res = cursor.fetchone()
            self.close(cursor, conn)
            return res
        except Exception as e:
            print('selectone except   ', e.args)
            self.close(cursor, conn)
            return None


    # 增加
    def insert(self, sql='', param=()):
        # self.connected()
        try:
            # self.db.getconn().execute(sql, param)
            cursor, conn = self.execute(sql, param)
            # print('============')
            # _id=self.db.conn.insert_id()
            _id = cursor.lastrowid
            # print('_id   ', _id)

            conn.commit()
            self.close(cursor, conn)
            # 防止表中没有id返回0
            if _id == 0:
                return True
            return _id
        except Exception as e:
            print('insert except   ', e.args)
            conn.rollback()
            self.close(cursor, conn)
            # self.conn.rollback()
            return 0


    # 增加多行
    def insertmany(self, sql='', param=()):
        # self.connected()
        cursor, conn = self.db.getconn()
        try:
            cursor.executemany(sql, param)
            # self.execute(sql,param)
            conn.commit()
            self.close(cursor, conn)
            return True
        except Exception as e:
            print('insert many except   ', e.args)
            conn.rollback()
            self.close(cursor, conn)
            # self.conn.rollback()
            return False


    # 删除
    def delete(self, sql='', param=()):
        # self.connected()
        try:
            cursor, conn = self.execute(sql, param)
            conn.commit()
            self.close(cursor, conn)
            return True
        except Exception as e:
            print('delete except   ', e.args)

            conn.rollback()
            self.close(cursor, conn)
            # self.conn.rollback()
            return False


    # 更新
    def update(self, sql='', param=()):
        # self.connected()
        try:
            cursor, conn = self.execute(sql, param)
            conn.commit()
            self.close(cursor, conn)
            return True
        except Exception as e:
            print('update except   ', e.args)
            conn.rollback()
            self.close(cursor, conn)
            # self.conn.rollback()
            return False


    @classmethod
    def getInstance(self):
        if MysqlHelp.mysql == None:
            MysqlHelp.mysql = MysqlHelp()
        return MysqlHelp.mysql


    # 执行命令
    # def execute(self, sql='', param=(), autoclose=False):
    #     cursor, conn = self.db.getconn()
    #     try:
    #         if param:
    #             cursor.execute(sql, param)
    #         else:
    #             cursor.execute(sql)
    #         conn.commit()
    #         if autoclose:
    #             self.close(cursor, conn)
    #     except Exception as e:
    #         pass
    #     return cursor, conn


    # 执行多条命令
    '[{"sql":"xxx","param":"xx"}....]'


    # def executemany(self, list=[]):
    #     cursor, conn = self.db.getconn()
    #     try:
    #         for order in list:
    #             sql = order['sql']
    #             param = order['param']
    #             if param:
    #                 cursor.execute(sql, param)
    #             else:
    #                 cursor.execute(sql)
    #         conn.commit()
    #         self.close(cursor, conn)
    #         return True
    #     except Exception as e:
    #         print('execute failed========', e.args)
    #         conn.rollback()
    #         self.close(cursor, conn)
    #         return False


    # def connect(self):
    #     self.conn = pymysql.connect(user=Config.DB_TEST_USER, db=Config.DB_TEST_DBNAME, passwd=Config.DB_TEST_PASSWORD, host=Config.DB_TEST_HOST)


    def close(self, cursor, conn):
        cursor.close()
        conn.close()
        # print("PT连接池释放con和cursor")
View Code

 

这些代码,参考链接:

https://blog.****.net/jacke121/article/details/79852146

他是基于Python 2.x写的。我改成了Python 3.x,并修复了一些bug

 

查询所有记录

新建一个test.py

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "select * from student"  # SQL语句
res = conn.selectall(sql)  # 执行sql,这里可以接收参数,用来拼接sql语句
print(res)
View Code

执行输出:

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
[{'id': 1, 'name': '韩雪', 'age': 24, 'gender': ''}, {'id': 2, 'name': '舒畅', 'age': 23, 'gender': ''}, {'id': 3, 'name': '唐嫣', 'age': 25, 'gender': ''}]
View Code

 

查询一条记录

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "select * from student"  # SQL语句
res = conn.selectone(sql)  # 执行sql,这里可以接收参数,用来拼接sql语句
print(res)
View Code

执行输出:

{'id': 1, 'name': '韩雪', 'age': 24, 'gender': ''}

如果有多条记录,默认返回第一条记录

 

插入一条记录

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "insert into student set name='{}',age='{}',gender='{}'".format('小甜甜','20','')
res = conn.insert(sql)  # 执行sql
print(res)
View Code

执行输出:4

 

注意:这个4是插入数据之后,生成的主键id

使用Navicat连接MySQL,查看表记录

python 全栈开发,Day121(DButils,websocket)

 

插入多条记录

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "insert into student(name,age,gender) values (%s,%s,%s)"
res = conn.insertmany(sql,[('赵丽颖','21',''),('Angelababy','22','')])  # 执行sql,注意:多行是列表
print(res)
View Code

执行输出:True

 

使用Navicat连接MySQL,查看表记录

python 全栈开发,Day121(DButils,websocket)

 

更新一条数据

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "update student set name='%s' where id=%s"
res = conn.update(sql,('杨颖','7'))  # 执行sql,这里可以接收参数,用来拼接sql语句
print(res)
View Code

执行输出:True

 

使用Navicat连接MySQL,查看表记录

python 全栈开发,Day121(DButils,websocket)

 

 

删除一条记录

python 全栈开发,Day121(DButils,websocket)python 全栈开发,Day121(DButils,websocket)
from mysqlhelp import MysqlHelp  # 导入MysqlHelp类

conn = MysqlHelp()  # 实例化
sql = "delete from student where id=%s"
res = conn.delete(sql,('7'))  # 执行sql,这里可以接收参数,用来拼接sql语句
print(res)
View Code

执行输出:True

 

使用Navicat连接MySQL,查看表记录

python 全栈开发,Day121(DButils,websocket)