From eb114a74f48f5f9c18c76c15c0282f68b362b808 Mon Sep 17 00:00:00 2001 From: badefei Date: Fri, 11 Jul 2025 10:53:23 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=20gongyinglia?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gongyinglia | 374 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 374 insertions(+) create mode 100644 gongyinglia diff --git a/gongyinglia b/gongyinglia new file mode 100644 index 0000000..bc5e76a --- /dev/null +++ b/gongyinglia @@ -0,0 +1,374 @@ +import tkinter as tk +from tkinter import ttk, scrolledtext, messagebox +from py2neo import Graph +import difflib +import random +import warnings + +# 过滤PNG图像警告 +warnings.filterwarnings("ignore", category=UserWarning, module="PIL") + +# Neo4j配置 +graph = Graph("bolt://localhost:7687", auth=("neo4j", "Bdf123456789")) + + +# 调试打印函数(增强缓存展示) +def debug_print(message, cache=None): + print(f"[DEBUG] {message}") + if cache: + print(f"[CACHE] {cache}") + + +# 模糊匹配函数 +def fuzzy_match(query, choices): + debug_print(f"模糊匹配: {query} in {choices}") + if not choices or all(v is None for v in choices): + return None + matches = difflib.get_close_matches(query, choices, n=1, cutoff=0.3) + return matches[0] if matches else None + + +# 策略推理规则 +def infer_strategy(risk_level, impact_level): + strategy_map = { + ('高', '大'): ['紧急性策略'], + ('高', '中'): ['紧急性策略'], + ('高', '小'): ['转移性策略'], + ('中', '大'): ['紧急性策略'], + ('中', '中'): ['预测性策略'], + ('中', '小'): ['转移性策略'], + ('低', '大'): ['预测性策略'], + ('低', '中'): ['预测性策略'], + ('低', '小'): ['预测性策略'], + } + return strategy_map.get((risk_level, impact_level), []) + + +# 检查供应商是否存在 +def check_supplier_exists(supplier_name): + try: + query = "MATCH (s:供应商 {名称: $name}) RETURN s LIMIT 1" + result = graph.run(query, name=supplier_name).data() + exists = bool(result) + debug_print(f"供应商 {supplier_name} 存在性检查", {"存在": exists}) + return exists + except Exception as e: + debug_print(f"检查供应商存在性失败", {"错误": str(e)}) + return False + + +# 查询下级供应商(被影响企业作为起点) +def find_downstream_suppliers(supplier_name): + try: + query = "MATCH (s:供应商 {名称: $name})-[:供应零件给]->(downstream:供应商) RETURN downstream.名称 AS 下级供应商" + result = graph.run(query, name=supplier_name).data() + suppliers = [item["下级供应商"] for item in result] + debug_print(f"查询下级供应商", {"供应商": suppliers}) + return suppliers + except Exception as e: + debug_print(f"查询下级供应商失败", {"错误": str(e)}) + return [] + + +# 查询上级供应商(被影响企业作为终点) +def find_upstream_suppliers(supplier_name): + try: + query = "MATCH (upstream:供应商)-[:供应零件给]->(s:供应商 {名称: $name}) RETURN upstream.名称 AS 上级供应商" + result = graph.run(query, name=supplier_name).data() + suppliers = [item["上级供应商"] for item in result] + debug_print(f"查询上级供应商", {"供应商": suppliers}) + return suppliers + except Exception as e: + debug_print(f"查询上级供应商失败", {"错误": str(e)}) + return [] + + +# 查询生产物料 +def find_produced_materials(supplier_name): + try: + query = "MATCH (s:供应商 {名称: $name})-[:生产零件]->(m:物料) RETURN m.编号 AS 物料编号" + result = graph.run(query, name=supplier_name).data() + materials = [item["物料编号"] for item in result] + debug_print(f"查询生产物料", {"物料": materials}) + return materials + except Exception as e: + debug_print(f"查询生产物料失败", {"错误": str(e)}) + return [] + + +# 处理供应商关系和策略替换(整合所有步骤) +def process_strategy_replacement(inputs, strategies): + try: + risk_subject = inputs["风险主体企业"] + affected_suppliers = inputs["被影响企业"].split() + + # 企业存在性检查 + if not check_supplier_exists(risk_subject): + messagebox.showerror("错误", f"风险主体供应商 {risk_subject} 不存在于图谱中!") + return None + for sup in affected_suppliers: + if not check_supplier_exists(sup): + messagebox.showerror("错误", f"被影响供应商 {sup} 不存在于图谱中!") + return None + + # 步骤1:搜索被影响企业影响的企业(下游) + affected_influence = None + if affected_suppliers: + downstream = find_downstream_suppliers(affected_suppliers[0]) + affected_influence = downstream[0] if downstream else None + debug_print("被影响企业影响的企业", {"企业": affected_influence}) + + # 步骤2:查询供应物料集合 + supply_materials = [] + for aff in affected_suppliers: + query = """ + MATCH (sub:供应商 {名称: $sub})-[rel:供应零件给]->(aff:供应商 {名称: $aff}) + RETURN rel.物料编号 AS 物料编号 + """ + result = graph.run(query, sub=risk_subject, aff=aff).data() + if result and "物料编号" in result[0]: + supply_materials.append(result[0]["物料编号"]) + debug_print("供应物料集合", {"物料": supply_materials}) + + # 步骤3:查询可替换企业集合 + replaceable_suppliers = [] + if supply_materials: + for mat in supply_materials: + query = """ + MATCH (s:供应商)-[:生产零件]->(m:物料 {编号: $mat}) + WHERE s.名称 <> $risk_subject + RETURN s.名称 AS 供应商名称 + """ + replaceable_suppliers.extend([ + item["供应商名称"] + for item in graph.run(query, mat=mat, risk_subject=risk_subject).data() + ]) + replaceable_suppliers = list(set(replaceable_suppliers)) + debug_print("可替换企业集合", {"企业": replaceable_suppliers}) + + # 步骤4:查询被影响企业的下级企业集合(上游) + lower_level_suppliers = [] + if affected_suppliers: + for aff in affected_suppliers: + lower_level_suppliers.extend(find_upstream_suppliers(aff)) + lower_level_suppliers = list(set(lower_level_suppliers)) + debug_print("下级企业集合", {"企业": lower_level_suppliers}) + + # 步骤5:查询下级企业生产物料集合 + lower_level_materials = {} + if lower_level_suppliers: + for sup in lower_level_suppliers: + materials = find_produced_materials(sup) + if materials: + lower_level_materials[sup] = materials + debug_print("下级企业生产物料集合", {"物料": lower_level_materials}) + + # 步骤6:匹配替换企业 + replace_supplier = None + if lower_level_materials and supply_materials: + for sup, mats in lower_level_materials.items(): + if any(mat in mats for mat in supply_materials): + replace_supplier = sup + debug_print("匹配到替换企业", {"企业": replace_supplier}) + break + if not replace_supplier and replaceable_suppliers: + replace_supplier = random.choice(replaceable_suppliers) + debug_print("从可替换企业中选择", {"企业": replace_supplier}) + + # 替换策略中的符号 + processed = [] + for s in strategies: + s = s.replace("#", risk_subject) + s = s.replace("*", affected_suppliers[0] if affected_suppliers else "") + s = s.replace("%", affected_influence or "") + s = s.replace("@_1", replace_supplier or "") + processed.append(s) + debug_print("替换后的策略", {"策略": processed}) + + return processed + except Exception as e: + debug_print(f"策略替换异常", {"错误": str(e)}) + messagebox.showerror("错误", f"策略处理失败: {str(e)}") + return None + + +# 查询风险项策略 +def query_risk_strategies(inputs): + try: + if not inputs["具体风险项"] or not inputs["风险主体企业"]: + messagebox.showwarning("输入缺失", "具体风险项和风险主体企业为必填项!") + return None + + risk_item = inputs["具体风险项"] + associated_risk = inputs["关联风险项"] + risk_level = inputs["风险等级"] + impact_level = inputs["风险影响程度"] + + # 模糊匹配风险项 + all_risk_names = [node["名称"] for node in graph.nodes.match("风险项") if "名称" in node] + matched_risk = fuzzy_match(risk_item, all_risk_names) + if not matched_risk: + messagebox.showerror("匹配失败", f"风险项 {risk_item} 未找到!") + return None + + matched_associated = None + if associated_risk: + matched_associated = fuzzy_match(associated_risk, all_risk_names) + if not matched_associated: + messagebox.showerror("匹配失败", f"关联风险项 {associated_risk} 未找到!") + return None + + # 推理策略性质 + strategy_type = infer_strategy(risk_level, impact_level) + if not strategy_type: + return "策略性质推理失败" + + # 查询策略 + strategies = [] + for item in [matched_risk, matched_associated] if matched_associated else [matched_risk]: + query = """ + MATCH (risk:风险项 {名称: $item})-[:有策略类]->(strategy:策略类) + MATCH (strategy)-[:策略性质为]->(st:策略性质 {名称: $strategy_type}) + RETURN strategy.描述 AS 策略名称 + """ + result = graph.run(query, item=item, strategy_type=strategy_type[0]).data() + strategies.extend([r["策略名称"] for r in result]) + debug_print("查询到的策略", {"策略": strategies}) + + return strategies, strategy_type, matched_risk, matched_associated + except Exception as e: + debug_print(f"查询风险策略异常", {"错误": str(e)}) + messagebox.showerror("错误", f"查询风险策略失败: {str(e)}") + return None + + +# 查询风险事件 +def query_risk_events(matched_risk, risk_level, impact_level): + query = """ + MATCH (risk:风险项 {名称: $risk})-[:有风险事件]->(event:风险事件) + MATCH (event)-[:风险等级为]->(rl {名称: $risk_level}) + MATCH (event)-[:风险影响程度为]->(ril {名称: $impact_level}) + RETURN event.描述 AS 事件名称 + """ + result = graph.run(query, risk=matched_risk, risk_level=risk_level, impact_level=impact_level).data() + events = [r["事件名称"] for r in result] + debug_print("查询到的风险事件", {"事件": events}) + return events + + +# 查询风险事件的策略 +def query_event_strategies(event_name): + query = """ + MATCH (event:风险事件 {描述: $event_name})-[:策略为]->(strategy:策略) + RETURN strategy.描述 AS 策略描述 + """ + result = graph.run(query, event_name=event_name).data() + strategies = [r["策略描述"] for r in result] + debug_print(f"风险事件 {event_name} 的策略", {"策略": strategies}) + return strategies + + +# GUI界面 +root = tk.Tk() +root.title("供应链风险查询系统") +root.geometry("800x900") + +# 输入字段 +fields = [ + ("风险事件", tk.StringVar(), None), + ("具体风险项", tk.StringVar(), None), + ("关联风险项", tk.StringVar(), None), + ("风险等级", tk.StringVar(value="高"), ["高", "中", "低"]), + ("风险影响程度", tk.StringVar(value="大"), ["大", "中", "小"]), + ("风险主体企业", tk.StringVar(), None), + ("被影响企业", tk.StringVar(), None), +] + +# 创建输入界面 +for idx, (label, var, options) in enumerate(fields): + frame = ttk.Frame(root) + frame.pack(fill="x", padx=10, pady=5) + ttk.Label(frame, text=label, width=25).pack(side="left") + if options: + ttk.OptionMenu(frame, var, var.get(), *options).pack(side="left", fill="x", expand=True) + else: + ttk.Entry(frame, textvariable=var, width=40).pack(side="left", fill="x", expand=True) + + +def submit(): + try: + inputs = {label: var.get() for label, var, _ in fields} + debug_print("提交查询", {"输入": inputs}) + + # 查询风险策略 + strategy_data = query_risk_strategies(inputs) + if not strategy_data: + return + + strategies, strategy_type, matched_risk, matched_associated = strategy_data + + # 查询风险事件 + risk_events = query_risk_events(matched_risk, inputs["风险等级"], inputs["风险影响程度"]) + + # 处理供应商关系和策略替换 + processed_strategies = process_strategy_replacement(inputs, strategies) + if not processed_strategies: + return + + # 构建结果(只显示特定信息) + result = "### 供应链风险分析与策略推荐 ###\n\n" + + result += f"• 具体风险项: {matched_risk}\n" + if matched_associated: + result += f"• 关联风险项: {matched_associated}\n" + result += f"• 策略性质: {strategy_type[0]}\n" + + if processed_strategies: + result += f"• 最优策略: {processed_strategies[0]}\n" + else: + result += "• 最优策略: 未找到匹配策略\n" + + if risk_events: + result += "\n• 风险事件及对应策略:\n" + for i, event in enumerate(risk_events, 1): + result += f" {i}. {event}\n" + event_strategies = query_event_strategies(event) + if event_strategies: + for j, strategy in enumerate(event_strategies, 1): + result += f" - 策略: {strategy}\n" + else: + result += " - 暂无对应策略\n" + else: + result += "\n• 风险事件: 未找到匹配风险事件\n" + + # 显示结果 + result_window = tk.Toplevel(root) + result_window.title("查询结果") + text = scrolledtext.ScrolledText(result_window, wrap=tk.WORD, font=("Arial", 12)) + text.pack(expand=True, fill="both", padx=10, pady=10) + text.insert(tk.END, result) + text.config(state="disabled") + + except Exception as e: + debug_print(f"提交查询异常", {"错误": str(e)}) + messagebox.showerror("错误", f"提交查询时发生错误: {str(e)}") + + +# 测试数据按钮 +def test_data(): + test_inputs = { + "具体风险项": "地震后节点企业的产能下降", + "风险等级": "高", + "风险影响程度": "小", + "风险主体企业": "企业A", + "被影响企业": "企业B" + } + for label, var, _ in fields: + if label in test_inputs: + var.set(test_inputs[label]) + + +ttk.Button(root, text="提交查询", command=submit).pack(pady=20) +ttk.Button(root, text="填充测试数据", command=test_data).pack(pady=10) + +root.mainloop() \ No newline at end of file