444 lines
23 KiB
Python
444 lines
23 KiB
Python
import random
|
||
import numpy as np
|
||
import math
|
||
from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类
|
||
class ChromosomeUtils:
|
||
"""染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件"""
|
||
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData):
|
||
"""
|
||
初始化工具类
|
||
:param order_data: 订单数据(需求、交货期等)
|
||
:param risk_data: 风险企业数据(产能等)
|
||
:param supplier_data: 供应商数据(产能、价格等)
|
||
"""
|
||
self.order = order_data # 订单数据
|
||
self.risk = risk_data # 风险企业数据
|
||
self.supplier = supplier_data # 供应商数据
|
||
self.I = order_data.I # 物料种类数
|
||
# 预计算每个物料的可选企业列表(0=风险企业,1+为供应商ID)
|
||
self.material_optional_enterprises = self._get_material_optional_enterprises()
|
||
# 每个物料的可选企业数量
|
||
self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises]
|
||
# 染色体总长度(3层:企业层+能力层+数量层)
|
||
self.chromosome_length = 3 * sum(self.material_enterprise_count)
|
||
def _get_material_optional_enterprises(self) -> list[list[int]]:
|
||
"""
|
||
生成每个物料的可选企业列表(内部方法)
|
||
:return: 二维列表,每个子列表为对应物料的可选企业ID
|
||
"""
|
||
optional = []
|
||
for i in range(self.I): # 遍历每种物料
|
||
ents = [0] # 先加入风险企业(ID=0)
|
||
# 加入可生产该物料的供应商(ID=1+供应商索引)
|
||
for j in range(self.supplier.supplier_count):
|
||
if self.supplier.can_produce[j][i] == 1: # 供应商j可生产物料i
|
||
ents.append(j + 1) # 供应商ID为j+1(区分风险企业)
|
||
optional.append(ents)
|
||
return optional
|
||
def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
|
||
"""
|
||
拆分染色体为三层
|
||
:param chromosome: 完整染色体
|
||
:return: 企业层、能力层、数量层(均为numpy数组)
|
||
"""
|
||
total_ent_count = sum(self.material_enterprise_count) # 所有物料的企业总数
|
||
enterprise_layer = chromosome[:total_ent_count] # 第一层:企业选择(0/1)
|
||
capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能
|
||
quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量
|
||
return enterprise_layer, capacity_layer, quantity_layer
|
||
def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
|
||
quantity_layer: np.ndarray) -> np.ndarray:
|
||
"""
|
||
合并三层为完整染色体
|
||
:param enterprise_layer: 企业层
|
||
:param capacity_layer: 能力层
|
||
:param quantity_layer: 数量层
|
||
:return: 完整染色体
|
||
"""
|
||
return np.hstack([enterprise_layer, capacity_layer, quantity_layer])
|
||
def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray:
|
||
"""
|
||
修复企业层:确保每种物料至少选择一个企业
|
||
:param enterprise_layer: 待修复的企业层
|
||
:return: 修复后的企业层
|
||
"""
|
||
repaired = enterprise_layer.copy()
|
||
# 计算各物料的企业编码分割点
|
||
split_points = np.cumsum(self.material_enterprise_count)
|
||
start = 0 # 起始索引
|
||
for i in range(self.I): # 遍历每种物料
|
||
end = split_points[i] # 当前物料的企业编码结束索引
|
||
segment = repaired[start:end] # 当前物料的企业选择片段
|
||
if np.sum(segment) == 0: # 未选择任何企业时,随机选一个
|
||
select_idx = random.randint(0, len(segment) - 1)
|
||
segment[select_idx] = 1
|
||
repaired[start:end] = segment
|
||
start = end # 更新起始索引
|
||
return repaired
|
||
|
||
def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray:
|
||
"""
|
||
修复能力层:满足单物料最小产能约束和企业总产能约束(整数化)
|
||
:param enterprise_layer: 企业层(用于确定哪些企业被选中)
|
||
:param capacity_layer: 待修复的能力层
|
||
:return: 修复后的能力层(整数)
|
||
"""
|
||
repaired = capacity_layer.copy().astype(int) # 强制转为整数
|
||
split_points = np.cumsum(self.material_enterprise_count)
|
||
start = 0
|
||
|
||
# 1. 修复单物料最小产能约束(每个企业的单物料产能不低于最小产能,整数)
|
||
for i in range(self.I):
|
||
end = split_points[i]
|
||
ents = self.material_optional_enterprises[i] # 可选企业列表
|
||
segment = repaired[start:end] # 当前物料的产能片段
|
||
enterprise_segment = enterprise_layer[start:end] # 企业选择状态
|
||
for idx, ent in enumerate(ents):
|
||
if enterprise_segment[idx] == 1: # 仅处理被选中的企业
|
||
# 确定该企业的单物料最小产能和最大产能
|
||
if ent == 0:
|
||
min_cap = self.risk.C0_i_min[i] # 风险企业最小产能
|
||
max_cap = self.risk.C0_total_max # 单物料最大产能=总产能上限
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_cap = self.supplier.Cj_i_min[supplier_id][i] # 供应商最小产能
|
||
max_cap = self.supplier.Cj_total_max[supplier_id] # 单物料最大产能=总产能上限
|
||
|
||
# 确保不低于最小产能,不超过最大产能
|
||
if segment[idx] < min_cap:
|
||
segment[idx] = min_cap
|
||
if segment[idx] > max_cap:
|
||
segment[idx] = max_cap
|
||
repaired[start:end] = segment
|
||
start = end
|
||
|
||
# 2. 修复企业总产能约束(企业所有物料的总产能不超过上限,整数处理)
|
||
max_adjust_iter = 10 # 最大调整迭代次数
|
||
for _ in range(max_adjust_iter):
|
||
# 计算每个企业的当前总产能
|
||
enterprise_total_capacity = {} # {企业ID: 总产能}
|
||
start = 0
|
||
for i in range(self.I):
|
||
end = split_points[i]
|
||
ents = self.material_optional_enterprises[i]
|
||
enterprise_segment = enterprise_layer[start:end]
|
||
capacity_segment = repaired[start:end]
|
||
for idx, ent in enumerate(ents):
|
||
if enterprise_segment[idx] == 1: # 仅统计选中的企业
|
||
if ent not in enterprise_total_capacity:
|
||
enterprise_total_capacity[ent] = 0
|
||
enterprise_total_capacity[ent] += capacity_segment[idx]
|
||
start = end
|
||
|
||
# 检查是否有企业超出总产能上限
|
||
over_cap_ents = []
|
||
for ent, total_cap in enterprise_total_capacity.items():
|
||
if ent == 0:
|
||
max_total_cap = self.risk.C0_total_max
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_total_cap = self.supplier.Cj_total_max[supplier_id]
|
||
if total_cap > max_total_cap:
|
||
over_cap_ents.append((ent, total_cap, max_total_cap))
|
||
|
||
if not over_cap_ents:
|
||
break # 无超出则退出循环
|
||
|
||
# 处理超出总产能的企业
|
||
for ent, total_cap, max_total_cap in over_cap_ents:
|
||
excess = total_cap - max_total_cap # 超出量(整数)
|
||
if excess <= 0:
|
||
continue
|
||
|
||
# 收集该企业的所有产能分配(含最小产能约束)
|
||
cap_assignments = [] # [(物料索引i, 产能索引idx, 当前产能, 最小产能)]
|
||
start = 0
|
||
for i in range(self.I):
|
||
end = split_points[i]
|
||
ents = self.material_optional_enterprises[i]
|
||
enterprise_segment = enterprise_layer[start:end]
|
||
capacity_segment = repaired[start:end]
|
||
for idx, e in enumerate(ents):
|
||
if e == ent and enterprise_segment[idx] == 1:
|
||
# 获取该物料的最小产能
|
||
if ent == 0:
|
||
min_cap = self.risk.C0_i_min[i]
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_cap = self.supplier.Cj_i_min[supplier_id][i]
|
||
cap_assignments.append((i, start + idx, capacity_segment[idx], min_cap))
|
||
start = end
|
||
|
||
# 循环减少超出产能:每次减少1,直到超出量为0
|
||
current_excess = excess
|
||
while current_excess > 0 and cap_assignments:
|
||
# 本轮可减少的物料列表(排除已达最小产能的)
|
||
reducible = [item for item in cap_assignments if item[2] > item[3]]
|
||
if not reducible:
|
||
break # 无可减少的产能,退出循环
|
||
|
||
# 遍历可减少的物料,每次减少1
|
||
for idx in range(len(reducible)):
|
||
i, pos, curr_cap, min_cap = reducible[idx]
|
||
# 减少1个单位产能
|
||
new_cap = curr_cap - 1
|
||
if new_cap < min_cap:
|
||
new_cap = min_cap # 确保不低于最小产能
|
||
|
||
# 更新产能值
|
||
repaired[pos] = new_cap
|
||
# 更新分配记录
|
||
cap_assignments = [
|
||
(i, pos, new_cap, min_cap) if (item[1] == pos) else item
|
||
for item in cap_assignments
|
||
]
|
||
reducible[idx] = (i, pos, new_cap, min_cap)
|
||
|
||
current_excess -= 1
|
||
if current_excess <= 0:
|
||
break # 超出量处理完毕,退出循环
|
||
|
||
# 最终确保所有产能为整数且不低于最小产能
|
||
start = 0
|
||
for i in range(self.I):
|
||
end = split_points[i]
|
||
ents = self.material_optional_enterprises[i]
|
||
segment = repaired[start:end]
|
||
enterprise_segment = enterprise_layer[start:end]
|
||
for idx, ent in enumerate(ents):
|
||
if enterprise_segment[idx] == 1:
|
||
if ent == 0:
|
||
min_cap = self.risk.C0_i_min[i]
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_cap = self.supplier.Cj_i_min[supplier_id][i]
|
||
if segment[idx] < min_cap:
|
||
segment[idx] = min_cap
|
||
repaired[start:end] = segment
|
||
start = end
|
||
|
||
return repaired.astype(int)
|
||
def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray:
|
||
"""修复数量编码层:满足总量需求、起订量和最大供应量约束(整数化)"""
|
||
repaired = quantity_layer.copy().astype(int) # 强制转为整数
|
||
split_points = np.cumsum(self.material_enterprise_count)
|
||
start = 0
|
||
max_iter = 20 # 增加迭代次数
|
||
for i in range(self.I): # 遍历每种物料
|
||
qi = self.order.Q[i] # 物料i的需求量(整数)
|
||
end = split_points[i]
|
||
ents = self.material_optional_enterprises[i]
|
||
segment = repaired[start:end]
|
||
enterprise_segment = enterprise_layer[start:end]
|
||
selected_indices = np.where(enterprise_segment == 1)[0]
|
||
if len(selected_indices) == 0:
|
||
start = end
|
||
continue
|
||
selected_ents = [ents[idx] for idx in selected_indices]
|
||
# 步骤1:初始化合法范围(起订量~最大供应量,整数)
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
min_q = 0
|
||
max_q = qi
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_q = self.supplier.MinOrder[supplier_id][i]
|
||
max_q = self.supplier.MaxOrder[supplier_id][i]
|
||
# 确保在合法范围内
|
||
if segment[idx] < min_q:
|
||
segment[idx] = min_q
|
||
elif segment[idx] > max_q:
|
||
segment[idx] = max_q
|
||
# 步骤2:强制总量等于需求数量(整数)
|
||
current_total = np.sum(segment[selected_indices])
|
||
if current_total <= 0:
|
||
# 随机分配初始数量
|
||
weights = np.random.rand(len(selected_indices))
|
||
weights /= np.sum(weights)
|
||
base = qi // len(selected_indices)
|
||
remainder = qi % len(selected_indices)
|
||
segment[selected_indices] = base
|
||
# 分配余数(优先给任务占比低的)
|
||
if remainder > 0:
|
||
# 计算各企业任务占比(当前分配/最大产能)
|
||
ratios = []
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
max_cap = self.risk.C0_total_max
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_cap = self.supplier.Cj_total_max[supplier_id]
|
||
ratio = segment[idx] / max_cap if max_cap > 0 else 1.0
|
||
ratios.append(ratio)
|
||
# 按占比升序排序,优先分配余数
|
||
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])]
|
||
for j in range(remainder):
|
||
segment[sorted_indices[j]] += 1
|
||
else:
|
||
# 按比例缩放(整数化)
|
||
scale = qi / current_total
|
||
scaled = np.array([int(round(x * scale)) for x in segment[selected_indices]])
|
||
segment[selected_indices] = scaled
|
||
# 调整差额
|
||
current_total = np.sum(segment[selected_indices])
|
||
diff = qi - current_total
|
||
if diff != 0:
|
||
# 按任务占比分配差额
|
||
ratios = []
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
max_cap = self.risk.C0_total_max
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_cap = self.supplier.Cj_total_max[supplier_id]
|
||
ratio = segment[idx] / max_cap if max_cap > 0 else 1.0
|
||
ratios.append(ratio)
|
||
if diff > 0:
|
||
# 正差额:优先分配给占比低的
|
||
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])]
|
||
for j in range(diff):
|
||
if j < len(sorted_indices):
|
||
segment[sorted_indices[j]] += 1
|
||
else:
|
||
# 负差额:优先从占比高的扣除
|
||
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0], reverse=True)]
|
||
for j in range(-diff):
|
||
if j < len(sorted_indices) and segment[sorted_indices[j]] > min_q:
|
||
segment[sorted_indices[j]] -= 1
|
||
# 步骤3:处理超出最大供应量的情况(整数)
|
||
need_adjust = True
|
||
iter_count = 0
|
||
while need_adjust and iter_count < max_iter:
|
||
iter_count += 1
|
||
need_adjust = False
|
||
total_excess = 0
|
||
# 截断超出max_q的数量(整数)
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
max_q = qi
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_q = self.supplier.MaxOrder[supplier_id][i]
|
||
if segment[idx] > max_q:
|
||
excess = segment[idx] - max_q
|
||
total_excess += excess
|
||
segment[idx] = max_q
|
||
need_adjust = True
|
||
# 分配超出量(整数)
|
||
if total_excess > 0:
|
||
available_indices = []
|
||
available_remaining = []
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
max_q = qi
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_q = self.supplier.MaxOrder[supplier_id][i]
|
||
remaining = max_q - segment[idx]
|
||
if remaining > 0:
|
||
available_indices.append(idx)
|
||
available_remaining.append(remaining)
|
||
if len(available_indices) > 0:
|
||
# 按剩余容量分配
|
||
total_available = sum(available_remaining)
|
||
if total_available > 0:
|
||
quotient = total_excess // total_available
|
||
remainder = total_excess % total_available
|
||
# 分配整数商
|
||
for idx, rem in zip(available_indices, available_remaining):
|
||
assign = min(quotient, rem)
|
||
segment[idx] += assign
|
||
total_excess -= assign
|
||
# 分配余数(优先给剩余多的)
|
||
if remainder > 0:
|
||
sorted_pairs = sorted(zip(available_indices, available_remaining), key=lambda x: x[1], reverse=True)
|
||
for idx, _ in sorted_pairs:
|
||
if remainder <= 0:
|
||
break
|
||
rem = max_q - segment[idx]
|
||
assign = min(remainder, rem)
|
||
segment[idx] += assign
|
||
remainder -= assign
|
||
total_excess = remainder
|
||
if total_excess > 0:
|
||
need_adjust = True
|
||
else:
|
||
# 无剩余容量,强制截断并重新缩放
|
||
segment[selected_indices] = np.minimum(segment[selected_indices],
|
||
[self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi
|
||
for ent in selected_ents])
|
||
# 重新调整总量
|
||
current_total = np.sum(segment[selected_indices])
|
||
diff = qi - current_total
|
||
if diff != 0:
|
||
# 兜底:分配给剩余容量最大的(即使为0,强制调整)
|
||
if diff > 0:
|
||
# 找最小起订量最低的企业
|
||
min_min_order = float('inf')
|
||
target_idx = selected_indices[0]
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
min_q = 0
|
||
else:
|
||
supplier_id = ent - 1
|
||
min_q = self.supplier.MinOrder[supplier_id][i]
|
||
if min_q < min_min_order:
|
||
min_min_order = min_q
|
||
target_idx = idx
|
||
segment[target_idx] += diff
|
||
else:
|
||
# 找分配量最大的企业扣除
|
||
max_qty = -1
|
||
target_idx = selected_indices[0]
|
||
for idx in selected_indices:
|
||
if segment[idx] > max_qty:
|
||
max_qty = segment[idx]
|
||
target_idx = idx
|
||
segment[target_idx] += diff # diff为负
|
||
need_adjust = True
|
||
# 检查总量(整数相等)
|
||
current_total = np.sum(segment[selected_indices])
|
||
if current_total != qi:
|
||
diff = qi - current_total
|
||
if diff > 0:
|
||
# 分配差额给剩余容量最大的
|
||
max_remaining = -1
|
||
target_idx = selected_indices[0]
|
||
for idx, ent in zip(selected_indices, selected_ents):
|
||
if ent == 0:
|
||
max_q = qi
|
||
else:
|
||
supplier_id = ent - 1
|
||
max_q = self.supplier.MaxOrder[supplier_id][i]
|
||
remaining = max_q - segment[idx]
|
||
if remaining > max_remaining:
|
||
max_remaining = remaining
|
||
target_idx = idx
|
||
segment[target_idx] += diff
|
||
else:
|
||
# 扣除差额从分配量最大的
|
||
max_qty = -1
|
||
target_idx = selected_indices[0]
|
||
for idx in selected_indices:
|
||
if segment[idx] > max_qty:
|
||
max_qty = segment[idx]
|
||
target_idx = idx
|
||
segment[target_idx] += diff
|
||
need_adjust = True
|
||
repaired[start:end] = segment
|
||
start = end
|
||
return repaired.astype(int)
|
||
def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray:
|
||
"""
|
||
完整修复染色体(按顺序修复三层)
|
||
:param chromosome: 待修复的染色体
|
||
:return: 修复后的染色体
|
||
"""
|
||
# 拆分三层
|
||
enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome)
|
||
# 依次修复(企业层→能力层→数量层,依赖关系)
|
||
enterprise_layer = self.repair_enterprise_layer(enterprise_layer)
|
||
capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer)
|
||
quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer)
|
||
# 合并修复后的三层
|
||
return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer) |