diff --git a/db_logic/user_face.py b/db_logic/user_face.py new file mode 100644 index 0000000..0004a81 --- /dev/null +++ b/db_logic/user_face.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@Date:2022/08/27 14:51:13 +''' +import sys +sys.path.append('.') +from db_logic.db_base import Repository +from models.user_models import EntityUserFace + + +class BllUserFace(Repository): + + def __init__(self, entityType=EntityUserFace): + return super().__init__(entityType) diff --git a/setting.py b/setting.py new file mode 100644 index 0000000..d80f292 --- /dev/null +++ b/setting.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@Date:2022/08/26 14:39:54 +''' +import sys +sys.path.append('.') + + +from timing.runtiming import TimingBase + + +timing_base = TimingBase() +timing_base.start() +# timing_base.timing_status= False +# timing_base.stop() \ No newline at end of file diff --git a/timing/demo.py b/timing/demo.py new file mode 100644 index 0000000..d1c2b42 --- /dev/null +++ b/timing/demo.py @@ -0,0 +1,255 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@Date:2022/08/26 08:34:30 +''' +# import sys +# sys.path.append('.') + +# import pymysql + + +# import pymysql +# conn = pymysql.connect(host='127.0.0.1', port=3306, +# user='root', passwd='123456', db='local_rms_db') +# cur = conn.cursor(cursor=pymysql.cursors.DictCursor) + + +# sql = "select * from rms_user" +# cur.execute(sql) +# data = cur.fetchall() +# print(data) + + +# import os +# from flask import Flask, jsonify + +# app = Flask(__name__) + + +# def get_file_base(): +# import base64 + +# with open(os.getcwd() + "/config/tem_00001.xlsx", "rb") as f: +# file_str = f.read() +# file_str = base64.b64encode(file_str).decode() +# return file_str + +# @app.route("/get_file", methods=["GET"]) +# def main_file(): +# return jsonify(get_file_base()) + + +# if __name__ == '__main__': +# app.run("127.0.0.1", 8001) + +db_name = "user" +data_list = [ + {"variety_id": "f8442a6e-9950-11ec-a502-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u6613\u5236\u6bd2", "remark4": "", "name": "\u7532\u82ef", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.5%", "cas_number": "108-88-3", "remark5": "", + "speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "683", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""}, + {"variety_id": "f8442a6f-9950-11ec-9253-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u82ef\u915a", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.0%", "cas_number": "108-95-2", "remark5": "", + "speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "848", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u6325\u53d1\u915a", "remark10": ""}, + {"variety_id": "f8442a70-9950-11ec-a157-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u82ef", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.5%", "cas_number": "71-43-2", "remark5": "", + "speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "640", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u6cb9", "remark10": ""}, + {"variety_id": "f8442a71-9950-11ec-9894-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u4e59\u9178\uff0836%\uff09", "english_name": "", "purity": "AR", "cas_number": "64-19-7", "remark5": "", "speci": "500", + "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "734", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""}, + {"variety_id": "f84451a6-9950-11ec-8925-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u4e59\u9178\u9150", "english_name": "", "purity": "AR", "cas_number": "106-24-7", "remark5": "", "speci": "500", + "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "749", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""}, + {"variety_id": "f84451a7-9950-11ec-9b4e-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u56db\u6c2f\u4e59\u70ef", "english_name": "", "purity": "\u73af\u4fdd\u7ea7", "cas_number": "127-18-4", "remark5": "", "speci": "500", + "speci_unit": "ml", "export_count": "28", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "1207", "manufacturer": "\u5929\u6d25\u50b2\u7136", "remark8": "", "remark9": "\u6cb9", "remark10": ""}, + {"variety_id": "f84451a8-9950-11ec-96ca-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u56db\u6c2f\u5316\u78b3", "english_name": "", "purity": "\u7ea2\u5916\u6d4b\u6cb9\u4eea\u4e13\u7528", "cas_number": "56-23-5", "remark5": "", "speci": "500", "speci_unit": "ml", "export_count": "31", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "1126", "manufacturer": "\u5b89\u8c31", "remark8": "", "remark9": "\u77ff\u7269\u6cb9", "remark10": ""}] + +# sql_header = f"insert into {}" +key_tup = [] +value_tup = [] +if data_list: + base_da = data_list[0] + for k,v in base_da.items(): + key_tup.append(k) + value_tup.append("%s") + +insert_sql = f"insert into {db_name}({','.join(key_tup)}) values({','.join(value_tup)})" +print(insert_sql) +"".join + + +import pymysql +from dbutils.pooled_db import PooledDB +# from common.db.mysql_config import MysqlConfig + +""" + pymysql封装总结 + https://blog.csdn.net/zhj_1121/article/details/121070412 + + python操作mysql之只看这篇就够了 + https://www.jianshu.com/p/4e72faebd27f + + 关于PooledDB使用autocommit的方法 + https://blog.51cto.com/abyss/1736844 +""" + + +class MysqlPool: + """ + MySQL 数据库连接池类 配置变量 + """ + + ''' + :param + reset: how connections should be reset when returned to the pool + (False or None to rollback transcations started with begin(), + True to always issue a rollback for safety's sake) + + :param + setsession: optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to ...", "set time zone ..."] + ''' + + ''' + https://blog.51cto.com/abyss/1736844 + 其中的 + setsession=['SET AUTOCOMMIT = 1'] + 就是用来设置线程池是否打开自动更新的配置,0为False,1为True + ''' + + # 初始化数据库连接池变量 + __pool = None + + # 创建连接池的最大数量 + __MAX_CONNECTIONS = 20 + # 连接池中空闲连接的初始数量 + __MIN_CACHED = 5 + # 连接池中空闲连接的最大数量 + __MAX_CACHED = 5 + # 共享连接的最大数量 + __MAX_SHARED = 0 + + # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理 + __BLOCK = True + # 单个连接的最大重复使用次数 + __MAX_USAGE = 1 + + # 当返回到池时,连接应该如何重置 + # (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚) + __RESET = True + # 设置自动提交 + __SET_SESSION = [] + + # 不能是 UTF-8 + __CHARSET = 'UTF8' + + def __init__(self, host, port, user, password, database): + """ + :param host: 数据库主机地址 + :param port: 端口号 + :param user: 用户名 + :param password: 密码 + :param database: 数据库名 + """ + if not self.__pool: + # self代表当前类的实例,即为 MysqlPool() 带小括号,执行后的数据。 + # __class__,魔法函数,代表从当前类的实例中,获取当前类,即为 MysqlPool 不带小括号的类。 + # __pool,这个代表的事类的变量,即为在类下面创建的初始化连接池,__pool + self.__class__.__pool = PooledDB( + creator=pymysql, + host=host, + port=port, + user=user, + password=password, + database=database, + maxconnections=self.__MAX_CONNECTIONS, + mincached=self.__MIN_CACHED, + maxcached=self.__MAX_CACHED, + maxshared=self.__MAX_SHARED, + blocking=self.__BLOCK, + maxusage=self.__MAX_USAGE, + setsession=self.__SET_SESSION, + charset=self.__CHARSET + ) + + def get_connect(self): + return self.__pool.connection() + + +class MysqlCursor: + """ + 从数据库配置环境,取出数据库配置参数 + 这里的参数,可以不从外部导入,直接手动写入也可以。 + """ + + def __init__(self, host=host, port=port, user=user, password=password, database=database) -> None: + """ + :param host: 数据库主机地址 + :param port: 端口号 + :param user: 用户名 + :param password: 密码 + :param database: 数据库名 + """ + self.__host = host + self.__port = port + self.__user = user + self.__password = password + self.__database = database + + # 初始化数据库连接池 + self.connects_pool = MysqlPool( + host=self.__host, + port=self.__port, + user=self.__user, + password=self.__password, + database=self.__database + ) + + def __enter__(self): + """ + # with 上下文管理,魔法函数,进入with时调用 + :return: 当前类 + """ + # 从数据库链接池,获取一个数据库链接 + connect = self.connects_pool.get_connect() + # 从获取的数据库链接,获取一个光标 + cursor = connect.cursor(pymysql.cursors.DictCursor) + + ''' + # https://blog.51cto.com/abyss/1736844 + # 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置 + # connect.autocommit = False + ''' + # 将数据库链接,赋值给当前类,方便__exit__函数调用 + self._connect = connect + # 将数据库光标,赋值给当前类,方便__exit__函数调用 + self._cursor = cursor + + # __enter__函数,必须返回当前类 + return self + + def __exit__(self, *exc_info): + """ + # with 上下文管理,魔法函数,退出with时调用 + :param exc_info: 异常信息,元祖 + :return: None + """ + # 退出with上下文时,使用当前类链接,提交数据库语句 + self._connect.commit() + # 关闭光标 + self._cursor.close() + # 关闭链接 + self._connect.close() + + @property + def cursor(self): + """ + 数据库连接池,取出链接,取出光标,转换为光标属性 + :return: 数据库连接池的光标 + """ + return self._cursor + + +if __name__ == "__main__": + + with MysqlCursor() as db: + # 获取数据库的方法 + sql = 'select count(id) as total from people' + db.cursor.execute("select count(id) as total from people") + data = db.cursor.fetchone() + print('--------统计数据条数', data) diff --git a/timing/runtiming.py b/timing/runtiming.py new file mode 100644 index 0000000..6764d57 --- /dev/null +++ b/timing/runtiming.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@Date:2022/08/26 14:00:05 +''' +import sys + +from Common.Utils import Utils +sys.path.append('.') +import threading +import time +from timing.utils import DbManager + +from models.user_models import EntityUser, EntityUserFace +from models.client_models import EntityClientCellUser, EntityClientUser +from db_logic.client_cell_user import BllClientCellUser +from db_logic.user import BllUser +from db_logic.client_user import BllClientUser +from db_logic.user_face import BllUserFace + +class TimingBase(object): + + time_sleep = 60 + timing_status = True + db_list = ["rms_client_cell_user", + "rms_client_user", + "rms_user", + "rms_user_face" + ] + + def get_db_obj(self, table_name): + dic = { + "rms_client_cell_user": BllClientCellUser(), + "rms_client_user": BllClientUser(), + "rms_user": BllUser(), + "rms_user_face": EntityUserFace() + } + return dic.get(table_name) + + def get_db_table(self, table_name): + dic = { + "rms_client_cell_user": EntityClientCellUser(), + "rms_client_user": EntityClientUser(), + "rms_user": EntityUser(), + "rms_user_face": BllUserFace() + } + return dic.get(table_name) + + + def sync_db(self): + while self.timing_status: + try: + db_manager = DbManager() + for i in self.db_list: + local_data = self.get_db_obj(i) + if not local_data: + continue + data_list = local_data.execute( + f"select * from {i}").fetchall() + if not data_list: + continue + data_list = Utils.msyql_table_model(data_list) + # db_manager.insert_ignore( + # **{"table": i, "data": data_list, "update_obj": local_data}) + db_manager.insert_ignore_update( + **{"table": i, "data": data_list}) + finally: + db_manager.close() + time.sleep(self.time_sleep) + + def start(self): + p = threading.Thread(target=self.sync_db, daemon=True) + p.start() + + + def stop(self): + self.timing_status = False + diff --git a/timing/utils.py b/timing/utils.py new file mode 100644 index 0000000..bd7a15b --- /dev/null +++ b/timing/utils.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python +# -*- encoding: utf-8 -*- +''' +@Date:2022/08/26 09:48:12 +''' +import sys +from tkinter import N +sys.path.append('.') +import pymysql +from dbutils.pooled_db import PooledDB +from common.logger_utils import logger + +def list_base_str(data_list): + if data_list: + # data = [] + resp_data = [] + for i in data_list: + tup_data = [] + key, value = "", "" + for k,v in i.items(): + if k == "is_add": + v = 0 + if v is None: + continue + key += f"{k}," + value += "%s," + tup_data.append(v) + # data.append(tuple(tup_data)) + if key.count(",") > 1: + key = key[:-1] + value = value[:-1] + resp_data.append( + { + "key":key, + "value":value, + "data":tuple(tup_data) + } + ) + + return resp_data + return None + + +class MysqlPool: + """ + MySQL 数据库连接池类 配置变量 + """ + + ''' + :param + reset: how connections should be reset when returned to the pool + (False or None to rollback transcations started with begin(), + True to always issue a rollback for safety's sake) + + :param + setsession: optional list of SQL commands that may serve to prepare + the session, e.g. ["set datestyle to ...", "set time zone ..."] + ''' + + ''' + https://blog.51cto.com/abyss/1736844 + 其中的 + setsession=['SET AUTOCOMMIT = 1'] + 就是用来设置线程池是否打开自动更新的配置,0为False,1为True + ''' + + # 初始化数据库连接池变量 + __pool = None + + # 创建连接池的最大数量 + __MAX_CONNECTIONS = 20 + # 连接池中空闲连接的初始数量 + __MIN_CACHED = 5 + # 连接池中空闲连接的最大数量 + __MAX_CACHED = 5 + # 共享连接的最大数量 + __MAX_SHARED = 0 + + # 超过最大连接数量时候的表现,为True等待连接数量下降,为false直接报错处理 + __BLOCK = True + # 单个连接的最大重复使用次数 + __MAX_USAGE = 1 + + # 当返回到池时,连接应该如何重置 + # (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚) + # 设置自动提交 + __SET_SESSION = [] + + # 不能是 UTF-8 + __CHARSET = 'UTF8' + + def __init__(self, host, port, user, password, database): + """ + :param host: 数据库主机地址 + :param port: 端口号 + :param user: 用户名 + :param password: 密码 + :param database: 数据库名 + """ + if not self.__pool: + # self代表当前类的实例,即为 MysqlPool() 带小括号,执行后的数据。 + # __class__,魔法函数,代表从当前类的实例中,获取当前类,即为 MysqlPool 不带小括号的类。 + # __pool,这个代表的事类的变量,即为在类下面创建的初始化连接池,__pool + self.__class__.__pool = PooledDB( + creator=pymysql, + host=host, + port=port, + user=user, + password=password, + database=database, + maxconnections=self.__MAX_CONNECTIONS, + mincached=self.__MIN_CACHED, + maxcached=self.__MAX_CACHED, + maxshared=self.__MAX_SHARED, + blocking=self.__BLOCK, + maxusage=self.__MAX_USAGE, + setsession=self.__SET_SESSION, + charset=self.__CHARSET + ) + + def get_connect(self): + return self.__pool.connection() + + +class DbManager(object): + + conn = None + cur = None + POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111') + + + def connectDatabase(self): + try: + self.conn = self.POOL.get_connect() + self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor) + return True + except: + logger.error("connectDatabase failed") + return False + + # 关闭数据库 + def close(self): + if self.conn and self.cur: + self.cur.close() + self.conn.close() + return True + + # 基本的执行SQL方法,下面几乎都调用这个 + def execute(self, sql, params=None, exe_many=False): + res = self.connectDatabase() + if not res: + return False + cnt = 0 + try: + if self.conn and self.cur: + if exe_many: + cnt = self.cur.executemany(sql, params) + else: + cnt = self.cur.execute(sql) + self.conn.commit() + except Exception as e: + logger.error("execute failed: " + sql) + logger.error(str(e) + "\n\n") + return False + finally: + self.close() + return cnt + + ############################################################### + ### 增删改查 + ############################################################### + + # 存在不做操作,不存在则新增 + def insert_ignore(self, **kwargs): + """ + table:必填,表名,如:table="test_table" + data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] + + "key":key, + "value":value, + "data":data + """ + table = kwargs["table"] + data = kwargs["data"] + local_data = kwargs.get("update_obj") + # table_obj = kwargs.get("table_obj") + data_dict = list_base_str(data) + if not data_dict: + return None + for i in data_dict: + sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};" + logger.info("sql:\n{}\n".format(sql)) + try: + self.execute(sql) + if local_data: + local_data.executeNoParam( + f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'") + except: + self.conn.rollback() + + # 存在不做操作,不存在则新增 + def insert_ignore_update(self, **kwargs): + """ + table:必填,表名,如:table="test_table" + data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] + + "key":key, + "value":value, + "data":data + """ + table = kwargs["table"] + data = kwargs["data"] + # local_data = kwargs.get("update_obj") + # table_obj = kwargs.get("table_obj") + data_dict = list_base_str(data) + if not data_dict: + return None + for i in data_dict: + """ + replace into 'subject' (subjectId,subjectName) + values (1,'离散'); + """ + sql = f"replace into {table}({i.get('key')}) values{i.get('data')};" + # sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};" + logger.info("sql:\n{}\n".format(sql)) + try: + self.execute(sql) + # if local_data: + # local_data.executeNoParam( + # f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'") + except: + self.conn.rollback() + # def insert_ignore + + + +# if __name__ == '__main__': +# from common.utlis import get_uuid +# db_obj = DbManager() +# # table:必填,表名,如:table = "test_table" +# # data :必填,更新数据,字典类型,如:data=[{"aaa": "666'6", "bbb": "888"}] +# data_dic = { +# "table":"rms_user", +# "data":[ +# { +# 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2", +# 'user_code': 10991, +# 'account': '测试11', +# 'real_name':" 测试123", +# 'sex': 1, +# 'mobile': "10111111111", +# 'is_enabled': 1, +# 'last_visit_date': '2022-07-15 08:30:47', +# 'is_add': 1 +# } +# ] +# } +# db_obj.insert_ignore(**data_dic) \ No newline at end of file