#!/usr/bin/env python # -*- encoding: utf-8 -*- ''' @Date:2022/08/26 09:48:12 ''' import sys sys.path.append('.') import pymysql from dbutils.pooled_db import PooledDB from Common.log_utlis import logger def list_base_str(data_list): if data_list: # data = [] resp_data = [] for i in data_list: tup_data = [] key, value = "", "" for k,v in i.items(): if k == "is_add": v = 0 if v is None: continue key += f"{k}," value += "%s," tup_data.append(v) # data.append(tuple(tup_data)) if key.count(",") > 1: key = key[:-1] value = value[:-1] resp_data.append( { "key":key, "value":value, "data":tuple(tup_data) } ) return resp_data return None class MysqlPool: """ MySQL 数据库连接池类 配置变量 """ ''' :param reset: how connections should be reset when returned to the pool (False or None to rollback transcations started with begin(), True to always issue a rollback for safety's sake) :param setsession: optional list of SQL commands that may serve to prepare the session, e.g. ["set datestyle to ...", "set time zone ..."] ''' ''' https://blog.51cto.com/abyss/1736844 其中的 setsession=['SET AUTOCOMMIT = 1'] 就是用来设置线程池是否打开自动更新的配置,0为False,1为True ''' # 初始化数据库连接池变量 __pool = None # 创建连接池的最大数量 __MAX_CONNECTIONS = 20 # 连接池中空闲连接的初始数量 __MIN_CACHED = 5 # 连接池中空闲连接的最大数量 __MAX_CACHED = 5 # 共享连接的最大数量 __MAX_SHARED = 0 # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理 __BLOCK = True # 单个连接的最大重复使用次数 __MAX_USAGE = 1 # 当返回到池时,连接应该如何重置 # (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚) # 设置自动提交 __SET_SESSION = [] # 不能是 UTF-8 __CHARSET = 'UTF8' def __init__(self, host, port, user, password, database): """ :param host: 数据库主机地址 :param port: 端口号 :param user: 用户名 :param password: 密码 :param database: 数据库名 """ if not self.__pool: # self代表当前类的实例,即为 MysqlPool() 带小括号,执行后的数据。 # __class__,魔法函数,代表从当前类的实例中,获取当前类,即为 MysqlPool 不带小括号的类。 # __pool,这个代表的事类的变量,即为在类下面创建的初始化连接池,__pool self.__class__.__pool = PooledDB( creator=pymysql, host=host, port=port, user=user, password=password, database=database, maxconnections=self.__MAX_CONNECTIONS, mincached=self.__MIN_CACHED, maxcached=self.__MAX_CACHED, maxshared=self.__MAX_SHARED, blocking=self.__BLOCK, maxusage=self.__MAX_USAGE, setsession=self.__SET_SESSION, charset=self.__CHARSET ) def get_connect(self): return self.__pool.connection() class DbManager(object): conn = None cur = None POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111') def connectDatabase(self): try: self.conn = self.POOL.get_connect() self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) return True except: logger.error("connectDatabase failed") return False # 关闭数据库 def close(self): if self.conn and self.cur: self.cur.close() self.conn.close() return True # 基本的执行SQL方法,下面几乎都调用这个 def execute(self, sql, params=None, exe_many=False): res = self.connectDatabase() if not res: return False cnt = 0 try: if self.conn and self.cur: if exe_many: cnt = self.cur.executemany(sql, params) else: cnt = self.cur.execute(sql) self.conn.commit() except Exception as e: logger.error("execute failed: " + sql) logger.error(str(e) + "\n\n") return False finally: self.close() return cnt ############################################################### ### 增删改查 ############################################################### # 存在不做操作,不存在则新增 def insert_ignore(self, **kwargs): """ table:必填,表名,如:table="test_table" data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] "key":key, "value":value, "data":data """ table = kwargs["table"] data = kwargs["data"] local_data = kwargs.get("update_obj") # table_obj = kwargs.get("table_obj") data_dict = list_base_str(data) if not data_dict: return None for i in data_dict: sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};" logger.info("sql:\n{}\n".format(sql)) try: self.execute(sql) if local_data: local_data.executeNoParam( f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'") except: self.conn.rollback() # 存在不做操作,不存在则新增 def insert_ignore_update(self, **kwargs): """ table:必填,表名,如:table="test_table" data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] "key":key, "value":value, "data":data """ table = kwargs["table"] data = kwargs["data"] # local_data = kwargs.get("update_obj") # table_obj = kwargs.get("table_obj") data_dict = list_base_str(data) if not data_dict: return None for i in data_dict: """ replace into 'subject' (subjectId,subjectName) values (1,'离散'); """ sql = f"replace into {table}({i.get('key')}) values{i.get('data')};" # sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};" logger.info("sql:\n{}\n".format(sql)) try: self.execute(sql) # if local_data: # local_data.executeNoParam( # f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'") except: self.conn.rollback() # def insert_ignore # if __name__ == '__main__': # from common.utlis import get_uuid # db_obj = DbManager() # # table:必填,表名,如:table = "test_table" # # data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] # data_dic = { # "table":"rms_user", # "data":[ # { # 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2", # 'user_code': 10991, # 'account': '测试11', # 'real_name':" 测试123", # 'sex': 1, # 'mobile': "10111111111", # 'is_enabled': 1, # 'last_visit_date': '2022-07-15 08:30:47', # 'is_add': 1 # } # ] # } # db_obj.insert_ignore(**data_dic)