Compare commits

..

No commits in common. 'main' and 'master' have entirely different histories.
main ... master

397
.gitignore vendored

@ -1,162 +1,237 @@
# ---> Python
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# pdm
# Similar to Pipfile.lock, it is generally recommended to include pdm.lock in version control.
#pdm.lock
# pdm stores project-wide configurations in .pdm.toml, but it is recommended to not include it
# in version control.
# https://pdm.fming.dev/#use-with-ide
.pdm.toml
# PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintained in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
### PyCharm+iml template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser
### PyCharm+all template
# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
# User-specific stuff
.idea/**/workspace.xml
.idea/**/tasks.xml
.idea/**/usage.statistics.xml
.idea/**/dictionaries
.idea/**/shelf
# AWS User-specific
.idea/**/aws.xml
# Generated files
.idea/**/contentModel.xml
# Sensitive or high-churn files
.idea/**/dataSources/
.idea/**/dataSources.ids
.idea/**/dataSources.local.xml
.idea/**/sqlDataSources.xml
.idea/**/dynamic.xml
.idea/**/uiDesigner.xml
.idea/**/dbnavigator.xml
# Gradle
.idea/**/gradle.xml
.idea/**/libraries
# Gradle and Maven with auto-import
# When using Gradle or Maven with auto-import, you should exclude module files,
# since they will be recreated, and may cause churn. Uncomment if using
# auto-import.
# .idea/artifacts
# .idea/compiler.xml
# .idea/jarRepositories.xml
# .idea/modules.xml
# .idea/*.iml
# .idea/modules
# *.iml
# *.ipr
# CMake
cmake-build-*/
# Mongo Explorer plugin
.idea/**/mongoSettings.xml
# File-based project format
*.iws
# IntelliJ
out/
# mpeltonen/sbt-idea plugin
.idea_modules/
# JIRA plugin
atlassian-ide-plugin.xml
# Cursive Clojure plugin
.idea/replstate.xml
# SonarLint plugin
.idea/sonarlint/
# Crashlytics plugin (for Android Studio and IntelliJ)
com_crashlytics_export_strings.xml
crashlytics.properties
crashlytics-build.properties
fabric.properties
# Editor-based Rest Client
.idea/httpRequests
# Android studio 3.1+ serialized cache file
.idea/caches/build_file_checksums.ser

@ -0,0 +1,400 @@
import datetime
import os
import pathlib
import logging
import socket
import time
import urllib.request
import pymysql
from pymysql import OperationalError
formatter = logging.Formatter('%(asctime)s|%(levelname)s|%(lineno)d|%(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
file_handler = logging.FileHandler("debug.log")
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(logging.DEBUG)
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status"
MYSQL_CONFIG = '/etc/mysql/mysql.conf.d/mysqld.cnf'
SLOW_CONFIG = {
"slow_query_log": 1,
"slow_query_log_file": "/var/log/mysql/slow_query.log",
"long_query_time": 0,
"log_queries_not_using_indexes": 0,
"max_binlog_size": "1024M",
}
class DatabaseConnectError(Exception):
pass
def get_db_conf():
return {
'host': "127.0.0.1", # 只能控制本地数据库
'port': 3344,
'user': "root",
'password': "Yanei!23",
'db': "rms_ge_prod",
'connect_timeout': 5
}
def is_ip_reachable(ip, port=80, timeout=3):
try:
# 创建一个 socket 对象
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
result = sock.connect_ex((ip, port))
if result == 0:
return True
else:
return False
except socket.error as e:
logger.error(f"is_ip_reachable Error {ip}:{port} : {e}")
return False
finally:
sock.close()
def to_one_format(cursor_result):
result = {}
for row in cursor_result:
result[row[0]] = row[1]
return result
def mysql_config_data(config=MYSQL_CONFIG):
with open(config, 'r') as f:
lines = f.readlines()
result = {}
for line in lines:
if line.startswith('wsrep_cluster_address'):
address_str = line.split("=")[1].replace('"', '').replace("gcomm://", "")
if address_str:
ip_list = address_str.split(',')
result["addresses"] = [ip.replace('\n', '') for ip in ip_list]
if line.startswith('server_id'):
server_id = line.split("=")[1]
result["server_id"] = server_id
if line.startswith("port"):
port = line.split("=")[1]
result["port"] = int(port)
return result
class Server:
def __init__(self):
self.status_path = pathlib.Path("status.json")
self.db_conf = get_db_conf()
if self.db_conf is None:
logger.error(f"db_config error: {self.db_conf}")
raise ValueError("Database URL is None")
self.conn = None
def get_connection(self):
for i in range(10):
try:
return pymysql.connect(**self.db_conf)
except Exception as e:
logger.exception(e)
time.sleep(3)
continue
raise DatabaseConnectError("Connection error")
def get_cluster_status(self):
while True:
try:
with self.conn.cursor() as cursor:
cursor.execute("show status like 'wsrep%'")
r = cursor.fetchall()
return to_one_format(r)
except OperationalError as e:
logger.error(e)
self.conn.close()
self.conn = self.get_connection()
def start_standalone_mode(self):
logger.info("Start standalone mode...")
# 备份
os.system(f"cp {MYSQL_CONFIG} {MYSQL_CONFIG}.cluster_bak")
new_lines = []
with open(MYSQL_CONFIG, 'r') as f:
lines = f.readlines()
for line in lines:
if line.startswith('wsrep'):
continue
for k in SLOW_CONFIG.keys():
if line.startswith(k):
continue
new_lines.append(line)
slow_config = []
for k, v in SLOW_CONFIG.items():
slow_config.append(f"{k}={v}")
new_content = ''.join(new_lines) + "\n" + "\n".join(slow_config)
with open(MYSQL_CONFIG, 'w') as f:
f.write(new_content)
os.system("systemctl restart mysql")
logger.info("Finish standalone mode...")
def start_cluster_mode(self):
logger.info("Start start_cluster_mode...")
os.system("systemctl stop mysql")
os.system(f"cp {MYSQL_CONFIG}.cluster_bak {MYSQL_CONFIG}")
os.system("rm -rf {MYSQL_CONFIG}.cluster_bak")
os.system("rm -rf /var/lib/mysql/galera.cache && "
"rm -rf /var/lib/mysql/grastate.dat && "
"rm -rf /var/lib/mysql/gvwstate.dat")
os.system("systemctl start mysql")
time.sleep(10)
# 判断是否连接集群
logger.info("check cluster join...")
is_join = False
for i in range(60):
time.sleep(10)
now_status = self.get_cluster_status()
cluster_size = int(now_status.get("wsrep_cluster_size"))
if cluster_size > 1:
logger.info(f"start_cluster_mode wsrep_cluster_size: {cluster_size}")
is_join = True
break
if not is_join:
logger.error("not join cluster...")
return False
# 判断是否同步数据完成
logger.info("check cluster status...")
is_primary = False
for i in range(180):
time.sleep(10)
now_status = self.get_cluster_status()
logger.info("cluster_status:")
logger.info(now_status)
cluster_status = now_status["wsrep_cluster_status"]
if cluster_status == "Primary":
logger.info(f"start_cluster_mode wsrep_cluster_status: {cluster_status}")
is_primary = True
break
if not is_primary:
logger.error("db primary timeout")
return False
logger.info("Finish start_cluster_mode...")
return True
def insert_record(self):
logger.info("start update data...")
now_str = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
exec_sql_path = f"{now_str}.sql"
fail_sql_path = f"{now_str}_fail.sql"
os.system(f'grep -E "INSERT|UPDATE|DELETE" /var/log/mysql/slow_query.log > {exec_sql_path}')
with open(exec_sql_path, 'r') as f:
lines = f.readlines()
logger.info(f"sql count: {len(lines)}")
fail_line = []
conn = self.get_connection()
cur = conn.cursor()
for line in lines:
if line.startswith('INSERT') or line.startswith('UPDATE') or line.startswith('DELETE'):
sql = line.strip().replace("\n", "")
try:
cur.execute(sql)
conn.commit()
logger.info(f"success: {sql} ")
except Exception as e:
logger.error(f"fail: {sql}")
logger.exception(e)
fail_line.append(line)
conn.rollback()
conn.close()
with open(fail_sql_path, 'w') as f:
f.writelines("\n".join(fail_line))
os.system("rm -rf /var/log/mysql/slow_query.log")
logger.info("finish update data...")
def check_is_to_disconnect(self):
logger.info("start check_is_to_disconnect...")
logger.debug("check cluster status...")
now_status = None
for _ in range(10):
now_status = self.get_cluster_status()
if int(now_status["wsrep_cluster_size"]) > 1:
logger.info(f"size > 1 : {now_status['wsrep_cluster_size']}")
return
time.sleep(3)
logger.info(f"cluster status wsrep_cluster_size: {now_status['wsrep_cluster_size']}")
logger.info("cluster size abnormal")
logger.info("check connection....")
mysql_config = mysql_config_data()
node_ips = mysql_config.get("addresses")
port = mysql_config.get("port") or 3344
logger.info(f"{node_ips}, {port}")
for _ in range(3):
time.sleep(3)
count = 0
for ip in node_ips:
if is_ip_reachable(ip, port):
count += 1
logger.info(f"{ip}:{port} is normal ")
# 有ip正常
if count > 0:
logger.info(f"{count} ip normal")
return False
logger.info("connect all ip abnormal")
return True
def standalone_run(self):
logger.info("start standalone_run....")
self.check_standalone_mode()
while True:
time.sleep(5)
bak_path = pathlib.Path(f"{MYSQL_CONFIG}.cluster_bak")
if not bak_path.exists():
continue
mysql_config = mysql_config_data(config=str(bak_path))
node_ips = mysql_config.get("addresses")
port = mysql_config.get("port") or 3344
count = 0
for ip in node_ips:
if is_ip_reachable(ip, port):
count += 1
logger.info(f"{ip}:{port} is normal ")
if count == 0:
logger.debug(f"{node_ips} {port} disconnection")
continue
logger.info("ip normal count: {}".format(count))
if len(node_ips) / 2 < count:
logger.info("cluster connect")
self.start_cluster_mode()
time.sleep(10)
self.insert_record()
logger.info("end standalone_run...")
return
def check_standalone_mode(self):
"""确认独立模式正常并通知前端"""
logger.info("start check_standalone_mode...")
while True:
conn = None
try:
conn = self.get_connection()
cur = conn.cursor()
cur.execute(f"use {get_db_conf()['db']}")
cur.execute(f"show tables")
logger.info("database is normal")
except DatabaseConnectError as e:
raise e
except Exception as e:
logger.error(e)
time.sleep(1)
continue
finally:
if conn:
conn.close()
# 只通知一次
try:
url = API_URL + "?status=synced"
logger.info(f"check_standalone_mode url: {url}")
urllib.request.urlopen(url)
return True
except Exception as e:
logger.exception(e)
return False
def cluster_run(self):
logger.info("start cluster_run...")
now_status = self.get_cluster_status()
logger.info(f"cluster_status: {now_status['wsrep_cluster_status']}")
mysql_config = mysql_config_data()
while True:
time.sleep(5)
now_status = self.get_cluster_status()
node_ips = mysql_config.get("addresses")
if len(node_ips) >= 1 and int(now_status.get("wsrep_cluster_size")) <= 1:
is_disconnect = self.check_is_to_disconnect()
if not is_disconnect:
continue
logger.info("cluster disconnect")
self.start_standalone_mode()
logger.info("end cluster_run...")
return
def to_standalone_run(self):
"""切换为单机模式"""
# 当前就是单机模式,只能重启尝试
if os.path.exists(f"{MYSQL_CONFIG}.cluster_bak"):
logger.info("is standalone and try restart")
os.system("kill -9 $(ps aux | grep mysqld | awk '{print $2}')")
logger.info("kill -9 mysql")
time.sleep(3)
# 当前是集群模式,切换为单机模式
else:
logger.info("is cluster and cluster to standalone")
self.start_standalone_mode()
def run(self):
while True:
try:
if not self.conn:
self.conn = self.get_connection()
now_status = self.get_cluster_status()
if int(now_status.get("wsrep_cluster_size") or 0) != 0:
self.cluster_run()
else:
self.standalone_run()
time.sleep(5)
except DatabaseConnectError as e:
# 数据库异常,切换为单机模式
logger.error(e)
logger.info("database connect error, to standalone")
self.to_standalone_run()
time.sleep(5)
# 重置连接
if self.conn:
self.conn.close()
self.conn = None
if __name__ == '__main__':
s = Server()
s.run()

@ -1,2 +0,0 @@
# galera_cluster_tool

@ -0,0 +1,16 @@
## Galera Cluster 数据库集群监控服务
#### ClusterMonitory.py
该脚本主要功能是监控本地数据库集群状态
在集群模式下监控集群状态,当本机离线后切换数据库为单机模式
切换为单机模式后尝试连接集群其它设备,网络恢复后重新切换为集群模式
恢复为集群模式后过滤slow_query.log重新执行增删改操作恢复单机模式下的数据
模式切换成功会发送请求到rms后台接口提示前端已完成切换
该脚本需要部署成服务保证自启动cluster.service
数据库连接信息修改需同步修改脚本内的信息
#### notify.py
该脚本用于接受集群状态变更的信息转发给rms后台接口
建议部署路径:/var/lib/mysql/notify.py
需修改该脚本权限

@ -0,0 +1,14 @@
[Unit]
Description=Mysql Cluster Montior Service
After=network.target
[Service]
Type=simple
WorkingDirectory=/home/yanei/cluster
ExecStart=/home/yanei/miniconda3/bin/python ClusterMonitor.py
User=root
Group=root
PermissionsStartOnly=true
[Install]
WantedBy=multi-user.target

@ -0,0 +1,198 @@
import ipaddress
import os
import pathlib
import time
import argparse
config_format = """
server_id={server_id}
binlog_format=row
default_storage_engine=InnoDB
innodb_file_per_table=1
innodb_autoinc_lock_mode=2
wsrep_on=ON
wsrep_provider=/usr/lib/galera/libgalera_smm.so
wsrep_cluster_name="{cluster_name}"
wsrep_cluster_address="gcomm://{cluster_address}"
wsrep_node_name="{node_name}"
wsrep_node_address="{node_address}"
wsrep_sst_method=rsync
wsrep_sst_auth={auth}
wsrep_provider_options="pc.ignore_quorum=true"
"""
parser = argparse.ArgumentParser(description="Galera Cluster Install Tool集群配置工具")
parser.add_argument("-t", "--type", choices=("master", "node"),
help="Node Type当前设备是主节点(master)、还是普通节点(node)",
required=True)
parser.add_argument("-id", "--server_id", help="Cluster Server ID (集群内设备id不可重复)", type=int, required=True)
parser.add_argument("-ip", "--node_address", help="Current Node Address (当前设备IP能被其它节点访问到)", required=True)
parser.add_argument("-n", "--node_name", help="Current Node Name (集群内设备名称,不可重复)")
parser.add_argument("-c", "--cluster_name", default="YaneiGaleraCluster",
help="Galera Cluster Name集群名称集群内节点需配置一致")
parser.add_argument("-f", "--mysql_file", default="/etc/mysql/mysql.conf.d/mysqld.cnf",
help="mysqld.cnf MySQL配置文件路径相关集群配置写入到该文件")
parser.add_argument("-u", "--username", help="MySQL Username", default="root")
parser.add_argument("-p", "--password", help="MySQL Password", default="yanei!23")
parser.add_argument("-g", "--cluster_address", required=True,
help="Galera Cluster Nodes IP Address (集群其它设备的ip如192.168.1.100,192.168.1.101)")
def clear_cache():
cache_file = pathlib.Path("/var/lib/mysql/galera.cache")
bat_file = pathlib.Path("/var/lib/mysql/grastate.dat")
if cache_file.exists():
cache_file.unlink()
print(f"Cache file {cache_file} has been deleted")
if bat_file.exists():
bat_file.unlink()
print(f"Bat file {bat_file} has been deleted")
def parse_validate_args(args):
node_type = args.type
server_id = args.server_id
# cluster_address 校验
cluster_address_str_value = args.cluster_address
if cluster_address_str_value:
if "," in cluster_address_str_value:
cluster_address_str_list = cluster_address_str_value.split(",")
elif "" in cluster_address_str_value:
cluster_address_str_list = cluster_address_str_value.split(",")
else:
cluster_address_str_list = [cluster_address_str_value]
cluster_address_list = []
for address_str in cluster_address_str_list:
try:
ipaddress.ip_address(address_str)
except ValueError:
raise ValueError(f"cluster_address {address_str} 参数错误, 必须ip格式")
cluster_address_list.append(address_str)
else:
cluster_address_list = []
# node_address 校验
node_address = args.node_address
try:
ipaddress.ip_address(node_address)
except ValueError:
raise ValueError(f"node_address {node_address} 参数错误, 必须ip格式")
if cluster_address_list:
if node_address in cluster_address_list:
raise ValueError("当前设备IP不能配置在集群ip中")
# node_name
node_name = args.node_name
if not node_name:
node_name = f"node_{node_address}".replace(".", "_")
# cluster_name
cluster_name = args.cluster_name
# mysql_file
mysql_file = args.mysql_file
file_path = pathlib.Path(mysql_file)
if not file_path.exists():
raise ValueError(f"{mysql_file} 配置文件未找到")
# auth
mysql_auth = f"{args.username}:{args.password}"
return dict(
server_id=server_id,
cluster_name=cluster_name,
cluster_address=",".join(cluster_address_list) if cluster_address_list else '',
node_name=node_name,
node_address=node_address,
auth=mysql_auth,
mysql_file=mysql_file,
)
def write_file(file_path, new_content):
config_kv = set()
for line in config_format.strip().split("\n"):
if line:
config_kv.add(line.split("=")[0])
def in_config_kv(line):
if not line:
return False
if line.startswith("#"):
return False
if line.startswith("\n"):
return False
for k in config_kv:
if line.startswith(k):
return True
return False
with open(file_path, "r") as f:
content_line = f.readlines()
c = []
for line in content_line:
if in_config_kv(line):
continue
c.append(line)
raw = "".join(c) + new_content
with open(file_path, "w") as f:
f.write(raw)
def node_config(config_data):
mysql_file = config_data.pop("mysql_file")
config_str = config_format.format(**config_data)
print(config_str)
write_file(mysql_file, config_str)
clear_cache()
print("重启数据库...")
os.system("systemctl restart mysql")
def master_config(config_data):
mysql_file = config_data.pop("mysql_file")
cluster_address = config_data.pop("cluster_address")
config_data["cluster_address"] = ""
config_str = config_format.format(**config_data)
print(config_str)
write_file(mysql_file, config_str)
clear_cache()
print("关闭旧服务")
os.system("systemctl stop mysql")
print("启动主节点服务...")
os.system("/usr/bin/mysqld_bootstrap")
input("请启动其它节点,启动后再继续。。。。")
input("确认继续?")
config_data["cluster_address"] = cluster_address
config_str = config_format.format(**config_data)
print(config_str)
write_file(mysql_file, config_str)
print("重启数据库...")
os.system("systemctl restart mysql")
if __name__ == '__main__':
args = parser.parse_args()
config_data = parse_validate_args(args)
if args.type == "master":
master_config(config_data)
elif args.type == "node":
node_config(config_data)
else:
raise ValueError("type error")
time.sleep(3)
os.system("ps -aux | grep mysql")
print("如果启动失败请重新执行")

@ -0,0 +1,54 @@
# Copyright (c) 2014, 2024, Oracle and/or its affiliates.
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License, version 2.0,
# as published by the Free Software Foundation.
#
# This program is designed to work with certain software (including
# but not limited to OpenSSL) that is licensed under separate terms,
# as designated in a particular file or component or in included license
# documentation. The authors of MySQL hereby grant you an additional
# permission to link the program and your derivative works with the
# separately licensed software that they have either included with
# the program or referenced in the documentation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License, version 2.0, for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
#
# The MySQL Server configuration file.
#
# For explanations see
# http://dev.mysql.com/doc/mysql/en/server-system-variables.html
[mysqld]
pid-file = /var/run/mysqld/mysqld.pid
socket = /var/run/mysqld/mysqld.sock
datadir = /var/lib/mysql
log-error = /var/log/mysql/error.log
log_bin = on
bind-address = 0.0.0.0
port = 3344
server_id=130
binlog_format=row
default_storage_engine=InnoDB
innodb_file_per_table=1
innodb_autoinc_lock_mode=2
wsrep_on=ON
wsrep_provider=/usr/lib/galera/libgalera_smm.so
wsrep_cluster_name="cluster"
wsrep_cluster_address="gcomm://192.168.100.129,192.168.100.128"
wsrep_node_name="node_192_168_100_130"
wsrep_node_address="192.168.100.130"
wsrep_sst_method=rsync
wsrep_sst_auth=root:yanei!23
wsrep_notify_cmd="/usr/bin/python3 /var/lib/mysql/notify.py"

@ -0,0 +1,85 @@
import os
import sys
import time
import psutil
# 定义状态
NORMAL = 'NORMAL'
ABNORMAL = 'ABNORMAL'
# 定义状态持续时间阈值(秒)
NORMAL_THRESHOLD = 10 * 60
ABNORMAL_THRESHOLD = 30
def get_process(name="mysqld"):
for proc in psutil.process_iter():
if proc.name() == name:
return proc
return None
# 定义状态机类
class ProcessMonitorStateMachine:
def __init__(self, process_name):
self.process_name = process_name
self.state = NORMAL
self.last_state_change_time = time.time()
def check_process(self):
"""检查进程是否存在,并返回其状态"""
process = get_process(name=self.process_name)
if process:
return NORMAL
else:
return ABNORMAL
def process_event(self):
"""处理事件,根据事件类型切换状态"""
current_time = time.time()
process_state = self.check_process()
print(f"当前状态:{process_state} {current_time}")
if process_state != self.state:
self.state = process_state
self.last_state_change_time = current_time
print(f"状态变更为: {self.state}")
if self.state == ABNORMAL:
self.handle_abnormal_timeout()
else:
if self.state == NORMAL and (current_time - self.last_state_change_time) > NORMAL_THRESHOLD:
self.handle_normal_timeout()
self.last_state_change_time = current_time
elif self.state == ABNORMAL and (current_time - self.last_state_change_time) > ABNORMAL_THRESHOLD:
self.handle_abnormal_timeout()
self.last_state_change_time = current_time
def handle_normal_timeout(self):
"""处理正常状态超时的指令"""
print("程序已正常,退出监控")
sys.exit(0)
def handle_abnormal_timeout(self):
"""处理异常状态超时的指令"""
print("异常状态持续时间过长,执行特定指令。")
os.system("/usr/bin/mysqld_bootstrap")
# 使用状态机监控进程
def main():
process_name = "mysqld" # 指定要监控的进程名称
monitor = ProcessMonitorStateMachine(process_name)
print("start...")
# 模拟定时检查进程状态
while True:
monitor.process_event()
time.sleep(10) # 每10秒检查一次
if __name__ == '__main__':
main()

@ -0,0 +1,27 @@
#!/usr/bin/python3
import sys
from urllib.parse import urlencode
import urllib.request
API_URL = "http://127.0.0.1:8000/api/rms/cab/sync/db_status"
def main():
args = sys.argv[1:]
params = {}
for index in range(len(args)):
arg = args[index]
if not arg.startswith('--'):
continue
key = arg[2:]
value = args[index+1]
params[key] = value
encoded_params = urlencode(params)
url = API_URL + '?' + encoded_params
urllib.request.urlopen(url)
if __name__ == '__main__':
main()
Loading…
Cancel
Save