You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 09:48:12
'''
import sys
sys.path.append('.')
import pymysql
from dbutils.pooled_db import PooledDB
from Common.log_utlis import logger
def list_base_str(data_list):
if data_list:
# data = []
resp_data = []
for i in data_list:
tup_data = []
key, value = "", ""
for k,v in i.items():
if k == "is_add":
v = 0
if v is None:
continue
key += f"{k},"
value += "%s,"
tup_data.append(v)
# data.append(tuple(tup_data))
if key.count(",") > 1:
key = key[:-1]
value = value[:-1]
resp_data.append(
{
"key":key,
"value":value,
"data":tuple(tup_data)
}
)
return resp_data
return None
class MysqlPool:
"""
MySQL 数据库连接池类 配置变量
"""
'''
:param
reset: how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
'''
'''
https://blog.51cto.com/abyss/1736844
其中的
setsession=['SET AUTOCOMMIT = 1']
就是用来设置线程池是否打开自动更新的配置0为False1为True
'''
# 初始化数据库连接池变量
__pool = None
# 创建连接池的最大数量
__MAX_CONNECTIONS = 20
# 连接池中空闲连接的初始数量
__MIN_CACHED = 5
# 连接池中空闲连接的最大数量
__MAX_CACHED = 5
# 共享连接的最大数量
__MAX_SHARED = 0
# 超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
__BLOCK = True
# 单个连接的最大重复使用次数
__MAX_USAGE = 1
# 当返回到池时,连接应该如何重置
# (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚)
# 设置自动提交
__SET_SESSION = []
# 不能是 UTF-8
__CHARSET = 'UTF8'
def __init__(self, host, port, user, password, database):
"""
:param host: 数据库主机地址
:param port: 端口号
:param user: 用户名
:param password: 密码
:param database: 数据库名
"""
if not self.__pool:
# self代表当前类的实例即为 MysqlPool 带小括号,执行后的数据。
# __class__魔法函数代表从当前类的实例中获取当前类即为 MysqlPool 不带小括号的类。
# __pool这个代表的事类的变量即为在类下面创建的初始化连接池__pool
self.__class__.__pool = PooledDB(
creator=pymysql,
host=host,
port=port,
user=user,
password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
charset=self.__CHARSET
)
def get_connect(self):
return self.__pool.connection()
class DbManager(object):
conn = None
cur = None
POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111')
def connectDatabase(self):
try:
self.conn = self.POOL.get_connect()
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return True
except:
logger.error("connectDatabase failed")
return False
# 关闭数据库
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 基本的执行SQL方法下面几乎都调用这个
def execute(self, sql, params=None, exe_many=False):
res = self.connectDatabase()
if not res:
return False
cnt = 0
try:
if self.conn and self.cur:
if exe_many:
cnt = self.cur.executemany(sql, params)
else:
cnt = self.cur.execute(sql)
self.conn.commit()
except Exception as e:
logger.error("execute failed: " + sql)
logger.error(str(e) + "\n\n")
return False
finally:
self.close()
return cnt
###############################################################
### 增删改查
###############################################################
# 存在不做操作,不存在则新增
def insert_ignore(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
if local_data:
local_data.executeNoParam(
f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# 存在不做操作,不存在则新增
def insert_ignore_update(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
# local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
"""
replace into 'subject' (subjectId,subjectName)
values (1,'离散');
"""
sql = f"replace into {table}({i.get('key')}) values{i.get('data')};"
# sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
# if local_data:
# local_data.executeNoParam(
# f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# def insert_ignore
# if __name__ == '__main__':
# from common.utlis import get_uuid
# db_obj = DbManager()
# # table必填表名table = "test_table"
# # data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
# data_dic = {
# "table":"rms_user",
# "data":[
# {
# 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2",
# 'user_code': 10991,
# 'account': '测试11',
# 'real_name':" 测试123",
# 'sex': 1,
# 'mobile': "10111111111",
# 'is_enabled': 1,
# 'last_visit_date': '2022-07-15 08:30:47',
# 'is_add': 1
# }
# ]
# }
# db_obj.insert_ignore(**data_dic)