|
|
#!/usr/bin/env python
|
|
|
# -*- encoding: utf-8 -*-
|
|
|
'''
|
|
|
@Date:2022/08/26 09:48:12
|
|
|
'''
|
|
|
import sys
|
|
|
sys.path.append('.')
|
|
|
import pymysql
|
|
|
from dbutils.pooled_db import PooledDB
|
|
|
from Common.log_utlis import logger
|
|
|
|
|
|
def list_base_str(data_list):
|
|
|
if data_list:
|
|
|
# data = []
|
|
|
resp_data = []
|
|
|
for i in data_list:
|
|
|
tup_data = []
|
|
|
key, value = "", ""
|
|
|
for k,v in i.items():
|
|
|
if k == "is_add":
|
|
|
v = 0
|
|
|
if v is None:
|
|
|
continue
|
|
|
key += f"{k},"
|
|
|
value += "%s,"
|
|
|
tup_data.append(v)
|
|
|
# data.append(tuple(tup_data))
|
|
|
if key.count(",") > 1:
|
|
|
key = key[:-1]
|
|
|
value = value[:-1]
|
|
|
resp_data.append(
|
|
|
{
|
|
|
"key":key,
|
|
|
"value":value,
|
|
|
"data":tuple(tup_data)
|
|
|
}
|
|
|
)
|
|
|
|
|
|
return resp_data
|
|
|
return None
|
|
|
|
|
|
|
|
|
class MysqlPool:
|
|
|
"""
|
|
|
MySQL 数据库连接池类 配置变量
|
|
|
"""
|
|
|
|
|
|
'''
|
|
|
:param
|
|
|
reset: how connections should be reset when returned to the pool
|
|
|
(False or None to rollback transcations started with begin(),
|
|
|
True to always issue a rollback for safety's sake)
|
|
|
|
|
|
:param
|
|
|
setsession: optional list of SQL commands that may serve to prepare
|
|
|
the session, e.g. ["set datestyle to ...", "set time zone ..."]
|
|
|
'''
|
|
|
|
|
|
'''
|
|
|
https://blog.51cto.com/abyss/1736844
|
|
|
其中的
|
|
|
setsession=['SET AUTOCOMMIT = 1']
|
|
|
就是用来设置线程池是否打开自动更新的配置,0为False,1为True
|
|
|
'''
|
|
|
|
|
|
# 初始化数据库连接池变量
|
|
|
__pool = None
|
|
|
|
|
|
# 创建连接池的最大数量
|
|
|
__MAX_CONNECTIONS = 20
|
|
|
# 连接池中空闲连接的初始数量
|
|
|
__MIN_CACHED = 5
|
|
|
# 连接池中空闲连接的最大数量
|
|
|
__MAX_CACHED = 5
|
|
|
# 共享连接的最大数量
|
|
|
__MAX_SHARED = 0
|
|
|
|
|
|
# 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理
|
|
|
__BLOCK = True
|
|
|
# 单个连接的最大重复使用次数
|
|
|
__MAX_USAGE = 1
|
|
|
|
|
|
# 当返回到池时,连接应该如何重置
|
|
|
# (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚)
|
|
|
# 设置自动提交
|
|
|
__SET_SESSION = []
|
|
|
|
|
|
# 不能是 UTF-8
|
|
|
__CHARSET = 'UTF8'
|
|
|
|
|
|
def __init__(self, host, port, user, password, database):
|
|
|
"""
|
|
|
:param host: 数据库主机地址
|
|
|
:param port: 端口号
|
|
|
:param user: 用户名
|
|
|
:param password: 密码
|
|
|
:param database: 数据库名
|
|
|
"""
|
|
|
if not self.__pool:
|
|
|
# self代表当前类的实例,即为 MysqlPool() 带小括号,执行后的数据。
|
|
|
# __class__,魔法函数,代表从当前类的实例中,获取当前类,即为 MysqlPool 不带小括号的类。
|
|
|
# __pool,这个代表的事类的变量,即为在类下面创建的初始化连接池,__pool
|
|
|
self.__class__.__pool = PooledDB(
|
|
|
creator=pymysql,
|
|
|
host=host,
|
|
|
port=port,
|
|
|
user=user,
|
|
|
password=password,
|
|
|
database=database,
|
|
|
maxconnections=self.__MAX_CONNECTIONS,
|
|
|
mincached=self.__MIN_CACHED,
|
|
|
maxcached=self.__MAX_CACHED,
|
|
|
maxshared=self.__MAX_SHARED,
|
|
|
blocking=self.__BLOCK,
|
|
|
maxusage=self.__MAX_USAGE,
|
|
|
setsession=self.__SET_SESSION,
|
|
|
charset=self.__CHARSET
|
|
|
)
|
|
|
|
|
|
def get_connect(self):
|
|
|
return self.__pool.connection()
|
|
|
|
|
|
|
|
|
class DbManager(object):
|
|
|
|
|
|
conn = None
|
|
|
cur = None
|
|
|
POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111')
|
|
|
|
|
|
|
|
|
def connectDatabase(self):
|
|
|
try:
|
|
|
self.conn = self.POOL.get_connect()
|
|
|
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
|
|
|
return True
|
|
|
except:
|
|
|
logger.error("connectDatabase failed")
|
|
|
return False
|
|
|
|
|
|
# 关闭数据库
|
|
|
def close(self):
|
|
|
if self.conn and self.cur:
|
|
|
self.cur.close()
|
|
|
self.conn.close()
|
|
|
return True
|
|
|
|
|
|
# 基本的执行SQL方法,下面几乎都调用这个
|
|
|
def execute(self, sql, params=None, exe_many=False):
|
|
|
res = self.connectDatabase()
|
|
|
if not res:
|
|
|
return False
|
|
|
cnt = 0
|
|
|
try:
|
|
|
if self.conn and self.cur:
|
|
|
if exe_many:
|
|
|
cnt = self.cur.executemany(sql, params)
|
|
|
else:
|
|
|
cnt = self.cur.execute(sql)
|
|
|
self.conn.commit()
|
|
|
except Exception as e:
|
|
|
logger.error("execute failed: " + sql)
|
|
|
logger.error(str(e) + "\n\n")
|
|
|
return False
|
|
|
finally:
|
|
|
self.close()
|
|
|
return cnt
|
|
|
|
|
|
###############################################################
|
|
|
### 增删改查
|
|
|
###############################################################
|
|
|
|
|
|
# 存在不做操作,不存在则新增
|
|
|
def insert_ignore(self, **kwargs):
|
|
|
"""
|
|
|
table:必填,表名,如:table="test_table"
|
|
|
data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}]
|
|
|
|
|
|
"key":key,
|
|
|
"value":value,
|
|
|
"data":data
|
|
|
"""
|
|
|
table = kwargs["table"]
|
|
|
data = kwargs["data"]
|
|
|
local_data = kwargs.get("update_obj")
|
|
|
# table_obj = kwargs.get("table_obj")
|
|
|
data_dict = list_base_str(data)
|
|
|
if not data_dict:
|
|
|
return None
|
|
|
for i in data_dict:
|
|
|
sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
|
|
|
logger.info("sql:\n{}\n".format(sql))
|
|
|
try:
|
|
|
self.execute(sql)
|
|
|
if local_data:
|
|
|
local_data.executeNoParam(
|
|
|
f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
|
|
|
except:
|
|
|
self.conn.rollback()
|
|
|
|
|
|
# 存在不做操作,不存在则新增
|
|
|
def insert_ignore_update(self, **kwargs):
|
|
|
"""
|
|
|
table:必填,表名,如:table="test_table"
|
|
|
data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}]
|
|
|
|
|
|
"key":key,
|
|
|
"value":value,
|
|
|
"data":data
|
|
|
"""
|
|
|
table = kwargs["table"]
|
|
|
data = kwargs["data"]
|
|
|
# local_data = kwargs.get("update_obj")
|
|
|
# table_obj = kwargs.get("table_obj")
|
|
|
data_dict = list_base_str(data)
|
|
|
if not data_dict:
|
|
|
return None
|
|
|
for i in data_dict:
|
|
|
"""
|
|
|
replace into 'subject' (subjectId,subjectName)
|
|
|
values (1,'离散');
|
|
|
"""
|
|
|
sql = f"replace into {table}({i.get('key')}) values{i.get('data')};"
|
|
|
# sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
|
|
|
logger.info("sql:\n{}\n".format(sql))
|
|
|
try:
|
|
|
self.execute(sql)
|
|
|
# if local_data:
|
|
|
# local_data.executeNoParam(
|
|
|
# f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
|
|
|
except:
|
|
|
self.conn.rollback()
|
|
|
# def insert_ignore
|
|
|
|
|
|
|
|
|
|
|
|
# if __name__ == '__main__':
|
|
|
# from common.utlis import get_uuid
|
|
|
# db_obj = DbManager()
|
|
|
# # table:必填,表名,如:table = "test_table"
|
|
|
# # data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}]
|
|
|
# data_dic = {
|
|
|
# "table":"rms_user",
|
|
|
# "data":[
|
|
|
# {
|
|
|
# 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2",
|
|
|
# 'user_code': 10991,
|
|
|
# 'account': '测试11',
|
|
|
# 'real_name':" 测试123",
|
|
|
# 'sex': 1,
|
|
|
# 'mobile': "10111111111",
|
|
|
# 'is_enabled': 1,
|
|
|
# 'last_visit_date': '2022-07-15 08:30:47',
|
|
|
# 'is_add': 1
|
|
|
# }
|
|
|
# ]
|
|
|
# }
|
|
|
# db_obj.insert_ignore(**data_dic) |