OrderReallocation-HeavyTruc.../encoder.py

154 lines
8.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import numpy as np
from data_structures import Config
from chromosome_utils import ChromosomeUtils
from objective_calculator import ObjectiveCalculator
from nsga2 import NSGA2
class Encoder:
"""种群初始化编码器:生成初始种群,包含多种初始化策略"""
def __init__(self, config: Config, utils: ChromosomeUtils):
"""
初始化编码器
:param config: 算法配置(如种群大小、各策略比例等)
:param utils: 染色体工具类实例
"""
self.config = config
self.utils = utils
self.pop_size = config.pop_size # 种群总大小
# 按比例分配四种初始化策略的个体数量
self.N1 = int(config.N1_ratio * self.pop_size) # 优先最小化成本的个体数
self.N2 = int(config.N2_ratio * self.pop_size) # 优先最小化延期的个体数
self.N3 = int(config.N3_ratio * self.pop_size) # 强制选择风险企业的个体数
self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 # 随机生成的个体数
def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray:
"""
生成随机染色体(内部方法)
:param force_risk_enterprise: 是否强制选择风险企业用于N3策略
:return: 生成的染色体(经过修复)
"""
enterprise_layer = [] # 企业选择层0/1
capacity_layer = [] # 产能层
quantity_layer = [] # 数量层
for i in range(self.utils.I): # 遍历每种物料
ent_count = self.utils.material_enterprise_count[i] # 当前物料的可选企业数量
ents = self.utils.material_optional_enterprises[i] # 可选企业列表0为风险企业
# 1. 生成企业层随机选择至少1个企业
e_genes = np.zeros(ent_count) # 初始化企业选择为0未选择
select_count = random.randint(1, ent_count) # 随机选择1到ent_count个企业
select_indices = random.sample(range(ent_count), select_count) # 随机选择索引
e_genes[select_indices] = 1 # 标记选中的企业
if force_risk_enterprise:
e_genes[0] = 1 # 强制选择风险企业索引0
enterprise_layer.extend(e_genes)
# 2. 生成能力层(为选中的企业分配不低于最小产能的随机产能)
c_genes = np.zeros(ent_count)
for idx, ent in enumerate(ents):
if e_genes[idx] == 1: # 仅为选中的企业分配产能
if ent == 0:
min_cap = self.utils.risk.C0_i_min[i] # 风险企业的最小产能
else:
supplier_id = ent - 1
min_cap = self.utils.supplier.Cj_i_min[supplier_id][i] # 供应商的最小产能
# 生成不低于最小产能的随机值无上限用2倍最小产能控制初始范围
c_genes[idx] = random.uniform(min_cap, min_cap * 2)
capacity_layer.extend(c_genes)
# 3. 生成数量层(为选中的企业随机分配数量)
q_genes = np.zeros(ent_count)
for idx, ent in enumerate(ents):
if e_genes[idx] == 1: # 仅为选中的企业分配数量
if ent == 0:
max_q = self.utils.order.Q[i] # 风险企业最多分配全部需求
else:
supplier_id = ent - 1
max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 供应商的最大供应量
q_genes[idx] = random.uniform(1, max_q) # 随机数量
quantity_layer.extend(q_genes)
# 合并三层并修复染色体(确保满足所有约束)
chromosome = self.utils._merge_chromosome(
np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer)
)
return self.utils.repair_chromosome(chromosome)
def initialize_population(self) -> np.ndarray:
"""
初始化完整种群(四种策略组合)
:return: 初始化后的种群numpy数组
"""
# 按四种策略生成子种群
pop1 = self._initialize_by_objective(self.N1, "cost") # 优先成本
pop2 = self._initialize_by_objective(self.N2, "tardiness") # 优先延期
pop3 = self._initialize_by_risk_enterprise(self.N3) # 强制风险企业
pop4 = self._initialize_random(self.N4) # 随机生成
# 过滤空数组(避免合并时报错)
population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0]
if len(population_list) == 0: # 所有子种群都为空时返回空数组
return np.array([])
# 合并子种群并打乱顺序
population = np.vstack(population_list)
np.random.shuffle(population)
return population[:self.pop_size] # 确保种群大小正确
def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray:
"""
基于目标函数初始化生成候选解后选择最优的count个
:param count: 需生成的个体数量
:param objective_type: 优化目标("cost""tardiness"
:return: 筛选后的子种群
"""
if count <= 0: # 数量为0时返回空数组
return np.array([])
# 生成候选解数量为count的3倍或至少20个确保有足够选择空间
candidate_count = max(3 * count, 20)
candidates = [self._generate_random_chromosome() for _ in range(candidate_count)]
# 计算候选解的目标函数值
calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils,
self.config)
objectives = [calculator.calculate_objectives(chrom) for chrom in candidates]
# 按目标函数排序并选择前count个
if objective_type == "cost":
# 按成本升序排序(成本越小越优)
sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0])
else:
# 按延期升序排序(延期越小越优)
sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1])
return np.array([candidates[i] for i in sorted_indices[:count]])
def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray:
"""
基于风险企业初始化强制选择风险企业用NSGA-II筛选
:param count: 需生成的个体数量
:return: 筛选后的子种群
"""
if count <= 0:
return np.array([])
# 生成强制选择风险企业的候选解
candidate_count = max(2 * count, 20)
candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)]
# 计算目标函数并进行非支配排序
calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils,
self.config)
objectives = [calculator.calculate_objectives(chrom) for chrom in candidates]
nsga2 = NSGA2(candidate_count, 2) # 2个目标函数
ranks, fronts = nsga2.fast_non_dominated_sort(objectives) # 非支配排序
# 从帕累托前沿开始选择,直到满足数量
selected = []
for front in fronts:
if len(selected) + len(front) <= count:
selected.extend([candidates[i] for i in front])
else:
# 前沿数量超过剩余需求时,取部分
selected.extend([candidates[i] for i in front[:count - len(selected)]])
break
return np.array(selected)
def _initialize_random(self, count: int) -> np.ndarray:
"""
随机初始化直接生成count个随机染色体
:param count: 需生成的个体数量
:return: 随机子种群
"""
if count <= 0:
return np.array([])
return np.array([self._generate_random_chromosome() for _ in range(count)])