154 lines
8.5 KiB
Python
154 lines
8.5 KiB
Python
import random
|
||
import numpy as np
|
||
from data_structures import Config
|
||
from chromosome_utils import ChromosomeUtils
|
||
from objective_calculator import ObjectiveCalculator
|
||
from nsga2 import NSGA2
|
||
class Encoder:
|
||
"""种群初始化编码器:生成初始种群,包含多种初始化策略(整数化)"""
|
||
def __init__(self, config: Config, utils: ChromosomeUtils):
|
||
"""
|
||
初始化编码器
|
||
:param config: 算法配置(如种群大小、各策略比例等)
|
||
:param utils: 染色体工具类实例
|
||
"""
|
||
self.config = config
|
||
self.utils = utils
|
||
self.pop_size = config.pop_size # 种群总大小
|
||
# 按比例分配四种初始化策略的个体数量
|
||
self.N1 = int(config.N1_ratio * self.pop_size) # 优先最小化成本的个体数
|
||
self.N2 = int(config.N2_ratio * self.pop_size) # 优先最小化延期的个体数
|
||
self.N3 = int(config.N3_ratio * self.pop_size) # 强制选择风险企业的个体数
|
||
self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 # 随机生成的个体数
|
||
def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray:
|
||
"""
|
||
生成随机染色体(内部方法,整数化)
|
||
:param force_risk_enterprise: 是否强制选择风险企业(用于N3策略)
|
||
:return: 生成的染色体(经过修复,整数)
|
||
"""
|
||
enterprise_layer = [] # 企业选择层(0/1)
|
||
capacity_layer = [] # 产能层(整数)
|
||
quantity_layer = [] # 数量层(整数)
|
||
for i in range(self.utils.I): # 遍历每种物料
|
||
ent_count = self.utils.material_enterprise_count[i] # 当前物料的可选企业数量
|
||
ents = self.utils.material_optional_enterprises[i] # 可选企业列表(0为风险企业)
|
||
# 1. 生成企业层(随机选择至少1个企业)
|
||
e_genes = np.zeros(ent_count) # 初始化企业选择为0(未选择)
|
||
select_count = random.randint(1, ent_count) # 随机选择1到ent_count个企业
|
||
select_indices = random.sample(range(ent_count), select_count) # 随机选择索引
|
||
e_genes[select_indices] = 1 # 标记选中的企业
|
||
if force_risk_enterprise:
|
||
e_genes[0] = 1 # 强制选择风险企业(索引0)
|
||
enterprise_layer.extend(e_genes)
|
||
# 2. 生成能力层(整数,不低于最小产能)
|
||
c_genes = np.zeros(ent_count, dtype=int)
|
||
for idx, ent in enumerate(ents):
|
||
if e_genes[idx] == 1: # 仅为选中的企业分配产能
|
||
if ent == 0:
|
||
min_cap = self.utils.risk.C0_i_min[i] # 风险企业的最小产能
|
||
max_cap = self.utils.risk.C0_total_max # 最大产能=总产能上限
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_cap = self.utils.supplier.Cj_i_min[supplier_id][i] # 供应商的最小产能
|
||
max_cap = self.utils.supplier.Cj_total_max[supplier_id] # 最大产能=总产能上限
|
||
# 生成整数产能
|
||
c_genes[idx] = random.randint(min_cap, max_cap)
|
||
capacity_layer.extend(c_genes)
|
||
# 3. 生成数量层(整数,在合法范围内)
|
||
q_genes = np.zeros(ent_count, dtype=int)
|
||
qi = self.utils.order.Q[i]
|
||
for idx, ent in enumerate(ents):
|
||
if e_genes[idx] == 1: # 仅为选中的企业分配数量
|
||
if ent == 0:
|
||
min_q = 0
|
||
max_q = qi
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_q = self.utils.supplier.MinOrder[supplier_id][i]
|
||
max_q = self.utils.supplier.MaxOrder[supplier_id][i]
|
||
# 生成整数数量
|
||
q_genes[idx] = random.randint(min_q, max_q)
|
||
quantity_layer.extend(q_genes)
|
||
# 合并三层并修复染色体(确保满足所有约束)
|
||
chromosome = self.utils._merge_chromosome(
|
||
np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer)
|
||
)
|
||
return self.utils.repair_chromosome(chromosome)
|
||
def initialize_population(self) -> np.ndarray:
|
||
"""
|
||
初始化完整种群(四种策略组合)
|
||
:return: 初始化后的种群(numpy数组,整数)
|
||
"""
|
||
# 按四种策略生成子种群
|
||
pop1 = self._initialize_by_objective(self.N1, "cost") # 优先成本
|
||
pop2 = self._initialize_by_objective(self.N2, "tardiness") # 优先延期
|
||
pop3 = self._initialize_by_risk_enterprise(self.N3) # 强制风险企业
|
||
pop4 = self._initialize_random(self.N4) # 随机生成
|
||
# 过滤空数组(避免合并时报错)
|
||
population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0]
|
||
if len(population_list) == 0: # 所有子种群都为空时返回空数组
|
||
return np.array([])
|
||
# 合并子种群并打乱顺序
|
||
population = np.vstack(population_list)
|
||
np.random.shuffle(population)
|
||
return population[:self.pop_size].astype(int) # 确保为整数
|
||
def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray:
|
||
"""
|
||
基于目标函数初始化(生成候选解后选择最优的count个)
|
||
:param count: 需生成的个体数量
|
||
:param objective_type: 优化目标("cost"或"tardiness")
|
||
:return: 筛选后的子种群(整数)
|
||
"""
|
||
if count <= 0: # 数量为0时返回空数组
|
||
return np.array([])
|
||
# 生成候选解(数量为count的3倍或至少20个,确保有足够选择空间)
|
||
candidate_count = max(3 * count, 20)
|
||
candidates = [self._generate_random_chromosome() for _ in range(candidate_count)]
|
||
# 计算候选解的目标函数值
|
||
calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils,
|
||
self.config)
|
||
objectives = [calculator.calculate_objectives(chrom) for chrom in candidates]
|
||
# 按目标函数排序并选择前count个
|
||
if objective_type == "cost":
|
||
# 按成本升序排序(成本越小越优)
|
||
sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0])
|
||
else:
|
||
# 按延期升序排序(延期越小越优)
|
||
sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1])
|
||
return np.array([candidates[i] for i in sorted_indices[:count]]).astype(int)
|
||
def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray:
|
||
"""
|
||
基于风险企业初始化(强制选择风险企业,用NSGA-II筛选)
|
||
:param count: 需生成的个体数量
|
||
:return: 筛选后的子种群(整数)
|
||
"""
|
||
if count <= 0:
|
||
return np.array([])
|
||
# 生成强制选择风险企业的候选解
|
||
candidate_count = max(2 * count, 20)
|
||
candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)]
|
||
# 计算目标函数并进行非支配排序
|
||
calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils,
|
||
self.config)
|
||
objectives = [calculator.calculate_objectives(chrom) for chrom in candidates]
|
||
nsga2 = NSGA2(candidate_count, 2) # 2个目标函数
|
||
ranks, fronts = nsga2.fast_non_dominated_sort(objectives) # 非支配排序
|
||
# 从帕累托前沿开始选择,直到满足数量
|
||
selected = []
|
||
for front in fronts:
|
||
if len(selected) + len(front) <= count:
|
||
selected.extend([candidates[i] for i in front])
|
||
else:
|
||
# 前沿数量超过剩余需求时,取部分
|
||
selected.extend([candidates[i] for i in front[:count - len(selected)]])
|
||
break
|
||
return np.array(selected).astype(int)
|
||
def _initialize_random(self, count: int) -> np.ndarray:
|
||
"""
|
||
随机初始化(直接生成count个随机染色体)
|
||
:param count: 需生成的个体数量
|
||
:return: 随机子种群(整数)
|
||
"""
|
||
if count <= 0:
|
||
return np.array([])
|
||
return np.array([self._generate_random_chromosome() for _ in range(count)]).astype(int) |