import random import numpy as np import math from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类 class ChromosomeUtils: """染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件""" def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData): """ 初始化工具类 :param order_data: 订单数据(需求、交货期等) :param risk_data: 风险企业数据(产能等) :param supplier_data: 供应商数据(产能、价格等) """ self.order = order_data # 订单数据 self.risk = risk_data # 风险企业数据 self.supplier = supplier_data # 供应商数据 self.I = order_data.I # 物料种类数 # 预计算每个物料的可选企业列表(0=风险企业,1+为供应商ID) self.material_optional_enterprises = self._get_material_optional_enterprises() # 每个物料的可选企业数量 self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises] # 染色体总长度(3层:企业层+能力层+数量层) self.chromosome_length = 3 * sum(self.material_enterprise_count) def _get_material_optional_enterprises(self) -> list[list[int]]: """ 生成每个物料的可选企业列表(内部方法) :return: 二维列表,每个子列表为对应物料的可选企业ID """ optional = [] for i in range(self.I): # 遍历每种物料 ents = [0] # 先加入风险企业(ID=0) # 加入可生产该物料的供应商(ID=1+供应商索引) for j in range(self.supplier.supplier_count): if self.supplier.can_produce[j][i] == 1: # 供应商j可生产物料i ents.append(j + 1) # 供应商ID为j+1(区分风险企业) optional.append(ents) return optional def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """ 拆分染色体为三层 :param chromosome: 完整染色体 :return: 企业层、能力层、数量层(均为numpy数组) """ total_ent_count = sum(self.material_enterprise_count) # 所有物料的企业总数 enterprise_layer = chromosome[:total_ent_count] # 第一层:企业选择(0/1) capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能 quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量 return enterprise_layer, capacity_layer, quantity_layer def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: """ 合并三层为完整染色体 :param enterprise_layer: 企业层 :param capacity_layer: 能力层 :param quantity_layer: 数量层 :return: 完整染色体 """ return np.hstack([enterprise_layer, capacity_layer, quantity_layer]) def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray: """ 修复企业层:确保每种物料至少选择一个企业 :param enterprise_layer: 待修复的企业层 :return: 修复后的企业层 """ repaired = enterprise_layer.copy() # 计算各物料的企业编码分割点 split_points = np.cumsum(self.material_enterprise_count) start = 0 # 起始索引 for i in range(self.I): # 遍历每种物料 end = split_points[i] # 当前物料的企业编码结束索引 segment = repaired[start:end] # 当前物料的企业选择片段 if np.sum(segment) == 0: # 未选择任何企业时,随机选一个 select_idx = random.randint(0, len(segment) - 1) segment[select_idx] = 1 repaired[start:end] = segment start = end # 更新起始索引 return repaired def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray: """ 修复能力层:满足单物料最小产能约束和企业总产能约束(整数化) :param enterprise_layer: 企业层(用于确定哪些企业被选中) :param capacity_layer: 待修复的能力层 :return: 修复后的能力层(整数) """ repaired = capacity_layer.copy().astype(int) # 强制转为整数 split_points = np.cumsum(self.material_enterprise_count) start = 0 # 1. 修复单物料最小产能约束(每个企业的单物料产能不低于最小产能,整数) for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] # 可选企业列表 segment = repaired[start:end] # 当前物料的产能片段 enterprise_segment = enterprise_layer[start:end] # 企业选择状态 for idx, ent in enumerate(ents): if enterprise_segment[idx] == 1: # 仅处理被选中的企业 # 确定该企业的单物料最小产能和最大产能 if ent == 0: min_cap = self.risk.C0_i_min[i] # 风险企业最小产能 max_cap = self.risk.C0_total_max # 单物料最大产能=总产能上限 else: supplier_id = ent - 1 min_cap = self.supplier.Cj_i_min[supplier_id][i] # 供应商最小产能 max_cap = self.supplier.Cj_total_max[supplier_id] # 单物料最大产能=总产能上限 # 确保不低于最小产能,不超过最大产能 if segment[idx] < min_cap: segment[idx] = min_cap if segment[idx] > max_cap: segment[idx] = max_cap repaired[start:end] = segment start = end # 2. 修复企业总产能约束(企业所有物料的总产能不超过上限,整数处理) max_adjust_iter = 10 # 最大调整迭代次数 for _ in range(max_adjust_iter): # 计算每个企业的当前总产能 enterprise_total_capacity = {} # {企业ID: 总产能} start = 0 for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] enterprise_segment = enterprise_layer[start:end] capacity_segment = repaired[start:end] for idx, ent in enumerate(ents): if enterprise_segment[idx] == 1: # 仅统计选中的企业 if ent not in enterprise_total_capacity: enterprise_total_capacity[ent] = 0 enterprise_total_capacity[ent] += capacity_segment[idx] start = end # 检查是否有企业超出总产能上限 over_cap_ents = [] for ent, total_cap in enterprise_total_capacity.items(): if ent == 0: max_total_cap = self.risk.C0_total_max else: supplier_id = ent - 1 max_total_cap = self.supplier.Cj_total_max[supplier_id] if total_cap > max_total_cap: over_cap_ents.append((ent, total_cap, max_total_cap)) if not over_cap_ents: break # 无超出则退出循环 # 处理超出总产能的企业 for ent, total_cap, max_total_cap in over_cap_ents: excess = total_cap - max_total_cap # 超出量(整数) if excess <= 0: continue # 收集该企业的所有产能分配(含最小产能约束) cap_assignments = [] # [(物料索引i, 产能索引idx, 当前产能, 最小产能)] start = 0 for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] enterprise_segment = enterprise_layer[start:end] capacity_segment = repaired[start:end] for idx, e in enumerate(ents): if e == ent and enterprise_segment[idx] == 1: # 获取该物料的最小产能 if ent == 0: min_cap = self.risk.C0_i_min[i] else: supplier_id = ent - 1 min_cap = self.supplier.Cj_i_min[supplier_id][i] cap_assignments.append((i, start + idx, capacity_segment[idx], min_cap)) start = end # 循环减少超出产能:每次减少1,直到超出量为0 current_excess = excess while current_excess > 0 and cap_assignments: # 本轮可减少的物料列表(排除已达最小产能的) reducible = [item for item in cap_assignments if item[2] > item[3]] if not reducible: break # 无可减少的产能,退出循环 # 遍历可减少的物料,每次减少1 for idx in range(len(reducible)): i, pos, curr_cap, min_cap = reducible[idx] # 减少1个单位产能 new_cap = curr_cap - 1 if new_cap < min_cap: new_cap = min_cap # 确保不低于最小产能 # 更新产能值 repaired[pos] = new_cap # 更新分配记录 cap_assignments = [ (i, pos, new_cap, min_cap) if (item[1] == pos) else item for item in cap_assignments ] reducible[idx] = (i, pos, new_cap, min_cap) current_excess -= 1 if current_excess <= 0: break # 超出量处理完毕,退出循环 # 最终确保所有产能为整数且不低于最小产能 start = 0 for i in range(self.I): end = split_points[i] ents = self.material_optional_enterprises[i] segment = repaired[start:end] enterprise_segment = enterprise_layer[start:end] for idx, ent in enumerate(ents): if enterprise_segment[idx] == 1: if ent == 0: min_cap = self.risk.C0_i_min[i] else: supplier_id = ent - 1 min_cap = self.supplier.Cj_i_min[supplier_id][i] if segment[idx] < min_cap: segment[idx] = min_cap repaired[start:end] = segment start = end return repaired.astype(int) def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: """修复数量编码层:满足总量需求、起订量和最大供应量约束(整数化)""" repaired = quantity_layer.copy().astype(int) # 强制转为整数 split_points = np.cumsum(self.material_enterprise_count) start = 0 max_iter = 20 # 增加迭代次数 for i in range(self.I): # 遍历每种物料 qi = self.order.Q[i] # 物料i的需求量(整数) end = split_points[i] ents = self.material_optional_enterprises[i] segment = repaired[start:end] enterprise_segment = enterprise_layer[start:end] selected_indices = np.where(enterprise_segment == 1)[0] if len(selected_indices) == 0: start = end continue selected_ents = [ents[idx] for idx in selected_indices] # 步骤1:初始化合法范围(起订量~最大供应量,整数) for idx, ent in zip(selected_indices, selected_ents): if ent == 0: min_q = 0 max_q = qi else: supplier_id = ent - 1 min_q = self.supplier.MinOrder[supplier_id][i] max_q = self.supplier.MaxOrder[supplier_id][i] # 确保在合法范围内 if segment[idx] < min_q: segment[idx] = min_q elif segment[idx] > max_q: segment[idx] = max_q # 步骤2:强制总量等于需求数量(整数) current_total = np.sum(segment[selected_indices]) if current_total <= 0: # 随机分配初始数量 weights = np.random.rand(len(selected_indices)) weights /= np.sum(weights) base = qi // len(selected_indices) remainder = qi % len(selected_indices) segment[selected_indices] = base # 分配余数(优先给任务占比低的) if remainder > 0: # 计算各企业任务占比(当前分配/最大产能) ratios = [] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_cap = self.risk.C0_total_max else: supplier_id = ent - 1 max_cap = self.supplier.Cj_total_max[supplier_id] ratio = segment[idx] / max_cap if max_cap > 0 else 1.0 ratios.append(ratio) # 按占比升序排序,优先分配余数 sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])] for j in range(remainder): segment[sorted_indices[j]] += 1 else: # 按比例缩放(整数化) scale = qi / current_total scaled = np.array([int(round(x * scale)) for x in segment[selected_indices]]) segment[selected_indices] = scaled # 调整差额 current_total = np.sum(segment[selected_indices]) diff = qi - current_total if diff != 0: # 按任务占比分配差额 ratios = [] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_cap = self.risk.C0_total_max else: supplier_id = ent - 1 max_cap = self.supplier.Cj_total_max[supplier_id] ratio = segment[idx] / max_cap if max_cap > 0 else 1.0 ratios.append(ratio) if diff > 0: # 正差额:优先分配给占比低的 sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])] for j in range(diff): if j < len(sorted_indices): segment[sorted_indices[j]] += 1 else: # 负差额:优先从占比高的扣除 sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0], reverse=True)] for j in range(-diff): if j < len(sorted_indices) and segment[sorted_indices[j]] > min_q: segment[sorted_indices[j]] -= 1 # 步骤3:处理超出最大供应量的情况(整数) need_adjust = True iter_count = 0 while need_adjust and iter_count < max_iter: iter_count += 1 need_adjust = False total_excess = 0 # 截断超出max_q的数量(整数) for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi else: supplier_id = ent - 1 max_q = self.supplier.MaxOrder[supplier_id][i] if segment[idx] > max_q: excess = segment[idx] - max_q total_excess += excess segment[idx] = max_q need_adjust = True # 分配超出量(整数) if total_excess > 0: available_indices = [] available_remaining = [] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi else: supplier_id = ent - 1 max_q = self.supplier.MaxOrder[supplier_id][i] remaining = max_q - segment[idx] if remaining > 0: available_indices.append(idx) available_remaining.append(remaining) if len(available_indices) > 0: # 按剩余容量分配 total_available = sum(available_remaining) if total_available > 0: quotient = total_excess // total_available remainder = total_excess % total_available # 分配整数商 for idx, rem in zip(available_indices, available_remaining): assign = min(quotient, rem) segment[idx] += assign total_excess -= assign # 分配余数(优先给剩余多的) if remainder > 0: sorted_pairs = sorted(zip(available_indices, available_remaining), key=lambda x: x[1], reverse=True) for idx, _ in sorted_pairs: if remainder <= 0: break rem = max_q - segment[idx] assign = min(remainder, rem) segment[idx] += assign remainder -= assign total_excess = remainder if total_excess > 0: need_adjust = True else: # 无剩余容量,强制截断并重新缩放 segment[selected_indices] = np.minimum(segment[selected_indices], [self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi for ent in selected_ents]) # 重新调整总量 current_total = np.sum(segment[selected_indices]) diff = qi - current_total if diff != 0: # 兜底:分配给剩余容量最大的(即使为0,强制调整) if diff > 0: # 找最小起订量最低的企业 min_min_order = float('inf') target_idx = selected_indices[0] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: min_q = 0 else: supplier_id = ent - 1 min_q = self.supplier.MinOrder[supplier_id][i] if min_q < min_min_order: min_min_order = min_q target_idx = idx segment[target_idx] += diff else: # 找分配量最大的企业扣除 max_qty = -1 target_idx = selected_indices[0] for idx in selected_indices: if segment[idx] > max_qty: max_qty = segment[idx] target_idx = idx segment[target_idx] += diff # diff为负 need_adjust = True # 检查总量(整数相等) current_total = np.sum(segment[selected_indices]) if current_total != qi: diff = qi - current_total if diff > 0: # 分配差额给剩余容量最大的 max_remaining = -1 target_idx = selected_indices[0] for idx, ent in zip(selected_indices, selected_ents): if ent == 0: max_q = qi else: supplier_id = ent - 1 max_q = self.supplier.MaxOrder[supplier_id][i] remaining = max_q - segment[idx] if remaining > max_remaining: max_remaining = remaining target_idx = idx segment[target_idx] += diff else: # 扣除差额从分配量最大的 max_qty = -1 target_idx = selected_indices[0] for idx in selected_indices: if segment[idx] > max_qty: max_qty = segment[idx] target_idx = idx segment[target_idx] += diff need_adjust = True repaired[start:end] = segment start = end return repaired.astype(int) def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray: """ 完整修复染色体(按顺序修复三层) :param chromosome: 待修复的染色体 :return: 修复后的染色体 """ # 拆分三层 enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome) # 依次修复(企业层→能力层→数量层,依赖关系) enterprise_layer = self.repair_enterprise_layer(enterprise_layer) capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer) quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer) # 合并修复后的三层 return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer)