You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
9.9 KiB

import os
import base64
import socket
from functools import lru_cache, wraps
from typing import Union
import json
import time
import math
import datetime
import platform
import subprocess
import ntpath
def compute_last_time(time1, ctime):
"""计算time1与当前时间的差值转换为D H M S格式"""
day, hour, mins, sec = 0, 0, 0, 0
dt1 = datetime.datetime.strptime(str(time1), '%Y-%m-%d %H:%M:%S')
dt2 = datetime.datetime.strptime(str(ctime), '%Y-%m-%d %H:%M:%S')
diff_str = str(dt2 - dt1)
if diff_str == '0:00:00':
return '0秒'
# 时、分、秒用小写字母h、m、s天用大写字母D
if 'day' not in diff_str:
sec, mins, hour = list(map(int, diff_str.split(":")))[::-1]
else:
day = diff_str.split(',')[0].split(' ')[0]
sec_min_hour = diff_str.split(",")[1].strip().split(":")
sec, mins, hour = list(map(int, sec_min_hour))[::-1]
day = int(day)
all_time = (F'{day}', F'{hour}', F'{mins}', F'{sec}')
# 处理为仅仅两个单位找到第一个非0的单位取两个值即可
start = 0
for i, e in enumerate(all_time):
if int(e[:-1]) != 0:
start = i
break
real_time_str = ' '.join(all_time[start : start + 2])
return real_time_str
def is_today_time(time1):
"""判断是否今天内发生的"""
today = str(datetime.datetime.today()).split(' ')[0] + ' 0:0:0'
dt1 = datetime.datetime.strptime(str(time1), '%Y-%m-%d %H:%M:%S')
dt2 = datetime.datetime.strptime(today, '%Y-%m-%d %H:%M:%S')
diff_str = str(dt1 - dt2)
if 'day' in diff_str:
return False, time1
else:
return True, time1.split(' ')[1]
def time_convert(s: int) -> str:
"""
秒转化为
:param s: int
:return: str 转化后的时间 数据加单位
"""
m, s = divmod(s, 60)
if not m:
return F"{s}"
h, m = divmod(m, 60)
if not h:
return F"{m}{s}"
d, h = divmod(h, 24)
if not d:
return F"{h}{m}"
return F"{d}{h}"
def time_convert_2_list(s: int) -> list:
"""
秒转化为
:param s: int
:return: list list[0]转化后的时间list[1]数据单位
"""
m, s = divmod(s, 60)
if not m:
return [s, ""]
h, m = divmod(m, 60)
if not h:
return [m, ""]
d, h = divmod(h, 24)
if not d:
return [h, ""]
ten, d = divmod(d, 10)
if not ten:
return [d, ""]
return [10, "天+"]
def byte_convert_2_kb(s):
"""
秒转化为
:param s: int byte
:return: float kb
"""
return round(float(s) / 1024, 3)
def ms_2_s(s):
"""
毫秒转为秒
"""
return int(round(float(s) / 1000, 0))
def bytes_to_tb(bytes: int) -> Union[int, float]:
tb = bytes / (1024**4)
tb_rounded = round(tb, 2)
if tb_rounded.is_integer():
return int(tb_rounded)
else:
return tb_rounded
class lazyproperty:
"""惰性属性"""
def __init__(self, fun):
self.fun = fun
def __get__(self, instance, owner):
if instance is None:
return self
value = self.fun(instance)
setattr(instance, self.fun.__name__, value)
return value
def ping_connect(ip, retry=1):
"""ping测试"""
system = platform.system()
cmd = f'ping -n 1 -w 1 {ip}' if system == 'Windows' else f'ping -c 3 {ip}'
return_code = -1
for i in range(retry):
with subprocess.Popen(cmd, shell=True, stderr=subprocess.PIPE) as p:
p.wait()
if p.returncode == 0:
return_code = p.returncode
break
time.sleep(1)
return return_code == 0
def Singleton(cls):
"""单例模式"""
_instance = {}
def inner():
if cls not in _instance:
_instance[cls] = cls()
return _instance[cls]
return inner
class SingletonArguments(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(SingletonArguments, cls).__call__(*args, **kwargs)
return cls._instances[cls]
def str2int(v):
try:
return int(v)
except Exception:
return v
def bytes2str(v):
try:
if isinstance(v, bytes):
return v.decode('utf8')
except Exception:
return v
def int2timestr(v):
try:
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(v / 1000))
except Exception as msg:
return v
def int2datetime(v):
try:
return datetime.datetime.fromtimestamp(v / 1000)
except Exception as msg:
return v
def str2datetime(v):
try:
return datetime.datetime.strptime(v, '%Y-%m-%s %H:%M:%S')
except Exception as msg:
return datetime.datetime.now()
def str2float(v):
try:
return float(v)
except Exception:
return v
def upper(v):
try:
return v.upper()
except Exception:
return v
def lower(v):
try:
return v.lower()
except Exception:
return v
def basename(path):
"""获取路径的文件名"""
return ntpath.basename(path)
def yanei_decimal(value: float, num: int):
"""保留num位有效数字
:param value:float: 待处理浮点数
:param num:int: 有效位数
"""
try:
if not isinstance(value, float):
return value
s = str(value)
pre, post = s.split('.')
return float(pre + '.' + post[:num])
except Exception:
return value
def seconds_format(time_cost: int):
"""
耗费时间格式转换
:param time_cost:
:return:
"""
min_value = 60
hour = 60 * 60
day = 60 * 60 * 24
if not time_cost or time_cost < 0:
return '0秒'
elif time_cost < min_value:
return '%s' % int(time_cost)
elif time_cost < hour:
# return '%s分%s秒' % (divmod(time_cost, min_value))
return '%s%s' % (divmod(int(time_cost), int(min_value)))
elif time_cost < day:
cost_hour, cost_min = divmod(int(time_cost), int(hour))
if cost_min > min_value:
return '%s%s' % (int(cost_hour), seconds_format(cost_min))
else:
return '%s时0分%s' % (int(cost_hour), seconds_format(cost_min))
else:
cost_day, cost_hour = divmod(int(time_cost), int(day))
if cost_hour >= hour:
return '%s%s' % (int(cost_day), seconds_format(cost_hour))
elif cost_hour >= min_value:
return '%s天0时%s' % (int(cost_day), seconds_format(cost_hour))
else:
return '%s天0时0分%s' % (int(cost_day), seconds_format(cost_hour))
def calculate_rto(end, start, ceil=True):
"""其中end, start都是ms级别的时间戳, ceil表示是否向上取整"""
total_miscro = float(end - start)
if ceil:
total = math.ceil(total_miscro / 1000)
else:
if total_miscro < 1000:
total = math.ceil(total_miscro / 1000)
else:
total = int(total_miscro / 1000)
return seconds_format(total)
def transform_duration_str_to_seconds(duration_str: str):
"""
将时长字符串转换为秒
:param duration_str: 时长字符串 eg: 1天2时3分4秒
:return:
"""
if not duration_str:
return None
try:
day, hour, minute, second = 0, 0, 0, 0
if "" in duration_str:
_values = duration_str.split("", 1)
day = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
hour = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
minute = int(_values[0])
duration_str = _values[-1]
if "" in duration_str:
_values = duration_str.split("", 1)
second = int(duration_str.split("")[0])
return day * 24 * 60 * 60 + hour * 60 * 60 + minute * 60 + second
except Exception:
return None
def extra_script_data_after_call(resp):
"""解析内容部门返回的响应数据并拼接成字符串"""
resp_data = ''
try:
overall = resp.get('overall')
_ts = resp.get('data', [])
resp_data = f'状态码:{overall} 其他信息:'
if isinstance(_ts, list):
for _t in _ts:
resp_data += f'{_t}\n'
except Exception:
try:
resp_data = f'{resp}'
except:
pass
return resp_data
def time_diff_int(str_time1, str_time2):
"""计算差值,返回秒数"""
# TODO 修改类型
if str(str_time2) == '2038-01-01 00:00:00':
raise Exception()
timearray = time.strptime(str_time1, "%Y-%m-%d %H:%M:%S")
int_time1 = int(time.mktime(timearray))
timearray = time.strptime(str_time2, "%Y-%m-%d %H:%M:%S")
int_time2 = int(time.mktime(timearray))
return int_time1 - int_time2
def timed_lru_cache(seconds: int, maxsize: int = 128):
def wrapper_cache(func):
func = lru_cache(maxsize=maxsize)(func)
func.lifetime = datetime.timedelta(seconds=seconds)
func.expiration = datetime.datetime.now() + func.lifetime
@wraps(func)
def wrapped_func(*args, **kwargs):
if datetime.datetime.now() >= func.expiration:
func.cache_clear()
func.expiration = datetime.datetime.now() + func.lifetime
return func(*args, **kwargs)
return wrapped_func
return wrapper_cache
def is_illegal_name(name):
illegal_list = ['+', "-", "=", "@"]
for i in illegal_list:
if i in name:
return True
return False
def get_local_ip():
try:
# 获取本机主机名
hostname = socket.gethostname()
# 获取本机IP地址
ip_address = socket.gethostbyname(hostname)
return ip_address
except socket.error:
return "Unable to get local IP"