From 8a37f3601f6d066ec41f58f7050c8a79afd07066 Mon Sep 17 00:00:00 2001
From: Hgq <2757430053@qq.com>
Date: Wed, 3 Dec 2025 14:59:14 +0800
Subject: [PATCH] =?UTF-8?q?=E6=97=A0=E6=8A=A5=E9=94=99=E4=BB=85=E8=AD=A6?=
=?UTF-8?q?=E5=91=8A=E7=89=88=E6=9C=AC?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.idea/.gitignore | 3 +
.../inspectionProfiles/profiles_settings.xml | 6 +
.idea/misc.xml | 7 +
.idea/modules.xml | 8 +
.idea/test.iml | 8 +
.idea/vcs.xml | 6 +
main.py | 876 ++++++++++++++++++
7 files changed, 914 insertions(+)
create mode 100644 .idea/.gitignore
create mode 100644 .idea/inspectionProfiles/profiles_settings.xml
create mode 100644 .idea/misc.xml
create mode 100644 .idea/modules.xml
create mode 100644 .idea/test.iml
create mode 100644 .idea/vcs.xml
create mode 100644 main.py
diff --git a/.idea/.gitignore b/.idea/.gitignore
new file mode 100644
index 0000000..359bb53
--- /dev/null
+++ b/.idea/.gitignore
@@ -0,0 +1,3 @@
+# 默认忽略的文件
+/shelf/
+/workspace.xml
diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml
new file mode 100644
index 0000000..105ce2d
--- /dev/null
+++ b/.idea/inspectionProfiles/profiles_settings.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
new file mode 100644
index 0000000..a6218fe
--- /dev/null
+++ b/.idea/misc.xml
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/modules.xml b/.idea/modules.xml
new file mode 100644
index 0000000..51ab974
--- /dev/null
+++ b/.idea/modules.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/test.iml b/.idea/test.iml
new file mode 100644
index 0000000..909438d
--- /dev/null
+++ b/.idea/test.iml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/.idea/vcs.xml b/.idea/vcs.xml
new file mode 100644
index 0000000..94a25f7
--- /dev/null
+++ b/.idea/vcs.xml
@@ -0,0 +1,6 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/main.py b/main.py
new file mode 100644
index 0000000..3747c9e
--- /dev/null
+++ b/main.py
@@ -0,0 +1,876 @@
+import numpy as np
+import random
+from typing import List, Dict, Tuple, Optional
+import matplotlib.pyplot as plt
+
+
+# ============================= 数据输入模块(用户需填写)=============================
+class OrderData:
+ """订单基础数据类(对应文档表四7符号)"""
+
+ def __init__(self):
+ # 1. 订单核心参数
+ self.I = 3 # 物料种类数(用户修改)
+ self.Q = [100, 150, 200] # 物料i待生产量Qi(索引0对应物料1,用户修改)
+ self.Dd = 10 # 订单需求交货期(时长,用户修改)
+
+ # 2. 物料基础成本(风险企业供应)
+ self.P = [50.0, 80.0, 60.0] # 物料i单位采购价格P_i(用户修改)
+ self.T0 = [5.0, 8.0, 6.0] # 物料i风险企业运输成本T_i^0(用户修改)
+
+ # 3. 惩罚系数(用户可按文档建议调整)
+ self.alpha = 2.0 # C1增长系数
+ self.beta = 1.0 # 时间尺度系数
+ self.delta = 1.3 # 惩罚倍数(1.2-1.5)
+ self.gamma = 0.5 # 提前交付惩罚系数
+
+
+class RiskEnterpriseData:
+ """风险企业数据类(对应文档表四7符号)"""
+
+ def __init__(self):
+ self.C0_max = 30.0 # 风险企业单位时间最大生产能力C0^max(用户修改)
+ self.C0_std = 20.0 # 风险企业单位时间标准生产能力(用户修改)
+ self.L1 = 3.0 # 扰动持续时长系数下限(关键订单3,一般订单2,用户修改)
+ self.L2 = 10.0 # 扰动持续时长系数上限(关键订单10,一般订单5,用户修改)
+ self.distance = 10.0 # 风险企业位置距离(用户修改)
+
+
+class SupplierData:
+ """供应商数据类(对应文档表四7符号)"""
+
+ def __init__(self, order: OrderData):
+ self.I = order.I
+ # Ji:物料i的供应商个数(索引0对应物料1,用户修改)
+ self.J = [2, 3, 2] # 物料1有2个供应商,物料2有3个,物料3有2个
+ # 供应商j的基础参数(按物料i分组,用户修改)
+ # 结构:self.suppliers[i][j] = {参数},i=物料索引,j=供应商索引(0开始)
+ self.suppliers = [
+ # 物料1的2个供应商
+ [
+ {"Cj_max": 40.0, "Cj_std": 30.0, "P_ij": 55.0, "T_ij": 7.0, "MinOrder": 20, "MaxOrder": 100,
+ "distance": 15.0},
+ {"Cj_max": 35.0, "Cj_std": 25.0, "P_ij": 52.0, "T_ij": 9.0, "MinOrder": 15, "MaxOrder": 90,
+ "distance": 20.0}
+ ],
+ # 物料2的3个供应商
+ [
+ {"Cj_max": 50.0, "Cj_std": 40.0, "P_ij": 85.0, "T_ij": 6.0, "MinOrder": 25, "MaxOrder": 150,
+ "distance": 12.0},
+ {"Cj_max": 45.0, "Cj_std": 35.0, "P_ij": 82.0, "T_ij": 8.0, "MinOrder": 20, "MaxOrder": 120,
+ "distance": 18.0},
+ {"Cj_max": 55.0, "Cj_std": 45.0, "P_ij": 88.0, "T_ij": 5.0, "MinOrder": 30, "MaxOrder": 160,
+ "distance": 10.0}
+ ],
+ # 物料3的2个供应商
+ [
+ {"Cj_max": 42.0, "Cj_std": 32.0, "P_ij": 65.0, "T_ij": 8.0, "MinOrder": 22, "MaxOrder": 200,
+ "distance": 16.0},
+ {"Cj_max": 38.0, "Cj_std": 28.0, "P_ij": 63.0, "T_ij": 10.0, "MinOrder": 18, "MaxOrder": 180,
+ "distance": 22.0}
+ ]
+ ]
+
+
+class AlgorithmParams:
+ """算法参数类(默认值可修改)"""
+
+ def __init__(self):
+ self.N = 100 # 种群规模
+ self.Pm = 0.05 # 变异概率
+ self.Pc = 0.8 # 交叉概率
+ self.MaxIter = 200 # 最大迭代次数
+ self.tournament_size = 3 # 锦标赛选择规模
+ # 种群比例 N1:N2:N3:N4 = 3:3:2:2
+ self.N1_ratio = 0.3
+ self.N2_ratio = 0.3
+ self.N3_ratio = 0.2
+ self.N4_ratio = 0.2
+
+
+# ============================= 辅助计算函数(文档对应函数)=============================
+def calculate_RNum(P_Num: float, A_Num: float) -> float:
+ """计算剩余待生产数量R_Num = A_Num - P_Num(文档1.2.2节)"""
+ return max(0.0, A_Num - P_Num)
+
+
+def calculate_ResTime(DueTime: float, riskTime: float) -> float:
+ """计算剩余交货期限ResTime = DueTime - riskTime(文档1.2.2节)"""
+ return max(0.0, DueTime - riskTime)
+
+
+def calculate_TimeLevel(Td: float, ResTime: float, L1: float, L2: float) -> str:
+ """计算扰动持续程度TimeLevel(文档1.2.2节)"""
+ if ResTime == 0:
+ return "长"
+ ratio = Td / ResTime
+ if ratio <= L1:
+ return "短"
+ elif ratio >= L2:
+ return "长"
+ else:
+ return "中"
+
+
+def calculate_CapLevel(R_Num: float, ResTime: float, C: float) -> str:
+ """计算订单生产能力等级CapLevel(文档1.2.2节)"""
+ if ResTime == 0:
+ return "不足"
+ capacity = C * ResTime # 总生产能力
+ if capacity >= R_Num * 1.2: # 预留20%缓冲
+ return "充足"
+ elif capacity <= R_Num * 0.8: # 缺口20%以上
+ return "不足"
+ else:
+ return "一般"
+
+
+# ============================= 染色体与种群类=============================
+class Chromosome:
+ """染色体类(三层编码结构)"""
+
+ def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData):
+ self.order = order
+ self.risk_ent = risk_ent
+ self.supplier = supplier
+ self.I = order.I
+ self.J_list = supplier.J # 各物料的供应商个数
+
+ # 三层编码初始化(随机生成基础结构)
+ self.enterprise_layer: List[List[int]] = [] # 企业编码层:I×(Ji+1),0/1
+ self.capacity_layer: List[List[float]] = [] # 能力编码层:I×(Ji+1),生产能力
+ self.quantity_layer: List[List[float]] = [] # 数量编码层:I×(Ji+1),分配数量
+
+ for i in range(self.I):
+ Ji = self.J_list[i]
+ total_ents = Ji + 1 # 1个风险企业 + Ji个供应商
+
+ # 企业编码层:随机生成0/1,确保至少有1个1
+ ent_codes = [random.randint(0, 1) for _ in range(total_ents)]
+ if sum(ent_codes) == 0:
+ ent_codes[random.randint(0, total_ents - 1)] = 1 # 强制激活1个
+ self.enterprise_layer.append(ent_codes)
+
+ # 能力编码层:基于企业标准生产能力随机生成
+ cap_codes = []
+ for j in range(total_ents):
+ if j == 0: # 风险企业
+ max_cap = risk_ent.C0_max
+ std_cap = risk_ent.C0_std
+ else: # 供应商j-1(索引转换)
+ max_cap = supplier.suppliers[i][j - 1]["Cj_max"]
+ std_cap = supplier.suppliers[i][j - 1]["Cj_std"]
+ # 在[0.5×std_cap, max_cap]范围内随机
+ cap = random.uniform(0.5 * std_cap, max_cap)
+ cap_codes.append(cap)
+ self.capacity_layer.append(cap_codes)
+
+ # 数量编码层:基于物料待生产量随机分配
+ qty_codes = [0.0] * total_ents
+ Qi = order.Q[i]
+ selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1]
+ if len(selected_ents) > 0:
+ # 随机分配数量(确保总和=Qi)
+ qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi
+ for idx, ent_idx in enumerate(selected_ents):
+ qty_codes[ent_idx] = qtys[idx]
+ self.quantity_layer.append(qty_codes)
+
+ # 修复初始染色体(确保满足约束)
+ self.repair()
+
+ def repair(self):
+ """染色体修复机制(文档1.4.1节)"""
+ # 步骤1:检查每类物料是否有供应方(物料生产约束)
+ for i in range(self.I):
+ ent_codes = self.enterprise_layer[i]
+ if sum(ent_codes) == 0:
+ # 随机激活1个企业
+ active_idx = random.randint(0, len(ent_codes) - 1)
+ self.enterprise_layer[i][active_idx] = 1
+
+ # 步骤2:检查企业最大生产能力约束
+ self._repair_capacity_constraint()
+
+ # 步骤3:检查物料数量约束(总和=Qi,供应商起订量/最大供应量)
+ self._repair_quantity_constraint()
+
+ def _repair_capacity_constraint(self):
+ """修复生产能力约束(修复属性名不匹配问题)"""
+ # 步骤1:修复风险企业生产能力(原逻辑不变)
+ risk_total_cap = 0.0
+ risk_assignments = [] # 存储(i, cap):物料i分配给风险企业的能力
+ for i in range(self.I):
+ if self.enterprise_layer[i][0] == 1: # 风险企业被选中
+ cap = self.capacity_layer[i][0]
+ risk_total_cap += cap
+ risk_assignments.append((i, cap))
+
+ # 若超出最大产能,剔除最小能力的物料
+ while risk_total_cap > self.risk_ent.C0_max and len(risk_assignments) > 0:
+ risk_assignments.sort(key=lambda x: x[1])
+ min_i, min_cap = risk_assignments.pop(0)
+ self.enterprise_layer[min_i][0] = 0 # 取消选择风险企业
+ self.capacity_layer[min_i][0] = 0.0
+ self.quantity_layer[min_i][0] = 0.0
+ risk_total_cap -= min_cap
+
+ # 步骤2:修复各供应商生产能力(核心修复:J_list → J)
+ for i in range(self.I): # 遍历每个物料i,找到该物料的所有供应商编码索引
+ Ji = self.J_list[i] # 物料i的供应商个数(注:此处self.J_list是Chromosome类自身属性,无需修改)
+ for j in range(1, Ji + 1): # j=1~Ji:物料i的供应商编码索引(对应供应商j-1)
+ supplier_idx = j - 1 # 供应商在suppliers列表中的索引(0开始)
+ supp_total_cap = 0.0
+ supp_assignments = []
+
+ # 遍历所有物料k,计算该供应商(j索引)在各物料上的总产能
+ for k in range(self.I):
+ # 核心修复:self.supplier.J_list[k] → self.supplier.J[k]
+ if j >= len(self.enterprise_layer[k]) or supplier_idx >= self.supplier.J[k]:
+ continue # 跳过无该供应商的物料,避免索引越界
+
+ # 若物料k选择了该供应商(j索引),累加产能
+ if self.enterprise_layer[k][j] == 1:
+ cap = self.capacity_layer[k][j]
+ supp_total_cap += cap
+ supp_assignments.append((k, cap))
+
+ # 若该供应商总产能超出最大产能,剔除最小能力的物料
+ if len(supp_assignments) == 0:
+ continue # 无物料分配给该供应商,跳过
+
+ # 获取该供应商在当前物料k上的最大产能(修正:用第一个分配的物料k)
+ first_k = supp_assignments[0][0]
+ supp_max_cap = self.supplier.suppliers[first_k][supplier_idx]["Cj_max"]
+
+ # 剔除超出产能的物料分配
+ while supp_total_cap > supp_max_cap and len(supp_assignments) > 0:
+ supp_assignments.sort(key=lambda x: x[1])
+ min_k, min_cap = supp_assignments.pop(0)
+ self.enterprise_layer[min_k][j] = 0 # 取消该物料对该供应商的选择
+ self.capacity_layer[min_k][j] = 0.0
+ self.quantity_layer[min_k][j] = 0.0
+ supp_total_cap -= min_cap
+
+ def _repair_quantity_constraint(self):
+ """修复物料数量约束(修复除以零错误)"""
+ for i in range(self.I):
+ Qi = self.order.Q[i]
+ ent_codes = self.enterprise_layer[i]
+ qty_codes = self.quantity_layer[i]
+ selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1]
+ current_total = sum(qty_codes)
+
+ # 调整总量至Qi(允许微小误差)
+ if abs(current_total - Qi) > 1e-6:
+ ratio = Qi / (current_total + 1e-10) # 避免初始总量为0时除以零
+ for idx in selected_ents:
+ qty_codes[idx] *= ratio
+
+ # 检查供应商起订量和最大供应量
+ Ji = self.J_list[i]
+ for j in range(1, Ji + 1): # 供应商j-1
+ if ent_codes[j] == 1:
+ min_order = self.supplier.suppliers[i][j - 1]["MinOrder"]
+ max_order = self.supplier.suppliers[i][j - 1]["MaxOrder"]
+ qty = qty_codes[j]
+
+ if qty < min_order:
+ # 低于起订量:要么增加到起订量,要么取消选择
+ if sum(qty_codes) + (min_order - qty) <= Qi * 1.1: # 允许10%缓冲
+ qty_codes[j] = min_order
+ # 从其他选中企业中减少对应数量
+ surplus_reduce = min_order - qty
+ other_ents = [idx for idx in selected_ents if idx != j]
+ if other_ents:
+ sum_other_qty = sum(qty_codes[idx] for idx in other_ents)
+ if sum_other_qty > 1e-10: # 避免除以零
+ reduce_ratio = surplus_reduce / sum_other_qty
+ for idx in other_ents:
+ qty_codes[idx] *= (1 - reduce_ratio)
+ else:
+ # 其他企业无数量可减,直接取消当前供应商选择
+ ent_codes[j] = 0
+ qty_codes[j] = 0.0
+ selected_ents.remove(j)
+ else:
+ ent_codes[j] = 0
+ qty_codes[j] = 0.0
+ selected_ents.remove(j)
+
+ elif qty > max_order:
+ # 超过最大供应量:调整到最大值,剩余差额分配给其他企业
+ qty_codes[j] = max_order
+ surplus_add = qty - max_order # 此处原逻辑笔误:应为当前qty - max_order,即超出部分转为需补充的差额
+ other_ents = [idx for idx in selected_ents if idx != j]
+ if other_ents:
+ # 核心修复:检查other_ents的数量总和是否为零
+ sum_other_qty = sum(qty_codes[idx] for idx in other_ents)
+ if sum_other_qty > 1e-10: # 总和非零,按比例分配
+ add_ratio = surplus_add / sum_other_qty
+ for idx in other_ents:
+ qty_codes[idx] *= (1 + add_ratio)
+ else:
+ # 总和为零,直接将差额分配给第一个其他企业
+ target_ent = other_ents[0]
+ # 确保不超过目标企业的最大供应量(若为供应商)
+ if target_ent == 0: # 目标是风险企业(无最大供应量限制,仅需满足产能)
+ qty_codes[target_ent] += surplus_add
+ else: # 目标是供应商,需检查最大供应量
+ supp_max_order = self.supplier.suppliers[i][target_ent - 1]["MaxOrder"]
+ add_qty = min(surplus_add, supp_max_order - qty_codes[target_ent])
+ qty_codes[target_ent] += add_qty
+ # 若仍有剩余差额(目标企业已达最大供应量),递归处理
+ remaining_surplus = surplus_add - add_qty
+ if remaining_surplus > 1e-6:
+ other_ents.remove(target_ent)
+ if other_ents:
+ # 重新计算剩余差额分配
+ sum_other_qty_new = sum(qty_codes[idx] for idx in other_ents)
+ if sum_other_qty_new > 1e-10:
+ add_ratio_new = remaining_surplus / sum_other_qty_new
+ for idx in other_ents:
+ qty_codes[idx] *= (1 + add_ratio_new)
+ else:
+ # 继续分配给下一个企业
+ for next_ent in other_ents:
+ if remaining_surplus <= 1e-6:
+ break
+ if next_ent == 0:
+ qty_codes[next_ent] += remaining_surplus
+ remaining_surplus = 0
+ else:
+ supp_max = self.supplier.suppliers[i][next_ent - 1]["MaxOrder"]
+ add = min(remaining_surplus, supp_max - qty_codes[next_ent])
+ qty_codes[next_ent] += add
+ remaining_surplus -= add
+
+ # 最终确保总和=Qi(避免累积误差)
+ current_total = sum(qty_codes)
+ if abs(current_total - Qi) > 1e-6 and selected_ents:
+ ratio = Qi / (current_total + 1e-10) # 加微小值避免除以零
+ for idx in selected_ents:
+ qty_codes[idx] *= ratio
+
+ def calculate_fitness(self) -> Tuple[float, float]:
+ """计算适应度(变更成本C,交付延期T)"""
+ C1 = self._calculate_C1()
+ C2 = self._calculate_C2()
+ C3 = self._calculate_C3()
+ C4 = self._calculate_C4()
+ C = C1 + C2 + C3 + C4
+
+ T = self._calculate_T()
+ return C, T
+
+ def _calculate_C1(self) -> float:
+ """计算变更惩罚成本C1"""
+ # 简化:假设ResTime=Dd(风险识别时刻为0),R_Num=Qi
+ ResTime = self.order.Dd
+ total_C1 = 0.0
+ for i in range(self.I):
+ Qi = self.order.Q[i]
+ # 最大惩罚成本=δ*(采购成本+运输成本)*Qi
+ max_penalty = self.order.delta * (self.order.P[i] + self.order.T0[i]) * Qi
+ # 指数惩罚项
+ exp_penalty = self.order.delta * Qi * np.exp(self.order.alpha / (ResTime + self.order.beta))
+ total_C1 += min(max_penalty, exp_penalty)
+ return total_C1
+
+ def _calculate_C2(self) -> float:
+ """计算采购变更成本C2"""
+ total_purchase_risk = 0.0 # 原风险企业采购成本
+ total_purchase_supp = 0.0 # 变更后供应商采购成本
+ for i in range(self.I):
+ # 风险企业采购成本
+ if self.enterprise_layer[i][0] == 1:
+ total_purchase_risk += self.quantity_layer[i][0] * self.order.P[i]
+ # 供应商采购成本
+ Ji = self.J_list[i]
+ for j in range(1, Ji + 1):
+ if self.enterprise_layer[i][j] == 1:
+ supp_p = self.supplier.suppliers[i][j - 1]["P_ij"]
+ total_purchase_supp += self.quantity_layer[i][j] * supp_p
+ return total_purchase_supp - total_purchase_risk
+
+ def _calculate_C3(self) -> float:
+ """计算运输变更成本C3"""
+ total_trans_risk = 0.0 # 原风险企业运输成本
+ total_trans_supp = 0.0 # 变更后供应商运输成本
+ for i in range(self.I):
+ # 风险企业运输成本
+ if self.enterprise_layer[i][0] == 1:
+ total_trans_risk += self.quantity_layer[i][0] * self.order.T0[i]
+ # 供应商运输成本
+ Ji = self.J_list[i]
+ for j in range(1, Ji + 1):
+ if self.enterprise_layer[i][j] == 1:
+ supp_t = self.supplier.suppliers[i][j - 1]["T_ij"]
+ total_trans_supp += self.quantity_layer[i][j] * supp_t
+ return total_trans_supp - total_trans_risk
+
+ def _calculate_C4(self) -> float:
+ """计算提前交付惩罚成本C4"""
+ delivery_dates = self._get_delivery_dates()
+ actual_delivery = max(delivery_dates)
+ advance = self.order.Dd - actual_delivery
+ return self.order.gamma * max(0.0, advance)
+
+ def _calculate_T(self) -> float:
+ """计算交付延期T"""
+ delivery_dates = self._get_delivery_dates()
+ actual_delivery = max(delivery_dates)
+ delay = actual_delivery - self.order.Dd
+ return max(0.0, delay)
+
+ def _get_delivery_dates(self) -> List[float]:
+ """计算各企业的交付日期(生产耗时+运输耗时)"""
+ delivery_dates = []
+ for i in range(self.I):
+ # 风险企业交付日期
+ if self.enterprise_layer[i][0] == 1:
+ qty = self.quantity_layer[i][0]
+ cap = self.capacity_layer[i][0]
+ if cap == 0:
+ prod_time = 0.0
+ else:
+ prod_time = qty / cap # 生产耗时
+ trans_time = self.risk_ent.distance / 10.0 # 简化运输耗时(距离/10)
+ delivery_dates.append(prod_time + trans_time)
+ # 供应商交付日期
+ Ji = self.J_list[i]
+ for j in range(1, Ji + 1):
+ if self.enterprise_layer[i][j] == 1:
+ qty = self.quantity_layer[i][j]
+ cap = self.capacity_layer[i][j]
+ if cap == 0:
+ prod_time = 0.0
+ else:
+ prod_time = qty / cap
+ trans_time = self.supplier.suppliers[i][j - 1]["distance"] / 10.0
+ delivery_dates.append(prod_time + trans_time)
+
+ # 新增:如果没有任何交付日期(空列表),返回一个默认值
+ if not delivery_dates:
+ # 返回一个很大的值表示无法交付,触发最大延期惩罚
+ return [float('inf')]
+ return delivery_dates
+
+
+class Population:
+ """种群类"""
+
+ def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData, params: AlgorithmParams):
+ self.order = order
+ self.risk_ent = risk_ent
+ self.supplier = supplier
+ self.params = params
+ self.N = params.N
+ self.population: List[Chromosome] = []
+
+ # 初始化种群(混合策略)
+ self._initialize_population()
+
+ def _initialize_population(self):
+ """混合种群初始化(文档1.4.1节)"""
+ N1 = int(self.N * self.params.N1_ratio) # 变更成本最小导向
+ N2 = int(self.N * self.params.N2_ratio) # 交付延期最短导向
+ N3 = int(self.N * self.params.N3_ratio) # 风险企业导向
+ N4 = self.N - N1 - N2 - N3 # 随机种群
+
+ # 1. 生成N1:变更成本最小导向
+ self.population.extend(self._generate_target_oriented_pop(N1, target="cost"))
+
+ # 2. 生成N2:交付延期最短导向
+ self.population.extend(self._generate_target_oriented_pop(N2, target="delay"))
+
+ # 3. 生成N3:风险企业导向
+ self.population.extend(self._generate_risk_ent_oriented_pop(N3))
+
+ # 4. 生成N4:随机种群
+ self.population.extend([Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(N4)])
+
+ def _generate_target_oriented_pop(self, size: int, target: str) -> List[Chromosome]:
+ """生成目标函数导向种群(文档1.4.1节)"""
+ if size <= 0:
+ return []
+
+ # 随机生成2*size个候选解
+ candidates = [Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(2 * size)]
+
+ # 计算适应度并排序
+ if target == "cost":
+ # 按变更成本升序排序
+ candidates.sort(key=lambda x: x.calculate_fitness()[0])
+ else: # target == "delay"
+ # 按交付延期升序排序
+ candidates.sort(key=lambda x: x.calculate_fitness()[1])
+
+ # 选择前size个
+ return candidates[:size]
+
+ def _generate_risk_ent_oriented_pop(self, size: int) -> List[Chromosome]:
+ """生成风险企业导向种群(文档1.4.1节)"""
+ if size <= 0:
+ return []
+
+ candidates = []
+ for _ in range(2 * size):
+ chrom = Chromosome(self.order, self.risk_ent, self.supplier)
+ # 强制风险企业选中所有物料
+ for i in range(self.order.I):
+ chrom.enterprise_layer[i][0] = 1 # yi0=1
+ # 修复染色体
+ chrom.repair()
+ candidates.append(chrom)
+
+ # 计算Pareto Rank,选择Rank最低的size个
+ ranked_candidates = self._fast_non_dominated_sort(candidates)[0] # Rank=0的解
+ if len(ranked_candidates) >= size:
+ return random.sample(ranked_candidates, size)
+ else:
+ # 不足时补充其他Rank的解
+ all_ranked = self._fast_non_dominated_sort(candidates)
+ selected = []
+ for rank in all_ranked:
+ selected.extend(rank)
+ if len(selected) >= size:
+ return selected[:size]
+ return selected
+
+ def _fast_non_dominated_sort(self, population: List[Chromosome]) -> List[List[Chromosome]]:
+ """快速非支配排序(标准NSGA-II算法)"""
+ dominated_sets = [[] for _ in range(len(population))]
+ domination_counts = [0] * len(population)
+ ranks = [0] * len(population)
+ fronts = [[]]
+
+ # 计算每个个体的支配关系
+ for p_idx in range(len(population)):
+ p_cost, p_delay = population[p_idx].calculate_fitness()
+ for q_idx in range(len(population)):
+ if p_idx == q_idx:
+ continue
+ q_cost, q_delay = population[q_idx].calculate_fitness()
+
+ # p支配q:p的两个目标都不劣于q,且至少一个目标更优
+ if (p_cost <= q_cost and p_delay <= q_delay) and (p_cost < q_cost or p_delay < q_delay):
+ dominated_sets[p_idx].append(q_idx)
+ # q支配p
+ elif (q_cost <= p_cost and q_delay <= p_delay) and (q_cost < p_cost or q_delay < q_delay):
+ domination_counts[p_idx] += 1
+
+ # 初始前沿(Rank=0)
+ if domination_counts[p_idx] == 0:
+ ranks[p_idx] = 0
+ fronts[0].append(population[p_idx])
+
+ # 生成后续前沿
+ rank = 1
+ while True:
+ current_front = []
+ for p_idx in range(len(population)):
+ if ranks[p_idx] == rank - 1:
+ for q_idx in dominated_sets[p_idx]:
+ domination_counts[q_idx] -= 1
+ if domination_counts[q_idx] == 0:
+ ranks[q_idx] = rank
+ current_front.append(population[q_idx])
+ if not current_front:
+ break
+ fronts.append(current_front)
+ rank += 1
+
+ return fronts
+
+ def _calculate_crowding_distance(self, front: List[Chromosome]) -> List[float]:
+ """计算拥挤度(标准NSGA-II算法)"""
+ n = len(front)
+ if n == 0:
+ return []
+ distances = [0.0] * n
+
+ # 按变更成本排序
+ cost_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[0])
+ # 边界个体拥挤度设为无穷大
+ distances[cost_sorted[0][0]] = float('inf')
+ distances[cost_sorted[-1][0]] = float('inf')
+ # 计算中间个体拥挤度
+ max_cost = cost_sorted[-1][1].calculate_fitness()[0]
+ min_cost = cost_sorted[0][1].calculate_fitness()[0]
+ if max_cost - min_cost > 1e-6:
+ for i in range(1, n - 1):
+ prev_cost = cost_sorted[i - 1][1].calculate_fitness()[0]
+ next_cost = cost_sorted[i + 1][1].calculate_fitness()[0]
+ distances[cost_sorted[i][0]] += (next_cost - prev_cost) / (max_cost - min_cost)
+
+ # 按交付延期排序
+ delay_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[1])
+ # 边界个体拥挤度设为无穷大
+ distances[delay_sorted[0][0]] = float('inf')
+ distances[delay_sorted[-1][0]] = float('inf')
+ # 计算中间个体拥挤度
+ max_delay = delay_sorted[-1][1].calculate_fitness()[1]
+ min_delay = delay_sorted[0][1].calculate_fitness()[1]
+ if max_delay - min_delay > 1e-6:
+ for i in range(1, n - 1):
+ prev_delay = delay_sorted[i - 1][1].calculate_fitness()[1]
+ next_delay = delay_sorted[i + 1][1].calculate_fitness()[1]
+ distances[delay_sorted[i][0]] += (next_delay - prev_delay) / (max_delay - min_delay)
+
+ return distances
+
+
+# ============================= 遗传操作函数=============================
+def crossover(parent1: Chromosome, parent2: Chromosome, Pc: float) -> Tuple[Chromosome, Chromosome]:
+ """两点交叉(文档1.4.1节)"""
+ if random.random() > Pc:
+ return parent1, parent2
+
+ # 复制父代染色体
+ child1 = Chromosome(parent1.order, parent1.risk_ent, parent1.supplier)
+ child2 = Chromosome(parent2.order, parent2.risk_ent, parent2.supplier)
+
+ # 确定染色体总基因长度(每层基因数相同)
+ total_genes_per_layer = sum([len(layer) for layer in parent1.enterprise_layer])
+ if total_genes_per_layer < 2:
+ return child1, child2
+
+ # 随机选择两个交叉位置
+ pos1 = random.randint(0, total_genes_per_layer - 2)
+ pos2 = random.randint(pos1 + 1, total_genes_per_layer - 1)
+
+ # 对三层编码分别执行交叉
+ for layer_idx, (layer1, layer2) in enumerate([
+ (child1.enterprise_layer, child2.enterprise_layer),
+ (child1.capacity_layer, child2.capacity_layer),
+ (child1.quantity_layer, child2.quantity_layer)
+ ]):
+ # 展平层编码
+ flat1 = []
+ flat2 = []
+ for segment in layer1:
+ flat1.extend(segment)
+ for segment in layer2:
+ flat2.extend(segment)
+
+ # 交换pos1-pos2区间的基因
+ flat1[pos1:pos2 + 1], flat2[pos1:pos2 + 1] = flat2[pos1:pos2 + 1], flat1[pos1:pos2 + 1]
+
+ # 重构层编码
+ ptr = 0
+ for i in range(len(layer1)):
+ segment_len = len(layer1[i])
+ layer1[i] = flat1[ptr:ptr + segment_len]
+ layer2[i] = flat2[ptr:ptr + segment_len]
+ ptr += segment_len
+
+ # 修复子代染色体
+ child1.repair()
+ child2.repair()
+
+ return child1, child2
+
+
+def mutate(chrom: Chromosome, Pm: float) -> Chromosome:
+ """均匀变异(文档1.4.1节)"""
+ # 对三层编码分别执行变异
+ # 1. 企业编码层(0/1翻转)
+ for i in range(chrom.I):
+ for j in range(len(chrom.enterprise_layer[i])):
+ if random.random() < Pm:
+ chrom.enterprise_layer[i][j] = 1 - chrom.enterprise_layer[i][j]
+
+ # 2. 能力编码层(随机重置)
+ for i in range(chrom.I):
+ Ji = chrom.J_list[i]
+ for j in range(len(chrom.capacity_layer[i])):
+ if random.random() < Pm:
+ if j == 0: # 风险企业
+ max_cap = chrom.risk_ent.C0_max
+ std_cap = chrom.risk_ent.C0_std
+ else: # 供应商j-1
+ max_cap = chrom.supplier.suppliers[i][j - 1]["Cj_max"]
+ std_cap = chrom.supplier.suppliers[i][j - 1]["Cj_std"]
+ chrom.capacity_layer[i][j] = random.uniform(0.5 * std_cap, max_cap)
+
+ # 3. 数量编码层(随机重置)
+ for i in range(chrom.I):
+ Qi = chrom.order.Q[i]
+ selected_ents = [idx for idx, val in enumerate(chrom.enterprise_layer[i]) if val == 1]
+ if len(selected_ents) > 0 and random.random() < Pm:
+ # 重新分配数量
+ qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi
+ for idx, ent_idx in enumerate(selected_ents):
+ chrom.quantity_layer[i][ent_idx] = qtys[idx]
+
+ # 修复变异后的染色体
+ chrom.repair()
+ return chrom
+
+
+def tournament_selection(population: List[Chromosome], tournament_size: int) -> Chromosome:
+ """锦标赛选择"""
+ candidates = random.sample(population, tournament_size)
+ # 选择Pareto支配最优的个体
+ best = candidates[0]
+ best_cost, best_delay = best.calculate_fitness()
+ for cand in candidates[1:]:
+ cand_cost, cand_delay = cand.calculate_fitness()
+ # cand支配best
+ if (cand_cost <= best_cost and cand_delay <= best_delay) and (cand_cost < best_cost or cand_delay < best_delay):
+ best = cand
+ best_cost, best_delay = cand_cost, cand_delay
+ return best
+
+
+# ============================= 主算法函数=============================
+def improved_nsga2() -> Tuple[List[Chromosome], List[Dict]]:
+ """改进NSGA-II算法主函数(文档1.4.2节流程)"""
+ # 1. 初始化数据和参数
+ order = OrderData()
+ risk_ent = RiskEnterpriseData()
+ supplier = SupplierData(order)
+ params = AlgorithmParams()
+
+ # 2. 初始化种群(第0代)
+ pop = Population(order, risk_ent, supplier, params)
+ population = pop.population
+
+ # 3. 迭代优化
+ iteration_history = [] # 记录每代最优目标值
+ for iter_idx in range(params.MaxIter):
+ # 4. 生成子代种群
+ offspring = []
+ while len(offspring) < params.N:
+ # 锦标赛选择父代
+ parent1 = tournament_selection(population, params.tournament_size)
+ parent2 = tournament_selection(population, params.tournament_size)
+ # 交叉
+ child1, child2 = crossover(parent1, parent2, params.Pc)
+ # 变异
+ child1 = mutate(child1, params.Pm)
+ child2 = mutate(child2, params.Pm)
+ # 添加到子代
+ offspring.append(child1)
+ if len(offspring) < params.N:
+ offspring.append(child2)
+
+ # 5. 合并父代和子代
+ combined = population + offspring
+
+ # 6. 快速非支配排序
+ fronts = pop._fast_non_dominated_sort(combined)
+
+ # 7. 拥挤度计算与精英保留
+ next_population = []
+ for front in fronts:
+ if len(next_population) + len(front) <= params.N:
+ # 该前沿全部入选
+ next_population.extend(front)
+ else:
+ # 计算拥挤度,选择拥挤度大的个体
+ distances = pop._calculate_crowding_distance(front)
+ # 按拥挤度降序排序
+ front_sorted = [front[i] for i in np.argsort(distances)[::-1]]
+ # 选择剩余名额
+ need = params.N - len(next_population)
+ next_population.extend(front_sorted[:need])
+ break
+
+ # 8. 更新种群
+ population = next_population
+
+ # 9. 记录迭代历史
+ current_front = pop._fast_non_dominated_sort(population)[0]
+ min_cost = min([chrom.calculate_fitness()[0] for chrom in current_front])
+ min_delay = min([chrom.calculate_fitness()[1] for chrom in current_front])
+ iteration_history.append({
+ "iter": iter_idx,
+ "min_cost": min_cost,
+ "min_delay": min_delay,
+ "pareto_size": len(current_front)
+ })
+
+ # 打印迭代信息
+ if (iter_idx + 1) % 20 == 0:
+ print(
+ f"迭代 {iter_idx + 1:3d} | 最优成本: {min_cost:8.2f} | 最优延期: {min_delay:6.2f} | Pareto解数: {len(current_front)}")
+
+ # 10. 输出最终Pareto解集
+ final_front = pop._fast_non_dominated_sort(population)[0]
+ return final_front, iteration_history
+
+
+# ============================= 结果可视化与输出=============================
+def visualize_results(pareto_front: List[Chromosome], iteration_history: List[Dict]):
+ """结果可视化"""
+ # 1. Pareto前沿曲线
+ costs = [chrom.calculate_fitness()[0] for chrom in pareto_front]
+ delays = [chrom.calculate_fitness()[1] for chrom in pareto_front]
+
+ plt.figure(figsize=(12, 5))
+
+ # 子图1:Pareto前沿
+ plt.subplot(1, 2, 1)
+ plt.scatter(costs, delays, c='red', s=50, alpha=0.7)
+ plt.xlabel('变更成本', fontsize=12)
+ plt.ylabel('交付延期', fontsize=12)
+ plt.title('Pareto最优前沿', fontsize=14)
+ plt.grid(True, alpha=0.3)
+
+ # 子图2:迭代收敛曲线
+ plt.subplot(1, 2, 2)
+ iters = [item["iter"] for item in iteration_history]
+ min_costs = [item["min_cost"] for item in iteration_history]
+ min_delays = [item["min_delay"] for item in iteration_history]
+
+ plt.plot(iters, min_costs, label='最优变更成本', linewidth=2)
+ plt.plot(iters, min_delays, label='最优交付延期', linewidth=2, linestyle='--')
+ plt.xlabel('迭代次数', fontsize=12)
+ plt.ylabel('目标值', fontsize=12)
+ plt.title('迭代收敛曲线', fontsize=14)
+ plt.legend()
+ plt.grid(True, alpha=0.3)
+
+ plt.tight_layout()
+ plt.show()
+
+
+def print_pareto_solutions(pareto_front: List[Chromosome]):
+ """打印Pareto解的详细信息"""
+ print("\n" + "=" * 80)
+ print("Pareto最优解集详细信息")
+ print("=" * 80)
+ for idx, chrom in enumerate(pareto_front):
+ cost, delay = chrom.calculate_fitness()
+ print(f"\n解 {idx + 1}:")
+ print(f" 变更成本: {cost:.2f} | 交付延期: {delay:.2f}")
+ print(" 订单分配方案:")
+ for i in range(chrom.I):
+ print(f" 物料{i + 1} (待生产: {chrom.order.Q[i]}):")
+ # 风险企业分配
+ if chrom.enterprise_layer[i][0] == 1:
+ qty = chrom.quantity_layer[i][0]
+ cap = chrom.capacity_layer[i][0]
+ print(f" 风险企业: 数量={qty:.1f}, 生产能力={cap:.1f}")
+ # 供应商分配
+ Ji = chrom.J_list[i]
+ for j in range(1, Ji + 1):
+ if chrom.enterprise_layer[i][j] == 1:
+ qty = chrom.quantity_layer[i][j]
+ cap = chrom.capacity_layer[i][j]
+ supp_info = chrom.supplier.suppliers[i][j - 1]
+ print(f" 供应商{j}: 数量={qty:.1f}, 生产能力={cap:.1f}, 采购价={supp_info['P_ij']:.1f}")
+
+
+# ============================= 主函数调用=============================
+if __name__ == "__main__":
+ # 运行算法
+ print("改进NSGA-II算法开始运行...")
+ print("=" * 60)
+ pareto_front, iteration_history = improved_nsga2()
+
+ # 结果输出与可视化
+ print_pareto_solutions(pareto_front)
+ visualize_results(pareto_front, iteration_history)
\ No newline at end of file