Data_Analysis/数据准备过程/模拟数据生成-提示词/generate_simulated_admissio...

315 lines
7.8 KiB
Python

# -*- coding: utf-8 -*-
"""按“生成入院量模拟数据.md”生成入院量模拟数据。"""
from __future__ import annotations
import random
from dataclasses import dataclass
from datetime import date
from pathlib import Path
from openpyxl import Workbook
from openpyxl.utils import get_column_letter
ROOT = Path(__file__).resolve().parent
HEADERS = [
"Year",
"Month",
"HospitalName",
"HospitalCode",
"DealerName",
"DealerCode",
"Province",
"City",
"GlobalDivision",
"LocalDivision",
"BU",
"ProductLine",
"ProductLineType",
"Material",
"MaterialDesc",
"CY Amt",
"LY Amt",
"Growth Amt",
"Growth% Amt",
"CY Qty",
"LY Qty",
"Growth Qty",
"Growth% Qty",
"CY LE AMT",
]
@dataclass(frozen=True)
class Hospital:
name: str
code: str
province: str
city: str
@dataclass(frozen=True)
class Dealer:
name: str
code: str
@dataclass(frozen=True)
class MaterialProfile:
global_div: str
local_div: str
bu: str
product_line: str
product_line_type: str
material: str
material_desc: str
unit_price_low: float
unit_price_high: float
weight: int
HOSPITALS = [
Hospital("上海市第一人民医院", "H310001", "上海市", "上海市"),
Hospital("浙江大学医学院附属第二医院", "H330001", "浙江省", "杭州市"),
Hospital("四川大学华西医院", "H510001", "四川省", "成都市"),
Hospital("广东省人民医院", "H440001", "广东省", "广州市"),
Hospital("中南大学湘雅医院", "H430001", "湖南省", "长沙市"),
Hospital("山东大学齐鲁医院", "H370001", "山东省", "济南市"),
Hospital("西安交通大学第一附属医院", "H610001", "陕西省", "西安市"),
Hospital("南京鼓楼医院", "H320001", "江苏省", "南京市"),
Hospital("福建省立医院", "H350001", "福建省", "福州市"),
Hospital("郑州大学第一附属医院", "H410001", "河南省", "郑州市"),
]
DEALERS = [
Dealer("上海睿康医疗器械有限公司", "D10001"),
Dealer("浙江和信医药科技有限公司", "D10002"),
Dealer("广州安泰医疗供应链有限公司", "D10003"),
Dealer("成都优诺医疗器械有限公司", "D10004"),
Dealer("南京博瑞医疗科技有限公司", "D10005"),
Dealer("济南康惠医疗设备有限公司", "D10006"),
Dealer("西安同泽医药贸易有限公司", "D10007"),
Dealer("福州海润医疗器械有限公司", "D10008"),
]
MATERIALS = [
MaterialProfile(
"Hospital Care",
"输液治疗",
"IV Therapy BU",
"静脉输注",
"留置针",
"MAT203145",
"一次性使用静脉留置针 20G",
36,
58,
160,
),
MaterialProfile(
"Hospital Care",
"输液治疗",
"IV Therapy BU",
"静脉输注",
"输液器",
"MAT203148",
"一次性使用精密过滤输液器",
22,
40,
150,
),
MaterialProfile(
"Hospital Care",
"输液治疗",
"IV Therapy BU",
"静脉输注",
"注射器",
"MAT203166",
"一次性使用无菌注射器 20ml",
4.5,
8.5,
110,
),
MaterialProfile(
"Avitum",
"透析",
"Renal Care BU",
"透析耗材",
"透析器",
"MAT301201",
"血液透析器 1.4m²",
145,
230,
130,
),
MaterialProfile(
"Avitum",
"透析",
"Renal Care BU",
"透析耗材",
"透析管路",
"MAT301216",
"血液透析管路 AV-SET",
85,
130,
90,
),
MaterialProfile(
"Aesculap",
"外科",
"Surgical BU",
"缝线与外科耗材",
"可吸收缝线",
"MAT402018",
"可吸收性外科缝线 3-0",
120,
180,
95,
),
MaterialProfile(
"Aesculap",
"外科",
"Surgical BU",
"缝线与外科耗材",
"非吸收缝线",
"MAT402026",
"非吸收性外科缝线 4-0",
88,
150,
85,
),
MaterialProfile(
"Hospital Care",
"输液治疗",
"IV Therapy BU",
"营养输注",
"肠内营养器械",
"MAT203199",
"一次性使用肠内营养输液器",
26,
44,
70,
),
]
def weighted_materials() -> list[MaterialProfile]:
out: list[MaterialProfile] = []
for m in MATERIALS:
out.extend([m] * m.weight)
return out
def quarter_factor(month: int) -> float:
if month in (10, 11, 12):
return 1.08
if month in (7, 8, 9):
return 1.03
return 1.0
def generate_rows(n: int, seed: int) -> list[list]:
rng = random.Random(seed)
mat_pool = weighted_materials()
years = [2024, 2025, 2026]
rows: list[list] = []
for _ in range(n):
year = rng.choice(years)
month = rng.randint(1, 12)
hospital = rng.choice(HOSPITALS)
dealer = rng.choice(DEALERS)
mat = rng.choice(mat_pool)
# 数量:近似反映季度小幅波动
base_cy_qty = rng.randint(120, 2200)
cy_qty = int(round(base_cy_qty * quarter_factor(month)))
ly_is_zero = rng.random() < 0.05
if ly_is_zero:
ly_qty = 0
else:
change_ratio_qty = rng.uniform(-0.28, 0.38)
ly_qty = max(1, int(round(cy_qty / (1 + change_ratio_qty))))
growth_qty = cy_qty - ly_qty
growth_pct_qty = 0 if ly_qty == 0 else round(growth_qty / ly_qty, 4)
# 金额:与数量和物料单价相关,保证可解释性
unit_price_cy = rng.uniform(mat.unit_price_low, mat.unit_price_high)
cy_amt = round(cy_qty * unit_price_cy, 2)
if ly_qty == 0:
ly_amt = 0.0
else:
# 同一物料年度单价一般平稳波动
unit_price_ly = unit_price_cy * rng.uniform(0.92, 1.08)
ly_amt = round(ly_qty * unit_price_ly, 2)
growth_amt = round(cy_amt - ly_amt, 2)
growth_pct_amt = 0 if ly_amt == 0 else round(growth_amt / ly_amt, 4)
cy_le_amt = round(cy_amt * rng.uniform(0.90, 1.15), 2)
rows.append(
[
year,
month,
hospital.name,
hospital.code,
dealer.name,
dealer.code,
hospital.province,
hospital.city,
mat.global_div,
mat.local_div,
mat.bu,
mat.product_line,
mat.product_line_type,
mat.material,
mat.material_desc,
cy_amt,
ly_amt,
growth_amt,
growth_pct_amt,
cy_qty,
ly_qty,
growth_qty,
growth_pct_qty,
cy_le_amt,
]
)
return rows
def main() -> None:
today = date.today()
out_name = f"入院量数据-模拟1000条-{today:%Y%m%d}.xlsx"
out_path = ROOT / out_name
rows = generate_rows(1000, int(today.strftime("%Y%m%d")))
wb = Workbook()
ws = wb.active
ws.title = "Sheet1"
ws.append(HEADERS)
for row in rows:
ws.append(row)
# 格式设置:金额/增长率显示友好
amt_cols = ["P", "Q", "R", "X"]
pct_cols = ["S", "W"]
for r in range(2, 1002):
for col in amt_cols:
ws[f"{col}{r}"].number_format = "#,##0.00"
for col in pct_cols:
ws[f"{col}{r}"].number_format = "0.00%"
for idx, _ in enumerate(HEADERS, start=1):
ws.column_dimensions[get_column_letter(idx)].width = 16
wb.save(out_path)
print(f"Written: {out_path}")
if __name__ == "__main__":
main()