parent
973e591cc6
commit
e66b55fd46
@ -0,0 +1,367 @@
|
||||
import datetime
|
||||
import os
|
||||
import pathlib
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
import urllib.request
|
||||
|
||||
import pymysql
|
||||
from pymysql import OperationalError
|
||||
|
||||
formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(message)s')
|
||||
logger = logging.getLogger(__name__)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
file_handler = logging.FileHandler("debug.log")
|
||||
file_handler.setLevel(logging.DEBUG)
|
||||
file_handler.setFormatter(formatter)
|
||||
logger.addHandler(file_handler)
|
||||
stream_handler = logging.StreamHandler()
|
||||
stream_handler.setLevel(logging.DEBUG)
|
||||
stream_handler.setFormatter(formatter)
|
||||
logger.addHandler(stream_handler)
|
||||
|
||||
API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status"
|
||||
MYSQL_CONFIG = '/etc/mysql/mysql.conf.d/mysqld.cnf'
|
||||
SLOW_CONFIG = {
|
||||
"slow_query_log": 1,
|
||||
"slow_query_log_file": "/var/log/mysql/slow_query.log",
|
||||
"long_query_time": 0,
|
||||
"log_queries_not_using_indexes": 0,
|
||||
"max_binlog_size": "1024M",
|
||||
}
|
||||
|
||||
|
||||
def get_db_conf():
|
||||
return {
|
||||
'host': "127.0.0.1", # 只能控制本地数据库
|
||||
'port': 3344,
|
||||
'user': "root",
|
||||
'password': "Yanei!23",
|
||||
'db': "rms_ge_prod",
|
||||
}
|
||||
|
||||
|
||||
def is_ip_reachable(ip, port=80, timeout=3):
|
||||
try:
|
||||
# 创建一个 socket 对象
|
||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.settimeout(timeout)
|
||||
result = sock.connect_ex((ip, port))
|
||||
if result == 0:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
except socket.error as e:
|
||||
logger.error(f"is_ip_reachable Error {ip}:{port} : {e}")
|
||||
return False
|
||||
finally:
|
||||
sock.close()
|
||||
|
||||
|
||||
def to_one_format(cursor_result):
|
||||
result = {}
|
||||
for row in cursor_result:
|
||||
result[row[0]] = row[1]
|
||||
return result
|
||||
|
||||
|
||||
def mysql_config_data(config=MYSQL_CONFIG):
|
||||
with open(config, 'r') as f:
|
||||
lines = f.readlines()
|
||||
result = {}
|
||||
for line in lines:
|
||||
if line.startswith('wsrep_cluster_address'):
|
||||
address_str = line.split("=")[1].replace('"', '').replace("gcomm://", "")
|
||||
if address_str:
|
||||
ip_list = address_str.split(',')
|
||||
result["addresses"] = [ip.replace('\n', '') for ip in ip_list]
|
||||
|
||||
if line.startswith('server_id'):
|
||||
server_id = line.split("=")[1]
|
||||
result["server_id"] = server_id
|
||||
|
||||
if line.startswith("port"):
|
||||
port = line.split("=")[1]
|
||||
result["port"] = int(port)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
class Server:
|
||||
|
||||
def __init__(self):
|
||||
self.status_path = pathlib.Path("status.json")
|
||||
self.db_conf = get_db_conf()
|
||||
if self.db_conf is None:
|
||||
logger.error(f"db_config error: {self.db_conf}")
|
||||
raise ValueError("Database URL is None")
|
||||
self.conn = self.get_connection()
|
||||
|
||||
def get_connection(self):
|
||||
while True:
|
||||
try:
|
||||
return pymysql.connect(**self.db_conf)
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
time.sleep(5)
|
||||
continue
|
||||
|
||||
def get_cluster_status(self):
|
||||
while True:
|
||||
try:
|
||||
with self.conn.cursor() as cursor:
|
||||
cursor.execute("show status like 'wsrep%'")
|
||||
r = cursor.fetchall()
|
||||
return to_one_format(r)
|
||||
except OperationalError as e:
|
||||
logger.error(e)
|
||||
|
||||
try:
|
||||
self.conn.close()
|
||||
self.conn = self.get_connection()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
time.sleep(10)
|
||||
|
||||
def start_standalone_mode(self):
|
||||
logger.info("Start standalone mode...")
|
||||
# 备份
|
||||
os.system(f"cp {MYSQL_CONFIG} {MYSQL_CONFIG}.cluster_bak")
|
||||
|
||||
new_lines = []
|
||||
with open(MYSQL_CONFIG, 'r') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
for line in lines:
|
||||
if line.startswith('wsrep'):
|
||||
continue
|
||||
|
||||
for k in SLOW_CONFIG.keys():
|
||||
if line.startswith(k):
|
||||
continue
|
||||
|
||||
new_lines.append(line)
|
||||
|
||||
slow_config = []
|
||||
for k, v in SLOW_CONFIG.items():
|
||||
slow_config.append(f"{k}={v}")
|
||||
|
||||
new_content = ''.join(new_lines) + "\n" + "\n".join(slow_config)
|
||||
with open(MYSQL_CONFIG, 'w') as f:
|
||||
f.write(new_content)
|
||||
|
||||
os.system("systemctl restart mysql")
|
||||
logger.info("Finish standalone mode...")
|
||||
|
||||
def start_cluster_mode(self):
|
||||
logger.info("Start start_cluster_mode...")
|
||||
os.system("systemctl stop mysql")
|
||||
os.system(f"cp {MYSQL_CONFIG}.cluster_bak {MYSQL_CONFIG}")
|
||||
os.system("rm -rf {MYSQL_CONFIG}.cluster_bak")
|
||||
os.system("rm -rf /var/lib/mysql/galera.cache && "
|
||||
"rm -rf /var/lib/mysql/grastate.dat && "
|
||||
"rm -rf /var/lib/mysql/gvwstate.dat")
|
||||
os.system("systemctl start mysql")
|
||||
time.sleep(10)
|
||||
|
||||
# 判断是否连接集群
|
||||
logger.info("check cluster join...")
|
||||
is_join = False
|
||||
for i in range(60):
|
||||
time.sleep(10)
|
||||
now_status = self.get_cluster_status()
|
||||
cluster_size = int(now_status.get("wsrep_cluster_size"))
|
||||
if cluster_size > 1:
|
||||
logger.info(f"start_cluster_mode wsrep_cluster_size: {cluster_size}")
|
||||
is_join = True
|
||||
break
|
||||
|
||||
if not is_join:
|
||||
logger.error("not join cluster...")
|
||||
return False
|
||||
|
||||
# 判断是否同步数据完成
|
||||
logger.info("check cluster status...")
|
||||
is_primary = False
|
||||
for i in range(180):
|
||||
time.sleep(10)
|
||||
|
||||
now_status = self.get_cluster_status()
|
||||
logger.info("cluster_status:")
|
||||
logger.info(now_status)
|
||||
cluster_status = now_status["wsrep_cluster_status"]
|
||||
if cluster_status == "Primary":
|
||||
logger.info(f"start_cluster_mode wsrep_cluster_status: {cluster_status}")
|
||||
is_primary = True
|
||||
break
|
||||
|
||||
if not is_primary:
|
||||
logger.error("db primary timeout")
|
||||
return False
|
||||
|
||||
logger.info("Finish start_cluster_mode...")
|
||||
return True
|
||||
|
||||
def insert_record(self):
|
||||
logger.info("start update data...")
|
||||
now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
|
||||
exec_sql_path = f"{now_str}.sql"
|
||||
fail_sql_path = f"{now_str}_fail.sql"
|
||||
os.system(f'grep -E "INSERT|UPDATE|DELETE" /var/log/mysql/slow_query.log > {exec_sql_path}')
|
||||
|
||||
with open(exec_sql_path, 'r') as f:
|
||||
lines = f.readlines()
|
||||
|
||||
logger.info(f"sql count: {len(lines)}")
|
||||
fail_line = []
|
||||
conn = self.get_connection()
|
||||
cur = conn.cursor()
|
||||
for line in lines:
|
||||
if line.startswith('INSERT') or line.startswith('UPDATE') or line.startswith('DELETE'):
|
||||
sql = line.strip().replace("\n", "")
|
||||
try:
|
||||
cur.execute(sql)
|
||||
conn.commit()
|
||||
logger.info(f"success: {sql} ")
|
||||
except Exception as e:
|
||||
logger.error(f"fail: {sql}")
|
||||
logger.exception(e)
|
||||
fail_line.append(line)
|
||||
conn.rollback()
|
||||
conn.close()
|
||||
|
||||
with open(fail_sql_path, 'w') as f:
|
||||
f.writelines("\n".join(fail_line))
|
||||
|
||||
os.system("rm -rf /var/log/mysql/slow_query.log")
|
||||
logger.info("finish update data...")
|
||||
|
||||
def check_is_to_disconnect(self):
|
||||
logger.info("start check_is_to_disconnect...")
|
||||
logger.debug("check cluster status...")
|
||||
now_status = None
|
||||
for _ in range(10):
|
||||
now_status = self.get_cluster_status()
|
||||
if int(now_status["wsrep_cluster_size"]) > 1:
|
||||
logger.info(f"size > 1 : {now_status['wsrep_cluster_size']}")
|
||||
return
|
||||
time.sleep(3)
|
||||
|
||||
logger.info(f"cluster status wsrep_cluster_size: {now_status['wsrep_cluster_size']}")
|
||||
logger.info("cluster size abnormal")
|
||||
|
||||
logger.info("check connection....")
|
||||
mysql_config = mysql_config_data()
|
||||
node_ips = mysql_config.get("addresses")
|
||||
port = mysql_config.get("port") or 3344
|
||||
logger.info(f"{node_ips}, {port}")
|
||||
|
||||
for _ in range(3):
|
||||
time.sleep(3)
|
||||
count = 0
|
||||
for ip in node_ips:
|
||||
if is_ip_reachable(ip, port):
|
||||
count += 1
|
||||
logger.info(f"{ip}:{port} is normal ")
|
||||
# 有ip正常
|
||||
if count > 0:
|
||||
logger.info(f"{count} ip normal")
|
||||
return False
|
||||
|
||||
logger.info("connect all ip abnormal")
|
||||
return True
|
||||
|
||||
def standalone_run(self):
|
||||
logger.info("start standalone_run....")
|
||||
self.check_standalone_mode()
|
||||
while True:
|
||||
time.sleep(5)
|
||||
bak_path = pathlib.Path(f"{MYSQL_CONFIG}.cluster_bak")
|
||||
if not bak_path.exists():
|
||||
continue
|
||||
|
||||
mysql_config = mysql_config_data(config=str(bak_path))
|
||||
node_ips = mysql_config.get("addresses")
|
||||
port = mysql_config.get("port") or 3344
|
||||
count = 0
|
||||
for ip in node_ips:
|
||||
if is_ip_reachable(ip, port):
|
||||
count += 1
|
||||
logger.info(f"{ip}:{port} is normal ")
|
||||
|
||||
if count == 0:
|
||||
logger.debug(f"{node_ips} {port} disconnection")
|
||||
continue
|
||||
|
||||
logger.info("ip normal count: {}".format(count))
|
||||
if len(node_ips) / 2 < count:
|
||||
logger.info("cluster connect")
|
||||
self.start_cluster_mode()
|
||||
time.sleep(10)
|
||||
self.insert_record()
|
||||
logger.info("end standalone_run...")
|
||||
return
|
||||
|
||||
def check_standalone_mode(self):
|
||||
"""确认独立模式正常并通知前端"""
|
||||
logger.info("start check_standalone_mode...")
|
||||
while True:
|
||||
conn = None
|
||||
try:
|
||||
conn = self.get_connection()
|
||||
cur = conn.cursor()
|
||||
cur.execute(f"use {get_db_conf()['db']}")
|
||||
cur.execute(f"show tables")
|
||||
logger.info("database is normal")
|
||||
except Exception as e:
|
||||
logger.error(e)
|
||||
time.sleep(1)
|
||||
continue
|
||||
finally:
|
||||
if conn:
|
||||
conn.close()
|
||||
|
||||
# 只通知一次
|
||||
try:
|
||||
url = API_URL + "?status=synced"
|
||||
logger.info(f"check_standalone_mode url: {url}")
|
||||
urllib.request.urlopen(url)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
return False
|
||||
|
||||
def cluster_run(self):
|
||||
logger.info("start cluster_run...")
|
||||
now_status = self.get_cluster_status()
|
||||
logger.info(f"cluster_status: {now_status['wsrep_cluster_status']}")
|
||||
mysql_config = mysql_config_data()
|
||||
while True:
|
||||
time.sleep(5)
|
||||
now_status = self.get_cluster_status()
|
||||
node_ips = mysql_config.get("addresses")
|
||||
if len(node_ips) >= 1 and int(now_status.get("wsrep_cluster_size")) <= 1:
|
||||
is_disconnect = self.check_is_to_disconnect()
|
||||
if not is_disconnect:
|
||||
continue
|
||||
|
||||
logger.info("cluster disconnect")
|
||||
self.start_standalone_mode()
|
||||
logger.info("end cluster_run...")
|
||||
return
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
now_status = self.get_cluster_status()
|
||||
if int(now_status.get("wsrep_cluster_size") or 0) != 0:
|
||||
self.cluster_run()
|
||||
else:
|
||||
self.standalone_run()
|
||||
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
s = Server()
|
||||
s.run()
|
@ -0,0 +1,26 @@
|
||||
import sys
|
||||
from urllib.parse import urlencode
|
||||
import urllib.request
|
||||
|
||||
API_URL = "http://127.0.0.1:8000/api/rms/sync/db_status"
|
||||
|
||||
|
||||
def main():
|
||||
args = sys.argv[1:]
|
||||
|
||||
params = {}
|
||||
for index in range(len(args)):
|
||||
arg = args[index]
|
||||
if not arg.startswith('--'):
|
||||
continue
|
||||
key = arg[2:]
|
||||
value = args[index+1]
|
||||
params[key] = value
|
||||
|
||||
encoded_params = urlencode(params)
|
||||
url = API_URL + '?' + encoded_params
|
||||
urllib.request.urlopen(url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
Loading…
Reference in new issue