import tkinter as tk from tkinter import ttk, scrolledtext, messagebox from py2neo import Graph import difflib import random import warnings # 过滤PNG图像警告 warnings.filterwarnings("ignore", category=UserWarning, module="PIL") # Neo4j配置 graph = Graph("bolt://localhost:7687", auth=("neo4j", "Bdf123456789")) # 调试打印函数(增强缓存展示) def debug_print(message, cache=None): print(f"[DEBUG] {message}") if cache: print(f"[CACHE] {cache}") # 模糊匹配函数 def fuzzy_match(query, choices): debug_print(f"模糊匹配: {query} in {choices}") if not choices or all(v is None for v in choices): return None matches = difflib.get_close_matches(query, choices, n=1, cutoff=0.3) return matches[0] if matches else None # 策略推理规则 def infer_strategy(risk_level, impact_level): strategy_map = { ('高', '大'): ['紧急性策略'], ('高', '中'): ['紧急性策略'], ('高', '小'): ['转移性策略'], ('中', '大'): ['紧急性策略'], ('中', '中'): ['预测性策略'], ('中', '小'): ['转移性策略'], ('低', '大'): ['预测性策略'], ('低', '中'): ['预测性策略'], ('低', '小'): ['预测性策略'], } return strategy_map.get((risk_level, impact_level), []) # 检查供应商是否存在 def check_supplier_exists(supplier_name): try: query = "MATCH (s:供应商 {名称: $name}) RETURN s LIMIT 1" result = graph.run(query, name=supplier_name).data() exists = bool(result) debug_print(f"供应商 {supplier_name} 存在性检查", {"存在": exists}) return exists except Exception as e: debug_print(f"检查供应商存在性失败", {"错误": str(e)}) return False # 查询下级供应商(被影响企业作为起点) def find_downstream_suppliers(supplier_name): try: query = "MATCH (s:供应商 {名称: $name})-[:供应零件给]->(downstream:供应商) RETURN downstream.名称 AS 下级供应商" result = graph.run(query, name=supplier_name).data() suppliers = [item["下级供应商"] for item in result] debug_print(f"查询下级供应商", {"供应商": suppliers}) return suppliers except Exception as e: debug_print(f"查询下级供应商失败", {"错误": str(e)}) return [] # 查询上级供应商(被影响企业作为终点) def find_upstream_suppliers(supplier_name): try: query = "MATCH (upstream:供应商)-[:供应零件给]->(s:供应商 {名称: $name}) RETURN upstream.名称 AS 上级供应商" result = graph.run(query, name=supplier_name).data() suppliers = [item["上级供应商"] for item in result] debug_print(f"查询上级供应商", {"供应商": suppliers}) return suppliers except Exception as e: debug_print(f"查询上级供应商失败", {"错误": str(e)}) return [] # 查询生产物料 def find_produced_materials(supplier_name): try: query = "MATCH (s:供应商 {名称: $name})-[:生产零件]->(m:物料) RETURN m.编号 AS 物料编号" result = graph.run(query, name=supplier_name).data() materials = [item["物料编号"] for item in result] debug_print(f"查询生产物料", {"物料": materials}) return materials except Exception as e: debug_print(f"查询生产物料失败", {"错误": str(e)}) return [] # 处理供应商关系和策略替换(整合所有步骤) def process_strategy_replacement(inputs, strategies): try: risk_subject = inputs["风险主体企业"] affected_suppliers = inputs["被影响企业"].split() # 企业存在性检查 if not check_supplier_exists(risk_subject): messagebox.showerror("错误", f"风险主体供应商 {risk_subject} 不存在于图谱中!") return None for sup in affected_suppliers: if not check_supplier_exists(sup): messagebox.showerror("错误", f"被影响供应商 {sup} 不存在于图谱中!") return None # 步骤1:搜索被影响企业影响的企业(下游) affected_influence = None if affected_suppliers: downstream = find_downstream_suppliers(affected_suppliers[0]) affected_influence = downstream[0] if downstream else None debug_print("被影响企业影响的企业", {"企业": affected_influence}) # 步骤2:查询供应物料集合 supply_materials = [] for aff in affected_suppliers: query = """ MATCH (sub:供应商 {名称: $sub})-[rel:供应零件给]->(aff:供应商 {名称: $aff}) RETURN rel.物料编号 AS 物料编号 """ result = graph.run(query, sub=risk_subject, aff=aff).data() if result and "物料编号" in result[0]: supply_materials.append(result[0]["物料编号"]) debug_print("供应物料集合", {"物料": supply_materials}) # 步骤3:查询可替换企业集合 replaceable_suppliers = [] if supply_materials: for mat in supply_materials: query = """ MATCH (s:供应商)-[:生产零件]->(m:物料 {编号: $mat}) WHERE s.名称 <> $risk_subject RETURN s.名称 AS 供应商名称 """ replaceable_suppliers.extend([ item["供应商名称"] for item in graph.run(query, mat=mat, risk_subject=risk_subject).data() ]) replaceable_suppliers = list(set(replaceable_suppliers)) debug_print("可替换企业集合", {"企业": replaceable_suppliers}) # 步骤4:查询被影响企业的下级企业集合(上游) lower_level_suppliers = [] if affected_suppliers: for aff in affected_suppliers: lower_level_suppliers.extend(find_upstream_suppliers(aff)) lower_level_suppliers = list(set(lower_level_suppliers)) debug_print("下级企业集合", {"企业": lower_level_suppliers}) # 步骤5:查询下级企业生产物料集合 lower_level_materials = {} if lower_level_suppliers: for sup in lower_level_suppliers: materials = find_produced_materials(sup) if materials: lower_level_materials[sup] = materials debug_print("下级企业生产物料集合", {"物料": lower_level_materials}) # 步骤6:匹配替换企业 replace_supplier = None if lower_level_materials and supply_materials: for sup, mats in lower_level_materials.items(): if any(mat in mats for mat in supply_materials): replace_supplier = sup debug_print("匹配到替换企业", {"企业": replace_supplier}) break if not replace_supplier and replaceable_suppliers: replace_supplier = random.choice(replaceable_suppliers) debug_print("从可替换企业中选择", {"企业": replace_supplier}) # 替换策略中的符号 processed = [] for s in strategies: s = s.replace("#", risk_subject) s = s.replace("*", affected_suppliers[0] if affected_suppliers else "") s = s.replace("%", affected_influence or "") s = s.replace("@_1", replace_supplier or "") processed.append(s) debug_print("替换后的策略", {"策略": processed}) return processed except Exception as e: debug_print(f"策略替换异常", {"错误": str(e)}) messagebox.showerror("错误", f"策略处理失败: {str(e)}") return None # 查询风险项策略 def query_risk_strategies(inputs): try: if not inputs["具体风险项"] or not inputs["风险主体企业"]: messagebox.showwarning("输入缺失", "具体风险项和风险主体企业为必填项!") return None risk_item = inputs["具体风险项"] associated_risk = inputs["关联风险项"] risk_level = inputs["风险等级"] impact_level = inputs["风险影响程度"] # 模糊匹配风险项 all_risk_names = [node["名称"] for node in graph.nodes.match("风险项") if "名称" in node] matched_risk = fuzzy_match(risk_item, all_risk_names) if not matched_risk: messagebox.showerror("匹配失败", f"风险项 {risk_item} 未找到!") return None matched_associated = None if associated_risk: matched_associated = fuzzy_match(associated_risk, all_risk_names) if not matched_associated: messagebox.showerror("匹配失败", f"关联风险项 {associated_risk} 未找到!") return None # 推理策略性质 strategy_type = infer_strategy(risk_level, impact_level) if not strategy_type: return "策略性质推理失败" # 查询策略 strategies = [] for item in [matched_risk, matched_associated] if matched_associated else [matched_risk]: query = """ MATCH (risk:风险项 {名称: $item})-[:有策略类]->(strategy:策略类) MATCH (strategy)-[:策略性质为]->(st:策略性质 {名称: $strategy_type}) RETURN strategy.描述 AS 策略名称 """ result = graph.run(query, item=item, strategy_type=strategy_type[0]).data() strategies.extend([r["策略名称"] for r in result]) debug_print("查询到的策略", {"策略": strategies}) return strategies, strategy_type, matched_risk, matched_associated except Exception as e: debug_print(f"查询风险策略异常", {"错误": str(e)}) messagebox.showerror("错误", f"查询风险策略失败: {str(e)}") return None # 查询风险事件 def query_risk_events(matched_risk, risk_level, impact_level): query = """ MATCH (risk:风险项 {名称: $risk})-[:有风险事件]->(event:风险事件) MATCH (event)-[:风险等级为]->(rl {名称: $risk_level}) MATCH (event)-[:风险影响程度为]->(ril {名称: $impact_level}) RETURN event.描述 AS 事件名称 """ result = graph.run(query, risk=matched_risk, risk_level=risk_level, impact_level=impact_level).data() events = [r["事件名称"] for r in result] debug_print("查询到的风险事件", {"事件": events}) return events # 查询风险事件的策略 def query_event_strategies(event_name): query = """ MATCH (event:风险事件 {描述: $event_name})-[:策略为]->(strategy:策略) RETURN strategy.描述 AS 策略描述 """ result = graph.run(query, event_name=event_name).data() strategies = [r["策略描述"] for r in result] debug_print(f"风险事件 {event_name} 的策略", {"策略": strategies}) return strategies # GUI界面 root = tk.Tk() root.title("供应链风险查询系统") root.geometry("800x900") # 输入字段 fields = [ ("风险事件", tk.StringVar(), None), ("具体风险项", tk.StringVar(), None), ("关联风险项", tk.StringVar(), None), ("风险等级", tk.StringVar(value="高"), ["高", "中", "低"]), ("风险影响程度", tk.StringVar(value="大"), ["大", "中", "小"]), ("风险主体企业", tk.StringVar(), None), ("被影响企业", tk.StringVar(), None), ] # 创建输入界面 for idx, (label, var, options) in enumerate(fields): frame = ttk.Frame(root) frame.pack(fill="x", padx=10, pady=5) ttk.Label(frame, text=label, width=25).pack(side="left") if options: ttk.OptionMenu(frame, var, var.get(), *options).pack(side="left", fill="x", expand=True) else: ttk.Entry(frame, textvariable=var, width=40).pack(side="left", fill="x", expand=True) def submit(): try: inputs = {label: var.get() for label, var, _ in fields} debug_print("提交查询", {"输入": inputs}) # 查询风险策略 strategy_data = query_risk_strategies(inputs) if not strategy_data: return strategies, strategy_type, matched_risk, matched_associated = strategy_data # 查询风险事件 risk_events = query_risk_events(matched_risk, inputs["风险等级"], inputs["风险影响程度"]) # 处理供应商关系和策略替换 processed_strategies = process_strategy_replacement(inputs, strategies) if not processed_strategies: return # 构建结果(只显示特定信息) result = "### 供应链风险分析与策略推荐 ###\n\n" result += f"• 具体风险项: {matched_risk}\n" if matched_associated: result += f"• 关联风险项: {matched_associated}\n" result += f"• 策略性质: {strategy_type[0]}\n" if processed_strategies: result += f"• 最优策略: {processed_strategies[0]}\n" else: result += "• 最优策略: 未找到匹配策略\n" if risk_events: result += "\n• 风险事件及对应策略:\n" for i, event in enumerate(risk_events, 1): result += f" {i}. {event}\n" event_strategies = query_event_strategies(event) if event_strategies: for j, strategy in enumerate(event_strategies, 1): result += f" - 策略: {strategy}\n" else: result += " - 暂无对应策略\n" else: result += "\n• 风险事件: 未找到匹配风险事件\n" # 显示结果 result_window = tk.Toplevel(root) result_window.title("查询结果") text = scrolledtext.ScrolledText(result_window, wrap=tk.WORD, font=("Arial", 12)) text.pack(expand=True, fill="both", padx=10, pady=10) text.insert(tk.END, result) text.config(state="disabled") except Exception as e: debug_print(f"提交查询异常", {"错误": str(e)}) messagebox.showerror("错误", f"提交查询时发生错误: {str(e)}") # 测试数据按钮 def test_data(): test_inputs = { "具体风险项": "地震后节点企业的产能下降", "风险等级": "高", "风险影响程度": "小", "风险主体企业": "企业A", "被影响企业": "企业B" } for label, var, _ in fields: if label in test_inputs: var.set(test_inputs[label]) ttk.Button(root, text="提交查询", command=submit).pack(pady=20) ttk.Button(root, text="填充测试数据", command=test_data).pack(pady=10) root.mainloop()