添加数据同步

duizhaopin
apan_youxiang@163.com 2 years ago
parent d0e78de4a9
commit 231f0142aa

@ -0,0 +1,15 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/27 14:51:13
'''
import sys
sys.path.append('.')
from db_logic.db_base import Repository
from models.user_models import EntityUserFace
class BllUserFace(Repository):
def __init__(self, entityType=EntityUserFace):
return super().__init__(entityType)

@ -0,0 +1,16 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 14:39:54
'''
import sys
sys.path.append('.')
from timing.runtiming import TimingBase
timing_base = TimingBase()
timing_base.start()
# timing_base.timing_status= False
# timing_base.stop()

@ -0,0 +1,255 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 08:34:30
'''
# import sys
# sys.path.append('.')
# import pymysql
# import pymysql
# conn = pymysql.connect(host='127.0.0.1', port=3306,
# user='root', passwd='123456', db='local_rms_db')
# cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
# sql = "select * from rms_user"
# cur.execute(sql)
# data = cur.fetchall()
# print(data)
# import os
# from flask import Flask, jsonify
# app = Flask(__name__)
# def get_file_base():
# import base64
# with open(os.getcwd() + "/config/tem_00001.xlsx", "rb") as f:
# file_str = f.read()
# file_str = base64.b64encode(file_str).decode()
# return file_str
# @app.route("/get_file", methods=["GET"])
# def main_file():
# return jsonify(get_file_base())
# if __name__ == '__main__':
# app.run("127.0.0.1", 8001)
db_name = "user"
data_list = [
{"variety_id": "f8442a6e-9950-11ec-a502-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u6613\u5236\u6bd2", "remark4": "", "name": "\u7532\u82ef", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.5%", "cas_number": "108-88-3", "remark5": "",
"speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "683", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""},
{"variety_id": "f8442a6f-9950-11ec-9253-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u82ef\u915a", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.0%", "cas_number": "108-95-2", "remark5": "",
"speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "848", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u6325\u53d1\u915a", "remark10": ""},
{"variety_id": "f8442a70-9950-11ec-a157-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u82ef", "english_name": "", "purity": "AR\uff08\u6caa\u8bd5\uff09\uff0c\u2265 99.5%", "cas_number": "71-43-2", "remark5": "",
"speci": "500", "speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "640", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u6cb9", "remark10": ""},
{"variety_id": "f8442a71-9950-11ec-9894-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u4e59\u9178\uff0836%\uff09", "english_name": "", "purity": "AR", "cas_number": "64-19-7", "remark5": "", "speci": "500",
"speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "734", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""},
{"variety_id": "f84451a6-9950-11ec-8925-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u4e59\u9178\u9150", "english_name": "", "purity": "AR", "cas_number": "106-24-7", "remark5": "", "speci": "500",
"speci_unit": "ml", "export_count": "1", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "749", "manufacturer": "\u56fd\u836f", "remark8": "", "remark9": "\u70f7\u57fa\u6c5e", "remark10": ""},
{"variety_id": "f84451a7-9950-11ec-9b4e-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u56db\u6c2f\u4e59\u70ef", "english_name": "", "purity": "\u73af\u4fdd\u7ea7", "cas_number": "127-18-4", "remark5": "", "speci": "500",
"speci_unit": "ml", "export_count": "28", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "1207", "manufacturer": "\u5929\u6d25\u50b2\u7136", "remark8": "", "remark9": "\u6cb9", "remark10": ""},
{"variety_id": "f84451a8-9950-11ec-96ca-e2052a1045e4", "remark1": "", "remark2": "", "remark3": "\u5371\u5316\u54c1", "remark4": "", "name": "\u56db\u6c2f\u5316\u78b3", "english_name": "", "purity": "\u7ea2\u5916\u6d4b\u6cb9\u4eea\u4e13\u7528", "cas_number": "56-23-5", "remark5": "", "speci": "500", "speci_unit": "ml", "export_count": "31", "remark6": "\u74f6", "production_date": "2022-03-01", "shelf_life": 10529, "price": "0", "is_supervise": 0, "remain": "1126", "manufacturer": "\u5b89\u8c31", "remark8": "", "remark9": "\u77ff\u7269\u6cb9", "remark10": ""}]
# sql_header = f"insert into {}"
key_tup = []
value_tup = []
if data_list:
base_da = data_list[0]
for k,v in base_da.items():
key_tup.append(k)
value_tup.append("%s")
insert_sql = f"insert into {db_name}({','.join(key_tup)}) values({','.join(value_tup)})"
print(insert_sql)
"".join
import pymysql
from dbutils.pooled_db import PooledDB
# from common.db.mysql_config import MysqlConfig
"""
pymysql封装总结
https://blog.csdn.net/zhj_1121/article/details/121070412
python操作mysql之只看这篇就够了
https://www.jianshu.com/p/4e72faebd27f
关于PooledDB使用autocommit的方法
https://blog.51cto.com/abyss/1736844
"""
class MysqlPool:
"""
MySQL 数据库连接池类 配置变量
"""
'''
:param
reset: how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
'''
'''
https://blog.51cto.com/abyss/1736844
其中的
setsession=['SET AUTOCOMMIT = 1']
就是用来设置线程池是否打开自动更新的配置0为False1为True
'''
# 初始化数据库连接池变量
__pool = None
# 创建连接池的最大数量
__MAX_CONNECTIONS = 20
# 连接池中空闲连接的初始数量
__MIN_CACHED = 5
# 连接池中空闲连接的最大数量
__MAX_CACHED = 5
# 共享连接的最大数量
__MAX_SHARED = 0
# 超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
__BLOCK = True
# 单个连接的最大重复使用次数
__MAX_USAGE = 1
# 当返回到池时,连接应该如何重置
# (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚)
__RESET = True
# 设置自动提交
__SET_SESSION = []
# 不能是 UTF-8
__CHARSET = 'UTF8'
def __init__(self, host, port, user, password, database):
"""
:param host: 数据库主机地址
:param port: 端口号
:param user: 用户名
:param password: 密码
:param database: 数据库名
"""
if not self.__pool:
# self代表当前类的实例即为 MysqlPool 带小括号,执行后的数据。
# __class__魔法函数代表从当前类的实例中获取当前类即为 MysqlPool 不带小括号的类。
# __pool这个代表的事类的变量即为在类下面创建的初始化连接池__pool
self.__class__.__pool = PooledDB(
creator=pymysql,
host=host,
port=port,
user=user,
password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
charset=self.__CHARSET
)
def get_connect(self):
return self.__pool.connection()
class MysqlCursor:
"""
从数据库配置环境取出数据库配置参数
这里的参数可以不从外部导入直接手动写入也可以
"""
def __init__(self, host=host, port=port, user=user, password=password, database=database) -> None:
"""
:param host: 数据库主机地址
:param port: 端口号
:param user: 用户名
:param password: 密码
:param database: 数据库名
"""
self.__host = host
self.__port = port
self.__user = user
self.__password = password
self.__database = database
# 初始化数据库连接池
self.connects_pool = MysqlPool(
host=self.__host,
port=self.__port,
user=self.__user,
password=self.__password,
database=self.__database
)
def __enter__(self):
"""
# with 上下文管理魔法函数进入with时调用
:return: 当前类
"""
# 从数据库链接池,获取一个数据库链接
connect = self.connects_pool.get_connect()
# 从获取的数据库链接,获取一个光标
cursor = connect.cursor(pymysql.cursors.DictCursor)
'''
# https://blog.51cto.com/abyss/1736844
# 如果使用连接池 则不能在取出后设置 而应该在创建线程池时设置
# connect.autocommit = False
'''
# 将数据库链接赋值给当前类方便__exit__函数调用
self._connect = connect
# 将数据库光标赋值给当前类方便__exit__函数调用
self._cursor = cursor
# __enter__函数必须返回当前类
return self
def __exit__(self, *exc_info):
"""
# with 上下文管理魔法函数退出with时调用
:param exc_info: 异常信息元祖
:return: None
"""
# 退出with上下文时使用当前类链接提交数据库语句
self._connect.commit()
# 关闭光标
self._cursor.close()
# 关闭链接
self._connect.close()
@property
def cursor(self):
"""
数据库连接池取出链接取出光标转换为光标属性
:return: 数据库连接池的光标
"""
return self._cursor
if __name__ == "__main__":
with MysqlCursor() as db:
# 获取数据库的方法
sql = 'select count(id) as total from people'
db.cursor.execute("select count(id) as total from people")
data = db.cursor.fetchone()
print('--------统计数据条数', data)

@ -0,0 +1,78 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 14:00:05
'''
import sys
from Common.Utils import Utils
sys.path.append('.')
import threading
import time
from timing.utils import DbManager
from models.user_models import EntityUser, EntityUserFace
from models.client_models import EntityClientCellUser, EntityClientUser
from db_logic.client_cell_user import BllClientCellUser
from db_logic.user import BllUser
from db_logic.client_user import BllClientUser
from db_logic.user_face import BllUserFace
class TimingBase(object):
time_sleep = 60
timing_status = True
db_list = ["rms_client_cell_user",
"rms_client_user",
"rms_user",
"rms_user_face"
]
def get_db_obj(self, table_name):
dic = {
"rms_client_cell_user": BllClientCellUser(),
"rms_client_user": BllClientUser(),
"rms_user": BllUser(),
"rms_user_face": EntityUserFace()
}
return dic.get(table_name)
def get_db_table(self, table_name):
dic = {
"rms_client_cell_user": EntityClientCellUser(),
"rms_client_user": EntityClientUser(),
"rms_user": EntityUser(),
"rms_user_face": BllUserFace()
}
return dic.get(table_name)
def sync_db(self):
while self.timing_status:
try:
db_manager = DbManager()
for i in self.db_list:
local_data = self.get_db_obj(i)
if not local_data:
continue
data_list = local_data.execute(
f"select * from {i}").fetchall()
if not data_list:
continue
data_list = Utils.msyql_table_model(data_list)
# db_manager.insert_ignore(
# **{"table": i, "data": data_list, "update_obj": local_data})
db_manager.insert_ignore_update(
**{"table": i, "data": data_list})
finally:
db_manager.close()
time.sleep(self.time_sleep)
def start(self):
p = threading.Thread(target=self.sync_db, daemon=True)
p.start()
def stop(self):
self.timing_status = False

@ -0,0 +1,258 @@
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
'''
@Date:2022/08/26 09:48:12
'''
import sys
from tkinter import N
sys.path.append('.')
import pymysql
from dbutils.pooled_db import PooledDB
from common.logger_utils import logger
def list_base_str(data_list):
if data_list:
# data = []
resp_data = []
for i in data_list:
tup_data = []
key, value = "", ""
for k,v in i.items():
if k == "is_add":
v = 0
if v is None:
continue
key += f"{k},"
value += "%s,"
tup_data.append(v)
# data.append(tuple(tup_data))
if key.count(",") > 1:
key = key[:-1]
value = value[:-1]
resp_data.append(
{
"key":key,
"value":value,
"data":tuple(tup_data)
}
)
return resp_data
return None
class MysqlPool:
"""
MySQL 数据库连接池类 配置变量
"""
'''
:param
reset: how connections should be reset when returned to the pool
(False or None to rollback transcations started with begin(),
True to always issue a rollback for safety's sake)
:param
setsession: optional list of SQL commands that may serve to prepare
the session, e.g. ["set datestyle to ...", "set time zone ..."]
'''
'''
https://blog.51cto.com/abyss/1736844
其中的
setsession=['SET AUTOCOMMIT = 1']
就是用来设置线程池是否打开自动更新的配置0为False1为True
'''
# 初始化数据库连接池变量
__pool = None
# 创建连接池的最大数量
__MAX_CONNECTIONS = 20
# 连接池中空闲连接的初始数量
__MIN_CACHED = 5
# 连接池中空闲连接的最大数量
__MAX_CACHED = 5
# 共享连接的最大数量
__MAX_SHARED = 0
# 超过最大连接数量时候的表现为True等待连接数量下降为false直接报错处理
__BLOCK = True
# 单个连接的最大重复使用次数
__MAX_USAGE = 1
# 当返回到池时,连接应该如何重置
# (False或None回滚以begin()开始的事务,为了安全起见,总是发出回滚)
# 设置自动提交
__SET_SESSION = []
# 不能是 UTF-8
__CHARSET = 'UTF8'
def __init__(self, host, port, user, password, database):
"""
:param host: 数据库主机地址
:param port: 端口号
:param user: 用户名
:param password: 密码
:param database: 数据库名
"""
if not self.__pool:
# self代表当前类的实例即为 MysqlPool 带小括号,执行后的数据。
# __class__魔法函数代表从当前类的实例中获取当前类即为 MysqlPool 不带小括号的类。
# __pool这个代表的事类的变量即为在类下面创建的初始化连接池__pool
self.__class__.__pool = PooledDB(
creator=pymysql,
host=host,
port=port,
user=user,
password=password,
database=database,
maxconnections=self.__MAX_CONNECTIONS,
mincached=self.__MIN_CACHED,
maxcached=self.__MAX_CACHED,
maxshared=self.__MAX_SHARED,
blocking=self.__BLOCK,
maxusage=self.__MAX_USAGE,
setsession=self.__SET_SESSION,
charset=self.__CHARSET
)
def get_connect(self):
return self.__pool.connection()
class DbManager(object):
conn = None
cur = None
POOL = MysqlPool(host='127.0.0.1',port=3306,user='root',password='123456',database='test111')
def connectDatabase(self):
try:
self.conn = self.POOL.get_connect()
self.cur = self.conn.cursor(cursor=pymysql.cursors.DictCursor)
return True
except:
logger.error("connectDatabase failed")
return False
# 关闭数据库
def close(self):
if self.conn and self.cur:
self.cur.close()
self.conn.close()
return True
# 基本的执行SQL方法下面几乎都调用这个
def execute(self, sql, params=None, exe_many=False):
res = self.connectDatabase()
if not res:
return False
cnt = 0
try:
if self.conn and self.cur:
if exe_many:
cnt = self.cur.executemany(sql, params)
else:
cnt = self.cur.execute(sql)
self.conn.commit()
except Exception as e:
logger.error("execute failed: " + sql)
logger.error(str(e) + "\n\n")
return False
finally:
self.close()
return cnt
###############################################################
### 增删改查
###############################################################
# 存在不做操作,不存在则新增
def insert_ignore(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
if local_data:
local_data.executeNoParam(
f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# 存在不做操作,不存在则新增
def insert_ignore_update(self, **kwargs):
"""
table必填表名table="test_table"
data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
"key":key,
"value":value,
"data":data
"""
table = kwargs["table"]
data = kwargs["data"]
# local_data = kwargs.get("update_obj")
# table_obj = kwargs.get("table_obj")
data_dict = list_base_str(data)
if not data_dict:
return None
for i in data_dict:
"""
replace into 'subject' (subjectId,subjectName)
values (1,'离散');
"""
sql = f"replace into {table}({i.get('key')}) values{i.get('data')};"
# sql = f"insert ignore {table}({i.get('key')}) values{i.get('data')};"
logger.info("sql\n{}\n".format(sql))
try:
self.execute(sql)
# if local_data:
# local_data.executeNoParam(
# f"update {table} set is_add=1 where {i.get('key').split(',')[0]}='{i.get('data')[0]}'")
except:
self.conn.rollback()
# def insert_ignore
# if __name__ == '__main__':
# from common.utlis import get_uuid
# db_obj = DbManager()
# # table必填表名table = "test_table"
# # data 必填更新数据字典类型data=[{"aaa": "666'6", "bbb": "888"}]
# data_dic = {
# "table":"rms_user",
# "data":[
# {
# 'user_id': "f87cd129-2501-11ed-af7b-f47b094925e2",
# 'user_code': 10991,
# 'account': '测试11',
# 'real_name':" 测试123",
# 'sex': 1,
# 'mobile': "10111111111",
# 'is_enabled': 1,
# 'last_visit_date': '2022-07-15 08:30:47',
# 'is_add': 1
# }
# ]
# }
# db_obj.insert_ignore(**data_dic)
Loading…
Cancel
Save