import datetime import os import pathlib import logging import socket import time import urllib.request import pymysql from pymysql import OperationalError formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(lineno)d|%(message)s') logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) file_handler = logging.FileHandler("debug.log") file_handler.setLevel(logging.DEBUG) file_handler.setFormatter(formatter) logger.addHandler(file_handler) stream_handler = logging.StreamHandler() stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(formatter) logger.addHandler(stream_handler) API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status" MYSQL_CONFIG = '/etc/mysql/mysql.conf.d/mysqld.cnf' SLOW_CONFIG = { "slow_query_log": 1, "slow_query_log_file": "/var/log/mysql/slow_query.log", "long_query_time": 0, "log_queries_not_using_indexes": 0, "max_binlog_size": "1024M", } class DatabaseConnectError(Exception): pass def get_db_conf(): return { 'host': "127.0.0.1", # 只能控制本地数据库 'port': 3344, 'user': "root", 'password': "Yanei!23", 'db': "rms_ge_prod", 'connect_timeout': 5 } def is_ip_reachable(ip, port=80, timeout=3): try: # 创建一个 socket 对象 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout) result = sock.connect_ex((ip, port)) if result == 0: return True else: return False except socket.error as e: logger.error(f"is_ip_reachable Error {ip}:{port} : {e}") return False finally: sock.close() def to_one_format(cursor_result): result = {} for row in cursor_result: result[row[0]] = row[1] return result def mysql_config_data(config=MYSQL_CONFIG): with open(config, 'r') as f: lines = f.readlines() result = {} for line in lines: if line.startswith('wsrep_cluster_address'): address_str = line.split("=")[1].replace('"', '').replace("gcomm://", "") if address_str: ip_list = address_str.split(',') result["addresses"] = [ip.replace('\n', '') for ip in ip_list] if line.startswith('server_id'): server_id = line.split("=")[1] result["server_id"] = server_id if line.startswith("port"): port = line.split("=")[1] result["port"] = int(port) return result class Server: def __init__(self): self.status_path = pathlib.Path("status.json") self.db_conf = get_db_conf() if self.db_conf is None: logger.error(f"db_config error: {self.db_conf}") raise ValueError("Database URL is None") self.conn = None def get_connection(self): for i in range(10): try: return pymysql.connect(**self.db_conf) except Exception as e: logger.exception(e) time.sleep(3) continue raise DatabaseConnectError("Connection error") def get_cluster_status(self): while True: try: with self.conn.cursor() as cursor: cursor.execute("show status like 'wsrep%'") r = cursor.fetchall() return to_one_format(r) except OperationalError as e: logger.error(e) self.conn.close() self.conn = self.get_connection() def start_standalone_mode(self): logger.info("Start standalone mode...") # 备份 os.system(f"cp {MYSQL_CONFIG} {MYSQL_CONFIG}.cluster_bak") new_lines = [] with open(MYSQL_CONFIG, 'r') as f: lines = f.readlines() for line in lines: if line.startswith('wsrep'): continue for k in SLOW_CONFIG.keys(): if line.startswith(k): continue new_lines.append(line) slow_config = [] for k, v in SLOW_CONFIG.items(): slow_config.append(f"{k}={v}") new_content = ''.join(new_lines) + "\n" + "\n".join(slow_config) with open(MYSQL_CONFIG, 'w') as f: f.write(new_content) os.system("systemctl restart mysql") logger.info("Finish standalone mode...") def start_cluster_mode(self): logger.info("Start start_cluster_mode...") os.system("systemctl stop mysql") os.system(f"cp {MYSQL_CONFIG}.cluster_bak {MYSQL_CONFIG}") os.system("rm -rf {MYSQL_CONFIG}.cluster_bak") os.system("rm -rf /var/lib/mysql/galera.cache && " "rm -rf /var/lib/mysql/grastate.dat && " "rm -rf /var/lib/mysql/gvwstate.dat") os.system("systemctl start mysql") time.sleep(10) # 判断是否连接集群 logger.info("check cluster join...") is_join = False for i in range(60): time.sleep(10) now_status = self.get_cluster_status() cluster_size = int(now_status.get("wsrep_cluster_size")) if cluster_size > 1: logger.info(f"start_cluster_mode wsrep_cluster_size: {cluster_size}") is_join = True break if not is_join: logger.error("not join cluster...") return False # 判断是否同步数据完成 logger.info("check cluster status...") is_primary = False for i in range(180): time.sleep(10) now_status = self.get_cluster_status() logger.info("cluster_status:") logger.info(now_status) cluster_status = now_status["wsrep_cluster_status"] if cluster_status == "Primary": logger.info(f"start_cluster_mode wsrep_cluster_status: {cluster_status}") is_primary = True break if not is_primary: logger.error("db primary timeout") return False logger.info("Finish start_cluster_mode...") return True def insert_record(self): logger.info("start update data...") now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") exec_sql_path = f"{now_str}.sql" fail_sql_path = f"{now_str}_fail.sql" os.system(f'grep -E "INSERT|UPDATE|DELETE" /var/log/mysql/slow_query.log > {exec_sql_path}') with open(exec_sql_path, 'r') as f: lines = f.readlines() logger.info(f"sql count: {len(lines)}") fail_line = [] conn = self.get_connection() cur = conn.cursor() for line in lines: if line.startswith('INSERT') or line.startswith('UPDATE') or line.startswith('DELETE'): sql = line.strip().replace("\n", "") try: cur.execute(sql) conn.commit() logger.info(f"success: {sql} ") except Exception as e: logger.error(f"fail: {sql}") logger.exception(e) fail_line.append(line) conn.rollback() conn.close() with open(fail_sql_path, 'w') as f: f.writelines("\n".join(fail_line)) os.system("rm -rf /var/log/mysql/slow_query.log") logger.info("finish update data...") def check_is_to_disconnect(self): logger.info("start check_is_to_disconnect...") logger.debug("check cluster status...") now_status = None for _ in range(10): now_status = self.get_cluster_status() if int(now_status["wsrep_cluster_size"]) > 1: logger.info(f"size > 1 : {now_status['wsrep_cluster_size']}") return time.sleep(3) logger.info(f"cluster status wsrep_cluster_size: {now_status['wsrep_cluster_size']}") logger.info("cluster size abnormal") logger.info("check connection....") mysql_config = mysql_config_data() node_ips = mysql_config.get("addresses") port = mysql_config.get("port") or 3344 logger.info(f"{node_ips}, {port}") for _ in range(3): time.sleep(3) count = 0 for ip in node_ips: if is_ip_reachable(ip, port): count += 1 logger.info(f"{ip}:{port} is normal ") # 有ip正常 if count > 0: logger.info(f"{count} ip normal") return False logger.info("connect all ip abnormal") return True def standalone_run(self): logger.info("start standalone_run....") self.check_standalone_mode() while True: time.sleep(5) bak_path = pathlib.Path(f"{MYSQL_CONFIG}.cluster_bak") if not bak_path.exists(): continue mysql_config = mysql_config_data(config=str(bak_path)) node_ips = mysql_config.get("addresses") port = mysql_config.get("port") or 3344 count = 0 for ip in node_ips: if is_ip_reachable(ip, port): count += 1 logger.info(f"{ip}:{port} is normal ") if count == 0: logger.debug(f"{node_ips} {port} disconnection") continue logger.info("ip normal count: {}".format(count)) if len(node_ips) / 2 < count: logger.info("cluster connect") self.start_cluster_mode() time.sleep(10) self.insert_record() logger.info("end standalone_run...") return def check_standalone_mode(self): """确认独立模式正常并通知前端""" logger.info("start check_standalone_mode...") while True: conn = None try: conn = self.get_connection() cur = conn.cursor() cur.execute(f"use {get_db_conf()['db']}") cur.execute(f"show tables") logger.info("database is normal") except DatabaseConnectError as e: raise e except Exception as e: logger.error(e) time.sleep(1) continue finally: if conn: conn.close() # 只通知一次 try: url = API_URL + "?status=synced" logger.info(f"check_standalone_mode url: {url}") urllib.request.urlopen(url) return True except Exception as e: logger.exception(e) return False def cluster_run(self): logger.info("start cluster_run...") now_status = self.get_cluster_status() logger.info(f"cluster_status: {now_status['wsrep_cluster_status']}") mysql_config = mysql_config_data() while True: time.sleep(5) now_status = self.get_cluster_status() node_ips = mysql_config.get("addresses") if len(node_ips) >= 1 and int(now_status.get("wsrep_cluster_size")) <= 1: is_disconnect = self.check_is_to_disconnect() if not is_disconnect: continue logger.info("cluster disconnect") self.start_standalone_mode() logger.info("end cluster_run...") return def to_standalone_run(self): """切换为单机模式""" # 当前就是单机模式,只能重启尝试 if os.path.exists(f"{MYSQL_CONFIG}.cluster_bak"): logger.info("is standalone and try restart") os.system("kill -9 $(ps aux | grep mysqld | awk '{print $2}')") logger.info("kill -9 mysql") time.sleep(3) # 当前是集群模式,切换为单机模式 else: logger.info("is cluster and cluster to standalone") self.start_standalone_mode() def run(self): while True: try: if not self.conn: self.conn = self.get_connection() now_status = self.get_cluster_status() if int(now_status.get("wsrep_cluster_size") or 0) != 0: self.cluster_run() else: self.standalone_run() time.sleep(5) except DatabaseConnectError as e: # 数据库异常,切换为单机模式 logger.error(e) logger.info("database connect error, to standalone") self.to_standalone_run() time.sleep(5) # 重置连接 if self.conn: self.conn.close() self.conn = None if __name__ == '__main__': s = Server() s.run()