import random import numpy as np from data_structures import Config from chromosome_utils import ChromosomeUtils from objective_calculator import ObjectiveCalculator from nsga2 import NSGA2 class Encoder: """种群初始化编码器:生成初始种群,包含多种初始化策略(整数化)""" def __init__(self, config: Config, utils: ChromosomeUtils): """ 初始化编码器 :param config: 算法配置(如种群大小、各策略比例等) :param utils: 染色体工具类实例 """ self.config = config self.utils = utils self.pop_size = config.pop_size # 种群总大小 # 按比例分配四种初始化策略的个体数量 self.N1 = int(config.N1_ratio * self.pop_size) # 优先最小化成本的个体数 self.N2 = int(config.N2_ratio * self.pop_size) # 优先最小化延期的个体数 self.N3 = int(config.N3_ratio * self.pop_size) # 强制选择风险企业的个体数 self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 # 随机生成的个体数 def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray: """ 生成随机染色体(内部方法,整数化) :param force_risk_enterprise: 是否强制选择风险企业(用于N3策略) :return: 生成的染色体(经过修复,整数) """ enterprise_layer = [] # 企业选择层(0/1) capacity_layer = [] # 产能层(整数) quantity_layer = [] # 数量层(整数) for i in range(self.utils.I): # 遍历每种物料 ent_count = self.utils.material_enterprise_count[i] # 当前物料的可选企业数量 ents = self.utils.material_optional_enterprises[i] # 可选企业列表(0为风险企业) # 1. 生成企业层(随机选择至少1个企业) e_genes = np.zeros(ent_count) # 初始化企业选择为0(未选择) select_count = random.randint(1, ent_count) # 随机选择1到ent_count个企业 select_indices = random.sample(range(ent_count), select_count) # 随机选择索引 e_genes[select_indices] = 1 # 标记选中的企业 if force_risk_enterprise: e_genes[0] = 1 # 强制选择风险企业(索引0) enterprise_layer.extend(e_genes) # 2. 生成能力层(整数,不低于最小产能) c_genes = np.zeros(ent_count, dtype=int) for idx, ent in enumerate(ents): if e_genes[idx] == 1: # 仅为选中的企业分配产能 if ent == 0: min_cap = self.utils.risk.C0_i_min[i] # 风险企业的最小产能 max_cap = self.utils.risk.C0_total_max # 最大产能=总产能上限 else: supplier_id = ent - 1 min_cap = self.utils.supplier.Cj_i_min[supplier_id][i] # 供应商的最小产能 max_cap = self.utils.supplier.Cj_total_max[supplier_id] # 最大产能=总产能上限 # 生成整数产能 c_genes[idx] = random.randint(min_cap, max_cap) capacity_layer.extend(c_genes) # 3. 生成数量层(整数,在合法范围内) q_genes = np.zeros(ent_count, dtype=int) qi = self.utils.order.Q[i] for idx, ent in enumerate(ents): if e_genes[idx] == 1: # 仅为选中的企业分配数量 if ent == 0: min_q = 0 max_q = qi else: supplier_id = ent - 1 min_q = self.utils.supplier.MinOrder[supplier_id][i] max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 生成整数数量 q_genes[idx] = random.randint(min_q, max_q) quantity_layer.extend(q_genes) # 合并三层并修复染色体(确保满足所有约束) chromosome = self.utils._merge_chromosome( np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer) ) return self.utils.repair_chromosome(chromosome) def initialize_population(self) -> np.ndarray: """ 初始化完整种群(四种策略组合) :return: 初始化后的种群(numpy数组,整数) """ # 按四种策略生成子种群 pop1 = self._initialize_by_objective(self.N1, "cost") # 优先成本 pop2 = self._initialize_by_objective(self.N2, "tardiness") # 优先延期 pop3 = self._initialize_by_risk_enterprise(self.N3) # 强制风险企业 pop4 = self._initialize_random(self.N4) # 随机生成 # 过滤空数组(避免合并时报错) population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0] if len(population_list) == 0: # 所有子种群都为空时返回空数组 return np.array([]) # 合并子种群并打乱顺序 population = np.vstack(population_list) np.random.shuffle(population) return population[:self.pop_size].astype(int) # 确保为整数 def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray: """ 基于目标函数初始化(生成候选解后选择最优的count个) :param count: 需生成的个体数量 :param objective_type: 优化目标("cost"或"tardiness") :return: 筛选后的子种群(整数) """ if count <= 0: # 数量为0时返回空数组 return np.array([]) # 生成候选解(数量为count的3倍或至少20个,确保有足够选择空间) candidate_count = max(3 * count, 20) candidates = [self._generate_random_chromosome() for _ in range(candidate_count)] # 计算候选解的目标函数值 calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] # 按目标函数排序并选择前count个 if objective_type == "cost": # 按成本升序排序(成本越小越优) sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0]) else: # 按延期升序排序(延期越小越优) sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1]) return np.array([candidates[i] for i in sorted_indices[:count]]).astype(int) def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray: """ 基于风险企业初始化(强制选择风险企业,用NSGA-II筛选) :param count: 需生成的个体数量 :return: 筛选后的子种群(整数) """ if count <= 0: return np.array([]) # 生成强制选择风险企业的候选解 candidate_count = max(2 * count, 20) candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)] # 计算目标函数并进行非支配排序 calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] nsga2 = NSGA2(candidate_count, 2) # 2个目标函数 ranks, fronts = nsga2.fast_non_dominated_sort(objectives) # 非支配排序 # 从帕累托前沿开始选择,直到满足数量 selected = [] for front in fronts: if len(selected) + len(front) <= count: selected.extend([candidates[i] for i in front]) else: # 前沿数量超过剩余需求时,取部分 selected.extend([candidates[i] for i in front[:count - len(selected)]]) break return np.array(selected).astype(int) def _initialize_random(self, count: int) -> np.ndarray: """ 随机初始化(直接生成count个随机染色体) :param count: 需生成的个体数量 :return: 随机子种群(整数) """ if count <= 0: return np.array([]) return np.array([self._generate_random_chromosome() for _ in range(count)]).astype(int)