import random import numpy as np from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类 class ChromosomeUtils: """染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件""" def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData): """ 初始化工具类 :param order_data: 订单数据(需求、交货期等) :param risk_data: 风险企业数据(产能等) :param supplier_data: 供应商数据(产能、价格等) """ self.order = order_data # 订单数据 self.risk = risk_data # 风险企业数据 self.supplier = supplier_data # 供应商数据 self.I = order_data.I # 物料种类数 # 预计算每个物料的可选企业列表(0=风险企业,1+为供应商ID) self.material_optional_enterprises = self._get_material_optional_enterprises() # 每个物料的可选企业数量 self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises] # 染色体总长度(3层:企业层+能力层+数量层) self.chromosome_length = 3 * sum(self.material_enterprise_count) def _get_material_optional_enterprises(self) -> list[list[int]]: """ 生成每个物料的可选企业列表(内部方法) :return: 二维列表,每个子列表为对应物料的可选企业ID """ optional = [] for i in range(self.I): # 遍历每种物料 ents = [0] # 先加入风险企业(ID=0) # 加入可生产该物料的供应商(ID=1+供应商索引) for j in range(self.supplier.supplier_count): if self.supplier.can_produce[j][i] == 1: # 供应商j可生产物料i ents.append(j + 1) # 供应商ID为j+1(区分风险企业) optional.append(ents) return optional def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ 拆分染色体为三层 :param chromosome: 完整染色体 :return: 企业层、能力层、数量层(均为numpy数组) """ total_ent_count = sum(self.material_enterprise_count) # 所有物料的企业总数 enterprise_layer = chromosome[:total_ent_count] # 第一层:企业选择(0/1) capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能 quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量 return enterprise_layer, capacity_layer, quantity_layer def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: """ 合并三层为完整染色体 :param enterprise_layer: 企业层 :param capacity_layer: 能力层 :param quantity_layer: 数量层 :return: 完整染色体 """ return np.hstack([enterprise_layer, capacity_layer, quantity_layer]) def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray: """ 修复企业层:确保每种物料至少选择一个企业 :param enterprise_layer: 待修复的企业层 :return: 修复后的企业层 """ repaired = enterprise_layer.copy() # 计算各物料的企业编码分割点 split_points = np.cumsum(self.material_enterprise_count) start = 0 # 起始索引 for i in range(self.I): # 遍历每种物料 end = split_points[i] # 当前物料的企业编码结束索引 segment = repaired[start:end] # 当前物料的企业选择片段 if np.sum(segment) == 0: # 未选择任何企业时,随机选一个 select_idx = random.randint(0, len(segment) - 1) segment[select_idx] = 1 repaired[start:end] = segment start = end # 更新起始索引 return repaired def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray: """ 修复能力层:确保满足单物料产能约束和企业总产能约束 :param enterprise_layer: 企业层(用于确定哪些企业被选中) :param capacity_layer: 待修复的能力层 :return: 修复后的能力层 """ repaired = capacity_layer.copy() split_points = np.cumsum(self.material_enterprise_count) start = 0 # 1. 修复单物料产能约束(每个企业的单物料产能不超过上限) for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] # 可选企业列表 segment = repaired[start:end] # 当前物料的产能片段 enterprise_segment = enterprise_layer[start:end] # 企业选择状态 for idx, ent in enumerate(ents): if enterprise_segment[idx] == 1: # 仅处理被选中的企业 # 确定该企业的单物料最大产能 if ent == 0: max_cap = self.risk.C0_i_std[i] # 风险企业 else: supplier_id = ent - 1 max_cap = self.supplier.Cj_i_std[supplier_id][i] # 供应商 # 若产能不合法,重置为1到max_cap之间的随机值 if segment[idx] <= 0 or segment[idx] > max_cap: segment[idx] = random.uniform(1, max_cap) repaired[start:end] = segment start = end # 2. 修复企业总产能约束(企业所有物料的总产能不超过上限) # 先计算每个企业的当前总产能 enterprise_total_capacity = {} # {企业ID: 总产能} start = 0 for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] enterprise_segment = enterprise_layer[start:end] capacity_segment = repaired[start:end] for idx, ent in enumerate(ents): if enterprise_segment[idx] == 1: # 仅统计选中的企业 if ent not in enterprise_total_capacity: enterprise_total_capacity[ent] = 0 enterprise_total_capacity[ent] += capacity_segment[idx] start = end # 调整超出总产能上限的企业(按比例缩放) for ent, total_cap in enterprise_total_capacity.items(): # 确定企业的总产能上限 if ent == 0: max_total_cap = self.risk.C0_total_max # 风险企业总产能上限 else: supplier_id = ent - 1 max_total_cap = self.supplier.Cj_total_max[supplier_id] # 供应商总产能上限 if total_cap > max_total_cap: # 超出上限时缩放 scale = max_total_cap / total_cap # 缩放比例(会出现浮点) start = 0 # 重新遍历,按比例缩小该企业所有物料的产能 for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] enterprise_segment = enterprise_layer[start:end] capacity_segment = repaired[start:end] for idx, e in enumerate(ents): if e == ent and enterprise_segment[idx] == 1: capacity_segment[idx] *= scale # 按比例缩小 repaired[start:end] = capacity_segment start = end return repaired def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: """修复数量编码层:满足总量需求、起订量和最大供应量约束""" repaired = quantity_layer.copy() split_points = np.cumsum(self.material_enterprise_count) start = 0 for i in range(self.I): # 遍历每种物料 qi = self.order.Q[i] # 物料i的需求量 end = split_points[i] ents = self.material_optional_enterprises[i] segment = repaired[start:end] enterprise_segment = enterprise_layer[start:end] selected_indices = np.where(enterprise_segment == 1)[0] if len(selected_indices) == 0: start = end continue selected_ents = [ents[idx] for idx in selected_indices] # 步骤1:初始化合法范围(起订量~最大供应量) for idx, ent in zip(selected_indices, selected_ents): if ent == 0: min_q = 0 max_q = qi else: supplier_id = ent - 1 min_q = self.supplier.MinOrder[supplier_id][i] max_q = self.supplier.MaxOrder[supplier_id][i] if segment[idx] < min_q: segment[idx] = min_q elif segment[idx] > max_q: segment[idx] = max_q # 步骤2:强制总量等于需求数量 current_total = np.sum(segment[selected_indices]) if current_total <= 0: weights = np.random.rand(len(selected_indices)) weights /= np.sum(weights) segment[selected_indices] = qi * weights else: scale = qi / current_total segment[selected_indices] *= scale # 步骤3:处理超出最大供应量的情况(添加最大迭代次数限制) need_adjust = True max_iter = 10 # 最大迭代次数,避免无限循环 iter_count = 0 while need_adjust and iter_count < max_iter: # 增加迭代次数限制 iter_count += 1 need_adjust = False total_excess = 0 # 截断超出max_q的数量 for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi else: supplier_id = ent - 1 max_q = self.supplier.MaxOrder[supplier_id][i] if segment[idx] > max_q: excess = segment[idx] - max_q total_excess += excess segment[idx] = max_q need_adjust = True # 分配超出量到有剩余的企业 if total_excess > 0: available_indices = [] available_max = [] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi else: supplier_id = ent - 1 max_q = self.supplier.MaxOrder[supplier_id][i] remaining = max_q - segment[idx] if remaining > 0: available_indices.append(idx) available_max.append(remaining) if len(available_indices) > 0: available_total = sum(available_max) if available_total > 0: alloc_ratio = np.array(available_max) / available_total alloc_excess = total_excess * alloc_ratio for idx, alloc in zip(available_indices, alloc_excess): segment[idx] += alloc need_adjust = True # 分配后可能需要再次检查 else: # 无剩余容量,强制截断并重新缩放总量 segment[selected_indices] = np.minimum(segment[selected_indices], [self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi for ent in selected_ents]) need_adjust = True # 检查总量误差,重新缩放 current_total = np.sum(segment[selected_indices]) if abs(current_total - qi) > 1e-6: scale = qi / current_total segment[selected_indices] *= scale need_adjust = True # 缩放后可能需要再次检查 # 最终强制总量等于需求(避免迭代次数到限后仍有误差) current_total = np.sum(segment[selected_indices]) if abs(current_total - qi) > 1e-6: scale = qi / current_total segment[selected_indices] *= scale repaired[start:end] = segment start = end return repaired def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray: """ 完整修复染色体(按顺序修复三层) :param chromosome: 待修复的染色体 :return: 修复后的染色体 """ # 拆分三层 enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome) # 依次修复(企业层→能力层→数量层,依赖关系) enterprise_layer = self.repair_enterprise_layer(enterprise_layer) capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer) quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer) # 合并修复后的三层 return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer)