You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
12 KiB

import datetime
import os
import pathlib
import logging
import socket
import time
import urllib.request
import pymysql
from pymysql import OperationalError
formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("debug.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status"
MYSQL_CONFIG = '/etc/mysql/mysql.conf.d/mysqld.cnf'
SLOW_CONFIG = {
"slow_query_log": 1,
"slow_query_log_file": "/var/log/mysql/slow_query.log",
"long_query_time": 0,
"log_queries_not_using_indexes": 0,
"max_binlog_size": "1024M",
}
def get_db_conf():
return {
'host': "127.0.0.1", # 只能控制本地数据库
'port': 3344,
'user': "root",
'password': "Yanei!23",
'db': "rms_ge_prod",
}
def is_ip_reachable(ip, port=80, timeout=3):
try:
# 创建一个 socket 对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
if result == 0:
return True
else:
return False
except socket.error as e:
logger.error(f"is_ip_reachable Error {ip}:{port} : {e}")
return False
finally:
sock.close()
def to_one_format(cursor_result):
result = {}
for row in cursor_result:
result[row[0]] = row[1]
return result
def mysql_config_data(config=MYSQL_CONFIG):
with open(config, 'r') as f:
lines = f.readlines()
result = {}
for line in lines:
if line.startswith('wsrep_cluster_address'):
address_str = line.split("=")[1].replace('"', '').replace("gcomm://", "")
if address_str:
ip_list = address_str.split(',')
result["addresses"] = [ip.replace('\n', '') for ip in ip_list]
if line.startswith('server_id'):
server_id = line.split("=")[1]
result["server_id"] = server_id
if line.startswith("port"):
port = line.split("=")[1]
result["port"] = int(port)
return result
class Server:
def __init__(self):
self.status_path = pathlib.Path("status.json")
self.db_conf = get_db_conf()
if self.db_conf is None:
logger.error(f"db_config error: {self.db_conf}")
raise ValueError("Database URL is None")
self.conn = self.get_connection()
def get_connection(self):
while True:
try:
return pymysql.connect(**self.db_conf)
except Exception as e:
logger.exception(e)
time.sleep(5)
continue
def get_cluster_status(self):
while True:
try:
with self.conn.cursor() as cursor:
cursor.execute("show status like 'wsrep%'")
r = cursor.fetchall()
return to_one_format(r)
except OperationalError as e:
logger.error(e)
try:
self.conn.close()
self.conn = self.get_connection()
except Exception as e:
logger.exception(e)
time.sleep(10)
def start_standalone_mode(self):
logger.info("Start standalone mode...")
# 备份
os.system(f"cp {MYSQL_CONFIG} {MYSQL_CONFIG}.cluster_bak")
new_lines = []
with open(MYSQL_CONFIG, 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('wsrep'):
continue
for k in SLOW_CONFIG.keys():
if line.startswith(k):
continue
new_lines.append(line)
slow_config = []
for k, v in SLOW_CONFIG.items():
slow_config.append(f"{k}={v}")
new_content = ''.join(new_lines) + "\n" + "\n".join(slow_config)
with open(MYSQL_CONFIG, 'w') as f:
f.write(new_content)
os.system("systemctl restart mysql")
logger.info("Finish standalone mode...")
def start_cluster_mode(self):
logger.info("Start start_cluster_mode...")
os.system("systemctl stop mysql")
os.system(f"cp {MYSQL_CONFIG}.cluster_bak {MYSQL_CONFIG}")
os.system("rm -rf {MYSQL_CONFIG}.cluster_bak")
os.system("rm -rf /var/lib/mysql/galera.cache && "
"rm -rf /var/lib/mysql/grastate.dat && "
"rm -rf /var/lib/mysql/gvwstate.dat")
os.system("systemctl start mysql")
time.sleep(10)
# 判断是否连接集群
logger.info("check cluster join...")
is_join = False
for i in range(60):
time.sleep(10)
now_status = self.get_cluster_status()
cluster_size = int(now_status.get("wsrep_cluster_size"))
if cluster_size > 1:
logger.info(f"start_cluster_mode wsrep_cluster_size: {cluster_size}")
is_join = True
break
if not is_join:
logger.error("not join cluster...")
return False
# 判断是否同步数据完成
logger.info("check cluster status...")
is_primary = False
for i in range(180):
time.sleep(10)
now_status = self.get_cluster_status()
logger.info("cluster_status:")
logger.info(now_status)
cluster_status = now_status["wsrep_cluster_status"]
if cluster_status == "Primary":
logger.info(f"start_cluster_mode wsrep_cluster_status: {cluster_status}")
is_primary = True
break
if not is_primary:
logger.error("db primary timeout")
return False
logger.info("Finish start_cluster_mode...")
return True
def insert_record(self):
logger.info("start update data...")
now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
exec_sql_path = f"{now_str}.sql"
fail_sql_path = f"{now_str}_fail.sql"
os.system(f'grep -E "INSERT|UPDATE|DELETE" /var/log/mysql/slow_query.log > {exec_sql_path}')
with open(exec_sql_path, 'r') as f:
lines = f.readlines()
logger.info(f"sql count: {len(lines)}")
fail_line = []
conn = self.get_connection()
cur = conn.cursor()
for line in lines:
if line.startswith('INSERT') or line.startswith('UPDATE') or line.startswith('DELETE'):
sql = line.strip().replace("\n", "")
try:
cur.execute(sql)
conn.commit()
logger.info(f"success: {sql} ")
except Exception as e:
logger.error(f"fail: {sql}")
logger.exception(e)
fail_line.append(line)
conn.rollback()
conn.close()
with open(fail_sql_path, 'w') as f:
f.writelines("\n".join(fail_line))
os.system("rm -rf /var/log/mysql/slow_query.log")
logger.info("finish update data...")
def check_is_to_disconnect(self):
logger.info("start check_is_to_disconnect...")
logger.debug("check cluster status...")
now_status = None
for _ in range(10):
now_status = self.get_cluster_status()
if int(now_status["wsrep_cluster_size"]) > 1:
logger.info(f"size > 1 : {now_status['wsrep_cluster_size']}")
return
time.sleep(3)
logger.info(f"cluster status wsrep_cluster_size: {now_status['wsrep_cluster_size']}")
logger.info("cluster size abnormal")
logger.info("check connection....")
mysql_config = mysql_config_data()
node_ips = mysql_config.get("addresses")
port = mysql_config.get("port") or 3344
logger.info(f"{node_ips}, {port}")
for _ in range(3):
time.sleep(3)
count = 0
for ip in node_ips:
if is_ip_reachable(ip, port):
count += 1
logger.info(f"{ip}:{port} is normal ")
# 有ip正常
if count > 0:
logger.info(f"{count} ip normal")
return False
logger.info("connect all ip abnormal")
return True
def standalone_run(self):
logger.info("start standalone_run....")
self.check_standalone_mode()
while True:
time.sleep(5)
bak_path = pathlib.Path(f"{MYSQL_CONFIG}.cluster_bak")
if not bak_path.exists():
continue
mysql_config = mysql_config_data(config=str(bak_path))
node_ips = mysql_config.get("addresses")
port = mysql_config.get("port") or 3344
count = 0
for ip in node_ips:
if is_ip_reachable(ip, port):
count += 1
logger.info(f"{ip}:{port} is normal ")
if count == 0:
logger.debug(f"{node_ips} {port} disconnection")
continue
logger.info("ip normal count: {}".format(count))
if len(node_ips) / 2 < count:
logger.info("cluster connect")
self.start_cluster_mode()
time.sleep(10)
self.insert_record()
logger.info("end standalone_run...")
return
def check_standalone_mode(self):
"""确认独立模式正常并通知前端"""
logger.info("start check_standalone_mode...")
while True:
conn = None
try:
conn = self.get_connection()
cur = conn.cursor()
cur.execute(f"use {get_db_conf()['db']}")
cur.execute(f"show tables")
logger.info("database is normal")
except Exception as e:
logger.error(e)
time.sleep(1)
continue
finally:
if conn:
conn.close()
# 只通知一次
try:
url = API_URL + "?status=synced"
logger.info(f"check_standalone_mode url: {url}")
urllib.request.urlopen(url)
return True
except Exception as e:
logger.exception(e)
return False
def cluster_run(self):
logger.info("start cluster_run...")
now_status = self.get_cluster_status()
logger.info(f"cluster_status: {now_status['wsrep_cluster_status']}")
mysql_config = mysql_config_data()
while True:
time.sleep(5)
now_status = self.get_cluster_status()
node_ips = mysql_config.get("addresses")
if len(node_ips) >= 1 and int(now_status.get("wsrep_cluster_size")) <= 1:
is_disconnect = self.check_is_to_disconnect()
if not is_disconnect:
continue
logger.info("cluster disconnect")
self.start_standalone_mode()
logger.info("end cluster_run...")
return
def run(self):
while True:
now_status = self.get_cluster_status()
if int(now_status.get("wsrep_cluster_size") or 0) != 0:
self.cluster_run()
else:
self.standalone_run()
time.sleep(5)
if __name__ == '__main__':
s = Server()
s.run()