diff --git a/chromosome_utils.py b/chromosome_utils.py index 5ee478b..eee419a 100644 --- a/chromosome_utils.py +++ b/chromosome_utils.py @@ -1,82 +1,122 @@ import random import numpy as np -from data_structures import OrderData, RiskEnterpriseData, SupplierData +from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类 class ChromosomeUtils: - """染色体编码、解码、修复工具类""" + """染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件""" + def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData): - self.order = order_data - self.risk = risk_data - self.supplier = supplier_data - self.I = order_data.I + """ + 初始化工具类 + :param order_data: 订单数据(需求、交货期等) + :param risk_data: 风险企业数据(产能等) + :param supplier_data: 供应商数据(产能、价格等) + """ + self.order = order_data # 订单数据 + self.risk = risk_data # 风险企业数据 + self.supplier = supplier_data # 供应商数据 + self.I = order_data.I # 物料种类数 + + # 预计算每个物料的可选企业列表(0=风险企业,1+为供应商ID) self.material_optional_enterprises = self._get_material_optional_enterprises() + # 每个物料的可选企业数量 self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises] + # 染色体总长度(3层:企业层+能力层+数量层) self.chromosome_length = 3 * sum(self.material_enterprise_count) def _get_material_optional_enterprises(self) -> list[list[int]]: - """获取每个物料的可选企业列表:0=风险企业,1~supplier_count=供应商""" + """ + 生成每个物料的可选企业列表(内部方法) + :return: 二维列表,每个子列表为对应物料的可选企业ID + """ optional = [] - for i in range(self.I): - ents = [0] # 先加入风险企业 + for i in range(self.I): # 遍历每种物料 + ents = [0] # 先加入风险企业(ID=0) + # 加入可生产该物料的供应商(ID=1+供应商索引) for j in range(self.supplier.supplier_count): - if self.supplier.can_produce[j][i] == 1: - ents.append(j + 1) + if self.supplier.can_produce[j][i] == 1: # 供应商j可生产物料i + ents.append(j + 1) # 供应商ID为j+1(区分风险企业) optional.append(ents) return optional def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: - """拆分染色体为三层:企业编码层、能力编码层、数量编码层""" - total_ent_count = sum(self.material_enterprise_count) - enterprise_layer = chromosome[:total_ent_count] - capacity_layer = chromosome[total_ent_count:2*total_ent_count] - quantity_layer = chromosome[2*total_ent_count:] + """ + 拆分染色体为三层 + :param chromosome: 完整染色体 + :return: 企业层、能力层、数量层(均为numpy数组) + """ + total_ent_count = sum(self.material_enterprise_count) # 所有物料的企业总数 + enterprise_layer = chromosome[:total_ent_count] # 第一层:企业选择(0/1) + capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能 + quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量 return enterprise_layer, capacity_layer, quantity_layer def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: - """合并三层为完整染色体""" + """ + 合并三层为完整染色体 + :param enterprise_layer: 企业层 + :param capacity_layer: 能力层 + :param quantity_layer: 数量层 + :return: 完整染色体 + """ return np.hstack([enterprise_layer, capacity_layer, quantity_layer]) def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray: - """修复企业编码层:确保每种物料至少有一个企业生产""" + """ + 修复企业层:确保每种物料至少选择一个企业 + :param enterprise_layer: 待修复的企业层 + :return: 修复后的企业层 + """ repaired = enterprise_layer.copy() + # 计算各物料的企业编码分割点 split_points = np.cumsum(self.material_enterprise_count) - start = 0 - for i in range(self.I): - end = split_points[i] - segment = repaired[start:end] - if np.sum(segment) == 0: - select_idx = random.randint(0, len(segment)-1) + start = 0 # 起始索引 + for i in range(self.I): # 遍历每种物料 + end = split_points[i] # 当前物料的企业编码结束索引 + segment = repaired[start:end] # 当前物料的企业选择片段 + if np.sum(segment) == 0: # 未选择任何企业时,随机选一个 + select_idx = random.randint(0, len(segment) - 1) segment[select_idx] = 1 repaired[start:end] = segment - start = end + start = end # 更新起始索引 return repaired def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray: - """修复能力编码层:满足单物料产能和总产能约束""" + """ + 修复能力层:确保满足单物料产能约束和企业总产能约束 + :param enterprise_layer: 企业层(用于确定哪些企业被选中) + :param capacity_layer: 待修复的能力层 + :return: 修复后的能力层 + """ repaired = capacity_layer.copy() split_points = np.cumsum(self.material_enterprise_count) start = 0 - # 单物料产能约束 + + # 1. 修复单物料产能约束(每个企业的单物料产能不超过上限) for i in range(self.I): end = split_points[i] - ents = self.material_optional_enterprises[i] - segment = repaired[start:end] - enterprise_segment = enterprise_layer[start:end] + ents = self.material_optional_enterprises[i] # 可选企业列表 + segment = repaired[start:end] # 当前物料的产能片段 + enterprise_segment = enterprise_layer[start:end] # 企业选择状态 for idx, ent in enumerate(ents): - if enterprise_segment[idx] == 1: + if enterprise_segment[idx] == 1: # 仅处理被选中的企业 + # 确定该企业的单物料最大产能 if ent == 0: - max_cap = self.risk.C0_i_std[i] + max_cap = self.risk.C0_i_std[i] # 风险企业 else: supplier_id = ent - 1 - max_cap = self.supplier.Cj_i_std[supplier_id][i] + max_cap = self.supplier.Cj_i_std[supplier_id][i] # 供应商 + # 若产能不合法,重置为1到max_cap之间的随机值 if segment[idx] <= 0 or segment[idx] > max_cap: segment[idx] = random.uniform(1, max_cap) repaired[start:end] = segment start = end - # 总产能约束 - enterprise_total_capacity = {} + + # 2. 修复企业总产能约束(企业所有物料的总产能不超过上限) + # 先计算每个企业的当前总产能 + enterprise_total_capacity = {} # {企业ID: 总产能} start = 0 for i in range(self.I): end = split_points[i] @@ -84,20 +124,23 @@ class ChromosomeUtils: enterprise_segment = enterprise_layer[start:end] capacity_segment = repaired[start:end] for idx, ent in enumerate(ents): - if enterprise_segment[idx] == 1: + if enterprise_segment[idx] == 1: # 仅统计选中的企业 if ent not in enterprise_total_capacity: enterprise_total_capacity[ent] = 0 enterprise_total_capacity[ent] += capacity_segment[idx] start = end - # 调整超出总产能的企业 + + # 调整超出总产能上限的企业(按比例缩放) for ent, total_cap in enterprise_total_capacity.items(): + # 确定企业的总产能上限 if ent == 0: - max_total_cap = self.risk.C0_total_max + max_total_cap = self.risk.C0_total_max # 风险企业总产能上限 else: supplier_id = ent - 1 - max_total_cap = self.supplier.Cj_total_max[supplier_id] - if total_cap > max_total_cap: - scale = max_total_cap / total_cap + max_total_cap = self.supplier.Cj_total_max[supplier_id] # 供应商总产能上限 + + if total_cap > max_total_cap: # 超出上限时缩放 + scale = max_total_cap / total_cap # 缩放比例 start = 0 for i in range(self.I): end = split_points[i] @@ -106,9 +149,10 @@ class ChromosomeUtils: capacity_segment = repaired[start:end] for idx, e in enumerate(ents): if e == ent and enterprise_segment[idx] == 1: - capacity_segment[idx] *= scale + capacity_segment[idx] *= scale # 按比例缩小 repaired[start:end] = capacity_segment start = end + return repaired def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: @@ -149,11 +193,15 @@ class ChromosomeUtils: else: scale = qi / current_total segment[selected_indices] *= scale - # 步骤3:处理超出最大供应量的情况(修复中文变量名+逻辑) + # 步骤3:处理超出最大供应量的情况(添加最大迭代次数限制) need_adjust = True - while need_adjust: + max_iter = 10 # 最大迭代次数,避免无限循环 + iter_count = 0 + while need_adjust and iter_count < max_iter: # 增加迭代次数限制 + iter_count += 1 need_adjust = False total_excess = 0 + # 截断超出max_q的数量 for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi @@ -165,6 +213,7 @@ class ChromosomeUtils: total_excess += excess segment[idx] = max_q need_adjust = True + # 分配超出量到有剩余的企业 if total_excess > 0: available_indices = [] available_max = [] @@ -185,22 +234,39 @@ class ChromosomeUtils: alloc_excess = total_excess * alloc_ratio for idx, alloc in zip(available_indices, alloc_excess): segment[idx] += alloc - else: - segment[selected_indices] = np.minimum(segment[selected_indices], - [self.supplier.MaxOrder[ent-1][i] if ent !=0 else qi for ent in selected_ents]) + need_adjust = True # 分配后可能需要再次检查 + else: + # 无剩余容量,强制截断并重新缩放总量 + segment[selected_indices] = np.minimum(segment[selected_indices], + [self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi + for ent in selected_ents]) + need_adjust = True + # 检查总量误差,重新缩放 current_total = np.sum(segment[selected_indices]) if abs(current_total - qi) > 1e-6: scale = qi / current_total segment[selected_indices] *= scale - need_adjust = True + need_adjust = True # 缩放后可能需要再次检查 + # 最终强制总量等于需求(避免迭代次数到限后仍有误差) + current_total = np.sum(segment[selected_indices]) + if abs(current_total - qi) > 1e-6: + scale = qi / current_total + segment[selected_indices] *= scale repaired[start:end] = segment start = end return repaired def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray: - """完整修复染色体:企业层→能力层→数量层""" + """ + 完整修复染色体(按顺序修复三层) + :param chromosome: 待修复的染色体 + :return: 修复后的染色体 + """ + # 拆分三层 enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome) + # 依次修复(企业层→能力层→数量层,依赖关系) enterprise_layer = self.repair_enterprise_layer(enterprise_layer) capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer) quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer) + # 合并修复后的三层 return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer) \ No newline at end of file diff --git a/data_structures.py b/data_structures.py index b633486..0bdc4a7 100644 --- a/data_structures.py +++ b/data_structures.py @@ -1,99 +1,113 @@ -# 数据结构定义 +# 数据结构定义:存储订单、企业、供应商数据及算法配置 + + class OrderData: - """订单数据类""" + """订单数据类:存储物料需求、交货期、成本等信息""" def __init__(self): self.I = 5 # 物料种类数 - self.Q = [250, 300, 200, 350, 280] # 各物料待生产量 - self.Dd = 12 # 需求交货期(时长) - self.P0 = [50.0, 80.0, 60.0, 70.0, 90.0] # 风险企业单位采购价 - self.T0 = [5.0, 8.0, 6.0, 7.0, 9.0] # 风险企业单位运输成本 - self.transport_speed = 10.0 # 运输速度(单位/时间) + self.Q = [250, 300, 200, 350, 280] # 各物料的需求数量 + self.Dd = 12 # 需求交货期(单位:时间) + self.P0 = [50.0, 80.0, 60.0, 70.0, 90.0] # 风险企业的单位采购价 + self.T0 = [5.0, 8.0, 6.0, 7.0, 9.0] # 风险企业的单位运输成本 + self.transport_speed = 10.0 # 运输速度(单位:距离/时间) class RiskEnterpriseData: - """风险企业数据类(单物料产能约束+总产能约束)""" + """风险企业数据类:存储风险企业的产能、距离等信息""" def __init__(self): self.I = 5 # 物料种类数(与订单一致) - self.C0_i_std = [40.0, 50.0, 35.0, 45.0, 48.0] # 单物料单位时间标准产能 - self.C0_total_max = 100.0 # 总产能上限 - self.distance = 10.0 # 位置距离 + self.C0_i_std = [40.0, 50.0, 35.0, 45.0, 48.0] # 单物料的单位时间标准产能 + self.C0_total_max = 100.0 # 总产能上限(单位时间) + self.distance = 10.0 # 与需求点的距离 class SupplierData: - """供应商数据类(按约束文档定义)""" + """供应商数据类:存储各供应商的产能、价格、距离等信息""" def __init__(self, I=5): - self.I = I - self.supplier_count = 4 - self.names = ["S0", "S1", "S2", "S3"] - # 能否生产某物料矩阵 (supplier_count × I) + self.I = I # 物料种类数 + self.supplier_count = 4 # 供应商数量 + self.names = ["S0", "S1", "S2", "S3"] # 供应商名称 + + # 能否生产某物料的矩阵(supplier_count × I),1=能生产,0=不能 self.can_produce = [ [1, 1, 1, 1, 1], [1, 0, 1, 0, 1], [0, 1, 0, 1, 0], [0, 0, 1, 1, 1] ] - # 单物料单位时间标准生产能力 + + # 单物料单位时间标准产能(supplier_count × I) self.Cj_i_std = [ [20.0, 18.0, 15.0, 22.0, 25.0], [25.0, 0.0, 30.0, 0.0, 28.0], [0.0, 22.0, 0.0, 35.0, 0.0], [0.0, 0.0, 20.0, 30.0, 22.0] ] - # 供应商单位时间最大总生产能力 + + # 供应商单位时间的最大总产能(supplier_count) self.Cj_total_max = [120.0, 110.0, 100.0, 95.0] - # 最小起订量 + + # 最小起订量(supplier_count × I) self.MinOrder = [ [20.0, 20.0, 15.0, 25.0, 20.0], [30.0, 0.0, 25.0, 0.0, 30.0], [0.0, 25.0, 0.0, 30.0, 0.0], [0.0, 0.0, 20.0, 35.0, 25.0] ] - # 最大供应量 + + # 最大供应量(supplier_count × I) self.MaxOrder = [ [100.0, 150.0, 80.0, 120.0, 130.0], [120.0, 0.0, 100.0, 0.0, 110.0], [0.0, 140.0, 0.0, 150.0, 0.0], [0.0, 0.0, 90.0, 130.0, 100.0] ] - # 单位采购价格 + + # 单位采购价格(supplier_count × I) self.P_ij = [ [60.0, 85.0, 70.0, 80.0, 100.0], [65.0, 0.0, 75.0, 0.0, 105.0], [0.0, 90.0, 0.0, 85.0, 0.0], [0.0, 0.0, 78.0, 88.0, 98.0] ] - # 单位运输成本 + + # 单位运输成本(supplier_count × I) self.T_ij = [ [7.0, 9.0, 8.0, 10.0, 12.0], [6.0, 0.0, 9.0, 0.0, 11.0], [0.0, 10.0, 0.0, 12.0, 0.0], [0.0, 0.0, 10.0, 13.0, 14.0] ] - # 供应商位置距离 + + # 供应商与需求点的距离(supplier_count) self.distance = [45.0, 35.0, 60.0, 50.0] class Config: - """算法参数配置类""" + """算法参数配置类:存储NSGA-II的各类参数""" def __init__(self): # 种群参数 - self.pop_size = 50 - self.N1_ratio = 0.2 # 变更成本最小种群比例 - self.N2_ratio = 0.2 # 交付延期最短种群比例 - self.N3_ratio = 0.3 # 基于风险企业种群比例 + self.pop_size = 50 # 种群大小 + self.N1_ratio = 0.2 # 优先成本的种群比例 + self.N2_ratio = 0.2 # 优先延期的种群比例 + self.N3_ratio = 0.3 # 强制风险企业的种群比例 self.N4_ratio = 0.3 # 随机种群比例 + # 遗传操作参数 - self.crossover_prob = 0.8 - self.mutation_prob = 0.3 - self.max_generations = 100 + self.crossover_prob = 0.8 # 交叉概率 + self.mutation_prob = 0.3 # 变异概率 + self.max_generations = 100 # 最大进化代数 + # 惩罚系数 self.delta = 1.3 # 变更惩罚系数 self.gamma = 0.8 # 提前交付惩罚系数 + # 早停参数 - self.early_stop_patience = 50 + self.early_stop_patience = 50 # 连续多少代无改进则早停 + # 目标函数数量 - self.objective_num = 2 \ No newline at end of file + self.objective_num = 2 # 双目标(成本+延期) \ No newline at end of file diff --git a/encoder.py b/encoder.py index 8c79ff8..d5cfe08 100644 --- a/encoder.py +++ b/encoder.py @@ -1,114 +1,171 @@ import random import numpy as np -from data_structures import Config -from chromosome_utils import ChromosomeUtils -from objective_calculator import ObjectiveCalculator -from nsga2 import NSGA2 +from data_structures import Config # 配置参数类 +from chromosome_utils import ChromosomeUtils # 染色体工具类 +from objective_calculator import ObjectiveCalculator # 目标函数计算器 +from nsga2 import NSGA2 # NSGA-II算法类 class Encoder: - """种群初始化编码器""" + """种群初始化编码器:生成初始种群,包含多种初始化策略""" + def __init__(self, config: Config, utils: ChromosomeUtils): + """ + 初始化编码器 + :param config: 算法配置(如种群大小、各策略比例等) + :param utils: 染色体工具类实例 + """ self.config = config self.utils = utils - self.pop_size = config.pop_size - self.N1 = int(config.N1_ratio * self.pop_size) - self.N2 = int(config.N2_ratio * self.pop_size) - self.N3 = int(config.N3_ratio * self.pop_size) - self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 + self.pop_size = config.pop_size # 种群总大小 + + # 按比例分配四种初始化策略的个体数量 + self.N1 = int(config.N1_ratio * self.pop_size) # 优先最小化成本的个体数 + self.N2 = int(config.N2_ratio * self.pop_size) # 优先最小化延期的个体数 + self.N3 = int(config.N3_ratio * self.pop_size) # 强制选择风险企业的个体数 + self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 # 随机生成的个体数 def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray: - """生成随机染色体""" - enterprise_layer = [] - capacity_layer = [] - quantity_layer = [] - for i in range(self.utils.I): - ent_count = self.utils.material_enterprise_count[i] - ents = self.utils.material_optional_enterprises[i] - # 企业层 - e_genes = np.zeros(ent_count) - select_count = random.randint(1, ent_count) - select_indices = random.sample(range(ent_count), select_count) - e_genes[select_indices] = 1 + """ + 生成随机染色体(内部方法) + :param force_risk_enterprise: 是否强制选择风险企业(用于N3策略) + :return: 生成的染色体(经过修复) + """ + enterprise_layer = [] # 企业选择层(0/1) + capacity_layer = [] # 产能层 + quantity_layer = [] # 数量层 + + for i in range(self.utils.I): # 遍历每种物料 + ent_count = self.utils.material_enterprise_count[i] # 当前物料的可选企业数量 + ents = self.utils.material_optional_enterprises[i] # 可选企业列表(0为风险企业) + + # 1. 生成企业层(随机选择至少1个企业) + e_genes = np.zeros(ent_count) # 初始化企业选择为0(未选择) + select_count = random.randint(1, ent_count) # 随机选择1到ent_count个企业 + select_indices = random.sample(range(ent_count), select_count) # 随机选择索引 + e_genes[select_indices] = 1 # 标记选中的企业 if force_risk_enterprise: - e_genes[0] = 1 + e_genes[0] = 1 # 强制选择风险企业(索引0) enterprise_layer.extend(e_genes) - # 能力层 + + # 2. 生成能力层(为选中的企业随机分配产能) c_genes = np.zeros(ent_count) for idx, ent in enumerate(ents): - if e_genes[idx] == 1: + if e_genes[idx] == 1: # 仅为选中的企业分配产能 if ent == 0: - max_cap = self.utils.risk.C0_i_std[i] # 修复:self.utils.risk + max_cap = self.utils.risk.C0_i_std[i] # 风险企业的最大产能 else: supplier_id = ent - 1 - max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] # 修复:self.utils.supplier - c_genes[idx] = random.uniform(1, max_cap) + max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] # 供应商的最大产能 + c_genes[idx] = random.uniform(1, max_cap) # 随机产能 capacity_layer.extend(c_genes) - # 数量层 + + # 3. 生成数量层(为选中的企业随机分配数量) q_genes = np.zeros(ent_count) for idx, ent in enumerate(ents): - if e_genes[idx] == 1: + if e_genes[idx] == 1: # 仅为选中的企业分配数量 if ent == 0: - max_q = self.utils.order.Q[i] # 修复:self.utils.order + max_q = self.utils.order.Q[i] # 风险企业最多分配全部需求 else: supplier_id = ent - 1 - max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 修复:self.utils.supplier - q_genes[idx] = random.uniform(1, max_q) + max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 供应商的最大供应量 + q_genes[idx] = random.uniform(1, max_q) # 随机数量 quantity_layer.extend(q_genes) + + # 合并三层并修复染色体(确保满足所有约束) chromosome = self.utils._merge_chromosome( np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer) ) return self.utils.repair_chromosome(chromosome) def initialize_population(self) -> np.ndarray: - """初始化种群:N1+N2+N3+N4""" - pop1 = self._initialize_by_objective(self.N1, "cost") - pop2 = self._initialize_by_objective(self.N2, "tardiness") - pop3 = self._initialize_by_risk_enterprise(self.N3) - pop4 = self._initialize_random(self.N4) - # 处理空数组情况 + """ + 初始化完整种群(四种策略组合) + :return: 初始化后的种群(numpy数组) + """ + # 按四种策略生成子种群 + pop1 = self._initialize_by_objective(self.N1, "cost") # 优先成本 + pop2 = self._initialize_by_objective(self.N2, "tardiness") # 优先延期 + pop3 = self._initialize_by_risk_enterprise(self.N3) # 强制风险企业 + pop4 = self._initialize_random(self.N4) # 随机生成 + + # 过滤空数组(避免合并时报错) population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0] - if len(population_list) == 0: + if len(population_list) == 0: # 所有子种群都为空时返回空数组 return np.array([]) + + # 合并子种群并打乱顺序 population = np.vstack(population_list) np.random.shuffle(population) - return population[:self.pop_size] + return population[:self.pop_size] # 确保种群大小正确 def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray: - """基于目标函数初始化""" - if count <= 0: + """ + 基于目标函数初始化(生成候选解后选择最优的count个) + :param count: 需生成的个体数量 + :param objective_type: 优化目标("cost"或"tardiness") + :return: 筛选后的子种群 + """ + if count <= 0: # 数量为0时返回空数组 return np.array([]) - candidate_count = max(2*count, 20) + + # 生成候选解(数量为count的2倍或至少20个,确保有足够选择空间) + candidate_count = max(2 * count, 20) candidates = [self._generate_random_chromosome() for _ in range(candidate_count)] - calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 + + # 计算候选解的目标函数值 + calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, + self.config) objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] + + # 按目标函数排序并选择前count个 if objective_type == "cost": + # 按成本升序排序(成本越小越优) sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0]) else: + # 按延期升序排序(延期越小越优) sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1]) + return np.array([candidates[i] for i in sorted_indices[:count]]) def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray: - """基于风险企业初始化""" + """ + 基于风险企业初始化(强制选择风险企业,用NSGA-II筛选) + :param count: 需生成的个体数量 + :return: 筛选后的子种群 + """ if count <= 0: return np.array([]) - candidate_count = max(2*count, 20) + + # 生成强制选择风险企业的候选解 + candidate_count = max(2 * count, 20) candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)] - calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 + + # 计算目标函数并进行非支配排序 + calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, + self.config) objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] - nsga2 = NSGA2(candidate_count, 2) - ranks, fronts = nsga2.fast_non_dominated_sort(objectives) + nsga2 = NSGA2(candidate_count, 2) # 2个目标函数 + ranks, fronts = nsga2.fast_non_dominated_sort(objectives) # 非支配排序 + + # 从帕累托前沿开始选择,直到满足数量 selected = [] for front in fronts: if len(selected) + len(front) <= count: selected.extend([candidates[i] for i in front]) else: - selected.extend([candidates[i] for i in front[:count-len(selected)]]) + # 前沿数量超过剩余需求时,取部分 + selected.extend([candidates[i] for i in front[:count - len(selected)]]) break + return np.array(selected) def _initialize_random(self, count: int) -> np.ndarray: - """随机初始化""" + """ + 随机初始化(直接生成count个随机染色体) + :param count: 需生成的个体数量 + :return: 随机子种群 + """ if count <= 0: return np.array([]) return np.array([self._generate_random_chromosome() for _ in range(count)]) \ No newline at end of file diff --git a/genetic_operators.py b/genetic_operators.py index adf2d30..dc3ebe3 100644 --- a/genetic_operators.py +++ b/genetic_operators.py @@ -1,79 +1,118 @@ import random import numpy as np -from data_structures import Config -from chromosome_utils import ChromosomeUtils +from data_structures import Config # 算法配置参数类 +from chromosome_utils import ChromosomeUtils # 染色体工具类 class GeneticOperator: - """遗传操作:交叉、变异""" + """遗传操作类:实现交叉和变异操作,用于产生新个体""" + def __init__(self, config: Config, utils: ChromosomeUtils): - self.config = config - self.utils = utils + """ + 初始化遗传操作器 + :param config: 算法配置参数(如交叉概率、变异概率等) + :param utils: 染色体工具类实例(提供染色体修复等功能) + """ + self.config = config # 配置参数 + self.utils = utils # 染色体工具 def two_point_crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> tuple[np.ndarray, np.ndarray]: - """两点交叉""" - length = self.utils.chromosome_length - if length < 2: + """ + 两点交叉:在染色体上随机选择两个点,交换两点之间的基因片段 + :param parent1: 父代染色体1 + :param parent2: 父代染色体2 + :return: 两个子代染色体(经过修复) + """ + length = self.utils.chromosome_length # 染色体总长度 + if length < 2: # 染色体长度不足2时无法交叉,直接返回父代副本 return parent1.copy(), parent2.copy() - point1 = random.randint(0, length//2) - point2 = random.randint(point1+1, length-1) + + # 随机选择两个交叉点(point1 < point2) + point1 = random.randint(0, length // 2) + point2 = random.randint(point1 + 1, length - 1) + + # 复制父代基因并交换片段 child1 = parent1.copy() child2 = parent2.copy() - child1[point1:point2] = parent2[point1:point2] + child1[point1:point2] = parent2[point1:point2] # 交换point1到point2之间的基因 child2[point1:point2] = parent1[point1:point2] + + # 修复染色体(确保满足约束条件) child1 = self.utils.repair_chromosome(child1) child2 = self.utils.repair_chromosome(child2) return child1, child2 def uniform_mutation(self, chromosome: np.ndarray) -> np.ndarray: - """均匀变异:遍历三层每个基因""" - mutated = chromosome.copy() + """ + 均匀变异:对染色体的三层基因(企业层、能力层、数量层)进行随机变异 + :param chromosome: 待变异的染色体 + :return: 变异后的染色体(经过修复) + """ + mutated = chromosome.copy() # 复制原始染色体,避免直接修改 + + # 拆分染色体为三层 enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(mutated) + # 计算各物料的企业编码在染色体中的分割点(用于分层处理) split_points = np.cumsum(self.utils.material_enterprise_count) - # 企业层变异(0/1翻转) - start = 0 - for i in range(self.utils.I): - end = split_points[i] + + # 1. 企业层变异(0/1翻转,即是否选择该企业) + start = 0 # 起始索引 + for i in range(self.utils.I): # 遍历每种物料 + end = split_points[i] # 当前物料的企业编码结束索引 + # 遍历该物料的所有可选企业 for idx in range(start, end): + # 以设定的变异概率进行翻转 if random.random() < self.config.mutation_prob: - enterprise_layer[idx] = 1 - enterprise_layer[idx] - start = end + enterprise_layer[idx] = 1 - enterprise_layer[idx] # 0→1或1→0 + start = end # 更新起始索引为下一个物料 + # 修复企业层(确保每种物料至少选择一个企业) enterprise_layer = self.utils.repair_enterprise_layer(enterprise_layer) - # 能力层变异 + + # 2. 能力层变异(调整选中企业的产能) start = 0 for i in range(self.utils.I): end = split_points[i] - ents = self.utils.material_optional_enterprises[i] - e_segment = enterprise_layer[start:end] + ents = self.utils.material_optional_enterprises[i] # 当前物料的可选企业列表 + e_segment = enterprise_layer[start:end] # 当前物料的企业选择状态 for idx in range(start, end): - if random.random() < self.config.mutation_prob and e_segment[idx-start] == 1: - ent = ents[idx-start] + # 仅对被选中的企业(e_segment[idx-start]==1)进行变异 + if random.random() < self.config.mutation_prob and e_segment[idx - start] == 1: + ent = ents[idx - start] # 企业ID(0为风险企业,1+为供应商) + # 确定该企业的最大产能 if ent == 0: - max_cap = self.utils.risk.C0_i_std[i] + max_cap = self.utils.risk.C0_i_std[i] # 风险企业的单物料产能 else: - supplier_id = ent - 1 - max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] + supplier_id = ent - 1 # 供应商索引(转换为0基) + max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] # 供应商的单物料产能 + # 随机生成1到max_cap之间的新产能 capacity_layer[idx] = random.uniform(1, max_cap) start = end + # 修复能力层(确保不超过企业总产能约束) capacity_layer = self.utils.repair_capacity_layer(enterprise_layer, capacity_layer) - # 数量层变异 + + # 3. 数量层变异(调整选中企业的生产数量) start = 0 for i in range(self.utils.I): end = split_points[i] ents = self.utils.material_optional_enterprises[i] e_segment = enterprise_layer[start:end] for idx in range(start, end): - if random.random() < self.config.mutation_prob and e_segment[idx-start] == 1: - ent = ents[idx-start] - qi = self.utils.order.Q[i] + # 仅对被选中的企业进行变异 + if random.random() < self.config.mutation_prob and e_segment[idx - start] == 1: + ent = ents[idx - start] + qi = self.utils.order.Q[i] # 当前物料的总需求 + # 确定该企业的最大可分配数量 if ent == 0: - max_q = qi + max_q = qi # 风险企业最多分配全部需求 else: supplier_id = ent - 1 - max_q = self.supplier.MaxOrder[supplier_id][i] + max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 供应商的最大供应量 + # 随机生成1到max_q之间的新数量 quantity_layer[idx] = random.uniform(1, max_q) start = end + # 修复数量层(确保总数量满足需求,且不超过企业最大供应) quantity_layer = self.utils.repair_quantity_layer(enterprise_layer, quantity_layer) - # 合并并修复 + + # 合并三层并返回 mutated = self.utils._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer) return mutated \ No newline at end of file diff --git a/main.py b/main.py index 53b470a..80e203b 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import random import numpy as np +# 导入数据结构和工具类(根据实际项目结构调整导入路径) from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config from chromosome_utils import ChromosomeUtils from objective_calculator import ObjectiveCalculator @@ -10,105 +11,162 @@ from visualizer import ResultVisualizer def main(): - # 初始化数据 - order_data = OrderData() - risk_data = RiskEnterpriseData() - supplier_data = SupplierData() - config = Config() + """主函数:执行NSGA-II算法求解多目标优化问题""" + try: + # 1. 初始化随机种子(确保结果可复现) + random.seed(42) + np.random.seed(42) - # 初始化工具类 - utils = ChromosomeUtils(order_data, risk_data, supplier_data) - calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) - encoder = Encoder(config, utils) - genetic_op = GeneticOperator(config, utils) - nsga2 = NSGA2(config.pop_size, config.objective_num) - visualizer = ResultVisualizer(utils) + # 2. 初始化数据(订单、风险企业、供应商、算法配置) + print("初始化数据结构...") + order_data = OrderData() # 订单数据(需求、交货期等) + risk_data = RiskEnterpriseData() # 风险企业数据 + supplier_data = SupplierData() # 供应商数据 + config = Config() # 算法参数配置 - # 初始化种群 - population = encoder.initialize_population() - print(f"初始化种群完成,种群大小: {population.shape}") + # 3. 初始化工具类和算法组件 + print("初始化算法组件...") + utils = ChromosomeUtils(order_data, risk_data, supplier_data) # 染色体工具 + calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) # 目标函数计算器 + encoder = Encoder(config, utils) # 种群初始化编码器 + genetic_op = GeneticOperator(config, utils) # 遗传操作器(交叉、变异) + nsga2 = NSGA2(config.pop_size, config.objective_num) # NSGA-II算法实例 + visualizer = ResultVisualizer(utils) # 结果可视化工具 - # 记录历史 - all_objectives = [] - convergence_history = [] - best_front = [] - best_front_objs = [] - no_improve_count = 0 - prev_best_avg = float('inf') + # 4. 初始化种群 + print("初始化种群...") + population = encoder.initialize_population() + print(f"初始化种群完成,种群大小: {population.shape if population.size > 0 else '空'}") + # 若种群初始化失败(为空),直接退出 + if population.size == 0: + print("错误:种群初始化失败,无法继续进化") + return - # 进化过程 - for generation in range(config.max_generations): - # 计算目标函数 - objectives = [calculator.calculate_objectives(chrom) for chrom in population] - all_objectives.extend(objectives) + # 5. 记录进化过程中的历史数据 + all_objectives = [] # 所有代的目标函数值 + convergence_history = [] # 收敛趋势(每代最优前沿的平均目标值) + best_front = [] # 最终帕累托前沿解 + best_front_objs = [] # 最终帕累托前沿的目标值 + no_improve_count = 0 # 无改进计数器(用于早停) + prev_best_avg = float('inf') # 上一代的最优平均目标值 - # 记录当前代的最优前沿 - ranks, fronts = nsga2.fast_non_dominated_sort(objectives) - current_front = fronts[0] - current_front_objs = [objectives[i] for i in current_front] - best_front = population[current_front] - best_front_objs = current_front_objs + # 6. 进化主循环 + print(f"开始进化(最大代数:{config.max_generations},早停耐心:{config.early_stop_patience})...") + for generation in range(config.max_generations): + try: + # 计算当前种群的目标函数值 + objectives = [calculator.calculate_objectives(chrom) for chrom in population] + all_objectives.extend(objectives) # 记录所有目标值 - # 收敛判断(基于前沿平均目标值) - if len(current_front_objs) > 0: - avg_cost = sum(obj[0] for obj in current_front_objs) / len(current_front_objs) - avg_tardiness = sum(obj[1] for obj in current_front_objs) / len(current_front_objs) - convergence_history.append((avg_cost, avg_tardiness)) - current_avg = avg_cost + avg_tardiness - if abs(current_avg - prev_best_avg) < 1e-4: - no_improve_count += 1 - else: - no_improve_count = 0 - prev_best_avg = current_avg + # 非支配排序,获取当前代的帕累托前沿 + ranks, fronts = nsga2.fast_non_dominated_sort(objectives) + current_front = fronts[0] if fronts else [] # 第0层为最优前沿 + current_front_objs = [objectives[i] for i in current_front] if current_front else [] + best_front = population[current_front] if current_front else [] # 更新当前最优前沿解 + best_front_objs = current_front_objs # 更新当前最优前沿目标值 + + # 记录收敛趋势(基于最优前沿的平均目标值) + if len(current_front_objs) > 0: + avg_cost = sum(obj[0] for obj in current_front_objs) / len(current_front_objs) + avg_tardiness = sum(obj[1] for obj in current_front_objs) / len(current_front_objs) + convergence_history.append((avg_cost, avg_tardiness)) + + # 判断是否改进(用于早停) + current_avg = avg_cost + avg_tardiness # 合并两个目标的平均值 + if abs(current_avg - prev_best_avg) < 1e-4: # 变化小于阈值,视为无改进 + no_improve_count += 1 + else: + no_improve_count = 0 # 有改进,重置计数器 + prev_best_avg = current_avg + else: + no_improve_count += 1 # 无前沿解,视为无改进 + + # 选择操作(锦标赛选择) + selected = nsga2.selection(population, objectives) + # 校验选择后的种群大小 + assert len(selected) == config.pop_size, \ + f"选择后的种群大小({len(selected)})与目标大小({config.pop_size})不符" + + # 交叉操作(两点交叉)- 修复索引越界问题 + offspring = [] # 子代种群 + selected_len = len(selected) # selected的长度(等于pop_size) + i = 0 + max_iter = 2 * config.pop_size # 最大迭代次数,避免无限循环 + iter_count = 0 + while len(offspring) < config.pop_size and iter_count < max_iter: + iter_count += 1 + # 检查i是否超出selected范围,若超出则重置(循环使用个体) + if i >= selected_len: + i = 0 + # 尝试两两交叉(需i+1在范围内,且满足交叉概率) + if i + 1 < selected_len and random.random() < config.crossover_prob: + parent1 = selected[i] + parent2 = selected[i + 1] + child1, child2 = genetic_op.two_point_crossover(parent1, parent2) + # 添加child1(若未达到种群大小) + if len(offspring) < config.pop_size: + offspring.append(child1) + # 添加child2(若未达到种群大小) + if len(offspring) < config.pop_size: + offspring.append(child2) + i += 2 # 处理下一对个体 + else: + # 直接添加当前父代(避免i+1越界) + offspring.append(selected[i]) + i += 1 # 处理下一个个体(步长改为1,避免快速越界) + + # 若迭代次数用尽仍未生成足够子代,补充随机个体(健壮性处理) + while len(offspring) < config.pop_size: + offspring.append(encoder._generate_random_chromosome()) # 需确保该方法可访问 + + # 变异操作(均匀变异) + offspring = [ + genetic_op.uniform_mutation(chrom) if random.random() < config.mutation_prob else chrom + for chrom in offspring + ] + offspring = np.array(offspring[:config.pop_size]) # 确保子代大小严格等于pop_size + + # 合并父代和子代,准备环境选择 + combined = np.vstack([population, offspring]) # 合并种群 + # 计算合并种群的目标函数值 + combined_objs = objectives + [calculator.calculate_objectives(chrom) for chrom in offspring] + + # 环境选择(保留最优的pop_size个个体) + population, objectives = nsga2.environmental_selection(combined, combined_objs) + # 校验环境选择后的种群大小 + assert len(population) == config.pop_size, \ + f"环境选择后的种群大小({len(population)})与目标大小({config.pop_size})不符" + + # 早停检查(连续多代无改进则停止) + if no_improve_count >= config.early_stop_patience: + print(f"早停触发:连续{no_improve_count}代无改进,终止于第{generation}代") + break + + # 每50代打印一次进度 + if generation % 50 == 0: + front_size = len(current_front) if current_front else 0 + print(f"第{generation}代完成,当前最优前沿大小: {front_size},无改进计数: {no_improve_count}") + + except Exception as e: + print(f"第{generation}代进化出错:{str(e)},跳过当前代") + continue + + # 7. 结果可视化与输出 + print("进化完成,处理结果...") + if len(best_front_objs) > 0: + visualizer.plot_pareto_front(all_objectives, best_front_objs) # 绘制帕累托前沿 + visualizer.plot_convergence(convergence_history) # 绘制收敛趋势 + visualizer.print_pareto_solutions(best_front, best_front_objs) # 打印最优解详情 else: - no_improve_count += 1 + print("未找到有效帕累托前沿解") - # 选择(锦标赛选择) - selected = nsga2.selection(population, objectives) - - # 交叉 - offspring = [] - i = 0 - while len(offspring) < config.pop_size: - if i+1 < config.pop_size and random.random() < config.crossover_prob: - parent1 = selected[i] - parent2 = selected[i+1] - child1, child2 = genetic_op.two_point_crossover(parent1, parent2) - offspring.append(child1) - if len(offspring) < config.pop_size: - offspring.append(child2) - else: - offspring.append(selected[i]) - i += 2 - - # 变异 - offspring = [genetic_op.uniform_mutation(chrom) if random.random() < config.mutation_prob else chrom - for chrom in offspring] - offspring = np.array(offspring[:config.pop_size]) - - # 合并父代和子代 - combined = np.vstack([population, offspring]) - combined_objs = objectives + [calculator.calculate_objectives(chrom) for chrom in offspring] - - # 环境选择(核心:NSGA-II的环境选择) - population, objectives = nsga2.environmental_selection(combined, combined_objs) - - # 早停检查 - if no_improve_count >= config.early_stop_patience: - print(f"早停于第 {generation} 代") - break - - if generation % 50 == 0: - print(f"第 {generation} 代完成,当前最优前沿大小: {len(current_front)}") - - # 结果可视化与打印 - print("进化完成,绘制结果...") - visualizer.plot_pareto_front(all_objectives, best_front_objs) - visualizer.plot_convergence(convergence_history) - visualizer.print_pareto_solutions(best_front, best_front_objs) + except Exception as e: + print(f"程序运行出错:{str(e)}") + import traceback + traceback.print_exc() # 打印详细错误栈 if __name__ == "__main__": - random.seed(42) - np.random.seed(42) - main() \ No newline at end of file + print("程序启动...") + main() + print("程序结束") \ No newline at end of file diff --git a/nsga2.py b/nsga2.py index c74c61a..21ee758 100644 --- a/nsga2.py +++ b/nsga2.py @@ -3,119 +3,181 @@ import numpy as np class NSGA2: + """NSGA-II算法类:实现多目标优化的非支配排序、选择、环境选择等核心操作""" + def __init__(self, pop_size, objective_num): - self.pop_size = pop_size - self.objective_num = objective_num + """ + 初始化NSGA-II + :param pop_size: 种群大小 + :param objective_num: 目标函数数量 + """ + self.pop_size = pop_size # 种群大小 + self.objective_num = objective_num # 目标数量 def fast_non_dominated_sort(self, objective_values): - """标准快速非支配排序实现""" + """ + 快速非支配排序:将个体按支配关系分层(前沿) + :param objective_values: 所有个体的目标函数值列表 + :return: ranks(每个个体的层级)和 fronts(每层的个体索引列表) + """ pop_size = len(objective_values) - domination_count = [0] * pop_size - dominated_solutions = [[] for _ in range(pop_size)] - ranks = [0] * pop_size - front = [[]] # 第0层 + domination_count = [0] * pop_size # 每个个体被支配的次数 + dominated_solutions = [[] for _ in range(pop_size)] # 每个个体支配的个体列表 + ranks = [0] * pop_size # 每个个体的层级(0为最优) + front = [[]] # 第0层前沿(初始为空) - # 计算支配关系 + # 1. 计算支配关系 for p in range(pop_size): for q in range(pop_size): if p == q: - continue - # p支配q? + continue # 个体不与自身比较 + # 判断p是否支配q if self._dominates(objective_values[p], objective_values[q]): - dominated_solutions[p].append(q) - # q支配p? + dominated_solutions[p].append(q) # p支配q,记录q + # 判断q是否支配p elif self._dominates(objective_values[q], objective_values[p]): - domination_count[p] += 1 + domination_count[p] += 1 # p被q支配,计数+1 + # 若p未被任何个体支配,加入第0层前沿 if domination_count[p] == 0: ranks[p] = 0 front[0].append(p) - # 处理后续层 + # 2. 生成后续层级的前沿 i = 0 - while len(front[i]) > 0: - next_front = [] - for p in front[i]: - for q in dominated_solutions[p]: - domination_count[q] -= 1 - if domination_count[q] == 0: - ranks[q] = i + 1 - next_front.append(q) + while len(front[i]) > 0: # 当前层非空时继续 + next_front = [] # 下一层前沿 + for p in front[i]: # 遍历当前层的每个个体 + for q in dominated_solutions[p]: # p支配的个体q + domination_count[q] -= 1 # q的被支配计数-1 + if domination_count[q] == 0: # q不再被任何个体支配 + ranks[q] = i + 1 # 层级为当前层+1 + next_front.append(q) # 加入下一层 i += 1 - front.append(next_front) + front.append(next_front) # 追加下一层 - return ranks, front[:-1] # 返回rank数组和各层front列表 + return ranks, front[:-1] # 返回层级和非空前沿 def _dominates(self, a, b): - """判断a是否支配b:a的所有目标≤b,且至少一个目标= distances[j]): selected.append(population[i]) else: selected.append(population[j]) + return np.array(selected) def _calculate_all_crowding_distances(self, objective_values, ranks): - """计算所有个体的拥挤度""" - max_rank = max(ranks) if ranks else 0 - all_distances = [0.0] * len(objective_values) + """ + 计算所有个体的拥挤度(内部方法) + :param objective_values: 目标函数值 + :param ranks: 每个个体的层级 + :return: 所有个体的拥挤度列表 + """ + max_rank = max(ranks) if ranks else 0 # 最大层级 + all_distances = [0.0] * len(objective_values) # 所有个体的拥挤度 + + # 按层级计算拥挤度 for r in range(max_rank + 1): - front = [i for i in range(len(objective_values)) if ranks[i] == r] - if len(front) <= 1: + front = [i for i in range(len(objective_values)) if ranks[i] == r] # 当前层级的个体 + if len(front) <= 1: # 单个个体拥挤度为无穷大 for i in front: all_distances[i] = float('inf') continue + # 计算当前层级的拥挤度 distances = self.crowding_distance(objective_values, front) + # 映射到全局索引 for i, idx in enumerate(front): all_distances[idx] = distances[i] + return all_distances def environmental_selection(self, population, objective_values): - """环境选择:从合并种群中选择pop_size个个体""" + """ + 环境选择:从合并的父代和子代中选择最优的pop_size个个体 + :param population: 合并的种群(父代+子代) + :param objective_values: 合并种群的目标函数值 + :return: 选择后的种群和对应的目标值 + """ + # 非支配排序 ranks, fronts = self.fast_non_dominated_sort(objective_values) - selected = [] + selected = [] # 选中的个体索引 + + # 按层级从优到劣选择,直到接近种群大小 for front in fronts: if len(selected) + len(front) <= self.pop_size: - selected.extend(front) + selected.extend(front) # 整层加入 else: - # 计算当前front的拥挤度,选择剩余数量的个体 + # 当前层无法全加入时,按拥挤度选择剩余个体 distances = self.crowding_distance(objective_values, front) + # 按拥挤度降序排序(优先选择分散的个体) sorted_front = [x for _, x in sorted(zip(distances, front), reverse=True)] + # 选择剩余所需数量 selected.extend(sorted_front[:self.pop_size - len(selected)]) break + + # 返回选中的个体和对应的目标值 return population[selected], [objective_values[i] for i in selected] \ No newline at end of file diff --git a/objective_calculator.py b/objective_calculator.py index 60218ab..0bb1259 100644 --- a/objective_calculator.py +++ b/objective_calculator.py @@ -4,100 +4,118 @@ from chromosome_utils import ChromosomeUtils class ObjectiveCalculator: - """目标函数计算器:变更成本C + 交付延期T""" + """目标函数计算器:计算双目标值(变更成本 + 交付延期)""" def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData, utils: ChromosomeUtils, config: Config): + """ + 初始化计算器 + :param order_data: 订单数据 + :param risk_data: 风险企业数据 + :param supplier_data: 供应商数据 + :param utils: 染色体工具类 + :param config: 算法配置(惩罚系数等) + """ self.order = order_data self.risk = risk_data self.supplier = supplier_data self.utils = utils self.config = config + # 预计算物料企业编码的分割点(提高效率) self.split_points = np.cumsum(utils.material_enterprise_count) def calculate_objectives(self, chromosome: np.ndarray) -> tuple[float, float]: - """计算双目标值:(变更成本C, 交付延期T)""" + """ + 计算双目标值 + :param chromosome: 染色体(解) + :return: (变更成本C, 交付延期T) + """ + # 拆分染色体为三层 enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome) - # 修复:传入capacity_layer参数 + # 计算变更成本和交付延期 C = self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer) T = self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer) return C, T - # 修复:添加capacity_layer参数 - def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> float: - """计算变更成本C = C1 + C2 + C3 + C4""" - C1 = 0.0 # 变更惩罚成本 - C2 = 0.0 # 采购变更成本 - C3 = 0.0 # 运输变更成本 - C4 = 0.0 # 提前交付惩罚成本 + def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, + quantity_layer: np.ndarray) -> float: + """ + 计算变更成本 C = C1 + C2 + C3 + C4 + - C1: 变更惩罚成本(使用供应商替代风险企业的惩罚) + - C2: 采购成本差异(变更后 - 原始) + - C3: 运输成本差异(变更后 - 原始) + - C4: 提前交付惩罚成本 + """ + C1 = 0.0 + C2 = 0.0 + C3 = 0.0 + C4 = 0.0 - # 原始成本(全部由风险企业生产) + # 原始成本(全部由风险企业生产时的成本) original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I)) original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I)) - # 变更后成本 + # 变更后成本(当前解的成本) new_purchase_cost = 0.0 new_transport_cost = 0.0 - risk_production = np.zeros(self.order.I) # 风险企业生产数量 - supplier_production = np.zeros(self.order.I) # 供应商生产数量 + risk_production = np.zeros(self.order.I) # 风险企业生产的数量 + supplier_production = np.zeros(self.order.I) # 供应商生产的数量 start = 0 - for i in range(self.order.I): + for i in range(self.order.I): # 遍历每种物料 end = self.split_points[i] - ents = self.utils.material_optional_enterprises[i] - e_segment = enterprise_layer[start:end] - q_segment = quantity_layer[start:end] + ents = self.utils.material_optional_enterprises[i] # 可选企业 + e_segment = enterprise_layer[start:end] # 企业选择状态 + q_segment = quantity_layer[start:end] # 数量分配 for idx, ent in enumerate(ents): - if e_segment[idx] == 1: - q = q_segment[idx] - if ent == 0: - # 风险企业 + if e_segment[idx] == 1: # 仅处理被选中的企业 + q = q_segment[idx] # 分配的数量 + if ent == 0: # 风险企业 risk_production[i] += q - new_purchase_cost += q * self.order.P0[i] - new_transport_cost += q * self.order.T0[i] - else: - # 供应商 + new_purchase_cost += q * self.order.P0[i] # 采购成本 + new_transport_cost += q * self.order.T0[i] # 运输成本 + else: # 供应商 supplier_id = ent - 1 supplier_production[i] += q - new_purchase_cost += q * self.supplier.P_ij[supplier_id][i] - new_transport_cost += q * self.supplier.T_ij[supplier_id][i] + new_purchase_cost += q * self.supplier.P_ij[supplier_id][i] # 采购成本 + new_transport_cost += q * self.supplier.T_ij[supplier_id][i] # 运输成本 start = end - # 计算C1:变更惩罚成本 + # 计算C1:变更惩罚成本(对供应商生产的部分收取惩罚) for i in range(self.order.I): + # 惩罚系数×供应商生产数量×(风险企业的单位采购+运输成本) C1 += self.config.delta * supplier_production[i] * (self.order.P0[i] + self.order.T0[i]) - # 计算C2:采购变更成本 + # 计算C2:采购成本差异(变更后 - 原始) C2 = new_purchase_cost - original_purchase_cost - # 计算C3:运输变更成本 + # 计算C3:运输成本差异(变更后 - 原始) C3 = new_transport_cost - original_transport_cost - # 计算C4:提前交付惩罚成本 - # 修复:传入capacity_layer参数 + # 计算C4:提前交付惩罚成本(若实际交货期早于需求交货期) actual_delivery_time = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) if actual_delivery_time < self.order.Dd: - # 计算风险企业和各供应商的交货时间 - risk_delivery = [] - supplier_deliveries = {} + # 计算风险企业和供应商的交货时间 + risk_delivery = [] # 风险企业的各物料交货时间 + supplier_deliveries = {} # {供应商ID: 各物料交货时间} + start = 0 for i in range(self.order.I): end = self.split_points[i] ents = self.utils.material_optional_enterprises[i] e_segment = enterprise_layer[start:end] - c_segment = capacity_layer[start:end] - q_segment = quantity_layer[start:end] + c_segment = capacity_layer[start:end] # 产能(用于计算生产时间) + q_segment = quantity_layer[start:end] # 数量 for idx, ent in enumerate(ents): if e_segment[idx] == 1: q = q_segment[idx] c = c_segment[idx] - if c == 0: - production_time = 0 - else: - production_time = q / c + # 生产时间 = 数量 / 产能(产能为0时按0处理) + production_time = q / c if c != 0 else 0 + # 运输时间 = 距离 / 运输速度 if ent == 0: transport_time = self.risk.distance / self.order.transport_speed risk_delivery.append(production_time + transport_time) @@ -109,48 +127,58 @@ class ObjectiveCalculator: supplier_deliveries[supplier_id].append(production_time + transport_time) start = end + # 风险企业的最大交货时间(取最长) D0 = max(risk_delivery) if risk_delivery else 0 + # 所有供应商的最大交货时间之和 Dj_sum = sum(max(times) for times in supplier_deliveries.values()) if supplier_deliveries else 0 + # 提前交付惩罚 = 惩罚系数 ×(需求交货期 - 风险企业交货时间 + 供应商总交货时间) C4 = self.config.gamma * ((self.order.Dd - D0) + Dj_sum) return C1 + C2 + C3 + C4 def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> float: - """计算实际交货期:所有企业中最长的生产+运输时间""" - max_time = 0.0 + """ + 计算实际交货期:所有企业中最长的(生产时间 + 运输时间) + :return: 实际交货期 + """ + max_time = 0.0 # 最大时间(实际交货期) start = 0 + for i in range(self.order.I): end = self.split_points[i] ents = self.utils.material_optional_enterprises[i] e_segment = enterprise_layer[start:end] - c_segment = capacity_layer[start:end] - q_segment = quantity_layer[start:end] + c_segment = capacity_layer[start:end] # 产能 + q_segment = quantity_layer[start:end] # 数量 for idx, ent in enumerate(ents): - if e_segment[idx] == 1: + if e_segment[idx] == 1: # 仅处理选中的企业 q = q_segment[idx] c = c_segment[idx] - if c == 0: - production_time = 0 - else: - production_time = q / c + # 生产时间 = 数量 / 产能(产能为0时按0处理) + production_time = q / c if c != 0 else 0 - # 计算运输时间 + # 运输时间 if ent == 0: transport_time = self.risk.distance / self.order.transport_speed else: supplier_id = ent - 1 transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed + # 总时间 = 生产时间 + 运输时间 total_time = production_time + transport_time if total_time > max_time: - max_time = total_time + max_time = total_time # 更新最大时间 start = end + return max_time def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> float: - """计算交付延期T = max(0, 实际交货期 - 需求交货期)""" + """ + 计算交付延期:max(0, 实际交货期 - 需求交货期) + :return: 延期时间(非负) + """ actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) return max(0.0, actual_delivery - self.order.Dd) \ No newline at end of file diff --git a/visualizer.py b/visualizer.py index 31e70d7..18d9afe 100644 --- a/visualizer.py +++ b/visualizer.py @@ -1,93 +1,132 @@ import matplotlib.pyplot as plt import numpy as np -from chromosome_utils import ChromosomeUtils +from chromosome_utils import ChromosomeUtils # 染色体工具类 class ResultVisualizer: - """结果可视化工具""" + """结果可视化工具类:绘制帕累托前沿、收敛曲线,打印最优解详情""" + def __init__(self, utils: ChromosomeUtils): + """ + 初始化可视化工具 + :param utils: 染色体工具类实例(用于解析染色体) + """ self.utils = utils - self.figsize = (12, 8) + self.figsize = (12, 8) # 图表大小 + # 企业名称列表(风险企业+供应商) self.supplier_names = ["RiskEnterprise"] + self.utils.supplier.names def plot_pareto_front(self, all_objectives: list[tuple[float, float]], non_dominated_objectives: list[tuple[float, float]]): - """绘制帕累托前沿""" + """ + 绘制帕累托前沿(所有解 vs 帕累托最优解) + :param all_objectives: 所有解的目标值 + :param non_dominated_objectives: 帕累托最优解的目标值 + """ plt.figure(figsize=self.figsize) + # 绘制所有解(蓝色点) if all_objectives: - c_all = [obj[0] for obj in all_objectives] - t_all = [obj[1] for obj in all_objectives] + c_all = [obj[0] for obj in all_objectives] # 成本 + t_all = [obj[1] for obj in all_objectives] # 延期 plt.scatter(c_all, t_all, color='blue', alpha=0.3, label='All Solutions', zorder=1) + # 绘制帕累托前沿(红色点) if non_dominated_objectives: c_nd = [obj[0] for obj in non_dominated_objectives] t_nd = [obj[1] for obj in non_dominated_objectives] plt.scatter(c_nd, t_nd, color='red', s=50, label='Pareto Front', zorder=5) + # 图表设置 plt.title('Pareto Front: Change Cost vs Tardiness') plt.xlabel('Change Cost') plt.ylabel('Tardiness') plt.legend() plt.grid(True, alpha=0.5) - plt.savefig('pareto_front.png', dpi=300, bbox_inches='tight') + plt.savefig('pareto_front.png', dpi=300, bbox_inches='tight') # 保存图片 plt.show() def plot_convergence(self, convergence_history: list[tuple[float, float]]): - """绘制收敛趋势图""" - if len(convergence_history) == 0: + """ + 绘制收敛趋势图(每代最优前沿的平均成本和延期) + :param convergence_history: 收敛历史数据((平均成本, 平均延期)) + """ + if len(convergence_history) == 0: # 无数据时不绘制 return plt.figure(figsize=self.figsize) - generations = list(range(len(convergence_history))) - costs = [h[0] for h in convergence_history] - tardiness = [h[1] for h in convergence_history] + generations = list(range(len(convergence_history))) # 代数 + costs = [h[0] for h in convergence_history] # 平均成本 + tardiness = [h[1] for h in convergence_history] # 平均延期 + + # 子图1:平均成本趋势 plt.subplot(2, 1, 1) plt.plot(generations, costs, 'b-') plt.title('Convergence History') plt.ylabel('Average Change Cost') plt.grid(True, alpha=0.5) + + # 子图2:平均延期趋势 plt.subplot(2, 1, 2) plt.plot(generations, tardiness, 'r-') plt.xlabel('Generation') plt.ylabel('Average Tardiness') plt.grid(True, alpha=0.5) - plt.tight_layout() - plt.savefig('convergence_history.png', dpi=300, bbox_inches='tight') + + plt.tight_layout() # 调整布局 + plt.savefig('convergence_history.png', dpi=300, bbox_inches='tight') # 保存图片 plt.show() def print_solution_details(self, solution: np.ndarray, objectives: tuple[float, float]): - """打印单个解的详细信息(含数量校验)""" + """ + 打印单个解的详细信息(企业选择、产能、数量等) + :param solution: 染色体(解) + :param objectives: 该解的目标值(成本, 延期) + """ + # 拆分染色体为三层 enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(solution) - split_points = np.cumsum(self.utils.material_enterprise_count) + split_points = np.cumsum(self.utils.material_enterprise_count) # 分割点 + print(f"\n变更成本: {objectives[0]:.2f}, 交付延期: {objectives[1]:.2f}") print("=" * 80) - total_q_check = [] + total_q_check = [] # 检查各物料的总数量是否满足需求 + start = 0 - for i in range(self.utils.I): + for i in range(self.utils.I): # 遍历每种物料 end = split_points[i] - ents = self.utils.material_optional_enterprises[i] - e_segment = enterprise_layer[start:end] - c_segment = capacity_layer[start:end] - q_segment = quantity_layer[start:end] - demand_q = self.utils.order.Q[i] - print(f"物料 {i} - 需求数量: {demand_q}, 分配总量: {np.sum(q_segment[e_segment==1]):.2f}") - total_q_check.append(abs(np.sum(q_segment[e_segment==1]) - demand_q) < 1e-2) + ents = self.utils.material_optional_enterprises[i] # 可选企业 + e_segment = enterprise_layer[start:end] # 企业选择状态 + c_segment = capacity_layer[start:end] # 产能 + q_segment = quantity_layer[start:end] # 数量 + + demand_q = self.utils.order.Q[i] # 需求数量 + allocated_q = np.sum(q_segment[e_segment == 1]) # 分配的总数量 + print(f"物料 {i} - 需求数量: {demand_q}, 分配总量: {allocated_q:.2f}") + total_q_check.append(abs(allocated_q - demand_q) < 1e-2) # 检查是否满足需求 + print(f" 选择的企业及其分配:") for idx, ent in enumerate(ents): - if e_segment[idx] == 1: - ent_name = self.supplier_names[ent] - cap = c_segment[idx] - qty = q_segment[idx] + if e_segment[idx] == 1: # 仅打印被选中的企业 + ent_name = self.supplier_names[ent] # 企业名称 + cap = c_segment[idx] # 产能 + qty = q_segment[idx] # 数量 print(f" 企业: {ent_name}, 产能: {cap:.2f}, 分配数量: {qty:.2f}") + start = end print("-" * 80) + + # 验证数量约束是否满足 if all(total_q_check): print("✅ 所有物料数量满足需求约束") else: print("❌ 部分物料数量未满足需求约束") def print_pareto_solutions(self, population: np.ndarray, objectives: list[tuple[float, float]]): - """打印帕累托前沿解的详细信息""" + """ + 打印帕累托前沿解的详细信息(最多打印前5个) + :param population: 帕累托前沿解的种群 + :param objectives: 对应的目标值列表 + """ print("\n" + "=" * 100) print("帕累托前沿解详细方案") print("=" * 100) + # 打印前5个解(或所有解,取较少者) for i in range(min(5, len(population))): - print(f"\n===== 最优解 {i+1} =====") + print(f"\n===== 最优解 {i + 1} =====") self.print_solution_details(population[i], objectives[i]) \ No newline at end of file