import random import numpy as np from data_structures import Config from chromosome_utils import ChromosomeUtils from objective_calculator import ObjectiveCalculator from nsga2 import NSGA2 class Encoder: """种群初始化编码器""" def __init__(self, config: Config, utils: ChromosomeUtils): self.config = config self.utils = utils self.pop_size = config.pop_size self.N1 = int(config.N1_ratio * self.pop_size) self.N2 = int(config.N2_ratio * self.pop_size) self.N3 = int(config.N3_ratio * self.pop_size) self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray: """生成随机染色体""" enterprise_layer = [] capacity_layer = [] quantity_layer = [] for i in range(self.utils.I): ent_count = self.utils.material_enterprise_count[i] ents = self.utils.material_optional_enterprises[i] # 企业层 e_genes = np.zeros(ent_count) select_count = random.randint(1, ent_count) select_indices = random.sample(range(ent_count), select_count) e_genes[select_indices] = 1 if force_risk_enterprise: e_genes[0] = 1 enterprise_layer.extend(e_genes) # 能力层 c_genes = np.zeros(ent_count) for idx, ent in enumerate(ents): if e_genes[idx] == 1: if ent == 0: max_cap = self.utils.risk.C0_i_std[i] # 修复:self.utils.risk else: supplier_id = ent - 1 max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] # 修复:self.utils.supplier c_genes[idx] = random.uniform(1, max_cap) capacity_layer.extend(c_genes) # 数量层 q_genes = np.zeros(ent_count) for idx, ent in enumerate(ents): if e_genes[idx] == 1: if ent == 0: max_q = self.utils.order.Q[i] # 修复:self.utils.order else: supplier_id = ent - 1 max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 修复:self.utils.supplier q_genes[idx] = random.uniform(1, max_q) quantity_layer.extend(q_genes) chromosome = self.utils._merge_chromosome( np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer) ) return self.utils.repair_chromosome(chromosome) def initialize_population(self) -> np.ndarray: """初始化种群:N1+N2+N3+N4""" pop1 = self._initialize_by_objective(self.N1, "cost") pop2 = self._initialize_by_objective(self.N2, "tardiness") pop3 = self._initialize_by_risk_enterprise(self.N3) pop4 = self._initialize_random(self.N4) # 处理空数组情况 population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0] if len(population_list) == 0: return np.array([]) population = np.vstack(population_list) np.random.shuffle(population) return population[:self.pop_size] def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray: """基于目标函数初始化""" if count <= 0: return np.array([]) candidate_count = max(2*count, 20) candidates = [self._generate_random_chromosome() for _ in range(candidate_count)] calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] if objective_type == "cost": sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0]) else: sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1]) return np.array([candidates[i] for i in sorted_indices[:count]]) def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray: """基于风险企业初始化""" if count <= 0: return np.array([]) candidate_count = max(2*count, 20) candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)] calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] nsga2 = NSGA2(candidate_count, 2) ranks, fronts = nsga2.fast_non_dominated_sort(objectives) selected = [] for front in fronts: if len(selected) + len(front) <= count: selected.extend([candidates[i] for i in front]) else: selected.extend([candidates[i] for i in front[:count-len(selected)]]) break return np.array(selected) def _initialize_random(self, count: int) -> np.ndarray: """随机初始化""" if count <= 0: return np.array([]) return np.array([self._generate_random_chromosome() for _ in range(count)])