kms-backend/helper/drug.py

346 lines
12 KiB

#!/usr/bin/env python
# encoding: utf-8
"""
@author: tx
@file: drug.py
@time: 2023/6/5 16:53
@desc:
"""
from datetime import datetime, timedelta
from fastapi import HTTPException
from conf import setting
from models.drug import Drug, DrugStateEnum
# from models.drug_use_log import DrugUseLog
from models.template import Template
from models.cabinet import Cabinet
async def drugs_except_info(drug_obj, terminal_obj):
"""
RFID药剂异常信息
:param drug_obj: 药剂
:param terminal_obj: 终端
:return: dict
"""
# drug_obj = await Drug.get(id=drug_obj.id).prefetch_related('dictionary','template')
alert_close_to = drug_obj.dictionary.params.get("alert_close_to", 0) if drug_obj.dictionary else 0
info_dict = {
"sound": False,
"message": "",
"state": 0,
}
if not drug_obj:
info_dict["is_rfid_except"] = True
info_dict["sound"] = True
info_dict["message"] = "非法标签"
info_dict["state"] = 0
return info_dict
info_dict["hole"] = drug_obj.hole
# 空瓶药剂
if drug_obj.state == DrugStateEnum.EMPTY:
info_dict["is_rfid_except"] = True
info_dict["sound"] = False
info_dict["message"] = "空瓶药剂"
info_dict["state"] = 1
return info_dict
receive_require_weigh = False
template_obj = await Template.get(id=drug_obj.template.id).prefetch_related('archive')
return_require_weigh = template_obj.archive.params.get("return_require_weigh", '')
# 归还
if return_require_weigh and drug_obj.state == DrugStateEnum.OUT and not drug_obj.last_use_weight:
info_dict["is_weight_except"] = True
info_dict["sound"] = True
info_dict["message"] = "药剂未称重"
info_dict["state"] = 3
# 领用
if receive_require_weigh and drug_obj.state == DrugStateEnum.IN and not drug_obj.last_use_weight:
info_dict["is_weight_except"] = True
info_dict["sound"] = True
info_dict["message"] = "药剂未称重"
info_dict["state"] = 3
near_expired = check_drug_near_expired(drug_obj, alert_close_to)
if near_expired:
info_dict["is_near_expired"] = True
info_dict["sound"] = False if not info_dict["sound"] else True
info_dict["message"] += "药剂临期 "
info_dict["state"] = 2
already_expired = check_drug_already_expired(drug_obj)
if already_expired:
info_dict["is_already_expired"] = True
info_dict["sound"] = False if not info_dict["sound"] else True
info_dict["message"] += f"{drug_obj.attribute('k1')}药剂过期 "
info_dict["state"] = 2
# 危化品不能放入标准品柜子 (柜体的大类id与药剂的大类id比较)
cabinet_obj = await Cabinet.filter(terminal_id=setting.TERMINAL_ID).prefetch_related("archive").first()
archive_id = cabinet_obj.archive.id
drug_archive_obj = await drug_obj.template.archive
if not archive_id == drug_archive_obj.id:
info_dict["is_put_expired"] = True
info_dict["sound"] = False if not info_dict["sound"] else True
info_dict["message"] += "请勿将此类试剂放入该柜体 "
info_dict["state"] = 4
info_dict["hole"] = drug_obj.hole
return info_dict
def check_drug_near_expired(drug, alert_close_to):
"""
判断是否临期
:param drug:
:return:
"""
if not drug.expired_at or not alert_close_to:
return False
today = datetime.today().date()
expiry_date = drug.expired_at.date()
alert_date = expiry_date - timedelta(days=alert_close_to)
if alert_date <= today <= expiry_date:
return True
return False
def check_drug_already_expired(drug, *args):
"""
判断是否过期
:param drug:
:return:
"""
if not drug.expired_at:
return False
today = datetime.today().date()
expiry_date = drug.expired_at.date()
if today >= expiry_date:
return True
return False
async def drugs_list_info(drugs: list[Drug]):
"""药剂列表数据"""
result = list()
for drug_obj in drugs:
data = {
"drug_id": drug_obj.id,
"rfid": drug_obj.rfid,
# "hole": drug_obj.hole,
# "cabinet_id": drug_obj.cabinet_id if drug_obj.cabinet_id else drug_obj.attribute_cabinet_id(),
# "drawer_id": drug_obj.drawer_id,
# "board_id": drug_obj.board_id,
"position": drug_obj.position,
# "expired_at": drug_obj.expired_at.strftime("%Y-%m-%d %H:%M:%S") if drug_obj.expired_at else '',
# "remain_gross_weight": await drug_obj.format_weight(),
# "display_weight": await drug_obj.format_weight(),
# "drug_info": await drug_obj.attribute_drug_info(),
"state": drug_obj.state,
# "open_date": drug_obj.open_date.strftime("%Y-%m-%d %H:%M:%S") if drug_obj.open_date else '',
# "manufacturer": drug_obj.attribute("cs", ""), # 生产厂商
# "parse_fill_json_content": await drug_obj.parse_fill_json_content(),
"fill_json_content": drug_obj.fill_json_content,
}
result.append(data)
return result
async def drug_near_already_expired(drugs: list[Drug], code="already",stime='',etime=''):
"""药剂告警数据"""
check_func = {
"near": check_drug_near_expired,
"already": check_drug_already_expired
}
drug_list = list()
for drug_obj in drugs:
# 临期报警天数
expiration_alert = drug_obj.dictionary.params.get("expiration_alert", "") if drug_obj.dictionary and drug_obj.dictionary.params else 10
is_near_expired = check_func[code](drug_obj, expiration_alert)
if is_near_expired:
expiry_date = drug_obj.expired_at.date()
if code=="already":
types ="过期预警"
warning_date =expiry_date
info=drug_obj.position+drug_obj.fill_json_content.get('k1','')+'('+drug_obj.rfid+')'+str(warning_date)+'已过期'
else:
types ="临期预警"
warning_date = expiry_date - timedelta(days=expiration_alert)
info=drug_obj.position+drug_obj.fill_json_content.get('k1','')+'('+drug_obj.rfid+')'+str(expiry_date)+'即将过期'
attribute_key = await drug_obj.dictionary.attribute_dictionary_info(drug_obj.template)
drug_info = {
"rfid":drug_obj.rfid,
"drug_id": drug_obj.id,
"warning_date":warning_date,
"drawer_id": drug_obj.drawer_id,
"drug_info": await drug_obj.attribute_drug_info(),
"position": drug_obj.position,
"storage_at": drug_obj.storage_at,
"expired_at": drug_obj.expired_at,
"types":types,
"info":info,
"unit_code": drug_obj.attribute("unit_code"),
"manufacturer": drug_obj.attribute("cs"), # 生产厂家
**await drug_obj.attribute_drug_info(),
**attribute_key,
}
if stime:
if etime:
if etime >warning_date>stime:
drug_list.append(drug_info)
else:
if warning_date>stime:
drug_list.append(drug_info)
else:
if etime:
if etime >warning_date:
drug_list.append(drug_info)
else:
drug_list.append(drug_info)
return drug_list
def gram_to_milligram(gram):
"""
将克转换成毫克
:param gram: float 克数
:return: float 毫克数
"""
gram = float(gram) * 1000 if gram else 0
return gram
def milligram_to_gram(milligram):
"""
将毫克转换成克
:param milligram: float 毫克数
:return: float 克数
"""
milligram = float(milligram) / 1000 if milligram else 0
return milligram
def gram_to_milliliter(gram, density):
"""
将克转换成毫升 d = m / v
保留1位小数
:param gram: g
:param density: g/ml
:return:
"""
if not density:
return False
if density.endswith('g/ml') or density[:-1].isdigit():
if density.endswith('g/ml'):
density = density[:-4]
milliliter = float(gram) / float(density)
return round(milliliter, 1)
else:
return False
def open_update_expired_at(drug_obj, template_obj, open_date):
"""
首次领用即开封
"""
"""
首次开瓶更新过期日期
- 新保质期 = 当前时间 + 开封保质期
- 与原保质期比较,谁更小,谁就是新保质期
:return:
"""
print("首次开瓶更新过期日期")
# 根据最后领用人判断是否为首次开瓶
if drug_obj.last_receive_id or drug_obj.last_receive_at:
return ""
shelf_life_k = [i.get("key") for i in template_obj.xlsx if i.get("text") in "开封保质期(天)"]
shelf_life_key = shelf_life_k[0] if shelf_life_k else ""
if not shelf_life_key:
return ""
shelf_life_day = drug_obj.fill_json_content.get(shelf_life_key) # 开封保质期500天
if not shelf_life_day:
return ""
# 保质期 = 当前时间 + 开封保质期
# 与原保质期比较,谁更小,谁就是新保质期
new_expired_at = open_date + timedelta(days=shelf_life_day)
if not drug_obj.expired_at or new_expired_at < drug_obj.expired_at:
return new_expired_at
return ""
async def simple_drugs_except_info(drug_obj):
"""
RFID药剂异常信息
:param drug_obj: 药剂
:param terminal_obj: 终端
:return: dict
"""
drug_obj = await Drug.get(id=drug_obj.id).prefetch_related('dictionary','template')
alert_close_to = drug_obj.dictionary.params.get("alert_close_to", 0) if drug_obj.dictionary and drug_obj.dictionary.params else 10
info_dict = {
"sound": False,
"message": "",
"state": 0,
}
if not drug_obj:
info_dict["is_rfid_except"] = True
info_dict["sound"] = True
info_dict["message"] = "非法标签"
info_dict["state"] = 0
return info_dict
info_dict["hole"] = drug_obj.hole
# 空瓶药剂
if drug_obj.state == DrugStateEnum.EMPTY:
info_dict["is_rfid_except"] = True
info_dict["sound"] = False
info_dict["message"] = "空瓶药剂"
info_dict["state"] = 1
return info_dict
receive_require_weigh = False
template_obj = await Template.get(id=drug_obj.template.id).prefetch_related('archive')
return_require_weigh = template_obj.archive.params.get("return_require_weigh", '')
# 归还
if drug_obj.state == DrugStateEnum.OUT and not drug_obj.last_use_weight and return_require_weigh:
info_dict["is_weight_except"] = True
info_dict["sound"] = True
info_dict["message"] = "药剂未称重"
info_dict["state"] = 3
# 领用
if drug_obj.state == DrugStateEnum.IN and not drug_obj.last_use_weight and receive_require_weigh:
info_dict["is_weight_except"] = True
info_dict["sound"] = True
info_dict["message"] = "药剂未称重"
info_dict["state"] = 3
near_expired = check_drug_near_expired(drug_obj, alert_close_to)
if near_expired:
info_dict["is_near_expired"] = True
info_dict["sound"] = False if not info_dict["sound"] else True
info_dict["message"] += "药剂临期 "
info_dict["state"] = 2
already_expired = check_drug_already_expired(drug_obj)
if already_expired:
info_dict["is_already_expired"] = True
info_dict["sound"] = False if not info_dict["sound"] else True
info_dict["message"] += f"{drug_obj.attribute('k1')}药剂过期 "
info_dict["state"] = 2
info_dict["hole"] = drug_obj.hole
return info_dict