import datetime from datetime import timedelta from io import BytesIO from dateutil.parser import parse from fastapi import HTTPException from openpyxl import load_workbook from tortoise import fields from helper.logger import logger from helper.tool import parse_datetime from models import User from models.base import BaseModel from models.template import Template class StorageForm(BaseModel): user = fields.ForeignKeyField('rms.User', to_field='id') template = fields.ForeignKeyField('rms.Template', to_field='id', description='药剂模板ID') name = fields.CharField(max_length=255, description='模板名称', null=True) fill_json_content = fields.JSONField(description='模版内容') drug_info = fields.TextField(default="", null=True, description='模板内药剂信息') class Meta: table = 'storage_forms' table_description = '入库单表' @classmethod async def upload(cls, user: User, template: Template, file_contents: bytes, filename: str): # 文件 bytes_file = BytesIO(file_contents) # 条目上传 xlsx = list(filter(lambda i: i.get("checked"), template.xlsx)) xlsx_transfer = {f"{item['key']}": f"{item['text']}" for item in xlsx} required_keys = [item['key'] for item in xlsx if item.get('required')] un_required_keys = [item['key'] for item in xlsx if not item.get('required')] data, drug_info = [], [] for row in cls.read_xlsx(bytes_file): # row {'药剂名称': '硫酸', '药剂规格': '500ml', '药剂纯度': '80%', '生产厂家': '广西硫酸厂'} # required_item {'k1': '硫酸', 'k2': '500ml', 'k3': '80%'} required_item = {item["key"]: row[item["text"]] for item in xlsx if item["key"] in required_keys} # un_required_item {'manufacturer': '广西硫酸厂'} un_required_item = {item["key"]: row[item["text"]] for item in xlsx if item["key"] in un_required_keys} keys = {**required_item, **un_required_item} keys = cls.check_parse_datetime(keys) validate, validate_msg = cls.row_check(required_item, keys, xlsx) data.append({ "fill_json_content": keys, "validate": validate, "validate_msg": validate_msg, "gross_weight": keys.get("gross_weight") if keys.get("gross_weight") else "", "import_count": int(keys.get("import_count")) if keys.get("import_count") else 1, }) drug_info.append(required_item.get("k1")) result = { "xlsx_transfer": xlsx_transfer, "data": data } await StorageForm.create(user=user, template=template, name=filename, fill_json_content=data, drug_info=",".join(drug_info)) return result @classmethod def read_xlsx(cls, file): workbook = load_workbook(file, read_only=True) sheet = workbook.active head = tuple(i.value for i in sheet[1]) for row in sheet.iter_rows(min_row=2, values_only=True): if any(item for item in row): row_list = [item.strftime("%Y-%m-%d") if isinstance(item, datetime.datetime) else item for item in row] yield dict(zip(head, row_list)) workbook.close() @classmethod def row_check(cls, required_item, keys, xlsx): '''检查行数据是否有效''' # 校验必填字段类型是否匹配 type_validate, msg = cls.check_row_type(required_item, xlsx) # 校验必填项 required_validate = all(value for value in required_item.values()) msg += f"必填字段未填写({required_item})" if not required_validate else "" # 校验gross_weight # gross_weight 数值 不能为空,不为空单位必须为mg或g,不填默认g # net_weight 净含量 可能为空,不为空单位必须为g,mg,ml gross_weight_validate, net_weight_validate, density_validate = True, True, True gross_weight = keys.get("gross_weight") if gross_weight: if gross_weight[:-1] == 'g' or gross_weight[:-1].isdigit(): gross_weight_validate = True else: gross_weight_validate = False msg += '重量最后单位不正确' # 净含量必须要有单位 数值 + 单位(单位必填) net_weight = keys.get("net_weight") if net_weight: if not net_weight.endswith("g") or not net_weight.endswith("mg") or not net_weight.endswith("ml"): net_weight_validate = False msg += "净含量单位必须带单位g,mg,ml" # 密度 density = keys.get("density") if density: if density.endswith('g/ml') or density[:-1].isdigit(): density_validate = True else: density_validate = False msg += '密度单位不正确' return required_validate and type_validate and gross_weight_validate and density_validate and net_weight_validate, msg @classmethod def check_row_type(cls, keys, xlsx): """ 校验类型 只校验必填项类型 :param keys: {'k1': '硫酸', 'k2': '500ml', 'k3': '80%', 'density': "0.82"} :param xlsx:[{"key": "k1", "text": "药剂名称", "required": true, "type": "string" | "number" | "date"}] :return: True | False """ key_type = {item["key"]: item.get("type", "string") for item in xlsx} for key, value in keys.items(): if key_type[key] == "number": try: float(value) except Exception as e: logger.warning("keys:{0}, 校验类型失败:{1}".format(keys, e)) return False, f"校验类型失败({key}, {value}, {key_type[key]})" elif key_type[key] == "date": try: parse(value) except Exception as e: logger.warning("keys:{0}, 校验类型失败:{1}".format(keys, e)) return False, f"校验类型失败({key}, {value}, {key_type[key]})" else: return True, "" @classmethod def check_parse_datetime(cls, keys): """ 检查校验日期时间(生产日期produce_date,过期日期expire_date) :param keys: {'k1': '硫酸', 'k2': '500ml', 'k3': '80%', 'manufacturer': '广西硫酸厂'} :return: """ trans_str = '%Y-%m-%d' for date_key in ['produce_date', 'expire_date']: if date_key in keys: if keys[date_key]: parsed_date = parse_datetime(keys[date_key], trans_str) if parsed_date: keys[date_key] = parsed_date else: raise HTTPException( status_code=400, detail="日期格式错误", ) else: keys[date_key] = "" return keys async def calculate_expire_date(self, template_obj): """ 计算过期日期 - 如果用户定义了生产日期与保质期,计算过期日期.如果用户定义了过期日期返回过期日期 """ if self.fill_json_content.get("expire_date"): return self.fill_json_content.get("expire_date") if not self.fill_json_content.get("produce_date") or not self.fill_json_content.get("shelf_life"): return "" else: produce_date = self.fill_json_content.get("produce_date") shelf_life = self.fill_json_content.get("shelf_life") production_date = datetime.datetime.strptime(produce_date, '%Y-%m-%d') shelf_life_date = timedelta(days=int(shelf_life)) expiration_date = production_date + shelf_life_date return expiration_date.strftime('%Y-%m-%d') #根据中文获取key值 async def parse_fill_json_content(self, pop_count: bool = True): """ 解析药剂信息 :return: {"药剂名称": "xx", "药剂规格": "xx"} """ fill_json_content = self.fill_json_content template_obj = await self.template xlsx_transfer = template_obj.parse_xlsx_transfer().get("xlsx_transfer") result = dict(zip(xlsx_transfer.values(),xlsx_transfer.keys())) return result