""" 将「数据」目录下三份 Excel(与 #数据与表结构.md 一致)转换为 analytics-demo-web/public/mock/*.json。 派生字段(源表无列时): - AE occurrenceDate / registrationDate:由审核日回溯生成,保证 occurrence < registration < review(演示用)。 - AE province:由入院量表按医院名称众数映射。 - AE SAE 四布尔:由「伤害表现」关键词启发式判定。 - 入院 productName:将 MaterialDesc 映射到 AE∪投诉 产品名称集合(最长前缀包含匹配),便于与投诉率联表。 """ from __future__ import annotations import hashlib import json from pathlib import Path import pandas as pd ROOT = Path(__file__).resolve().parents[1] DATA_DIR = ROOT / "数据" OUT_DIR = ROOT / "analytics-demo-web" / "public" / "mock" def _d(v) -> pd.Timestamp | pd.NaTType: if pd.isna(v): return pd.NaT return pd.to_datetime(v) def _fmt(d: pd.Timestamp | pd.NaTType) -> str: if pd.isna(d): return "" return d.strftime("%Y-%m-%d") def _stable_int(s: str) -> int: return int(hashlib.md5(s.encode("utf-8")).hexdigest()[:8], 16) def infer_sae_flags(injury_text: str) -> dict[str, bool]: t = injury_text or "" return { "saeDeath": ("死亡" in t) and ("非死亡" not in t), "saeLifeThreatening": "危及生命" in t or "生命威胁" in t or "心跳骤停" in t, "saeDisability": "残疾" in t or "功能障碍" in t, "saeHospitalization": "住院" in t or "延长住院" in t or "住院时间延长" in t, } def map_material_to_product(material_desc: str, canonical_products: list[str]) -> str: if not isinstance(material_desc, str) or not material_desc.strip(): return "" s = material_desc.strip() for p in canonical_products: if p and p in s: return p return s.split()[0] if " " in s else s def main() -> None: ae_path = DATA_DIR / "不良事件数据-模拟1000条-20260414.xlsx" adm_path = DATA_DIR / "入院量数据-模拟1000条-20260414.xlsx" cmp_path = DATA_DIR / "质量投诉数据-模拟1000条-20260414.xlsx" ae_df = pd.read_excel(ae_path, sheet_name=0) adm_df = pd.read_excel(adm_path, sheet_name=0) cmp_df = pd.read_excel(cmp_path, sheet_name=0) cmp_df.columns = [str(c).replace("\n", "").strip() for c in cmp_df.columns] hosp_prov = ( adm_df.groupby("HospitalName")["Province"] .agg(lambda s: s.mode().iloc[0] if len(s.mode()) else (s.iloc[0] if len(s) else "")) .to_dict() ) hospital_keys = sorted(hosp_prov.keys(), key=len, reverse=True) # 入院量表仅覆盖部分医院;其余 AE 单位按常识补省(仅用于演示地图/省聚合) province_override: dict[str, str] = { "中国医科大学附属第一医院": "辽宁省", "华中科技大学同济医学院附属协和医院": "湖北省", "昆明医科大学第一附属医院": "云南省", "天津市肿瘤医院": "天津市", "重庆医科大学附属第一医院": "重庆市", } def resolve_province(unit: str) -> str: if unit in province_override: return province_override[unit] if unit in hosp_prov: return str(hosp_prov[unit]).strip() for h in hospital_keys: if not h: continue if h in unit or unit in h: return str(hosp_prov[h]).strip() return "(未映射)" ae_products = set(ae_df["产品名称"].dropna().astype(str).unique()) cmp_products = set(cmp_df["产品名称"].dropna().astype(str).unique()) canonical_products = sorted(ae_products | cmp_products, key=len, reverse=True) ae_rows: list[dict] = [] for _, row in ae_df.iterrows(): code = str(row.get("报告编码", "")).strip() review = _d(row.get("审核日期")) if pd.isna(review): continue h = _stable_int(code) % 10000 reg = review - pd.Timedelta(days=3 + (h % 6)) occ = reg - pd.Timedelta(days=1 + (h % 12)) unit = str(row.get("单位名称", "")).strip() injury = str(row.get("伤害表现", "") or "").strip() flags = infer_sae_flags(injury) ae_rows.append( { "reportCode": code, "occurrenceDate": _fmt(occ), "registrationDate": _fmt(reg), "unitName": unit, "businessUnit": str(row.get("事业线", "") or "").strip(), "productName": str(row.get("产品名称", "") or "").strip(), "registrationNo": str(row.get("注册证编号/曾用注册证编号", "") or "").strip(), "model": str(row.get("型号", "") or "").strip(), "batchNo": str(row.get("产品批号", "") or "").strip(), "injuryExpression": injury, "deviceFailure": str(row.get("器械故障表现", "") or "").strip(), "reviewDate": _fmt(review), "province": resolve_province(unit) or "(未映射)", **flags, } ) adm_rows: list[dict] = [] for _, row in adm_df.iterrows(): md = row.get("MaterialDesc") pname = map_material_to_product(str(md) if md == md else "", canonical_products) adm_rows.append( { "year": int(row["Year"]), "month": int(row["Month"]), "hospitalName": str(row.get("HospitalName", "") or "").strip(), "dealerName": str(row.get("DealerName", "") or "").strip(), "province": str(row.get("Province", "") or "").strip(), "bu": str(row.get("BU", "") or "").strip(), "productName": pname, "cyQty": float(row.get("CY Qty", 0) or 0), "lyQty": float(row.get("LY Qty", 0) or 0), "growthQtyPct": round(float(row.get("Growth% Qty", 0) or 0) * 100, 4), "cyAmt": float(row.get("CY Amt", 0) or 0), "growthAmtPct": round(float(row.get("Growth% Amt", 0) or 0) * 100, 4), } ) def yn_to_bool(v) -> bool: s = str(v).strip() return s == "是" or s.lower() == "true" or s == "1" cmp_rows: list[dict] = [] for _, row in cmp_df.iterrows(): reg = _d(row.get("C3登记日期")) if pd.isna(reg): continue close = _d(row.get("关闭日期")) survey = _d(row.get("调查报告完成日期")) if pd.isna(close): close = survey if not pd.isna(survey) else reg cmp_rows.append( { "c3Code": str(row.get("C3编号", "")).strip(), "model": str(row.get("型号", "") or "").strip(), "batchNo": str(row.get("批号", "") or "").strip(), "registrationNo": str(row.get("注册证号", "") or "").strip(), "productName": str(row.get("产品名称", "") or "").strip(), "hospitalName": str(row.get("医院名称", "") or "").strip(), "faultType": str(row.get("故障类型", "") or "").strip(), "registerDate": _fmt(reg), "isAe": yn_to_bool(row.get("是否不良事件")), "conclusion": str(row.get("调查结论(处理结果)", "") or "").strip(), "compensation": str(row.get("赔付结论", "") or "").strip(), "closeDate": _fmt(close), } ) OUT_DIR.mkdir(parents=True, exist_ok=True) (OUT_DIR / "ae.json").write_text( json.dumps(ae_rows, ensure_ascii=False, indent=2), encoding="utf-8", ) (OUT_DIR / "admission.json").write_text( json.dumps(adm_rows, ensure_ascii=False, indent=2), encoding="utf-8", ) (OUT_DIR / "complaint.json").write_text( json.dumps(cmp_rows, ensure_ascii=False, indent=2), encoding="utf-8", ) print("Wrote", len(ae_rows), "ae,", len(adm_rows), "admission,", len(cmp_rows), "complaint ->", OUT_DIR) if __name__ == "__main__": main()