OrderReallocation-HeavyTruc.../chromosome_utils.py

273 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import numpy as np
from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类
class ChromosomeUtils:
"""染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData):
"""
初始化工具类
:param order_data: 订单数据(需求、交货期等)
:param risk_data: 风险企业数据(产能等)
:param supplier_data: 供应商数据(产能、价格等)
"""
self.order = order_data # 订单数据
self.risk = risk_data # 风险企业数据
self.supplier = supplier_data # 供应商数据
self.I = order_data.I # 物料种类数
# 预计算每个物料的可选企业列表0=风险企业1+为供应商ID
self.material_optional_enterprises = self._get_material_optional_enterprises()
# 每个物料的可选企业数量
self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises]
# 染色体总长度3层企业层+能力层+数量层)
self.chromosome_length = 3 * sum(self.material_enterprise_count)
def _get_material_optional_enterprises(self) -> list[list[int]]:
"""
生成每个物料的可选企业列表(内部方法)
:return: 二维列表每个子列表为对应物料的可选企业ID
"""
optional = []
for i in range(self.I): # 遍历每种物料
ents = [0] # 先加入风险企业ID=0
# 加入可生产该物料的供应商ID=1+供应商索引)
for j in range(self.supplier.supplier_count):
if self.supplier.can_produce[j][i] == 1: # 供应商j可生产物料i
ents.append(j + 1) # 供应商ID为j+1区分风险企业
optional.append(ents)
return optional
def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
拆分染色体为三层
:param chromosome: 完整染色体
:return: 企业层、能力层、数量层均为numpy数组
"""
total_ent_count = sum(self.material_enterprise_count) # 所有物料的企业总数
enterprise_layer = chromosome[:total_ent_count] # 第一层企业选择0/1
capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能
quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量
return enterprise_layer, capacity_layer, quantity_layer
def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> np.ndarray:
"""
合并三层为完整染色体
:param enterprise_layer: 企业层
:param capacity_layer: 能力层
:param quantity_layer: 数量层
:return: 完整染色体
"""
return np.hstack([enterprise_layer, capacity_layer, quantity_layer])
def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray:
"""
修复企业层:确保每种物料至少选择一个企业
:param enterprise_layer: 待修复的企业层
:return: 修复后的企业层
"""
repaired = enterprise_layer.copy()
# 计算各物料的企业编码分割点
split_points = np.cumsum(self.material_enterprise_count)
start = 0 # 起始索引
for i in range(self.I): # 遍历每种物料
end = split_points[i] # 当前物料的企业编码结束索引
segment = repaired[start:end] # 当前物料的企业选择片段
if np.sum(segment) == 0: # 未选择任何企业时,随机选一个
select_idx = random.randint(0, len(segment) - 1)
segment[select_idx] = 1
repaired[start:end] = segment
start = end # 更新起始索引
return repaired
def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray:
"""
修复能力层:确保满足单物料产能约束和企业总产能约束
:param enterprise_layer: 企业层(用于确定哪些企业被选中)
:param capacity_layer: 待修复的能力层
:return: 修复后的能力层
"""
repaired = capacity_layer.copy()
split_points = np.cumsum(self.material_enterprise_count)
start = 0
# 1. 修复单物料产能约束(每个企业的单物料产能不超过上限)
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i] # 可选企业列表
segment = repaired[start:end] # 当前物料的产能片段
enterprise_segment = enterprise_layer[start:end] # 企业选择状态
for idx, ent in enumerate(ents):
if enterprise_segment[idx] == 1: # 仅处理被选中的企业
# 确定该企业的单物料最大产能
if ent == 0:
max_cap = self.risk.C0_i_std[i] # 风险企业
else:
supplier_id = ent - 1
max_cap = self.supplier.Cj_i_std[supplier_id][i] # 供应商
# 若产能不合法重置为1到max_cap之间的随机值
if segment[idx] <= 0 or segment[idx] > max_cap:
segment[idx] = random.uniform(1, max_cap)
repaired[start:end] = segment
start = end
# 2. 修复企业总产能约束(企业所有物料的总产能不超过上限)
# 先计算每个企业的当前总产能
enterprise_total_capacity = {} # {企业ID: 总产能}
start = 0
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i]
enterprise_segment = enterprise_layer[start:end]
capacity_segment = repaired[start:end]
for idx, ent in enumerate(ents):
if enterprise_segment[idx] == 1: # 仅统计选中的企业
if ent not in enterprise_total_capacity:
enterprise_total_capacity[ent] = 0
enterprise_total_capacity[ent] += capacity_segment[idx]
start = end
# 调整超出总产能上限的企业(按比例缩放)
for ent, total_cap in enterprise_total_capacity.items():
# 确定企业的总产能上限
if ent == 0:
max_total_cap = self.risk.C0_total_max # 风险企业总产能上限
else:
supplier_id = ent - 1
max_total_cap = self.supplier.Cj_total_max[supplier_id] # 供应商总产能上限
if total_cap > max_total_cap: # 超出上限时缩放
scale = max_total_cap / total_cap # 缩放比例(会出现浮点)
start = 0
# 重新遍历,按比例缩小该企业所有物料的产能
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i]
enterprise_segment = enterprise_layer[start:end]
capacity_segment = repaired[start:end]
for idx, e in enumerate(ents):
if e == ent and enterprise_segment[idx] == 1:
capacity_segment[idx] *= scale # 按比例缩小
repaired[start:end] = capacity_segment
start = end
return repaired
def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray:
"""修复数量编码层:满足总量需求、起订量和最大供应量约束"""
repaired = quantity_layer.copy()
split_points = np.cumsum(self.material_enterprise_count)
start = 0
for i in range(self.I): # 遍历每种物料
qi = self.order.Q[i] # 物料i的需求量
end = split_points[i]
ents = self.material_optional_enterprises[i]
segment = repaired[start:end]
enterprise_segment = enterprise_layer[start:end]
selected_indices = np.where(enterprise_segment == 1)[0]
if len(selected_indices) == 0:
start = end
continue
selected_ents = [ents[idx] for idx in selected_indices]
# 步骤1初始化合法范围起订量~最大供应量)
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
min_q = 0
max_q = qi
else:
supplier_id = ent - 1
min_q = self.supplier.MinOrder[supplier_id][i]
max_q = self.supplier.MaxOrder[supplier_id][i]
if segment[idx] < min_q:
segment[idx] = min_q
elif segment[idx] > max_q:
segment[idx] = max_q
# 步骤2强制总量等于需求数量
current_total = np.sum(segment[selected_indices])
if current_total <= 0:
weights = np.random.rand(len(selected_indices))
weights /= np.sum(weights)
segment[selected_indices] = qi * weights
else:
scale = qi / current_total
segment[selected_indices] *= scale
# 步骤3处理超出最大供应量的情况添加最大迭代次数限制
need_adjust = True
max_iter = 10 # 最大迭代次数,避免无限循环
iter_count = 0
while need_adjust and iter_count < max_iter: # 增加迭代次数限制
iter_count += 1
need_adjust = False
total_excess = 0
# 截断超出max_q的数量
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_q = qi
else:
supplier_id = ent - 1
max_q = self.supplier.MaxOrder[supplier_id][i]
if segment[idx] > max_q:
excess = segment[idx] - max_q
total_excess += excess
segment[idx] = max_q
need_adjust = True
# 分配超出量到有剩余的企业
if total_excess > 0:
available_indices = []
available_max = []
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_q = qi
else:
supplier_id = ent - 1
max_q = self.supplier.MaxOrder[supplier_id][i]
remaining = max_q - segment[idx]
if remaining > 0:
available_indices.append(idx)
available_max.append(remaining)
if len(available_indices) > 0:
available_total = sum(available_max)
if available_total > 0:
alloc_ratio = np.array(available_max) / available_total
alloc_excess = total_excess * alloc_ratio
for idx, alloc in zip(available_indices, alloc_excess):
segment[idx] += alloc
need_adjust = True # 分配后可能需要再次检查
else:
# 无剩余容量,强制截断并重新缩放总量
segment[selected_indices] = np.minimum(segment[selected_indices],
[self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi
for ent in selected_ents])
need_adjust = True
# 检查总量误差,重新缩放
current_total = np.sum(segment[selected_indices])
if abs(current_total - qi) > 1e-6:
scale = qi / current_total
segment[selected_indices] *= scale
need_adjust = True # 缩放后可能需要再次检查
# 最终强制总量等于需求(避免迭代次数到限后仍有误差)
current_total = np.sum(segment[selected_indices])
if abs(current_total - qi) > 1e-6:
scale = qi / current_total
segment[selected_indices] *= scale
repaired[start:end] = segment
start = end
return repaired
def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray:
"""
完整修复染色体(按顺序修复三层)
:param chromosome: 待修复的染色体
:return: 修复后的染色体
"""
# 拆分三层
enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome)
# 依次修复(企业层→能力层→数量层,依赖关系)
enterprise_layer = self.repair_enterprise_layer(enterprise_layer)
capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer)
quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer)
# 合并修复后的三层
return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer)