commit 8a37f3601f6d066ec41f58f7050c8a79afd07066 Author: Hgq <2757430053@qq.com> Date: Wed Dec 3 14:59:14 2025 +0800 无报错仅警告版本 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..359bb53 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..a6218fe --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..51ab974 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/test.iml b/.idea/test.iml new file mode 100644 index 0000000..909438d --- /dev/null +++ b/.idea/test.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3747c9e --- /dev/null +++ b/main.py @@ -0,0 +1,876 @@ +import numpy as np +import random +from typing import List, Dict, Tuple, Optional +import matplotlib.pyplot as plt + + +# ============================= 数据输入模块(用户需填写)============================= +class OrderData: + """订单基础数据类(对应文档表四7符号)""" + + def __init__(self): + # 1. 订单核心参数 + self.I = 3 # 物料种类数(用户修改) + self.Q = [100, 150, 200] # 物料i待生产量Qi(索引0对应物料1,用户修改) + self.Dd = 10 # 订单需求交货期(时长,用户修改) + + # 2. 物料基础成本(风险企业供应) + self.P = [50.0, 80.0, 60.0] # 物料i单位采购价格P_i(用户修改) + self.T0 = [5.0, 8.0, 6.0] # 物料i风险企业运输成本T_i^0(用户修改) + + # 3. 惩罚系数(用户可按文档建议调整) + self.alpha = 2.0 # C1增长系数 + self.beta = 1.0 # 时间尺度系数 + self.delta = 1.3 # 惩罚倍数(1.2-1.5) + self.gamma = 0.5 # 提前交付惩罚系数 + + +class RiskEnterpriseData: + """风险企业数据类(对应文档表四7符号)""" + + def __init__(self): + self.C0_max = 30.0 # 风险企业单位时间最大生产能力C0^max(用户修改) + self.C0_std = 20.0 # 风险企业单位时间标准生产能力(用户修改) + self.L1 = 3.0 # 扰动持续时长系数下限(关键订单3,一般订单2,用户修改) + self.L2 = 10.0 # 扰动持续时长系数上限(关键订单10,一般订单5,用户修改) + self.distance = 10.0 # 风险企业位置距离(用户修改) + + +class SupplierData: + """供应商数据类(对应文档表四7符号)""" + + def __init__(self, order: OrderData): + self.I = order.I + # Ji:物料i的供应商个数(索引0对应物料1,用户修改) + self.J = [2, 3, 2] # 物料1有2个供应商,物料2有3个,物料3有2个 + # 供应商j的基础参数(按物料i分组,用户修改) + # 结构:self.suppliers[i][j] = {参数},i=物料索引,j=供应商索引(0开始) + self.suppliers = [ + # 物料1的2个供应商 + [ + {"Cj_max": 40.0, "Cj_std": 30.0, "P_ij": 55.0, "T_ij": 7.0, "MinOrder": 20, "MaxOrder": 100, + "distance": 15.0}, + {"Cj_max": 35.0, "Cj_std": 25.0, "P_ij": 52.0, "T_ij": 9.0, "MinOrder": 15, "MaxOrder": 90, + "distance": 20.0} + ], + # 物料2的3个供应商 + [ + {"Cj_max": 50.0, "Cj_std": 40.0, "P_ij": 85.0, "T_ij": 6.0, "MinOrder": 25, "MaxOrder": 150, + "distance": 12.0}, + {"Cj_max": 45.0, "Cj_std": 35.0, "P_ij": 82.0, "T_ij": 8.0, "MinOrder": 20, "MaxOrder": 120, + "distance": 18.0}, + {"Cj_max": 55.0, "Cj_std": 45.0, "P_ij": 88.0, "T_ij": 5.0, "MinOrder": 30, "MaxOrder": 160, + "distance": 10.0} + ], + # 物料3的2个供应商 + [ + {"Cj_max": 42.0, "Cj_std": 32.0, "P_ij": 65.0, "T_ij": 8.0, "MinOrder": 22, "MaxOrder": 200, + "distance": 16.0}, + {"Cj_max": 38.0, "Cj_std": 28.0, "P_ij": 63.0, "T_ij": 10.0, "MinOrder": 18, "MaxOrder": 180, + "distance": 22.0} + ] + ] + + +class AlgorithmParams: + """算法参数类(默认值可修改)""" + + def __init__(self): + self.N = 100 # 种群规模 + self.Pm = 0.05 # 变异概率 + self.Pc = 0.8 # 交叉概率 + self.MaxIter = 200 # 最大迭代次数 + self.tournament_size = 3 # 锦标赛选择规模 + # 种群比例 N1:N2:N3:N4 = 3:3:2:2 + self.N1_ratio = 0.3 + self.N2_ratio = 0.3 + self.N3_ratio = 0.2 + self.N4_ratio = 0.2 + + +# ============================= 辅助计算函数(文档对应函数)============================= +def calculate_RNum(P_Num: float, A_Num: float) -> float: + """计算剩余待生产数量R_Num = A_Num - P_Num(文档1.2.2节)""" + return max(0.0, A_Num - P_Num) + + +def calculate_ResTime(DueTime: float, riskTime: float) -> float: + """计算剩余交货期限ResTime = DueTime - riskTime(文档1.2.2节)""" + return max(0.0, DueTime - riskTime) + + +def calculate_TimeLevel(Td: float, ResTime: float, L1: float, L2: float) -> str: + """计算扰动持续程度TimeLevel(文档1.2.2节)""" + if ResTime == 0: + return "长" + ratio = Td / ResTime + if ratio <= L1: + return "短" + elif ratio >= L2: + return "长" + else: + return "中" + + +def calculate_CapLevel(R_Num: float, ResTime: float, C: float) -> str: + """计算订单生产能力等级CapLevel(文档1.2.2节)""" + if ResTime == 0: + return "不足" + capacity = C * ResTime # 总生产能力 + if capacity >= R_Num * 1.2: # 预留20%缓冲 + return "充足" + elif capacity <= R_Num * 0.8: # 缺口20%以上 + return "不足" + else: + return "一般" + + +# ============================= 染色体与种群类============================= +class Chromosome: + """染色体类(三层编码结构)""" + + def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData): + self.order = order + self.risk_ent = risk_ent + self.supplier = supplier + self.I = order.I + self.J_list = supplier.J # 各物料的供应商个数 + + # 三层编码初始化(随机生成基础结构) + self.enterprise_layer: List[List[int]] = [] # 企业编码层:I×(Ji+1),0/1 + self.capacity_layer: List[List[float]] = [] # 能力编码层:I×(Ji+1),生产能力 + self.quantity_layer: List[List[float]] = [] # 数量编码层:I×(Ji+1),分配数量 + + for i in range(self.I): + Ji = self.J_list[i] + total_ents = Ji + 1 # 1个风险企业 + Ji个供应商 + + # 企业编码层:随机生成0/1,确保至少有1个1 + ent_codes = [random.randint(0, 1) for _ in range(total_ents)] + if sum(ent_codes) == 0: + ent_codes[random.randint(0, total_ents - 1)] = 1 # 强制激活1个 + self.enterprise_layer.append(ent_codes) + + # 能力编码层:基于企业标准生产能力随机生成 + cap_codes = [] + for j in range(total_ents): + if j == 0: # 风险企业 + max_cap = risk_ent.C0_max + std_cap = risk_ent.C0_std + else: # 供应商j-1(索引转换) + max_cap = supplier.suppliers[i][j - 1]["Cj_max"] + std_cap = supplier.suppliers[i][j - 1]["Cj_std"] + # 在[0.5×std_cap, max_cap]范围内随机 + cap = random.uniform(0.5 * std_cap, max_cap) + cap_codes.append(cap) + self.capacity_layer.append(cap_codes) + + # 数量编码层:基于物料待生产量随机分配 + qty_codes = [0.0] * total_ents + Qi = order.Q[i] + selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] + if len(selected_ents) > 0: + # 随机分配数量(确保总和=Qi) + qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi + for idx, ent_idx in enumerate(selected_ents): + qty_codes[ent_idx] = qtys[idx] + self.quantity_layer.append(qty_codes) + + # 修复初始染色体(确保满足约束) + self.repair() + + def repair(self): + """染色体修复机制(文档1.4.1节)""" + # 步骤1:检查每类物料是否有供应方(物料生产约束) + for i in range(self.I): + ent_codes = self.enterprise_layer[i] + if sum(ent_codes) == 0: + # 随机激活1个企业 + active_idx = random.randint(0, len(ent_codes) - 1) + self.enterprise_layer[i][active_idx] = 1 + + # 步骤2:检查企业最大生产能力约束 + self._repair_capacity_constraint() + + # 步骤3:检查物料数量约束(总和=Qi,供应商起订量/最大供应量) + self._repair_quantity_constraint() + + def _repair_capacity_constraint(self): + """修复生产能力约束(修复属性名不匹配问题)""" + # 步骤1:修复风险企业生产能力(原逻辑不变) + risk_total_cap = 0.0 + risk_assignments = [] # 存储(i, cap):物料i分配给风险企业的能力 + for i in range(self.I): + if self.enterprise_layer[i][0] == 1: # 风险企业被选中 + cap = self.capacity_layer[i][0] + risk_total_cap += cap + risk_assignments.append((i, cap)) + + # 若超出最大产能,剔除最小能力的物料 + while risk_total_cap > self.risk_ent.C0_max and len(risk_assignments) > 0: + risk_assignments.sort(key=lambda x: x[1]) + min_i, min_cap = risk_assignments.pop(0) + self.enterprise_layer[min_i][0] = 0 # 取消选择风险企业 + self.capacity_layer[min_i][0] = 0.0 + self.quantity_layer[min_i][0] = 0.0 + risk_total_cap -= min_cap + + # 步骤2:修复各供应商生产能力(核心修复:J_list → J) + for i in range(self.I): # 遍历每个物料i,找到该物料的所有供应商编码索引 + Ji = self.J_list[i] # 物料i的供应商个数(注:此处self.J_list是Chromosome类自身属性,无需修改) + for j in range(1, Ji + 1): # j=1~Ji:物料i的供应商编码索引(对应供应商j-1) + supplier_idx = j - 1 # 供应商在suppliers列表中的索引(0开始) + supp_total_cap = 0.0 + supp_assignments = [] + + # 遍历所有物料k,计算该供应商(j索引)在各物料上的总产能 + for k in range(self.I): + # 核心修复:self.supplier.J_list[k] → self.supplier.J[k] + if j >= len(self.enterprise_layer[k]) or supplier_idx >= self.supplier.J[k]: + continue # 跳过无该供应商的物料,避免索引越界 + + # 若物料k选择了该供应商(j索引),累加产能 + if self.enterprise_layer[k][j] == 1: + cap = self.capacity_layer[k][j] + supp_total_cap += cap + supp_assignments.append((k, cap)) + + # 若该供应商总产能超出最大产能,剔除最小能力的物料 + if len(supp_assignments) == 0: + continue # 无物料分配给该供应商,跳过 + + # 获取该供应商在当前物料k上的最大产能(修正:用第一个分配的物料k) + first_k = supp_assignments[0][0] + supp_max_cap = self.supplier.suppliers[first_k][supplier_idx]["Cj_max"] + + # 剔除超出产能的物料分配 + while supp_total_cap > supp_max_cap and len(supp_assignments) > 0: + supp_assignments.sort(key=lambda x: x[1]) + min_k, min_cap = supp_assignments.pop(0) + self.enterprise_layer[min_k][j] = 0 # 取消该物料对该供应商的选择 + self.capacity_layer[min_k][j] = 0.0 + self.quantity_layer[min_k][j] = 0.0 + supp_total_cap -= min_cap + + def _repair_quantity_constraint(self): + """修复物料数量约束(修复除以零错误)""" + for i in range(self.I): + Qi = self.order.Q[i] + ent_codes = self.enterprise_layer[i] + qty_codes = self.quantity_layer[i] + selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] + current_total = sum(qty_codes) + + # 调整总量至Qi(允许微小误差) + if abs(current_total - Qi) > 1e-6: + ratio = Qi / (current_total + 1e-10) # 避免初始总量为0时除以零 + for idx in selected_ents: + qty_codes[idx] *= ratio + + # 检查供应商起订量和最大供应量 + Ji = self.J_list[i] + for j in range(1, Ji + 1): # 供应商j-1 + if ent_codes[j] == 1: + min_order = self.supplier.suppliers[i][j - 1]["MinOrder"] + max_order = self.supplier.suppliers[i][j - 1]["MaxOrder"] + qty = qty_codes[j] + + if qty < min_order: + # 低于起订量:要么增加到起订量,要么取消选择 + if sum(qty_codes) + (min_order - qty) <= Qi * 1.1: # 允许10%缓冲 + qty_codes[j] = min_order + # 从其他选中企业中减少对应数量 + surplus_reduce = min_order - qty + other_ents = [idx for idx in selected_ents if idx != j] + if other_ents: + sum_other_qty = sum(qty_codes[idx] for idx in other_ents) + if sum_other_qty > 1e-10: # 避免除以零 + reduce_ratio = surplus_reduce / sum_other_qty + for idx in other_ents: + qty_codes[idx] *= (1 - reduce_ratio) + else: + # 其他企业无数量可减,直接取消当前供应商选择 + ent_codes[j] = 0 + qty_codes[j] = 0.0 + selected_ents.remove(j) + else: + ent_codes[j] = 0 + qty_codes[j] = 0.0 + selected_ents.remove(j) + + elif qty > max_order: + # 超过最大供应量:调整到最大值,剩余差额分配给其他企业 + qty_codes[j] = max_order + surplus_add = qty - max_order # 此处原逻辑笔误:应为当前qty - max_order,即超出部分转为需补充的差额 + other_ents = [idx for idx in selected_ents if idx != j] + if other_ents: + # 核心修复:检查other_ents的数量总和是否为零 + sum_other_qty = sum(qty_codes[idx] for idx in other_ents) + if sum_other_qty > 1e-10: # 总和非零,按比例分配 + add_ratio = surplus_add / sum_other_qty + for idx in other_ents: + qty_codes[idx] *= (1 + add_ratio) + else: + # 总和为零,直接将差额分配给第一个其他企业 + target_ent = other_ents[0] + # 确保不超过目标企业的最大供应量(若为供应商) + if target_ent == 0: # 目标是风险企业(无最大供应量限制,仅需满足产能) + qty_codes[target_ent] += surplus_add + else: # 目标是供应商,需检查最大供应量 + supp_max_order = self.supplier.suppliers[i][target_ent - 1]["MaxOrder"] + add_qty = min(surplus_add, supp_max_order - qty_codes[target_ent]) + qty_codes[target_ent] += add_qty + # 若仍有剩余差额(目标企业已达最大供应量),递归处理 + remaining_surplus = surplus_add - add_qty + if remaining_surplus > 1e-6: + other_ents.remove(target_ent) + if other_ents: + # 重新计算剩余差额分配 + sum_other_qty_new = sum(qty_codes[idx] for idx in other_ents) + if sum_other_qty_new > 1e-10: + add_ratio_new = remaining_surplus / sum_other_qty_new + for idx in other_ents: + qty_codes[idx] *= (1 + add_ratio_new) + else: + # 继续分配给下一个企业 + for next_ent in other_ents: + if remaining_surplus <= 1e-6: + break + if next_ent == 0: + qty_codes[next_ent] += remaining_surplus + remaining_surplus = 0 + else: + supp_max = self.supplier.suppliers[i][next_ent - 1]["MaxOrder"] + add = min(remaining_surplus, supp_max - qty_codes[next_ent]) + qty_codes[next_ent] += add + remaining_surplus -= add + + # 最终确保总和=Qi(避免累积误差) + current_total = sum(qty_codes) + if abs(current_total - Qi) > 1e-6 and selected_ents: + ratio = Qi / (current_total + 1e-10) # 加微小值避免除以零 + for idx in selected_ents: + qty_codes[idx] *= ratio + + def calculate_fitness(self) -> Tuple[float, float]: + """计算适应度(变更成本C,交付延期T)""" + C1 = self._calculate_C1() + C2 = self._calculate_C2() + C3 = self._calculate_C3() + C4 = self._calculate_C4() + C = C1 + C2 + C3 + C4 + + T = self._calculate_T() + return C, T + + def _calculate_C1(self) -> float: + """计算变更惩罚成本C1""" + # 简化:假设ResTime=Dd(风险识别时刻为0),R_Num=Qi + ResTime = self.order.Dd + total_C1 = 0.0 + for i in range(self.I): + Qi = self.order.Q[i] + # 最大惩罚成本=δ*(采购成本+运输成本)*Qi + max_penalty = self.order.delta * (self.order.P[i] + self.order.T0[i]) * Qi + # 指数惩罚项 + exp_penalty = self.order.delta * Qi * np.exp(self.order.alpha / (ResTime + self.order.beta)) + total_C1 += min(max_penalty, exp_penalty) + return total_C1 + + def _calculate_C2(self) -> float: + """计算采购变更成本C2""" + total_purchase_risk = 0.0 # 原风险企业采购成本 + total_purchase_supp = 0.0 # 变更后供应商采购成本 + for i in range(self.I): + # 风险企业采购成本 + if self.enterprise_layer[i][0] == 1: + total_purchase_risk += self.quantity_layer[i][0] * self.order.P[i] + # 供应商采购成本 + Ji = self.J_list[i] + for j in range(1, Ji + 1): + if self.enterprise_layer[i][j] == 1: + supp_p = self.supplier.suppliers[i][j - 1]["P_ij"] + total_purchase_supp += self.quantity_layer[i][j] * supp_p + return total_purchase_supp - total_purchase_risk + + def _calculate_C3(self) -> float: + """计算运输变更成本C3""" + total_trans_risk = 0.0 # 原风险企业运输成本 + total_trans_supp = 0.0 # 变更后供应商运输成本 + for i in range(self.I): + # 风险企业运输成本 + if self.enterprise_layer[i][0] == 1: + total_trans_risk += self.quantity_layer[i][0] * self.order.T0[i] + # 供应商运输成本 + Ji = self.J_list[i] + for j in range(1, Ji + 1): + if self.enterprise_layer[i][j] == 1: + supp_t = self.supplier.suppliers[i][j - 1]["T_ij"] + total_trans_supp += self.quantity_layer[i][j] * supp_t + return total_trans_supp - total_trans_risk + + def _calculate_C4(self) -> float: + """计算提前交付惩罚成本C4""" + delivery_dates = self._get_delivery_dates() + actual_delivery = max(delivery_dates) + advance = self.order.Dd - actual_delivery + return self.order.gamma * max(0.0, advance) + + def _calculate_T(self) -> float: + """计算交付延期T""" + delivery_dates = self._get_delivery_dates() + actual_delivery = max(delivery_dates) + delay = actual_delivery - self.order.Dd + return max(0.0, delay) + + def _get_delivery_dates(self) -> List[float]: + """计算各企业的交付日期(生产耗时+运输耗时)""" + delivery_dates = [] + for i in range(self.I): + # 风险企业交付日期 + if self.enterprise_layer[i][0] == 1: + qty = self.quantity_layer[i][0] + cap = self.capacity_layer[i][0] + if cap == 0: + prod_time = 0.0 + else: + prod_time = qty / cap # 生产耗时 + trans_time = self.risk_ent.distance / 10.0 # 简化运输耗时(距离/10) + delivery_dates.append(prod_time + trans_time) + # 供应商交付日期 + Ji = self.J_list[i] + for j in range(1, Ji + 1): + if self.enterprise_layer[i][j] == 1: + qty = self.quantity_layer[i][j] + cap = self.capacity_layer[i][j] + if cap == 0: + prod_time = 0.0 + else: + prod_time = qty / cap + trans_time = self.supplier.suppliers[i][j - 1]["distance"] / 10.0 + delivery_dates.append(prod_time + trans_time) + + # 新增:如果没有任何交付日期(空列表),返回一个默认值 + if not delivery_dates: + # 返回一个很大的值表示无法交付,触发最大延期惩罚 + return [float('inf')] + return delivery_dates + + +class Population: + """种群类""" + + def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData, params: AlgorithmParams): + self.order = order + self.risk_ent = risk_ent + self.supplier = supplier + self.params = params + self.N = params.N + self.population: List[Chromosome] = [] + + # 初始化种群(混合策略) + self._initialize_population() + + def _initialize_population(self): + """混合种群初始化(文档1.4.1节)""" + N1 = int(self.N * self.params.N1_ratio) # 变更成本最小导向 + N2 = int(self.N * self.params.N2_ratio) # 交付延期最短导向 + N3 = int(self.N * self.params.N3_ratio) # 风险企业导向 + N4 = self.N - N1 - N2 - N3 # 随机种群 + + # 1. 生成N1:变更成本最小导向 + self.population.extend(self._generate_target_oriented_pop(N1, target="cost")) + + # 2. 生成N2:交付延期最短导向 + self.population.extend(self._generate_target_oriented_pop(N2, target="delay")) + + # 3. 生成N3:风险企业导向 + self.population.extend(self._generate_risk_ent_oriented_pop(N3)) + + # 4. 生成N4:随机种群 + self.population.extend([Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(N4)]) + + def _generate_target_oriented_pop(self, size: int, target: str) -> List[Chromosome]: + """生成目标函数导向种群(文档1.4.1节)""" + if size <= 0: + return [] + + # 随机生成2*size个候选解 + candidates = [Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(2 * size)] + + # 计算适应度并排序 + if target == "cost": + # 按变更成本升序排序 + candidates.sort(key=lambda x: x.calculate_fitness()[0]) + else: # target == "delay" + # 按交付延期升序排序 + candidates.sort(key=lambda x: x.calculate_fitness()[1]) + + # 选择前size个 + return candidates[:size] + + def _generate_risk_ent_oriented_pop(self, size: int) -> List[Chromosome]: + """生成风险企业导向种群(文档1.4.1节)""" + if size <= 0: + return [] + + candidates = [] + for _ in range(2 * size): + chrom = Chromosome(self.order, self.risk_ent, self.supplier) + # 强制风险企业选中所有物料 + for i in range(self.order.I): + chrom.enterprise_layer[i][0] = 1 # yi0=1 + # 修复染色体 + chrom.repair() + candidates.append(chrom) + + # 计算Pareto Rank,选择Rank最低的size个 + ranked_candidates = self._fast_non_dominated_sort(candidates)[0] # Rank=0的解 + if len(ranked_candidates) >= size: + return random.sample(ranked_candidates, size) + else: + # 不足时补充其他Rank的解 + all_ranked = self._fast_non_dominated_sort(candidates) + selected = [] + for rank in all_ranked: + selected.extend(rank) + if len(selected) >= size: + return selected[:size] + return selected + + def _fast_non_dominated_sort(self, population: List[Chromosome]) -> List[List[Chromosome]]: + """快速非支配排序(标准NSGA-II算法)""" + dominated_sets = [[] for _ in range(len(population))] + domination_counts = [0] * len(population) + ranks = [0] * len(population) + fronts = [[]] + + # 计算每个个体的支配关系 + for p_idx in range(len(population)): + p_cost, p_delay = population[p_idx].calculate_fitness() + for q_idx in range(len(population)): + if p_idx == q_idx: + continue + q_cost, q_delay = population[q_idx].calculate_fitness() + + # p支配q:p的两个目标都不劣于q,且至少一个目标更优 + if (p_cost <= q_cost and p_delay <= q_delay) and (p_cost < q_cost or p_delay < q_delay): + dominated_sets[p_idx].append(q_idx) + # q支配p + elif (q_cost <= p_cost and q_delay <= p_delay) and (q_cost < p_cost or q_delay < q_delay): + domination_counts[p_idx] += 1 + + # 初始前沿(Rank=0) + if domination_counts[p_idx] == 0: + ranks[p_idx] = 0 + fronts[0].append(population[p_idx]) + + # 生成后续前沿 + rank = 1 + while True: + current_front = [] + for p_idx in range(len(population)): + if ranks[p_idx] == rank - 1: + for q_idx in dominated_sets[p_idx]: + domination_counts[q_idx] -= 1 + if domination_counts[q_idx] == 0: + ranks[q_idx] = rank + current_front.append(population[q_idx]) + if not current_front: + break + fronts.append(current_front) + rank += 1 + + return fronts + + def _calculate_crowding_distance(self, front: List[Chromosome]) -> List[float]: + """计算拥挤度(标准NSGA-II算法)""" + n = len(front) + if n == 0: + return [] + distances = [0.0] * n + + # 按变更成本排序 + cost_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[0]) + # 边界个体拥挤度设为无穷大 + distances[cost_sorted[0][0]] = float('inf') + distances[cost_sorted[-1][0]] = float('inf') + # 计算中间个体拥挤度 + max_cost = cost_sorted[-1][1].calculate_fitness()[0] + min_cost = cost_sorted[0][1].calculate_fitness()[0] + if max_cost - min_cost > 1e-6: + for i in range(1, n - 1): + prev_cost = cost_sorted[i - 1][1].calculate_fitness()[0] + next_cost = cost_sorted[i + 1][1].calculate_fitness()[0] + distances[cost_sorted[i][0]] += (next_cost - prev_cost) / (max_cost - min_cost) + + # 按交付延期排序 + delay_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[1]) + # 边界个体拥挤度设为无穷大 + distances[delay_sorted[0][0]] = float('inf') + distances[delay_sorted[-1][0]] = float('inf') + # 计算中间个体拥挤度 + max_delay = delay_sorted[-1][1].calculate_fitness()[1] + min_delay = delay_sorted[0][1].calculate_fitness()[1] + if max_delay - min_delay > 1e-6: + for i in range(1, n - 1): + prev_delay = delay_sorted[i - 1][1].calculate_fitness()[1] + next_delay = delay_sorted[i + 1][1].calculate_fitness()[1] + distances[delay_sorted[i][0]] += (next_delay - prev_delay) / (max_delay - min_delay) + + return distances + + +# ============================= 遗传操作函数============================= +def crossover(parent1: Chromosome, parent2: Chromosome, Pc: float) -> Tuple[Chromosome, Chromosome]: + """两点交叉(文档1.4.1节)""" + if random.random() > Pc: + return parent1, parent2 + + # 复制父代染色体 + child1 = Chromosome(parent1.order, parent1.risk_ent, parent1.supplier) + child2 = Chromosome(parent2.order, parent2.risk_ent, parent2.supplier) + + # 确定染色体总基因长度(每层基因数相同) + total_genes_per_layer = sum([len(layer) for layer in parent1.enterprise_layer]) + if total_genes_per_layer < 2: + return child1, child2 + + # 随机选择两个交叉位置 + pos1 = random.randint(0, total_genes_per_layer - 2) + pos2 = random.randint(pos1 + 1, total_genes_per_layer - 1) + + # 对三层编码分别执行交叉 + for layer_idx, (layer1, layer2) in enumerate([ + (child1.enterprise_layer, child2.enterprise_layer), + (child1.capacity_layer, child2.capacity_layer), + (child1.quantity_layer, child2.quantity_layer) + ]): + # 展平层编码 + flat1 = [] + flat2 = [] + for segment in layer1: + flat1.extend(segment) + for segment in layer2: + flat2.extend(segment) + + # 交换pos1-pos2区间的基因 + flat1[pos1:pos2 + 1], flat2[pos1:pos2 + 1] = flat2[pos1:pos2 + 1], flat1[pos1:pos2 + 1] + + # 重构层编码 + ptr = 0 + for i in range(len(layer1)): + segment_len = len(layer1[i]) + layer1[i] = flat1[ptr:ptr + segment_len] + layer2[i] = flat2[ptr:ptr + segment_len] + ptr += segment_len + + # 修复子代染色体 + child1.repair() + child2.repair() + + return child1, child2 + + +def mutate(chrom: Chromosome, Pm: float) -> Chromosome: + """均匀变异(文档1.4.1节)""" + # 对三层编码分别执行变异 + # 1. 企业编码层(0/1翻转) + for i in range(chrom.I): + for j in range(len(chrom.enterprise_layer[i])): + if random.random() < Pm: + chrom.enterprise_layer[i][j] = 1 - chrom.enterprise_layer[i][j] + + # 2. 能力编码层(随机重置) + for i in range(chrom.I): + Ji = chrom.J_list[i] + for j in range(len(chrom.capacity_layer[i])): + if random.random() < Pm: + if j == 0: # 风险企业 + max_cap = chrom.risk_ent.C0_max + std_cap = chrom.risk_ent.C0_std + else: # 供应商j-1 + max_cap = chrom.supplier.suppliers[i][j - 1]["Cj_max"] + std_cap = chrom.supplier.suppliers[i][j - 1]["Cj_std"] + chrom.capacity_layer[i][j] = random.uniform(0.5 * std_cap, max_cap) + + # 3. 数量编码层(随机重置) + for i in range(chrom.I): + Qi = chrom.order.Q[i] + selected_ents = [idx for idx, val in enumerate(chrom.enterprise_layer[i]) if val == 1] + if len(selected_ents) > 0 and random.random() < Pm: + # 重新分配数量 + qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi + for idx, ent_idx in enumerate(selected_ents): + chrom.quantity_layer[i][ent_idx] = qtys[idx] + + # 修复变异后的染色体 + chrom.repair() + return chrom + + +def tournament_selection(population: List[Chromosome], tournament_size: int) -> Chromosome: + """锦标赛选择""" + candidates = random.sample(population, tournament_size) + # 选择Pareto支配最优的个体 + best = candidates[0] + best_cost, best_delay = best.calculate_fitness() + for cand in candidates[1:]: + cand_cost, cand_delay = cand.calculate_fitness() + # cand支配best + if (cand_cost <= best_cost and cand_delay <= best_delay) and (cand_cost < best_cost or cand_delay < best_delay): + best = cand + best_cost, best_delay = cand_cost, cand_delay + return best + + +# ============================= 主算法函数============================= +def improved_nsga2() -> Tuple[List[Chromosome], List[Dict]]: + """改进NSGA-II算法主函数(文档1.4.2节流程)""" + # 1. 初始化数据和参数 + order = OrderData() + risk_ent = RiskEnterpriseData() + supplier = SupplierData(order) + params = AlgorithmParams() + + # 2. 初始化种群(第0代) + pop = Population(order, risk_ent, supplier, params) + population = pop.population + + # 3. 迭代优化 + iteration_history = [] # 记录每代最优目标值 + for iter_idx in range(params.MaxIter): + # 4. 生成子代种群 + offspring = [] + while len(offspring) < params.N: + # 锦标赛选择父代 + parent1 = tournament_selection(population, params.tournament_size) + parent2 = tournament_selection(population, params.tournament_size) + # 交叉 + child1, child2 = crossover(parent1, parent2, params.Pc) + # 变异 + child1 = mutate(child1, params.Pm) + child2 = mutate(child2, params.Pm) + # 添加到子代 + offspring.append(child1) + if len(offspring) < params.N: + offspring.append(child2) + + # 5. 合并父代和子代 + combined = population + offspring + + # 6. 快速非支配排序 + fronts = pop._fast_non_dominated_sort(combined) + + # 7. 拥挤度计算与精英保留 + next_population = [] + for front in fronts: + if len(next_population) + len(front) <= params.N: + # 该前沿全部入选 + next_population.extend(front) + else: + # 计算拥挤度,选择拥挤度大的个体 + distances = pop._calculate_crowding_distance(front) + # 按拥挤度降序排序 + front_sorted = [front[i] for i in np.argsort(distances)[::-1]] + # 选择剩余名额 + need = params.N - len(next_population) + next_population.extend(front_sorted[:need]) + break + + # 8. 更新种群 + population = next_population + + # 9. 记录迭代历史 + current_front = pop._fast_non_dominated_sort(population)[0] + min_cost = min([chrom.calculate_fitness()[0] for chrom in current_front]) + min_delay = min([chrom.calculate_fitness()[1] for chrom in current_front]) + iteration_history.append({ + "iter": iter_idx, + "min_cost": min_cost, + "min_delay": min_delay, + "pareto_size": len(current_front) + }) + + # 打印迭代信息 + if (iter_idx + 1) % 20 == 0: + print( + f"迭代 {iter_idx + 1:3d} | 最优成本: {min_cost:8.2f} | 最优延期: {min_delay:6.2f} | Pareto解数: {len(current_front)}") + + # 10. 输出最终Pareto解集 + final_front = pop._fast_non_dominated_sort(population)[0] + return final_front, iteration_history + + +# ============================= 结果可视化与输出============================= +def visualize_results(pareto_front: List[Chromosome], iteration_history: List[Dict]): + """结果可视化""" + # 1. Pareto前沿曲线 + costs = [chrom.calculate_fitness()[0] for chrom in pareto_front] + delays = [chrom.calculate_fitness()[1] for chrom in pareto_front] + + plt.figure(figsize=(12, 5)) + + # 子图1:Pareto前沿 + plt.subplot(1, 2, 1) + plt.scatter(costs, delays, c='red', s=50, alpha=0.7) + plt.xlabel('变更成本', fontsize=12) + plt.ylabel('交付延期', fontsize=12) + plt.title('Pareto最优前沿', fontsize=14) + plt.grid(True, alpha=0.3) + + # 子图2:迭代收敛曲线 + plt.subplot(1, 2, 2) + iters = [item["iter"] for item in iteration_history] + min_costs = [item["min_cost"] for item in iteration_history] + min_delays = [item["min_delay"] for item in iteration_history] + + plt.plot(iters, min_costs, label='最优变更成本', linewidth=2) + plt.plot(iters, min_delays, label='最优交付延期', linewidth=2, linestyle='--') + plt.xlabel('迭代次数', fontsize=12) + plt.ylabel('目标值', fontsize=12) + plt.title('迭代收敛曲线', fontsize=14) + plt.legend() + plt.grid(True, alpha=0.3) + + plt.tight_layout() + plt.show() + + +def print_pareto_solutions(pareto_front: List[Chromosome]): + """打印Pareto解的详细信息""" + print("\n" + "=" * 80) + print("Pareto最优解集详细信息") + print("=" * 80) + for idx, chrom in enumerate(pareto_front): + cost, delay = chrom.calculate_fitness() + print(f"\n解 {idx + 1}:") + print(f" 变更成本: {cost:.2f} | 交付延期: {delay:.2f}") + print(" 订单分配方案:") + for i in range(chrom.I): + print(f" 物料{i + 1} (待生产: {chrom.order.Q[i]}):") + # 风险企业分配 + if chrom.enterprise_layer[i][0] == 1: + qty = chrom.quantity_layer[i][0] + cap = chrom.capacity_layer[i][0] + print(f" 风险企业: 数量={qty:.1f}, 生产能力={cap:.1f}") + # 供应商分配 + Ji = chrom.J_list[i] + for j in range(1, Ji + 1): + if chrom.enterprise_layer[i][j] == 1: + qty = chrom.quantity_layer[i][j] + cap = chrom.capacity_layer[i][j] + supp_info = chrom.supplier.suppliers[i][j - 1] + print(f" 供应商{j}: 数量={qty:.1f}, 生产能力={cap:.1f}, 采购价={supp_info['P_ij']:.1f}") + + +# ============================= 主函数调用============================= +if __name__ == "__main__": + # 运行算法 + print("改进NSGA-II算法开始运行...") + print("=" * 60) + pareto_front, iteration_history = improved_nsga2() + + # 结果输出与可视化 + print_pareto_solutions(pareto_front) + visualize_results(pareto_front, iteration_history) \ No newline at end of file