gongyinglian/gongyinglia

374 lines
15 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, scrolledtext, messagebox
from py2neo import Graph
import difflib
import random
import warnings
# 过滤PNG图像警告
warnings.filterwarnings("ignore", category=UserWarning, module="PIL")
# Neo4j配置
graph = Graph("bolt://localhost:7687", auth=("neo4j", "Bdf123456789"))
# 调试打印函数(增强缓存展示)
def debug_print(message, cache=None):
print(f"[DEBUG] {message}")
if cache:
print(f"[CACHE] {cache}")
# 模糊匹配函数
def fuzzy_match(query, choices):
debug_print(f"模糊匹配: {query} in {choices}")
if not choices or all(v is None for v in choices):
return None
matches = difflib.get_close_matches(query, choices, n=1, cutoff=0.3)
return matches[0] if matches else None
# 策略推理规则
def infer_strategy(risk_level, impact_level):
strategy_map = {
('高', '大'): ['紧急性策略'],
('高', '中'): ['紧急性策略'],
('高', '小'): ['转移性策略'],
('中', '大'): ['紧急性策略'],
('中', '中'): ['预测性策略'],
('中', '小'): ['转移性策略'],
('低', '大'): ['预测性策略'],
('低', '中'): ['预测性策略'],
('低', '小'): ['预测性策略'],
}
return strategy_map.get((risk_level, impact_level), [])
# 检查供应商是否存在
def check_supplier_exists(supplier_name):
try:
query = "MATCH (s:供应商 {名称: $name}) RETURN s LIMIT 1"
result = graph.run(query, name=supplier_name).data()
exists = bool(result)
debug_print(f"供应商 {supplier_name} 存在性检查", {"存在": exists})
return exists
except Exception as e:
debug_print(f"检查供应商存在性失败", {"错误": str(e)})
return False
# 查询下级供应商(被影响企业作为起点)
def find_downstream_suppliers(supplier_name):
try:
query = "MATCH (s:供应商 {名称: $name})-[:供应零件给]->(downstream:供应商) RETURN downstream.名称 AS 下级供应商"
result = graph.run(query, name=supplier_name).data()
suppliers = [item["下级供应商"] for item in result]
debug_print(f"查询下级供应商", {"供应商": suppliers})
return suppliers
except Exception as e:
debug_print(f"查询下级供应商失败", {"错误": str(e)})
return []
# 查询上级供应商(被影响企业作为终点)
def find_upstream_suppliers(supplier_name):
try:
query = "MATCH (upstream:供应商)-[:供应零件给]->(s:供应商 {名称: $name}) RETURN upstream.名称 AS 上级供应商"
result = graph.run(query, name=supplier_name).data()
suppliers = [item["上级供应商"] for item in result]
debug_print(f"查询上级供应商", {"供应商": suppliers})
return suppliers
except Exception as e:
debug_print(f"查询上级供应商失败", {"错误": str(e)})
return []
# 查询生产物料
def find_produced_materials(supplier_name):
try:
query = "MATCH (s:供应商 {名称: $name})-[:生产零件]->(m:物料) RETURN m.编号 AS 物料编号"
result = graph.run(query, name=supplier_name).data()
materials = [item["物料编号"] for item in result]
debug_print(f"查询生产物料", {"物料": materials})
return materials
except Exception as e:
debug_print(f"查询生产物料失败", {"错误": str(e)})
return []
# 处理供应商关系和策略替换(整合所有步骤)
def process_strategy_replacement(inputs, strategies):
try:
risk_subject = inputs["风险主体企业"]
affected_suppliers = inputs["被影响企业"].split()
# 企业存在性检查
if not check_supplier_exists(risk_subject):
messagebox.showerror("错误", f"风险主体供应商 {risk_subject} 不存在于图谱中!")
return None
for sup in affected_suppliers:
if not check_supplier_exists(sup):
messagebox.showerror("错误", f"被影响供应商 {sup} 不存在于图谱中!")
return None
# 步骤1搜索被影响企业影响的企业下游
affected_influence = None
if affected_suppliers:
downstream = find_downstream_suppliers(affected_suppliers[0])
affected_influence = downstream[0] if downstream else None
debug_print("被影响企业影响的企业", {"企业": affected_influence})
# 步骤2查询供应物料集合
supply_materials = []
for aff in affected_suppliers:
query = """
MATCH (sub:供应商 {名称: $sub})-[rel:供应零件给]->(aff:供应商 {名称: $aff})
RETURN rel.物料编号 AS 物料编号
"""
result = graph.run(query, sub=risk_subject, aff=aff).data()
if result and "物料编号" in result[0]:
supply_materials.append(result[0]["物料编号"])
debug_print("供应物料集合", {"物料": supply_materials})
# 步骤3查询可替换企业集合
replaceable_suppliers = []
if supply_materials:
for mat in supply_materials:
query = """
MATCH (s:供应商)-[:生产零件]->(m:物料 {编号: $mat})
WHERE s.名称 <> $risk_subject
RETURN s.名称 AS 供应商名称
"""
replaceable_suppliers.extend([
item["供应商名称"]
for item in graph.run(query, mat=mat, risk_subject=risk_subject).data()
])
replaceable_suppliers = list(set(replaceable_suppliers))
debug_print("可替换企业集合", {"企业": replaceable_suppliers})
# 步骤4查询被影响企业的下级企业集合上游
lower_level_suppliers = []
if affected_suppliers:
for aff in affected_suppliers:
lower_level_suppliers.extend(find_upstream_suppliers(aff))
lower_level_suppliers = list(set(lower_level_suppliers))
debug_print("下级企业集合", {"企业": lower_level_suppliers})
# 步骤5查询下级企业生产物料集合
lower_level_materials = {}
if lower_level_suppliers:
for sup in lower_level_suppliers:
materials = find_produced_materials(sup)
if materials:
lower_level_materials[sup] = materials
debug_print("下级企业生产物料集合", {"物料": lower_level_materials})
# 步骤6匹配替换企业
replace_supplier = None
if lower_level_materials and supply_materials:
for sup, mats in lower_level_materials.items():
if any(mat in mats for mat in supply_materials):
replace_supplier = sup
debug_print("匹配到替换企业", {"企业": replace_supplier})
break
if not replace_supplier and replaceable_suppliers:
replace_supplier = random.choice(replaceable_suppliers)
debug_print("从可替换企业中选择", {"企业": replace_supplier})
# 替换策略中的符号
processed = []
for s in strategies:
s = s.replace("#", risk_subject)
s = s.replace("*", affected_suppliers[0] if affected_suppliers else "")
s = s.replace("%", affected_influence or "")
s = s.replace("@_1", replace_supplier or "")
processed.append(s)
debug_print("替换后的策略", {"策略": processed})
return processed
except Exception as e:
debug_print(f"策略替换异常", {"错误": str(e)})
messagebox.showerror("错误", f"策略处理失败: {str(e)}")
return None
# 查询风险项策略
def query_risk_strategies(inputs):
try:
if not inputs["具体风险项"] or not inputs["风险主体企业"]:
messagebox.showwarning("输入缺失", "具体风险项和风险主体企业为必填项!")
return None
risk_item = inputs["具体风险项"]
associated_risk = inputs["关联风险项"]
risk_level = inputs["风险等级"]
impact_level = inputs["风险影响程度"]
# 模糊匹配风险项
all_risk_names = [node["名称"] for node in graph.nodes.match("风险项") if "名称" in node]
matched_risk = fuzzy_match(risk_item, all_risk_names)
if not matched_risk:
messagebox.showerror("匹配失败", f"风险项 {risk_item} 未找到!")
return None
matched_associated = None
if associated_risk:
matched_associated = fuzzy_match(associated_risk, all_risk_names)
if not matched_associated:
messagebox.showerror("匹配失败", f"关联风险项 {associated_risk} 未找到!")
return None
# 推理策略性质
strategy_type = infer_strategy(risk_level, impact_level)
if not strategy_type:
return "策略性质推理失败"
# 查询策略
strategies = []
for item in [matched_risk, matched_associated] if matched_associated else [matched_risk]:
query = """
MATCH (risk:风险项 {名称: $item})-[:有策略类]->(strategy:策略类)
MATCH (strategy)-[:策略性质为]->(st:策略性质 {名称: $strategy_type})
RETURN strategy.描述 AS 策略名称
"""
result = graph.run(query, item=item, strategy_type=strategy_type[0]).data()
strategies.extend([r["策略名称"] for r in result])
debug_print("查询到的策略", {"策略": strategies})
return strategies, strategy_type, matched_risk, matched_associated
except Exception as e:
debug_print(f"查询风险策略异常", {"错误": str(e)})
messagebox.showerror("错误", f"查询风险策略失败: {str(e)}")
return None
# 查询风险事件
def query_risk_events(matched_risk, risk_level, impact_level):
query = """
MATCH (risk:风险项 {名称: $risk})-[:有风险事件]->(event:风险事件)
MATCH (event)-[:风险等级为]->(rl {名称: $risk_level})
MATCH (event)-[:风险影响程度为]->(ril {名称: $impact_level})
RETURN event.描述 AS 事件名称
"""
result = graph.run(query, risk=matched_risk, risk_level=risk_level, impact_level=impact_level).data()
events = [r["事件名称"] for r in result]
debug_print("查询到的风险事件", {"事件": events})
return events
# 查询风险事件的策略
def query_event_strategies(event_name):
query = """
MATCH (event:风险事件 {描述: $event_name})-[:策略为]->(strategy:策略)
RETURN strategy.描述 AS 策略描述
"""
result = graph.run(query, event_name=event_name).data()
strategies = [r["策略描述"] for r in result]
debug_print(f"风险事件 {event_name} 的策略", {"策略": strategies})
return strategies
# GUI界面
root = tk.Tk()
root.title("供应链风险查询系统")
root.geometry("800x900")
# 输入字段
fields = [
("风险事件", tk.StringVar(), None),
("具体风险项", tk.StringVar(), None),
("关联风险项", tk.StringVar(), None),
("风险等级", tk.StringVar(value="高"), ["高", "中", "低"]),
("风险影响程度", tk.StringVar(value="大"), ["大", "中", "小"]),
("风险主体企业", tk.StringVar(), None),
("被影响企业", tk.StringVar(), None),
]
# 创建输入界面
for idx, (label, var, options) in enumerate(fields):
frame = ttk.Frame(root)
frame.pack(fill="x", padx=10, pady=5)
ttk.Label(frame, text=label, width=25).pack(side="left")
if options:
ttk.OptionMenu(frame, var, var.get(), *options).pack(side="left", fill="x", expand=True)
else:
ttk.Entry(frame, textvariable=var, width=40).pack(side="left", fill="x", expand=True)
def submit():
try:
inputs = {label: var.get() for label, var, _ in fields}
debug_print("提交查询", {"输入": inputs})
# 查询风险策略
strategy_data = query_risk_strategies(inputs)
if not strategy_data:
return
strategies, strategy_type, matched_risk, matched_associated = strategy_data
# 查询风险事件
risk_events = query_risk_events(matched_risk, inputs["风险等级"], inputs["风险影响程度"])
# 处理供应商关系和策略替换
processed_strategies = process_strategy_replacement(inputs, strategies)
if not processed_strategies:
return
# 构建结果(只显示特定信息)
result = "### 供应链风险分析与策略推荐 ###\n\n"
result += f"• 具体风险项: {matched_risk}\n"
if matched_associated:
result += f"• 关联风险项: {matched_associated}\n"
result += f"• 策略性质: {strategy_type[0]}\n"
if processed_strategies:
result += f"• 最优策略: {processed_strategies[0]}\n"
else:
result += "• 最优策略: 未找到匹配策略\n"
if risk_events:
result += "\n• 风险事件及对应策略:\n"
for i, event in enumerate(risk_events, 1):
result += f" {i}. {event}\n"
event_strategies = query_event_strategies(event)
if event_strategies:
for j, strategy in enumerate(event_strategies, 1):
result += f" - 策略: {strategy}\n"
else:
result += " - 暂无对应策略\n"
else:
result += "\n• 风险事件: 未找到匹配风险事件\n"
# 显示结果
result_window = tk.Toplevel(root)
result_window.title("查询结果")
text = scrolledtext.ScrolledText(result_window, wrap=tk.WORD, font=("Arial", 12))
text.pack(expand=True, fill="both", padx=10, pady=10)
text.insert(tk.END, result)
text.config(state="disabled")
except Exception as e:
debug_print(f"提交查询异常", {"错误": str(e)})
messagebox.showerror("错误", f"提交查询时发生错误: {str(e)}")
# 测试数据按钮
def test_data():
test_inputs = {
"具体风险项": "地震后节点企业的产能下降",
"风险等级": "高",
"风险影响程度": "小",
"风险主体企业": "企业A",
"被影响企业": "企业B"
}
for label, var, _ in fields:
if label in test_inputs:
var.set(test_inputs[label])
ttk.Button(root, text="提交查询", command=submit).pack(pady=20)
ttk.Button(root, text="填充测试数据", command=test_data).pack(pady=10)
root.mainloop()