From e66b55fd468eead91245cdbe1db923b2cef5b91b Mon Sep 17 00:00:00 2001 From: ghb <3779166520@qq.com> Date: Tue, 23 Jul 2024 08:53:40 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=8F=98=E6=9B=B4=E8=A7=A6?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ClusterMonitor.py | 367 ++++++++++++++++++++++++++++++++++++++++++++++ notify.py | 26 ++++ 2 files changed, 393 insertions(+) create mode 100644 ClusterMonitor.py create mode 100644 notify.py diff --git a/ClusterMonitor.py b/ClusterMonitor.py new file mode 100644 index 0000000..39ca278 --- /dev/null +++ b/ClusterMonitor.py @@ -0,0 +1,367 @@ +import datetime +import os +import pathlib +import logging +import socket +import time +import urllib.request + +import pymysql +from pymysql import OperationalError + +formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s') +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +file_handler = logging.FileHandler("debug.log") +file_handler.setLevel(logging.DEBUG) +file_handler.setFormatter(formatter) +logger.addHandler(file_handler) +stream_handler = logging.StreamHandler() +stream_handler.setLevel(logging.DEBUG) +stream_handler.setFormatter(formatter) +logger.addHandler(stream_handler) + +API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status" +MYSQL_CONFIG = '/etc/mysql/mysql.conf.d/mysqld.cnf' +SLOW_CONFIG = { + "slow_query_log": 1, + "slow_query_log_file": "/var/log/mysql/slow_query.log", + "long_query_time": 0, + "log_queries_not_using_indexes": 0, + "max_binlog_size": "1024M", +} + + +def get_db_conf(): + return { + 'host': "127.0.0.1", # 只能控制本地数据库 + 'port': 3344, + 'user': "root", + 'password': "Yanei!23", + 'db': "rms_ge_prod", + } + + +def is_ip_reachable(ip, port=80, timeout=3): + try: + # 创建一个 socket 对象 + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.settimeout(timeout) + result = sock.connect_ex((ip, port)) + if result == 0: + return True + else: + return False + except socket.error as e: + logger.error(f"is_ip_reachable Error {ip}:{port} : {e}") + return False + finally: + sock.close() + + +def to_one_format(cursor_result): + result = {} + for row in cursor_result: + result[row[0]] = row[1] + return result + + +def mysql_config_data(config=MYSQL_CONFIG): + with open(config, 'r') as f: + lines = f.readlines() + result = {} + for line in lines: + if line.startswith('wsrep_cluster_address'): + address_str = line.split("=")[1].replace('"', '').replace("gcomm://", "") + if address_str: + ip_list = address_str.split(',') + result["addresses"] = [ip.replace('\n', '') for ip in ip_list] + + if line.startswith('server_id'): + server_id = line.split("=")[1] + result["server_id"] = server_id + + if line.startswith("port"): + port = line.split("=")[1] + result["port"] = int(port) + + return result + + +class Server: + + def __init__(self): + self.status_path = pathlib.Path("status.json") + self.db_conf = get_db_conf() + if self.db_conf is None: + logger.error(f"db_config error: {self.db_conf}") + raise ValueError("Database URL is None") + self.conn = self.get_connection() + + def get_connection(self): + while True: + try: + return pymysql.connect(**self.db_conf) + except Exception as e: + logger.exception(e) + time.sleep(5) + continue + + def get_cluster_status(self): + while True: + try: + with self.conn.cursor() as cursor: + cursor.execute("show status like 'wsrep%'") + r = cursor.fetchall() + return to_one_format(r) + except OperationalError as e: + logger.error(e) + + try: + self.conn.close() + self.conn = self.get_connection() + except Exception as e: + logger.exception(e) + time.sleep(10) + + def start_standalone_mode(self): + logger.info("Start standalone mode...") + # 备份 + os.system(f"cp {MYSQL_CONFIG} {MYSQL_CONFIG}.cluster_bak") + + new_lines = [] + with open(MYSQL_CONFIG, 'r') as f: + lines = f.readlines() + + for line in lines: + if line.startswith('wsrep'): + continue + + for k in SLOW_CONFIG.keys(): + if line.startswith(k): + continue + + new_lines.append(line) + + slow_config = [] + for k, v in SLOW_CONFIG.items(): + slow_config.append(f"{k}={v}") + + new_content = ''.join(new_lines) + "\n" + "\n".join(slow_config) + with open(MYSQL_CONFIG, 'w') as f: + f.write(new_content) + + os.system("systemctl restart mysql") + logger.info("Finish standalone mode...") + + def start_cluster_mode(self): + logger.info("Start start_cluster_mode...") + os.system("systemctl stop mysql") + os.system(f"cp {MYSQL_CONFIG}.cluster_bak {MYSQL_CONFIG}") + os.system("rm -rf {MYSQL_CONFIG}.cluster_bak") + os.system("rm -rf /var/lib/mysql/galera.cache && " + "rm -rf /var/lib/mysql/grastate.dat && " + "rm -rf /var/lib/mysql/gvwstate.dat") + os.system("systemctl start mysql") + time.sleep(10) + + # 判断是否连接集群 + logger.info("check cluster join...") + is_join = False + for i in range(60): + time.sleep(10) + now_status = self.get_cluster_status() + cluster_size = int(now_status.get("wsrep_cluster_size")) + if cluster_size > 1: + logger.info(f"start_cluster_mode wsrep_cluster_size: {cluster_size}") + is_join = True + break + + if not is_join: + logger.error("not join cluster...") + return False + + # 判断是否同步数据完成 + logger.info("check cluster status...") + is_primary = False + for i in range(180): + time.sleep(10) + + now_status = self.get_cluster_status() + logger.info("cluster_status:") + logger.info(now_status) + cluster_status = now_status["wsrep_cluster_status"] + if cluster_status == "Primary": + logger.info(f"start_cluster_mode wsrep_cluster_status: {cluster_status}") + is_primary = True + break + + if not is_primary: + logger.error("db primary timeout") + return False + + logger.info("Finish start_cluster_mode...") + return True + + def insert_record(self): + logger.info("start update data...") + now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S") + exec_sql_path = f"{now_str}.sql" + fail_sql_path = f"{now_str}_fail.sql" + os.system(f'grep -E "INSERT|UPDATE|DELETE" /var/log/mysql/slow_query.log > {exec_sql_path}') + + with open(exec_sql_path, 'r') as f: + lines = f.readlines() + + logger.info(f"sql count: {len(lines)}") + fail_line = [] + conn = self.get_connection() + cur = conn.cursor() + for line in lines: + if line.startswith('INSERT') or line.startswith('UPDATE') or line.startswith('DELETE'): + sql = line.strip().replace("\n", "") + try: + cur.execute(sql) + conn.commit() + logger.info(f"success: {sql} ") + except Exception as e: + logger.error(f"fail: {sql}") + logger.exception(e) + fail_line.append(line) + conn.rollback() + conn.close() + + with open(fail_sql_path, 'w') as f: + f.writelines("\n".join(fail_line)) + + os.system("rm -rf /var/log/mysql/slow_query.log") + logger.info("finish update data...") + + def check_is_to_disconnect(self): + logger.info("start check_is_to_disconnect...") + logger.debug("check cluster status...") + now_status = None + for _ in range(10): + now_status = self.get_cluster_status() + if int(now_status["wsrep_cluster_size"]) > 1: + logger.info(f"size > 1 : {now_status['wsrep_cluster_size']}") + return + time.sleep(3) + + logger.info(f"cluster status wsrep_cluster_size: {now_status['wsrep_cluster_size']}") + logger.info("cluster size abnormal") + + logger.info("check connection....") + mysql_config = mysql_config_data() + node_ips = mysql_config.get("addresses") + port = mysql_config.get("port") or 3344 + logger.info(f"{node_ips}, {port}") + + for _ in range(3): + time.sleep(3) + count = 0 + for ip in node_ips: + if is_ip_reachable(ip, port): + count += 1 + logger.info(f"{ip}:{port} is normal ") + # 有ip正常 + if count > 0: + logger.info(f"{count} ip normal") + return False + + logger.info("connect all ip abnormal") + return True + + def standalone_run(self): + logger.info("start standalone_run....") + self.check_standalone_mode() + while True: + time.sleep(5) + bak_path = pathlib.Path(f"{MYSQL_CONFIG}.cluster_bak") + if not bak_path.exists(): + continue + + mysql_config = mysql_config_data(config=str(bak_path)) + node_ips = mysql_config.get("addresses") + port = mysql_config.get("port") or 3344 + count = 0 + for ip in node_ips: + if is_ip_reachable(ip, port): + count += 1 + logger.info(f"{ip}:{port} is normal ") + + if count == 0: + logger.debug(f"{node_ips} {port} disconnection") + continue + + logger.info("ip normal count: {}".format(count)) + if len(node_ips) / 2 < count: + logger.info("cluster connect") + self.start_cluster_mode() + time.sleep(10) + self.insert_record() + logger.info("end standalone_run...") + return + + def check_standalone_mode(self): + """确认独立模式正常并通知前端""" + logger.info("start check_standalone_mode...") + while True: + conn = None + try: + conn = self.get_connection() + cur = conn.cursor() + cur.execute(f"use {get_db_conf()['db']}") + cur.execute(f"show tables") + logger.info("database is normal") + except Exception as e: + logger.error(e) + time.sleep(1) + continue + finally: + if conn: + conn.close() + + # 只通知一次 + try: + url = API_URL + "?status=synced" + logger.info(f"check_standalone_mode url: {url}") + urllib.request.urlopen(url) + return True + except Exception as e: + logger.exception(e) + return False + + def cluster_run(self): + logger.info("start cluster_run...") + now_status = self.get_cluster_status() + logger.info(f"cluster_status: {now_status['wsrep_cluster_status']}") + mysql_config = mysql_config_data() + while True: + time.sleep(5) + now_status = self.get_cluster_status() + node_ips = mysql_config.get("addresses") + if len(node_ips) >= 1 and int(now_status.get("wsrep_cluster_size")) <= 1: + is_disconnect = self.check_is_to_disconnect() + if not is_disconnect: + continue + + logger.info("cluster disconnect") + self.start_standalone_mode() + logger.info("end cluster_run...") + return + + def run(self): + while True: + now_status = self.get_cluster_status() + if int(now_status.get("wsrep_cluster_size") or 0) != 0: + self.cluster_run() + else: + self.standalone_run() + + time.sleep(5) + + +if __name__ == '__main__': + s = Server() + s.run() diff --git a/notify.py b/notify.py new file mode 100644 index 0000000..e1c5392 --- /dev/null +++ b/notify.py @@ -0,0 +1,26 @@ +import sys +from urllib.parse import urlencode +import urllib.request + +API_URL = "http://127.0.0.1:8000/api/rms/sync/db_status" + + +def main(): + args = sys.argv[1:] + + params = {} + for index in range(len(args)): + arg = args[index] + if not arg.startswith('--'): + continue + key = arg[2:] + value = args[index+1] + params[key] = value + + encoded_params = urlencode(params) + url = API_URL + '?' + encoded_params + urllib.request.urlopen(url) + + +if __name__ == '__main__': + main()