import csv import json import os import pathlib import shutil from app.conf.Setting import settings # from app.lib.StaticData import ServerStatus class RecordStatus: """记录当然实验工作状态""" def __init__(self): self.file = pathlib.Path(settings.BASE_PATH, settings.TEMP_PATH, "last_experiment.json") def load(self): if not self.file.exists(): return {} return json.loads(self.file.read_text()) def update(self, work_time, status): data = { "work_time": work_time, "status": status } self.file.write_text(json.dumps(data)) def clear(self): if self.file.exists(): self.file.unlink() class ExperimentData: """实验数据""" current_obj = None id = None def __init__(self, _id): self._id = _id self.dir_path = pathlib.Path(os.path.join(settings.BASE_PATH, settings.TEMP_PATH, self._id)) self.file_path = self.dir_path.joinpath("experiment_data.csv") self.status_file = self.dir_path.joinpath("last_status.json") def init(self): if not self.dir_path.exists(): print(f"create dir {self.dir_path}") self.dir_path.mkdir(parents=True, exist_ok=True) self.file_path.touch(exist_ok=True) self.status_file.touch(exist_ok=True) ExperimentData.current_obj = self ExperimentData.id = self._id def update_status(self, status_data): format_value = (f'{status_data["status"]},{status_data["status_text"]},' f'{status_data["s_pressure"]},{status_data["temperature"]},' f'{status_data["test_time"]}\n') with open(self.file_path, "a+") as f: f.write(format_value) data = { "status": status_data["status"], "status_text": status_data["status_text"], "s_pressure": status_data["s_pressure"], "temperature": status_data["temperature"], "test_time": status_data["test_time"] } self.status_file.write_text(json.dumps(data)) def get_last_status(self): if not self.status_file.exists(): return {'status': 0} # 设置的初始值 raw = self.status_file.read_text() if not raw: return {'status': 0} # 设置的初始值 return json.loads(self.status_file.read_text()) def finish(self): ExperimentData.current_obj = None def remove(self): if self.dir_path.exists(): print(f"remove dir {self.dir_path}") shutil.rmtree(self.dir_path) def judging_status(self, status_obj, work_time): """根据时间判断状态""" cur_status = status_obj[0][0] for status, time in status_obj: if int(work_time) <= int(time): return cur_status else: cur_status = status return cur_status def get_raw_data(self): result = [] if not self.file_path.exists(): return result with open(self.file_path, "r") as f: reader = csv.reader(f) for row in reader: result.append(row) return result