You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
8.2 KiB

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 09:48:12
'''
import sys
sys.path.append('.')
import pymysql
from dbutils.pooled_db import PooledDB
from Common.log_utlis import logger
def list_base_str(data_list):
if data_list:
# data = []
resp_data = []
for i in data_list:
tup_data = []
key, value = "", ""
for k,v in i.items():
if k == "is_add":
v = 0
if v is None:
continue
key += f"{k},"
value += "%s,"
tup_data.append(v)
# data.append(tuple(tup_data))
if key.count(",") > 1:
key = key[:-1]
value = value[:-1]
resp_data.append(
{
"key":key,
"value":value,
"data":tuple(tup_data)
}
)
return resp_data
return None
class MysqlPool:
"""
MySQL 数据库连接池类 配置变量
"""
'''
:param
reset: how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
'''
'''
https://blog.51cto.com/abyss/1736844
其中的
setsession=['SET AUTOCOMMIT = 1']
就是用来设置线程池是否打开自动更新的配置0为False1为True
'''
# 初始化数据库连接池变量
__pool = None
# 创建连接池的最大数量
__MAX_CONNECTIONS = 20
# 连接池中空闲连接的初始数量
__MIN_CACHED = 5
# 连接池中空闲连接的最大数量
__MAX_CACHED = 5
# 共享连接的最大数量
__MAX_SHARED = 0
# 超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
__BLOCK = True
# 单个连接的最大重复使用次数
__MAX_USAGE = 1
# 当返回到池时,连接应该如何重置
# (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚)
# 设置自动提交
__SET_SESSION = []
# 不能是 UTF-8
__CHARSET = 'UTF8'
def __init__(self, host, port, user, password, database):
"""
:param host: 数据库主机地址
:param port: 端口号
:param user: 用户名
:param password: 密码
:param database: 数据库名
"""
if not self.__pool:
# self代表当前类的实例即为 MysqlPool 带小括号,执行后的数据。
# __class__魔法函数代表从当前类的实例中获取当前类即为 MysqlPool 不带小括号的类。
# __pool这个代表的事类的变量即为在类下面创建的初始化连接池__pool
self.__class__.__pool = PooledDB(
creator=pymysql,
host=host,
port=port,
user=user,
password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
charset=self.__CHARSET
)
def get_connect(self):
return self.__pool.connection()
class DbManager(object):
conn = None
cur = None
POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111')
def connectDatabase(self):
try:
self.conn = self.POOL.get_connect()
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return True
except:
logger.error("connectDatabase failed")
return False
# 关闭数据库
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 基本的执行SQL方法下面几乎都调用这个
def execute(self, sql, params=None, exe_many=False):
res = self.connectDatabase()
if not res:
return False
cnt = 0
try:
if self.conn and self.cur:
if exe_many:
cnt = self.cur.executemany(sql, params)
else:
cnt = self.cur.execute(sql)
self.conn.commit()
except Exception as e:
logger.error("execute failed: " + sql)
logger.error(str(e) + "\n\n")
return False
finally:
self.close()
return cnt
###############################################################
### 增删改查
###############################################################
# 存在不做操作,不存在则新增
def insert_ignore(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
if local_data:
local_data.executeNoParam(
f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# 存在不做操作,不存在则新增
def insert_ignore_update(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
# local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
"""
replace into 'subject' (subjectId,subjectName)
values (1,'离散');
"""
sql = f"replace into {table}({i.get('key')}) values{i.get('data')};"
# sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
# if local_data:
# local_data.executeNoParam(
# f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# def insert_ignore
# if __name__ == '__main__':
# from common.utlis import get_uuid
# db_obj = DbManager()
# # table必填表名table = "test_table"
# # data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
# data_dic = {
# "table":"rms_user",
# "data":[
# {
# 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2",
# 'user_code': 10991,
# 'account': '测试11',
# 'real_name':" 测试123",
# 'sex': 1,
# 'mobile': "10111111111",
# 'is_enabled': 1,
# 'last_visit_date': '2022-07-15 08:30:47',
# 'is_add': 1
# }
# ]
# }
# db_obj.insert_ignore(**data_dic)