import numpy as np import random from typing import List, Dict, Tuple, Optional import matplotlib.pyplot as plt # ============================= 数据输入模块(用户需填写)============================= class OrderData: """订单基础数据类(对应文档表四7符号)""" def __init__(self): # 1. 订单核心参数 self.I = 3 # 物料种类数(用户修改) self.Q = [100, 150, 200] # 物料i待生产量Qi(索引0对应物料1,用户修改) self.Dd = 10 # 订单需求交货期(时长,用户修改) # 2. 物料基础成本(风险企业供应) self.P = [50.0, 80.0, 60.0] # 物料i单位采购价格P_i(用户修改) self.T0 = [5.0, 8.0, 6.0] # 物料i风险企业运输成本T_i^0(用户修改) # 3. 惩罚系数(用户可按文档建议调整) self.alpha = 2.0 # C1增长系数 self.beta = 1.0 # 时间尺度系数 self.delta = 1.3 # 惩罚倍数(1.2-1.5) self.gamma = 0.5 # 提前交付惩罚系数 class RiskEnterpriseData: """风险企业数据类(对应文档表四7符号)""" def __init__(self): self.C0_max = 30.0 # 风险企业单位时间最大生产能力C0^max(用户修改) self.C0_std = 20.0 # 风险企业单位时间标准生产能力(用户修改) self.L1 = 3.0 # 扰动持续时长系数下限(关键订单3,一般订单2,用户修改) self.L2 = 10.0 # 扰动持续时长系数上限(关键订单10,一般订单5,用户修改) self.distance = 10.0 # 风险企业位置距离(用户修改) class SupplierData: """供应商数据类(对应文档表四7符号)""" def __init__(self, order: OrderData): self.I = order.I # Ji:物料i的供应商个数(索引0对应物料1,用户修改) self.J = [2, 3, 2] # 物料1有2个供应商,物料2有3个,物料3有2个 # 供应商j的基础参数(按物料i分组,用户修改) # 结构:self.suppliers[i][j] = {参数},i=物料索引,j=供应商索引(0开始) self.suppliers = [ # 物料1的2个供应商 [ {"Cj_max": 40.0, "Cj_std": 30.0, "P_ij": 55.0, "T_ij": 7.0, "MinOrder": 20, "MaxOrder": 100, "distance": 15.0}, {"Cj_max": 35.0, "Cj_std": 25.0, "P_ij": 52.0, "T_ij": 9.0, "MinOrder": 15, "MaxOrder": 90, "distance": 20.0} ], # 物料2的3个供应商 [ {"Cj_max": 50.0, "Cj_std": 40.0, "P_ij": 85.0, "T_ij": 6.0, "MinOrder": 25, "MaxOrder": 150, "distance": 12.0}, {"Cj_max": 45.0, "Cj_std": 35.0, "P_ij": 82.0, "T_ij": 8.0, "MinOrder": 20, "MaxOrder": 120, "distance": 18.0}, {"Cj_max": 55.0, "Cj_std": 45.0, "P_ij": 88.0, "T_ij": 5.0, "MinOrder": 30, "MaxOrder": 160, "distance": 10.0} ], # 物料3的2个供应商 [ {"Cj_max": 42.0, "Cj_std": 32.0, "P_ij": 65.0, "T_ij": 8.0, "MinOrder": 22, "MaxOrder": 200, "distance": 16.0}, {"Cj_max": 38.0, "Cj_std": 28.0, "P_ij": 63.0, "T_ij": 10.0, "MinOrder": 18, "MaxOrder": 180, "distance": 22.0} ] ] class AlgorithmParams: """算法参数类(默认值可修改)""" def __init__(self): self.N = 100 # 种群规模 self.Pm = 0.05 # 变异概率 self.Pc = 0.8 # 交叉概率 self.MaxIter = 200 # 最大迭代次数 self.tournament_size = 3 # 锦标赛选择规模 # 种群比例 N1:N2:N3:N4 = 3:3:2:2 self.N1_ratio = 0.3 self.N2_ratio = 0.3 self.N3_ratio = 0.2 self.N4_ratio = 0.2 # ============================= 辅助计算函数(文档对应函数)============================= def calculate_RNum(P_Num: float, A_Num: float) -> float: """计算剩余待生产数量R_Num = A_Num - P_Num(文档1.2.2节)""" return max(0.0, A_Num - P_Num) def calculate_ResTime(DueTime: float, riskTime: float) -> float: """计算剩余交货期限ResTime = DueTime - riskTime(文档1.2.2节)""" return max(0.0, DueTime - riskTime) def calculate_TimeLevel(Td: float, ResTime: float, L1: float, L2: float) -> str: """计算扰动持续程度TimeLevel(文档1.2.2节)""" if ResTime == 0: return "长" ratio = Td / ResTime if ratio <= L1: return "短" elif ratio >= L2: return "长" else: return "中" def calculate_CapLevel(R_Num: float, ResTime: float, C: float) -> str: """计算订单生产能力等级CapLevel(文档1.2.2节)""" if ResTime == 0: return "不足" capacity = C * ResTime # 总生产能力 if capacity >= R_Num * 1.2: # 预留20%缓冲 return "充足" elif capacity <= R_Num * 0.8: # 缺口20%以上 return "不足" else: return "一般" # ============================= 染色体与种群类============================= class Chromosome: """染色体类(三层编码结构)""" def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData): self.order = order self.risk_ent = risk_ent self.supplier = supplier self.I = order.I self.J_list = supplier.J # 各物料的供应商个数 # 三层编码初始化(随机生成基础结构) self.enterprise_layer: List[List[int]] = [] # 企业编码层:I×(Ji+1),0/1 self.capacity_layer: List[List[float]] = [] # 能力编码层:I×(Ji+1),生产能力 self.quantity_layer: List[List[float]] = [] # 数量编码层:I×(Ji+1),分配数量 for i in range(self.I): Ji = self.J_list[i] total_ents = Ji + 1 # 1个风险企业 + Ji个供应商 # 企业编码层:随机生成0/1,确保至少有1个1 ent_codes = [random.randint(0, 1) for _ in range(total_ents)] if sum(ent_codes) == 0: ent_codes[random.randint(0, total_ents - 1)] = 1 # 强制激活1个 self.enterprise_layer.append(ent_codes) # 能力编码层:基于企业标准生产能力随机生成 cap_codes = [] for j in range(total_ents): if j == 0: # 风险企业 max_cap = risk_ent.C0_max std_cap = risk_ent.C0_std else: # 供应商j-1(索引转换) max_cap = supplier.suppliers[i][j - 1]["Cj_max"] std_cap = supplier.suppliers[i][j - 1]["Cj_std"] # 在[0.5×std_cap, max_cap]范围内随机 cap = random.uniform(0.5 * std_cap, max_cap) cap_codes.append(cap) self.capacity_layer.append(cap_codes) # 数量编码层:基于物料待生产量随机分配 qty_codes = [0.0] * total_ents Qi = order.Q[i] selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] if len(selected_ents) > 0: # 随机分配数量(确保总和=Qi) qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi for idx, ent_idx in enumerate(selected_ents): qty_codes[ent_idx] = qtys[idx] self.quantity_layer.append(qty_codes) # 修复初始染色体(确保满足约束) self.repair() def repair(self): """染色体修复机制(文档1.4.1节)""" # 步骤1:检查每类物料是否有供应方(物料生产约束) for i in range(self.I): ent_codes = self.enterprise_layer[i] if sum(ent_codes) == 0: # 随机激活1个企业 active_idx = random.randint(0, len(ent_codes) - 1) self.enterprise_layer[i][active_idx] = 1 # 步骤2:检查企业最大生产能力约束 self._repair_capacity_constraint() # 步骤3:检查物料数量约束(总和=Qi,供应商起订量/最大供应量) self._repair_quantity_constraint() def _repair_capacity_constraint(self): """修复生产能力约束(修复属性名不匹配问题)""" # 步骤1:修复风险企业生产能力(原逻辑不变) risk_total_cap = 0.0 risk_assignments = [] # 存储(i, cap):物料i分配给风险企业的能力 for i in range(self.I): if self.enterprise_layer[i][0] == 1: # 风险企业被选中 cap = self.capacity_layer[i][0] risk_total_cap += cap risk_assignments.append((i, cap)) # 若超出最大产能,剔除最小能力的物料 while risk_total_cap > self.risk_ent.C0_max and len(risk_assignments) > 0: risk_assignments.sort(key=lambda x: x[1]) min_i, min_cap = risk_assignments.pop(0) self.enterprise_layer[min_i][0] = 0 # 取消选择风险企业 self.capacity_layer[min_i][0] = 0.0 self.quantity_layer[min_i][0] = 0.0 risk_total_cap -= min_cap # 步骤2:修复各供应商生产能力(核心修复:J_list → J) for i in range(self.I): # 遍历每个物料i,找到该物料的所有供应商编码索引 Ji = self.J_list[i] # 物料i的供应商个数(注:此处self.J_list是Chromosome类自身属性,无需修改) for j in range(1, Ji + 1): # j=1~Ji:物料i的供应商编码索引(对应供应商j-1) supplier_idx = j - 1 # 供应商在suppliers列表中的索引(0开始) supp_total_cap = 0.0 supp_assignments = [] # 遍历所有物料k,计算该供应商(j索引)在各物料上的总产能 for k in range(self.I): # 核心修复:self.supplier.J_list[k] → self.supplier.J[k] if j >= len(self.enterprise_layer[k]) or supplier_idx >= self.supplier.J[k]: continue # 跳过无该供应商的物料,避免索引越界 # 若物料k选择了该供应商(j索引),累加产能 if self.enterprise_layer[k][j] == 1: cap = self.capacity_layer[k][j] supp_total_cap += cap supp_assignments.append((k, cap)) # 若该供应商总产能超出最大产能,剔除最小能力的物料 if len(supp_assignments) == 0: continue # 无物料分配给该供应商,跳过 # 获取该供应商在当前物料k上的最大产能(修正:用第一个分配的物料k) first_k = supp_assignments[0][0] supp_max_cap = self.supplier.suppliers[first_k][supplier_idx]["Cj_max"] # 剔除超出产能的物料分配 while supp_total_cap > supp_max_cap and len(supp_assignments) > 0: supp_assignments.sort(key=lambda x: x[1]) min_k, min_cap = supp_assignments.pop(0) self.enterprise_layer[min_k][j] = 0 # 取消该物料对该供应商的选择 self.capacity_layer[min_k][j] = 0.0 self.quantity_layer[min_k][j] = 0.0 supp_total_cap -= min_cap def _repair_quantity_constraint(self): """修复物料数量约束(修复除以零错误)""" for i in range(self.I): Qi = self.order.Q[i] ent_codes = self.enterprise_layer[i] qty_codes = self.quantity_layer[i] selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] current_total = sum(qty_codes) # 调整总量至Qi(允许微小误差) if abs(current_total - Qi) > 1e-6: ratio = Qi / (current_total + 1e-10) # 避免初始总量为0时除以零 for idx in selected_ents: qty_codes[idx] *= ratio # 检查供应商起订量和最大供应量 Ji = self.J_list[i] for j in range(1, Ji + 1): # 供应商j-1 if ent_codes[j] == 1: min_order = self.supplier.suppliers[i][j - 1]["MinOrder"] max_order = self.supplier.suppliers[i][j - 1]["MaxOrder"] qty = qty_codes[j] if qty < min_order: # 低于起订量:要么增加到起订量,要么取消选择 if sum(qty_codes) + (min_order - qty) <= Qi * 1.1: # 允许10%缓冲 qty_codes[j] = min_order # 从其他选中企业中减少对应数量 surplus_reduce = min_order - qty other_ents = [idx for idx in selected_ents if idx != j] if other_ents: sum_other_qty = sum(qty_codes[idx] for idx in other_ents) if sum_other_qty > 1e-10: # 避免除以零 reduce_ratio = surplus_reduce / sum_other_qty for idx in other_ents: qty_codes[idx] *= (1 - reduce_ratio) else: # 其他企业无数量可减,直接取消当前供应商选择 ent_codes[j] = 0 qty_codes[j] = 0.0 selected_ents.remove(j) else: ent_codes[j] = 0 qty_codes[j] = 0.0 selected_ents.remove(j) elif qty > max_order: # 超过最大供应量:调整到最大值,剩余差额分配给其他企业 qty_codes[j] = max_order surplus_add = qty - max_order # 此处原逻辑笔误:应为当前qty - max_order,即超出部分转为需补充的差额 other_ents = [idx for idx in selected_ents if idx != j] if other_ents: # 核心修复:检查other_ents的数量总和是否为零 sum_other_qty = sum(qty_codes[idx] for idx in other_ents) if sum_other_qty > 1e-10: # 总和非零,按比例分配 add_ratio = surplus_add / sum_other_qty for idx in other_ents: qty_codes[idx] *= (1 + add_ratio) else: # 总和为零,直接将差额分配给第一个其他企业 target_ent = other_ents[0] # 确保不超过目标企业的最大供应量(若为供应商) if target_ent == 0: # 目标是风险企业(无最大供应量限制,仅需满足产能) qty_codes[target_ent] += surplus_add else: # 目标是供应商,需检查最大供应量 supp_max_order = self.supplier.suppliers[i][target_ent - 1]["MaxOrder"] add_qty = min(surplus_add, supp_max_order - qty_codes[target_ent]) qty_codes[target_ent] += add_qty # 若仍有剩余差额(目标企业已达最大供应量),递归处理 remaining_surplus = surplus_add - add_qty if remaining_surplus > 1e-6: other_ents.remove(target_ent) if other_ents: # 重新计算剩余差额分配 sum_other_qty_new = sum(qty_codes[idx] for idx in other_ents) if sum_other_qty_new > 1e-10: add_ratio_new = remaining_surplus / sum_other_qty_new for idx in other_ents: qty_codes[idx] *= (1 + add_ratio_new) else: # 继续分配给下一个企业 for next_ent in other_ents: if remaining_surplus <= 1e-6: break if next_ent == 0: qty_codes[next_ent] += remaining_surplus remaining_surplus = 0 else: supp_max = self.supplier.suppliers[i][next_ent - 1]["MaxOrder"] add = min(remaining_surplus, supp_max - qty_codes[next_ent]) qty_codes[next_ent] += add remaining_surplus -= add # 最终确保总和=Qi(避免累积误差) current_total = sum(qty_codes) if abs(current_total - Qi) > 1e-6 and selected_ents: ratio = Qi / (current_total + 1e-10) # 加微小值避免除以零 for idx in selected_ents: qty_codes[idx] *= ratio def calculate_fitness(self) -> Tuple[float, float]: """计算适应度(变更成本C,交付延期T)""" C1 = self._calculate_C1() C2 = self._calculate_C2() C3 = self._calculate_C3() C4 = self._calculate_C4() C = C1 + C2 + C3 + C4 T = self._calculate_T() return C, T def _calculate_C1(self) -> float: """计算变更惩罚成本C1""" # 简化:假设ResTime=Dd(风险识别时刻为0),R_Num=Qi ResTime = self.order.Dd total_C1 = 0.0 for i in range(self.I): Qi = self.order.Q[i] # 最大惩罚成本=δ*(采购成本+运输成本)*Qi max_penalty = self.order.delta * (self.order.P[i] + self.order.T0[i]) * Qi # 指数惩罚项 exp_penalty = self.order.delta * Qi * np.exp(self.order.alpha / (ResTime + self.order.beta)) total_C1 += min(max_penalty, exp_penalty) return total_C1 def _calculate_C2(self) -> float: """计算采购变更成本C2""" total_purchase_risk = 0.0 # 原风险企业采购成本 total_purchase_supp = 0.0 # 变更后供应商采购成本 for i in range(self.I): # 风险企业采购成本 if self.enterprise_layer[i][0] == 1: total_purchase_risk += self.quantity_layer[i][0] * self.order.P[i] # 供应商采购成本 Ji = self.J_list[i] for j in range(1, Ji + 1): if self.enterprise_layer[i][j] == 1: supp_p = self.supplier.suppliers[i][j - 1]["P_ij"] total_purchase_supp += self.quantity_layer[i][j] * supp_p return total_purchase_supp - total_purchase_risk def _calculate_C3(self) -> float: """计算运输变更成本C3""" total_trans_risk = 0.0 # 原风险企业运输成本 total_trans_supp = 0.0 # 变更后供应商运输成本 for i in range(self.I): # 风险企业运输成本 if self.enterprise_layer[i][0] == 1: total_trans_risk += self.quantity_layer[i][0] * self.order.T0[i] # 供应商运输成本 Ji = self.J_list[i] for j in range(1, Ji + 1): if self.enterprise_layer[i][j] == 1: supp_t = self.supplier.suppliers[i][j - 1]["T_ij"] total_trans_supp += self.quantity_layer[i][j] * supp_t return total_trans_supp - total_trans_risk def _calculate_C4(self) -> float: """计算提前交付惩罚成本C4""" delivery_dates = self._get_delivery_dates() actual_delivery = max(delivery_dates) advance = self.order.Dd - actual_delivery return self.order.gamma * max(0.0, advance) def _calculate_T(self) -> float: """计算交付延期T""" delivery_dates = self._get_delivery_dates() actual_delivery = max(delivery_dates) delay = actual_delivery - self.order.Dd return max(0.0, delay) def _get_delivery_dates(self) -> List[float]: """计算各企业的交付日期(生产耗时+运输耗时)""" delivery_dates = [] for i in range(self.I): # 风险企业交付日期 if self.enterprise_layer[i][0] == 1: qty = self.quantity_layer[i][0] cap = self.capacity_layer[i][0] if cap == 0: prod_time = 0.0 else: prod_time = qty / cap # 生产耗时 trans_time = self.risk_ent.distance / 10.0 # 简化运输耗时(距离/10) delivery_dates.append(prod_time + trans_time) # 供应商交付日期 Ji = self.J_list[i] for j in range(1, Ji + 1): if self.enterprise_layer[i][j] == 1: qty = self.quantity_layer[i][j] cap = self.capacity_layer[i][j] if cap == 0: prod_time = 0.0 else: prod_time = qty / cap trans_time = self.supplier.suppliers[i][j - 1]["distance"] / 10.0 delivery_dates.append(prod_time + trans_time) # 新增:如果没有任何交付日期(空列表),返回一个默认值 if not delivery_dates: # 返回一个很大的值表示无法交付,触发最大延期惩罚 return [float('inf')] return delivery_dates class Population: """种群类""" def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData, params: AlgorithmParams): self.order = order self.risk_ent = risk_ent self.supplier = supplier self.params = params self.N = params.N self.population: List[Chromosome] = [] # 初始化种群(混合策略) self._initialize_population() def _initialize_population(self): """混合种群初始化(文档1.4.1节)""" N1 = int(self.N * self.params.N1_ratio) # 变更成本最小导向 N2 = int(self.N * self.params.N2_ratio) # 交付延期最短导向 N3 = int(self.N * self.params.N3_ratio) # 风险企业导向 N4 = self.N - N1 - N2 - N3 # 随机种群 # 1. 生成N1:变更成本最小导向 self.population.extend(self._generate_target_oriented_pop(N1, target="cost")) # 2. 生成N2:交付延期最短导向 self.population.extend(self._generate_target_oriented_pop(N2, target="delay")) # 3. 生成N3:风险企业导向 self.population.extend(self._generate_risk_ent_oriented_pop(N3)) # 4. 生成N4:随机种群 self.population.extend([Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(N4)]) def _generate_target_oriented_pop(self, size: int, target: str) -> List[Chromosome]: """生成目标函数导向种群(文档1.4.1节)""" if size <= 0: return [] # 随机生成2*size个候选解 candidates = [Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(2 * size)] # 计算适应度并排序 if target == "cost": # 按变更成本升序排序 candidates.sort(key=lambda x: x.calculate_fitness()[0]) else: # target == "delay" # 按交付延期升序排序 candidates.sort(key=lambda x: x.calculate_fitness()[1]) # 选择前size个 return candidates[:size] def _generate_risk_ent_oriented_pop(self, size: int) -> List[Chromosome]: """生成风险企业导向种群(文档1.4.1节)""" if size <= 0: return [] candidates = [] for _ in range(2 * size): chrom = Chromosome(self.order, self.risk_ent, self.supplier) # 强制风险企业选中所有物料 for i in range(self.order.I): chrom.enterprise_layer[i][0] = 1 # yi0=1 # 修复染色体 chrom.repair() candidates.append(chrom) # 计算Pareto Rank,选择Rank最低的size个 ranked_candidates = self._fast_non_dominated_sort(candidates)[0] # Rank=0的解 if len(ranked_candidates) >= size: return random.sample(ranked_candidates, size) else: # 不足时补充其他Rank的解 all_ranked = self._fast_non_dominated_sort(candidates) selected = [] for rank in all_ranked: selected.extend(rank) if len(selected) >= size: return selected[:size] return selected def _fast_non_dominated_sort(self, population: List[Chromosome]) -> List[List[Chromosome]]: """快速非支配排序(标准NSGA-II算法)""" dominated_sets = [[] for _ in range(len(population))] domination_counts = [0] * len(population) ranks = [0] * len(population) fronts = [[]] # 计算每个个体的支配关系 for p_idx in range(len(population)): p_cost, p_delay = population[p_idx].calculate_fitness() for q_idx in range(len(population)): if p_idx == q_idx: continue q_cost, q_delay = population[q_idx].calculate_fitness() # p支配q:p的两个目标都不劣于q,且至少一个目标更优 if (p_cost <= q_cost and p_delay <= q_delay) and (p_cost < q_cost or p_delay < q_delay): dominated_sets[p_idx].append(q_idx) # q支配p elif (q_cost <= p_cost and q_delay <= p_delay) and (q_cost < p_cost or q_delay < q_delay): domination_counts[p_idx] += 1 # 初始前沿(Rank=0) if domination_counts[p_idx] == 0: ranks[p_idx] = 0 fronts[0].append(population[p_idx]) # 生成后续前沿 rank = 1 while True: current_front = [] for p_idx in range(len(population)): if ranks[p_idx] == rank - 1: for q_idx in dominated_sets[p_idx]: domination_counts[q_idx] -= 1 if domination_counts[q_idx] == 0: ranks[q_idx] = rank current_front.append(population[q_idx]) if not current_front: break fronts.append(current_front) rank += 1 return fronts def _calculate_crowding_distance(self, front: List[Chromosome]) -> List[float]: """计算拥挤度(标准NSGA-II算法)""" n = len(front) if n == 0: return [] distances = [0.0] * n # 按变更成本排序 cost_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[0]) # 边界个体拥挤度设为无穷大 distances[cost_sorted[0][0]] = float('inf') distances[cost_sorted[-1][0]] = float('inf') # 计算中间个体拥挤度 max_cost = cost_sorted[-1][1].calculate_fitness()[0] min_cost = cost_sorted[0][1].calculate_fitness()[0] if max_cost - min_cost > 1e-6: for i in range(1, n - 1): prev_cost = cost_sorted[i - 1][1].calculate_fitness()[0] next_cost = cost_sorted[i + 1][1].calculate_fitness()[0] distances[cost_sorted[i][0]] += (next_cost - prev_cost) / (max_cost - min_cost) # 按交付延期排序 delay_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[1]) # 边界个体拥挤度设为无穷大 distances[delay_sorted[0][0]] = float('inf') distances[delay_sorted[-1][0]] = float('inf') # 计算中间个体拥挤度 max_delay = delay_sorted[-1][1].calculate_fitness()[1] min_delay = delay_sorted[0][1].calculate_fitness()[1] if max_delay - min_delay > 1e-6: for i in range(1, n - 1): prev_delay = delay_sorted[i - 1][1].calculate_fitness()[1] next_delay = delay_sorted[i + 1][1].calculate_fitness()[1] distances[delay_sorted[i][0]] += (next_delay - prev_delay) / (max_delay - min_delay) return distances # ============================= 遗传操作函数============================= def crossover(parent1: Chromosome, parent2: Chromosome, Pc: float) -> Tuple[Chromosome, Chromosome]: """两点交叉(文档1.4.1节)""" if random.random() > Pc: return parent1, parent2 # 复制父代染色体 child1 = Chromosome(parent1.order, parent1.risk_ent, parent1.supplier) child2 = Chromosome(parent2.order, parent2.risk_ent, parent2.supplier) # 确定染色体总基因长度(每层基因数相同) total_genes_per_layer = sum([len(layer) for layer in parent1.enterprise_layer]) if total_genes_per_layer < 2: return child1, child2 # 随机选择两个交叉位置 pos1 = random.randint(0, total_genes_per_layer - 2) pos2 = random.randint(pos1 + 1, total_genes_per_layer - 1) # 对三层编码分别执行交叉 for layer_idx, (layer1, layer2) in enumerate([ (child1.enterprise_layer, child2.enterprise_layer), (child1.capacity_layer, child2.capacity_layer), (child1.quantity_layer, child2.quantity_layer) ]): # 展平层编码 flat1 = [] flat2 = [] for segment in layer1: flat1.extend(segment) for segment in layer2: flat2.extend(segment) # 交换pos1-pos2区间的基因 flat1[pos1:pos2 + 1], flat2[pos1:pos2 + 1] = flat2[pos1:pos2 + 1], flat1[pos1:pos2 + 1] # 重构层编码 ptr = 0 for i in range(len(layer1)): segment_len = len(layer1[i]) layer1[i] = flat1[ptr:ptr + segment_len] layer2[i] = flat2[ptr:ptr + segment_len] ptr += segment_len # 修复子代染色体 child1.repair() child2.repair() return child1, child2 def mutate(chrom: Chromosome, Pm: float) -> Chromosome: """均匀变异(文档1.4.1节)""" # 对三层编码分别执行变异 # 1. 企业编码层(0/1翻转) for i in range(chrom.I): for j in range(len(chrom.enterprise_layer[i])): if random.random() < Pm: chrom.enterprise_layer[i][j] = 1 - chrom.enterprise_layer[i][j] # 2. 能力编码层(随机重置) for i in range(chrom.I): Ji = chrom.J_list[i] for j in range(len(chrom.capacity_layer[i])): if random.random() < Pm: if j == 0: # 风险企业 max_cap = chrom.risk_ent.C0_max std_cap = chrom.risk_ent.C0_std else: # 供应商j-1 max_cap = chrom.supplier.suppliers[i][j - 1]["Cj_max"] std_cap = chrom.supplier.suppliers[i][j - 1]["Cj_std"] chrom.capacity_layer[i][j] = random.uniform(0.5 * std_cap, max_cap) # 3. 数量编码层(随机重置) for i in range(chrom.I): Qi = chrom.order.Q[i] selected_ents = [idx for idx, val in enumerate(chrom.enterprise_layer[i]) if val == 1] if len(selected_ents) > 0 and random.random() < Pm: # 重新分配数量 qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi for idx, ent_idx in enumerate(selected_ents): chrom.quantity_layer[i][ent_idx] = qtys[idx] # 修复变异后的染色体 chrom.repair() return chrom def tournament_selection(population: List[Chromosome], tournament_size: int) -> Chromosome: """锦标赛选择""" candidates = random.sample(population, tournament_size) # 选择Pareto支配最优的个体 best = candidates[0] best_cost, best_delay = best.calculate_fitness() for cand in candidates[1:]: cand_cost, cand_delay = cand.calculate_fitness() # cand支配best if (cand_cost <= best_cost and cand_delay <= best_delay) and (cand_cost < best_cost or cand_delay < best_delay): best = cand best_cost, best_delay = cand_cost, cand_delay return best # ============================= 主算法函数============================= def improved_nsga2() -> Tuple[List[Chromosome], List[Dict]]: """改进NSGA-II算法主函数(文档1.4.2节流程)""" # 1. 初始化数据和参数 order = OrderData() risk_ent = RiskEnterpriseData() supplier = SupplierData(order) params = AlgorithmParams() # 2. 初始化种群(第0代) pop = Population(order, risk_ent, supplier, params) population = pop.population # 3. 迭代优化 iteration_history = [] # 记录每代最优目标值 for iter_idx in range(params.MaxIter): # 4. 生成子代种群 offspring = [] while len(offspring) < params.N: # 锦标赛选择父代 parent1 = tournament_selection(population, params.tournament_size) parent2 = tournament_selection(population, params.tournament_size) # 交叉 child1, child2 = crossover(parent1, parent2, params.Pc) # 变异 child1 = mutate(child1, params.Pm) child2 = mutate(child2, params.Pm) # 添加到子代 offspring.append(child1) if len(offspring) < params.N: offspring.append(child2) # 5. 合并父代和子代 combined = population + offspring # 6. 快速非支配排序 fronts = pop._fast_non_dominated_sort(combined) # 7. 拥挤度计算与精英保留 next_population = [] for front in fronts: if len(next_population) + len(front) <= params.N: # 该前沿全部入选 next_population.extend(front) else: # 计算拥挤度,选择拥挤度大的个体 distances = pop._calculate_crowding_distance(front) # 按拥挤度降序排序 front_sorted = [front[i] for i in np.argsort(distances)[::-1]] # 选择剩余名额 need = params.N - len(next_population) next_population.extend(front_sorted[:need]) break # 8. 更新种群 population = next_population # 9. 记录迭代历史 current_front = pop._fast_non_dominated_sort(population)[0] min_cost = min([chrom.calculate_fitness()[0] for chrom in current_front]) min_delay = min([chrom.calculate_fitness()[1] for chrom in current_front]) iteration_history.append({ "iter": iter_idx, "min_cost": min_cost, "min_delay": min_delay, "pareto_size": len(current_front) }) # 打印迭代信息 if (iter_idx + 1) % 20 == 0: print( f"迭代 {iter_idx + 1:3d} | 最优成本: {min_cost:8.2f} | 最优延期: {min_delay:6.2f} | Pareto解数: {len(current_front)}") # 10. 输出最终Pareto解集 final_front = pop._fast_non_dominated_sort(population)[0] return final_front, iteration_history # ============================= 结果可视化与输出============================= def visualize_results(pareto_front: List[Chromosome], iteration_history: List[Dict]): """结果可视化""" # 1. Pareto前沿曲线 costs = [chrom.calculate_fitness()[0] for chrom in pareto_front] delays = [chrom.calculate_fitness()[1] for chrom in pareto_front] plt.figure(figsize=(12, 5)) # 子图1:Pareto前沿 plt.subplot(1, 2, 1) plt.scatter(costs, delays, c='red', s=50, alpha=0.7) plt.xlabel('变更成本', fontsize=12) plt.ylabel('交付延期', fontsize=12) plt.title('Pareto最优前沿', fontsize=14) plt.grid(True, alpha=0.3) # 子图2:迭代收敛曲线 plt.subplot(1, 2, 2) iters = [item["iter"] for item in iteration_history] min_costs = [item["min_cost"] for item in iteration_history] min_delays = [item["min_delay"] for item in iteration_history] plt.plot(iters, min_costs, label='最优变更成本', linewidth=2) plt.plot(iters, min_delays, label='最优交付延期', linewidth=2, linestyle='--') plt.xlabel('迭代次数', fontsize=12) plt.ylabel('目标值', fontsize=12) plt.title('迭代收敛曲线', fontsize=14) plt.legend() plt.grid(True, alpha=0.3) plt.tight_layout() plt.show() def print_pareto_solutions(pareto_front: List[Chromosome]): """打印Pareto解的详细信息""" print("\n" + "=" * 80) print("Pareto最优解集详细信息") print("=" * 80) for idx, chrom in enumerate(pareto_front): cost, delay = chrom.calculate_fitness() print(f"\n解 {idx + 1}:") print(f" 变更成本: {cost:.2f} | 交付延期: {delay:.2f}") print(" 订单分配方案:") for i in range(chrom.I): print(f" 物料{i + 1} (待生产: {chrom.order.Q[i]}):") # 风险企业分配 if chrom.enterprise_layer[i][0] == 1: qty = chrom.quantity_layer[i][0] cap = chrom.capacity_layer[i][0] print(f" 风险企业: 数量={qty:.1f}, 生产能力={cap:.1f}") # 供应商分配 Ji = chrom.J_list[i] for j in range(1, Ji + 1): if chrom.enterprise_layer[i][j] == 1: qty = chrom.quantity_layer[i][j] cap = chrom.capacity_layer[i][j] supp_info = chrom.supplier.suppliers[i][j - 1] print(f" 供应商{j}: 数量={qty:.1f}, 生产能力={cap:.1f}, 采购价={supp_info['P_ij']:.1f}") # ============================= 主函数调用============================= if __name__ == "__main__": # 运行算法 print("改进NSGA-II算法开始运行...") print("=" * 60) pareto_front, iteration_history = improved_nsga2() # 结果输出与可视化 print_pareto_solutions(pareto_front) visualize_results(pareto_front, iteration_history)