You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
9.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import base64
import socket
from functools import lru_cache, wraps
from typing import Union
import json
import time
import math
import datetime
import platform
import subprocess
import ntpath
def compute_last_time(time1, ctime):
"""计算time1与当前时间的差值转换为D H M S格式"""
day, hour, mins, sec = 0, 0, 0, 0
dt1 = datetime.datetime.strptime(str(time1), '%Y-%m-%d %H:%M:%S')
dt2 = datetime.datetime.strptime(str(ctime), '%Y-%m-%d %H:%M:%S')
diff_str = str(dt2 - dt1)
if diff_str == '0:00:00':
return '0秒'
# 时、分、秒用小写字母h、m、s天用大写字母D
if 'day' not in diff_str:
sec, mins, hour = list(map(int, diff_str.split(":")))[::-1]
else:
day = diff_str.split(',')[0].split(' ')[0]
sec_min_hour = diff_str.split(",")[1].strip().split(":")
sec, mins, hour = list(map(int, sec_min_hour))[::-1]
day = int(day)
all_time = (F'{day}', F'{hour}', F'{mins}', F'{sec}')
# 处理为仅仅两个单位找到第一个非0的单位取两个值即可
start = 0
for i, e in enumerate(all_time):
if int(e[:-1]) != 0:
start = i
break
real_time_str = ' '.join(all_time[start : start + 2])
return real_time_str
def is_today_time(time1):
"""判断是否今天内发生的"""
today = str(datetime.datetime.today()).split(' ')[0] + ' 0:0:0'
dt1 = datetime.datetime.strptime(str(time1), '%Y-%m-%d %H:%M:%S')
dt2 = datetime.datetime.strptime(today, '%Y-%m-%d %H:%M:%S')
diff_str = str(dt1 - dt2)
if 'day' in diff_str:
return False, time1
else:
return True, time1.split(' ')[1]
def time_convert(s: int) -> str:
"""
秒转化为
:param s: int 秒
:return: str 转化后的时间, 数据加单位
"""
m, s = divmod(s, 60)
if not m:
return F"{s}"
h, m = divmod(m, 60)
if not h:
return F"{m}{s}"
d, h = divmod(h, 24)
if not d:
return F"{h}{m}"
return F"{d}{h}"
def time_convert_2_list(s: int) -> list:
"""
秒转化为
:param s: int 秒
:return: list list[0]转化后的时间list[1]数据单位
"""
m, s = divmod(s, 60)
if not m:
return [s, ""]
h, m = divmod(m, 60)
if not h:
return [m, ""]
d, h = divmod(h, 24)
if not d:
return [h, ""]
ten, d = divmod(d, 10)
if not ten:
return [d, ""]
return [10, "天+"]
def byte_convert_2_kb(s):
"""
秒转化为
:param s: int byte
:return: float kb
"""
return round(float(s) / 1024, 3)
def ms_2_s(s):
"""
毫秒转为秒
"""
return int(round(float(s) / 1000, 0))
def bytes_to_tb(bytes: int) -> Union[int, float]:
tb = bytes / (1024**4)
tb_rounded = round(tb, 2)
if tb_rounded.is_integer():
return int(tb_rounded)
else:
return tb_rounded
class lazyproperty:
"""惰性属性"""
def __init__(self, fun):
self.fun = fun
def __get__(self, instance, owner):
if instance is None:
return self
value = self.fun(instance)
setattr(instance, self.fun.__name__, value)
return value
def ping_connect(ip, retry=1):
"""ping测试"""
system = platform.system()
cmd = f'ping -n 1 -w 1 {ip}' if system == 'Windows' else f'ping -c 3 {ip}'
return_code = -1
for i in range(retry):
with subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) as p:
p.wait()
if p.returncode == 0:
return_code = p.returncode
break
time.sleep(1)
return return_code == 0
def Singleton(cls):
"""单例模式"""
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
class SingletonArguments(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonArguments, cls).__call__(*args, **kwargs)
return cls._instances[cls]
def str2int(v):
try:
return int(v)
except Exception:
return v
def bytes2str(v):
try:
if isinstance(v, bytes):
return v.decode('utf8')
except Exception:
return v
def int2timestr(v):
try:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(v / 1000))
except Exception as msg:
return v
def int2datetime(v):
try:
return datetime.datetime.fromtimestamp(v / 1000)
except Exception as msg:
return v
def str2datetime(v):
try:
return datetime.datetime.strptime(v, '%Y-%m-%s %H:%M:%S')
except Exception as msg:
return datetime.datetime.now()
def str2float(v):
try:
return float(v)
except Exception:
return v
def upper(v):
try:
return v.upper()
except Exception:
return v
def lower(v):
try:
return v.lower()
except Exception:
return v
def basename(path):
"""获取路径的文件名"""
return ntpath.basename(path)
def yanei_decimal(value: float, num: int):
"""保留num位有效数字
:param value:float: 待处理浮点数
:param num:int: 有效位数
"""
try:
if not isinstance(value, float):
return value
s = str(value)
pre, post = s.split('.')
return float(pre + '.' + post[:num])
except Exception:
return value
def seconds_format(time_cost: int):
"""
耗费时间格式转换
:param time_cost:
:return:
"""
min_value = 60
hour = 60 * 60
day = 60 * 60 * 24
if not time_cost or time_cost < 0:
return '0秒'
elif time_cost < min_value:
return '%s' % int(time_cost)
elif time_cost < hour:
# return '%s分%s秒' % (divmod(time_cost, min_value))
return '%s%s' % (divmod(int(time_cost), int(min_value)))
elif time_cost < day:
cost_hour, cost_min = divmod(int(time_cost), int(hour))
if cost_min > min_value:
return '%s%s' % (int(cost_hour), seconds_format(cost_min))
else:
return '%s时0分%s' % (int(cost_hour), seconds_format(cost_min))
else:
cost_day, cost_hour = divmod(int(time_cost), int(day))
if cost_hour >= hour:
return '%s%s' % (int(cost_day), seconds_format(cost_hour))
elif cost_hour >= min_value:
return '%s天0时%s' % (int(cost_day), seconds_format(cost_hour))
else:
return '%s天0时0分%s' % (int(cost_day), seconds_format(cost_hour))
def calculate_rto(end, start, ceil=True):
"""其中end, start都是ms级别的时间戳, ceil表示是否向上取整"""
total_miscro = float(end - start)
if ceil:
total = math.ceil(total_miscro / 1000)
else:
if total_miscro < 1000:
total = math.ceil(total_miscro / 1000)
else:
total = int(total_miscro / 1000)
return seconds_format(total)
def transform_duration_str_to_seconds(duration_str: str):
"""
将时长字符串转换为秒
:param duration_str: 时长字符串 eg: 1天2时3分4秒
:return:
"""
if not duration_str:
return None
try:
day, hour, minute, second = 0, 0, 0, 0
if "" in duration_str:
_values = duration_str.split("", 1)
day = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
hour = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
minute = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
second = int(duration_str.split("")[0])
return day * 24 * 60 * 60 + hour * 60 * 60 + minute * 60 + second
except Exception:
return None
def extra_script_data_after_call(resp):
"""解析内容部门返回的响应数据并拼接成字符串"""
resp_data = ''
try:
overall = resp.get('overall')
_ts = resp.get('data', [])
resp_data = f'状态码:{overall} 其他信息:'
if isinstance(_ts, list):
for _t in _ts:
resp_data += f'{_t}\n'
except Exception:
try:
resp_data = f'{resp}'
except:
pass
return resp_data
def time_diff_int(str_time1, str_time2):
"""计算差值,返回秒数"""
# TODO 修改类型
if str(str_time2) == '2038-01-01 00:00:00':
raise Exception()
timearray = time.strptime(str_time1, "%Y-%m-%d %H:%M:%S")
int_time1 = int(time.mktime(timearray))
timearray = time.strptime(str_time2, "%Y-%m-%d %H:%M:%S")
int_time2 = int(time.mktime(timearray))
return int_time1 - int_time2
def timed_lru_cache(seconds: int, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = datetime.timedelta(seconds=seconds)
func.expiration = datetime.datetime.now() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.datetime.now() >= func.expiration:
func.cache_clear()
func.expiration = datetime.datetime.now() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
def is_illegal_name(name):
illegal_list = ['+', "-", "=", "@"]
for i in illegal_list:
if i in name:
return True
return False
def get_local_ip():
try:
# 获取本机主机名
hostname = socket.gethostname()
# 获取本机IP地址
ip_address = socket.gethostbyname(hostname)
return ip_address
except socket.error:
return "Unable to get local IP"