没有过滤解,但是功能正常

This commit is contained in:
Hgq 2025-12-07 16:41:26 +08:00
parent c21683083d
commit 6a482e6af1
7 changed files with 459 additions and 361 deletions

View File

@ -1,11 +1,9 @@
import random
import numpy as np
import math
from data_structures import OrderData, RiskEnterpriseData, SupplierData # 数据结构类
class ChromosomeUtils:
"""染色体工具类:提供染色体的拆分、合并、修复等功能,确保满足约束条件"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData):
"""
初始化工具类
@ -23,7 +21,6 @@ class ChromosomeUtils:
self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises]
# 染色体总长度3层企业层+能力层+数量层)
self.chromosome_length = 3 * sum(self.material_enterprise_count)
def _get_material_optional_enterprises(self) -> list[list[int]]:
"""
生成每个物料的可选企业列表内部方法
@ -38,7 +35,6 @@ class ChromosomeUtils:
ents.append(j + 1) # 供应商ID为j+1区分风险企业
optional.append(ents)
return optional
def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
拆分染色体为三层
@ -50,7 +46,6 @@ class ChromosomeUtils:
capacity_layer = chromosome[total_ent_count:2 * total_ent_count] # 第二层:产能
quantity_layer = chromosome[2 * total_ent_count:] # 第三层:数量
return enterprise_layer, capacity_layer, quantity_layer
def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> np.ndarray:
"""
@ -61,7 +56,6 @@ class ChromosomeUtils:
:return: 完整染色体
"""
return np.hstack([enterprise_layer, capacity_layer, quantity_layer])
def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray:
"""
修复企业层确保每种物料至少选择一个企业
@ -84,16 +78,16 @@ class ChromosomeUtils:
def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray:
"""
修复能力层满足单物料最小产能约束和企业总产能约束
修复能力层满足单物料最小产能约束和企业总产能约束整数化
:param enterprise_layer: 企业层用于确定哪些企业被选中
:param capacity_layer: 待修复的能力层
:return: 修复后的能力层
:return: 修复后的能力层整数
"""
repaired = capacity_layer.copy()
repaired = capacity_layer.copy().astype(int) # 强制转为整数
split_points = np.cumsum(self.material_enterprise_count)
start = 0
# 1. 修复单物料最小产能约束(每个企业的单物料产能不低于最小产能
# 1. 修复单物料最小产能约束(每个企业的单物料产能不低于最小产能,整数
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i] # 可选企业列表
@ -101,21 +95,27 @@ class ChromosomeUtils:
enterprise_segment = enterprise_layer[start:end] # 企业选择状态
for idx, ent in enumerate(ents):
if enterprise_segment[idx] == 1: # 仅处理被选中的企业
# 确定该企业的单物料最小产能
# 确定该企业的单物料最小产能和最大产能
if ent == 0:
min_cap = self.risk.C0_i_min[i] # 风险企业最小产能
max_cap = self.risk.C0_total_max # 单物料最大产能=总产能上限
else:
supplier_id = ent - 1
min_cap = self.supplier.Cj_i_min[supplier_id][i] # 供应商最小产能
max_cap = self.supplier.Cj_total_max[supplier_id] # 单物料最大产能=总产能上限
# 若产能低于最小产能重置为最小产能到2倍最小产能之间的随机值无上限约束合理范围控制
# 确保不低于最小产能,不超过最大产能
if segment[idx] < min_cap:
segment[idx] = random.uniform(min_cap, min_cap * 2)
segment[idx] = min_cap
if segment[idx] > max_cap:
segment[idx] = max_cap
repaired[start:end] = segment
start = end
# 2. 修复企业总产能约束(企业所有物料的总产能不超过上限)
# 先计算每个企业的当前总产能
# 2. 修复企业总产能约束(企业所有物料的总产能不超过上限,整数处理)
max_adjust_iter = 10 # 最大调整迭代次数
for _ in range(max_adjust_iter):
# 计算每个企业的当前总产能
enterprise_total_capacity = {} # {企业ID: 总产能}
start = 0
for i in range(self.I):
@ -130,19 +130,29 @@ class ChromosomeUtils:
enterprise_total_capacity[ent] += capacity_segment[idx]
start = end
# 调整超出总产能上限的企业(按比例缩放)
# 检查是否有企业超出总产能上限
over_cap_ents = []
for ent, total_cap in enterprise_total_capacity.items():
# 确定企业的总产能上限
if ent == 0:
max_total_cap = self.risk.C0_total_max # 风险企业总产能上限
max_total_cap = self.risk.C0_total_max
else:
supplier_id = ent - 1
max_total_cap = self.supplier.Cj_total_max[supplier_id] # 供应商总产能上限
max_total_cap = self.supplier.Cj_total_max[supplier_id]
if total_cap > max_total_cap:
over_cap_ents.append((ent, total_cap, max_total_cap))
if total_cap > max_total_cap: # 超出上限时缩放
scale = max_total_cap / total_cap # 缩放比例
if not over_cap_ents:
break # 无超出则退出循环
# 处理超出总产能的企业
for ent, total_cap, max_total_cap in over_cap_ents:
excess = total_cap - max_total_cap # 超出量(整数)
if excess <= 0:
continue
# 收集该企业的所有产能分配(含最小产能约束)
cap_assignments = [] # [(物料索引i, 产能索引idx, 当前产能, 最小产能)]
start = 0
# 重新遍历,按比例缩小该企业所有物料的产能
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i]
@ -150,26 +160,72 @@ class ChromosomeUtils:
capacity_segment = repaired[start:end]
for idx, e in enumerate(ents):
if e == ent and enterprise_segment[idx] == 1:
# 缩放后仍需保证不低于最小产能
scaled_cap = capacity_segment[idx] * scale
# 获取该物料的最小产能
if ent == 0:
min_cap = self.risk.C0_i_min[i]
else:
supplier_id = e - 1
supplier_id = ent - 1
min_cap = self.supplier.Cj_i_min[supplier_id][i]
capacity_segment[idx] = max(scaled_cap, min_cap)
repaired[start:end] = capacity_segment
cap_assignments.append((i, start + idx, capacity_segment[idx], min_cap))
start = end
return repaired
# 循环减少超出产能每次减少1直到超出量为0
current_excess = excess
while current_excess > 0 and cap_assignments:
# 本轮可减少的物料列表(排除已达最小产能的)
reducible = [item for item in cap_assignments if item[2] > item[3]]
if not reducible:
break # 无可减少的产能,退出循环
# 遍历可减少的物料每次减少1
for idx in range(len(reducible)):
i, pos, curr_cap, min_cap = reducible[idx]
# 减少1个单位产能
new_cap = curr_cap - 1
if new_cap < min_cap:
new_cap = min_cap # 确保不低于最小产能
# 更新产能值
repaired[pos] = new_cap
# 更新分配记录
cap_assignments = [
(i, pos, new_cap, min_cap) if (item[1] == pos) else item
for item in cap_assignments
]
reducible[idx] = (i, pos, new_cap, min_cap)
current_excess -= 1
if current_excess <= 0:
break # 超出量处理完毕,退出循环
# 最终确保所有产能为整数且不低于最小产能
start = 0
for i in range(self.I):
end = split_points[i]
ents = self.material_optional_enterprises[i]
segment = repaired[start:end]
enterprise_segment = enterprise_layer[start:end]
for idx, ent in enumerate(ents):
if enterprise_segment[idx] == 1:
if ent == 0:
min_cap = self.risk.C0_i_min[i]
else:
supplier_id = ent - 1
min_cap = self.supplier.Cj_i_min[supplier_id][i]
if segment[idx] < min_cap:
segment[idx] = min_cap
repaired[start:end] = segment
start = end
return repaired.astype(int)
def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray:
"""修复数量编码层:满足总量需求、起订量和最大供应量约束"""
repaired = quantity_layer.copy()
"""修复数量编码层:满足总量需求、起订量和最大供应量约束(整数化)"""
repaired = quantity_layer.copy().astype(int) # 强制转为整数
split_points = np.cumsum(self.material_enterprise_count)
start = 0
max_iter = 20 # 增加迭代次数
for i in range(self.I): # 遍历每种物料
qi = self.order.Q[i] # 物料i的需求量
qi = self.order.Q[i] # 物料i的需求量(整数)
end = split_points[i]
ents = self.material_optional_enterprises[i]
segment = repaired[start:end]
@ -179,7 +235,7 @@ class ChromosomeUtils:
start = end
continue
selected_ents = [ents[idx] for idx in selected_indices]
# 步骤1初始化合法范围起订量~最大供应量
# 步骤1初始化合法范围起订量~最大供应量,整数
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
min_q = 0
@ -188,28 +244,75 @@ class ChromosomeUtils:
supplier_id = ent - 1
min_q = self.supplier.MinOrder[supplier_id][i]
max_q = self.supplier.MaxOrder[supplier_id][i]
# 确保在合法范围内
if segment[idx] < min_q:
segment[idx] = min_q
elif segment[idx] > max_q:
segment[idx] = max_q
# 步骤2强制总量等于需求数量
# 步骤2强制总量等于需求数量(整数)
current_total = np.sum(segment[selected_indices])
if current_total <= 0:
# 随机分配初始数量
weights = np.random.rand(len(selected_indices))
weights /= np.sum(weights)
segment[selected_indices] = qi * weights
base = qi // len(selected_indices)
remainder = qi % len(selected_indices)
segment[selected_indices] = base
# 分配余数(优先给任务占比低的)
if remainder > 0:
# 计算各企业任务占比(当前分配/最大产能)
ratios = []
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_cap = self.risk.C0_total_max
else:
supplier_id = ent - 1
max_cap = self.supplier.Cj_total_max[supplier_id]
ratio = segment[idx] / max_cap if max_cap > 0 else 1.0
ratios.append(ratio)
# 按占比升序排序,优先分配余数
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])]
for j in range(remainder):
segment[sorted_indices[j]] += 1
else:
# 按比例缩放(整数化)
scale = qi / current_total
segment[selected_indices] *= scale
# 步骤3处理超出最大供应量的情况添加最大迭代次数限制
scaled = np.array([int(round(x * scale)) for x in segment[selected_indices]])
segment[selected_indices] = scaled
# 调整差额
current_total = np.sum(segment[selected_indices])
diff = qi - current_total
if diff != 0:
# 按任务占比分配差额
ratios = []
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_cap = self.risk.C0_total_max
else:
supplier_id = ent - 1
max_cap = self.supplier.Cj_total_max[supplier_id]
ratio = segment[idx] / max_cap if max_cap > 0 else 1.0
ratios.append(ratio)
if diff > 0:
# 正差额:优先分配给占比低的
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0])]
for j in range(diff):
if j < len(sorted_indices):
segment[sorted_indices[j]] += 1
else:
# 负差额:优先从占比高的扣除
sorted_indices = [idx for _, idx in sorted(zip(ratios, selected_indices), key=lambda x: x[0], reverse=True)]
for j in range(-diff):
if j < len(sorted_indices) and segment[sorted_indices[j]] > min_q:
segment[sorted_indices[j]] -= 1
# 步骤3处理超出最大供应量的情况整数
need_adjust = True
max_iter = 10 # 最大迭代次数,避免无限循环
iter_count = 0
while need_adjust and iter_count < max_iter: # 增加迭代次数限制
while need_adjust and iter_count < max_iter:
iter_count += 1
need_adjust = False
total_excess = 0
# 截断超出max_q的数量
# 截断超出max_q的数量(整数)
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_q = qi
@ -221,10 +324,10 @@ class ChromosomeUtils:
total_excess += excess
segment[idx] = max_q
need_adjust = True
# 分配超出量到有剩余的企业
# 分配超出量(整数)
if total_excess > 0:
available_indices = []
available_max = []
available_remaining = []
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_q = qi
@ -234,36 +337,97 @@ class ChromosomeUtils:
remaining = max_q - segment[idx]
if remaining > 0:
available_indices.append(idx)
available_max.append(remaining)
available_remaining.append(remaining)
if len(available_indices) > 0:
available_total = sum(available_max)
if available_total > 0:
alloc_ratio = np.array(available_max) / available_total
alloc_excess = total_excess * alloc_ratio
for idx, alloc in zip(available_indices, alloc_excess):
segment[idx] += alloc
need_adjust = True # 分配后可能需要再次检查
# 按剩余容量分配
total_available = sum(available_remaining)
if total_available > 0:
quotient = total_excess // total_available
remainder = total_excess % total_available
# 分配整数商
for idx, rem in zip(available_indices, available_remaining):
assign = min(quotient, rem)
segment[idx] += assign
total_excess -= assign
# 分配余数(优先给剩余多的)
if remainder > 0:
sorted_pairs = sorted(zip(available_indices, available_remaining), key=lambda x: x[1], reverse=True)
for idx, _ in sorted_pairs:
if remainder <= 0:
break
rem = max_q - segment[idx]
assign = min(remainder, rem)
segment[idx] += assign
remainder -= assign
total_excess = remainder
if total_excess > 0:
need_adjust = True
else:
# 无剩余容量,强制截断并重新缩放总量
# 无剩余容量,强制截断并重新缩放
segment[selected_indices] = np.minimum(segment[selected_indices],
[self.supplier.MaxOrder[ent - 1][i] if ent != 0 else qi
for ent in selected_ents])
# 重新调整总量
current_total = np.sum(segment[selected_indices])
diff = qi - current_total
if diff != 0:
# 兜底分配给剩余容量最大的即使为0强制调整
if diff > 0:
# 找最小起订量最低的企业
min_min_order = float('inf')
target_idx = selected_indices[0]
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
min_q = 0
else:
supplier_id = ent - 1
min_q = self.supplier.MinOrder[supplier_id][i]
if min_q < min_min_order:
min_min_order = min_q
target_idx = idx
segment[target_idx] += diff
else:
# 找分配量最大的企业扣除
max_qty = -1
target_idx = selected_indices[0]
for idx in selected_indices:
if segment[idx] > max_qty:
max_qty = segment[idx]
target_idx = idx
segment[target_idx] += diff # diff为负
need_adjust = True
# 检查总量误差,重新缩放
# 检查总量(整数相等)
current_total = np.sum(segment[selected_indices])
if abs(current_total - qi) > 1e-6:
scale = qi / current_total
segment[selected_indices] *= scale
need_adjust = True # 缩放后可能需要再次检查
# 最终强制总量等于需求(避免迭代次数到限后仍有误差)
current_total = np.sum(segment[selected_indices])
if abs(current_total - qi) > 1e-6:
scale = qi / current_total
segment[selected_indices] *= scale
if current_total != qi:
diff = qi - current_total
if diff > 0:
# 分配差额给剩余容量最大的
max_remaining = -1
target_idx = selected_indices[0]
for idx, ent in zip(selected_indices, selected_ents):
if ent == 0:
max_q = qi
else:
supplier_id = ent - 1
max_q = self.supplier.MaxOrder[supplier_id][i]
remaining = max_q - segment[idx]
if remaining > max_remaining:
max_remaining = remaining
target_idx = idx
segment[target_idx] += diff
else:
# 扣除差额从分配量最大的
max_qty = -1
target_idx = selected_indices[0]
for idx in selected_indices:
if segment[idx] > max_qty:
max_qty = segment[idx]
target_idx = idx
segment[target_idx] += diff
need_adjust = True
repaired[start:end] = segment
start = end
return repaired
return repaired.astype(int)
def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray:
"""
完整修复染色体按顺序修复三层

View File

@ -3,20 +3,18 @@ class OrderData:
"""订单数据类:存储物料需求、交货期、成本等信息"""
def __init__(self):
self.I = 5 # 物料种类数
self.Q = [6000, 12000, 20000, 7500, 13500] # 各物料的需求数量
self.Dd = 30 # 需求交货期(单位:时间)
self.P0 = [45, 30, 30, 50, 40] # 风险企业的单位采购价
self.T0 = [5, 8, 6, 7, 9] # 风险企业的单位运输成本
self.transport_speed = 10 # 运输速度(单位:距离/时间)
self.Q = [6000, 12000, 20000, 7500, 13500] # 各物料的需求数量(整数)
self.Dd = 30 # 需求交货期(单位:时间,整数)
self.P0 = [45, 30, 30, 50, 40] # 风险企业的单位采购价(整数)
self.T0 = [5, 8, 6, 7, 9] # 风险企业的单位运输成本(整数)
self.transport_speed = 10 # 运输速度(单位:距离/时间,整数)
class RiskEnterpriseData:
"""风险企业数据类:存储风险企业的产能、距离等信息"""
def __init__(self):
self.I = 5 # 物料种类数(与订单一致)
self.C0_i_min = [50, 100, 150, 80, 100] # 单物料的单位时间最小产能
self.C0_total_max = 900 # 总产能上限(单位时间)
self.distance = 20 # 与需求点的距离
self.C0_i_min = [50, 100, 150, 80, 100] # 单物料的单位时间最小产能(整数)
self.C0_total_max = 900 # 总产能上限(单位时间,整数)
self.distance = 20 # 与需求点的距离(整数)
class SupplierData:
"""供应商数据类:存储各供应商的产能、价格、距离等信息"""
def __init__(self, I=5):
@ -30,51 +28,50 @@ class SupplierData:
[0, 1, 0, 1, 0],
[0, 0, 1, 1, 1]
]
# 单物料单位时间最小产能supplier_count × I0表示不能生产该物料
# 单物料单位时间最小产能supplier_count × I0表示不能生产该物料(整数)
self.Cj_i_min = [
[30, 80, 100, 60, 80],
[60, 0, 180, 0, 120],
[0, 150, 0, 120, 0],
[0, 0, 170, 105, 115]
]
# 供应商单位时间的最大总产能supplier_count
# 供应商单位时间的最大总产能supplier_count,整数
self.Cj_total_max = [700, 800, 600, 850]
# 最小起订量supplier_count × I
# 最小起订量supplier_count × I,整数
self.MinOrder = [
[800, 1500, 3000, 800, 1500],
[1000, 0, 3500, 0, 1800],
[0, 1700, 0, 1000, 0],
[0, 0, 2500, 500, 1000]
]
# 最大供应量supplier_count × I
# 最大供应量supplier_count × I,整数
self.MaxOrder = [
[5000, 10000, 18000, 6500, 11000],
[8000, 0, 25000, 0, 15000],
[0, 8000, 0, 6000, 0],
[0, 0, 20000, 7500, 13500]
]
# 单位采购价格supplier_count × I
# 单位采购价格supplier_count × I,整数
self.P_ij = [
[50, 35, 28, 47, 38],
[43, 0, 28, 0, 36],
[0, 31, 0, 52, 0],
[0, 0, 32, 52, 43]
]
# 单位运输成本supplier_count × I
# 单位运输成本supplier_count × I,整数
self.T_ij = [
[6, 9, 8, 9, 12],
[4, 0, 5, 0, 15],
[0, 10, 0, 7, 0],
[0, 0, 8, 9, 11]
]
# 供应商与需求点的距离supplier_count
# 供应商与需求点的距离supplier_count,整数
self.distance = [60, 50, 70, 40]
class Config:
"""算法参数配置类存储NSGA-II的各类参数"""
def __init__(self):
# 种群参数
self.pop_size = 100 # 种群大小
self.pop_size = 200 # 种群大小
self.N1_ratio = 0.2 # 优先成本的种群比例
self.N2_ratio = 0.2 # 优先延期的种群比例
self.N3_ratio = 0.3 # 强制风险企业的种群比例
@ -82,10 +79,9 @@ class Config:
# 遗传操作参数
self.crossover_prob = 0.8 # 交叉概率
self.mutation_prob = 0.3 # 变异概率
self.max_generations = 300 # 最大进化代数
self.max_generations = 100 # 最大进化代数
# 惩罚系数
self.delta = 1.3 # 变更惩罚系数
self.gamma = 500 # 提前交付惩罚系数
# 早停参数
self.early_stop_patience = 50 # 连续多少代无改进则早停
# 目标函数数量

View File

@ -4,9 +4,8 @@ from data_structures import Config
from chromosome_utils import ChromosomeUtils
from objective_calculator import ObjectiveCalculator
from nsga2 import NSGA2
class Encoder:
"""种群初始化编码器:生成初始种群,包含多种初始化策略"""
"""种群初始化编码器:生成初始种群,包含多种初始化策略(整数化)"""
def __init__(self, config: Config, utils: ChromosomeUtils):
"""
初始化编码器
@ -21,16 +20,15 @@ class Encoder:
self.N2 = int(config.N2_ratio * self.pop_size) # 优先最小化延期的个体数
self.N3 = int(config.N3_ratio * self.pop_size) # 强制选择风险企业的个体数
self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 # 随机生成的个体数
def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray:
"""
生成随机染色体内部方法
生成随机染色体内部方法整数化
:param force_risk_enterprise: 是否强制选择风险企业用于N3策略
:return: 生成的染色体经过修复
:return: 生成的染色体经过修复整数
"""
enterprise_layer = [] # 企业选择层0/1
capacity_layer = [] # 产能层
quantity_layer = [] # 数量层
capacity_layer = [] # 产能层(整数)
quantity_layer = [] # 数量层(整数)
for i in range(self.utils.I): # 遍历每种物料
ent_count = self.utils.material_enterprise_count[i] # 当前物料的可选企业数量
ents = self.utils.material_optional_enterprises[i] # 可选企业列表0为风险企业
@ -42,39 +40,44 @@ class Encoder:
if force_risk_enterprise:
e_genes[0] = 1 # 强制选择风险企业索引0
enterprise_layer.extend(e_genes)
# 2. 生成能力层(为选中的企业分配不低于最小产能的随机产能)
c_genes = np.zeros(ent_count)
# 2. 生成能力层(整数,不低于最小产能)
c_genes = np.zeros(ent_count, dtype=int)
for idx, ent in enumerate(ents):
if e_genes[idx] == 1: # 仅为选中的企业分配产能
if ent == 0:
min_cap = self.utils.risk.C0_i_min[i] # 风险企业的最小产能
max_cap = self.utils.risk.C0_total_max # 最大产能=总产能上限
else:
supplier_id = ent - 1
min_cap = self.utils.supplier.Cj_i_min[supplier_id][i] # 供应商的最小产能
# 生成不低于最小产能的随机值无上限用2倍最小产能控制初始范围
c_genes[idx] = random.uniform(min_cap, min_cap * 2)
max_cap = self.utils.supplier.Cj_total_max[supplier_id] # 最大产能=总产能上限
# 生成整数产能
c_genes[idx] = random.randint(min_cap, max_cap)
capacity_layer.extend(c_genes)
# 3. 生成数量层(为选中的企业随机分配数量)
q_genes = np.zeros(ent_count)
# 3. 生成数量层(整数,在合法范围内)
q_genes = np.zeros(ent_count, dtype=int)
qi = self.utils.order.Q[i]
for idx, ent in enumerate(ents):
if e_genes[idx] == 1: # 仅为选中的企业分配数量
if ent == 0:
max_q = self.utils.order.Q[i] # 风险企业最多分配全部需求
min_q = 0
max_q = qi
else:
supplier_id = ent - 1
max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 供应商的最大供应量
q_genes[idx] = random.uniform(1, max_q) # 随机数量
min_q = self.utils.supplier.MinOrder[supplier_id][i]
max_q = self.utils.supplier.MaxOrder[supplier_id][i]
# 生成整数数量
q_genes[idx] = random.randint(min_q, max_q)
quantity_layer.extend(q_genes)
# 合并三层并修复染色体(确保满足所有约束)
chromosome = self.utils._merge_chromosome(
np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer)
)
return self.utils.repair_chromosome(chromosome)
def initialize_population(self) -> np.ndarray:
"""
初始化完整种群四种策略组合
:return: 初始化后的种群numpy数组
:return: 初始化后的种群numpy数组整数
"""
# 按四种策略生成子种群
pop1 = self._initialize_by_objective(self.N1, "cost") # 优先成本
@ -88,14 +91,13 @@ class Encoder:
# 合并子种群并打乱顺序
population = np.vstack(population_list)
np.random.shuffle(population)
return population[:self.pop_size] # 确保种群大小正确
return population[:self.pop_size].astype(int) # 确保为整数
def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray:
"""
基于目标函数初始化生成候选解后选择最优的count个
:param count: 需生成的个体数量
:param objective_type: 优化目标"cost""tardiness"
:return: 筛选后的子种群
:return: 筛选后的子种群整数
"""
if count <= 0: # 数量为0时返回空数组
return np.array([])
@ -113,13 +115,12 @@ class Encoder:
else:
# 按延期升序排序(延期越小越优)
sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1])
return np.array([candidates[i] for i in sorted_indices[:count]])
return np.array([candidates[i] for i in sorted_indices[:count]]).astype(int)
def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray:
"""
基于风险企业初始化强制选择风险企业用NSGA-II筛选
:param count: 需生成的个体数量
:return: 筛选后的子种群
:return: 筛选后的子种群整数
"""
if count <= 0:
return np.array([])
@ -141,14 +142,13 @@ class Encoder:
# 前沿数量超过剩余需求时,取部分
selected.extend([candidates[i] for i in front[:count - len(selected)]])
break
return np.array(selected)
return np.array(selected).astype(int)
def _initialize_random(self, count: int) -> np.ndarray:
"""
随机初始化直接生成count个随机染色体
:param count: 需生成的个体数量
:return: 随机子种群
:return: 随机子种群整数
"""
if count <= 0:
return np.array([])
return np.array([self._generate_random_chromosome() for _ in range(count)])
return np.array([self._generate_random_chromosome() for _ in range(count)]).astype(int)

View File

@ -2,9 +2,8 @@ import random
import numpy as np
from data_structures import Config # 算法配置参数类
from chromosome_utils import ChromosomeUtils # 染色体工具类
class GeneticOperator:
"""遗传操作类:实现交叉和变异操作,用于产生新个体"""
"""遗传操作类:实现交叉和变异操作,用于产生新个体(整数化)"""
def __init__(self, config: Config, utils: ChromosomeUtils):
"""
初始化遗传操作器
@ -13,17 +12,16 @@ class GeneticOperator:
"""
self.config = config # 配置参数
self.utils = utils # 染色体工具
def two_point_crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
"""
两点交叉在染色体上随机选择两个点交换两点之间的基因片段
:param parent1: 父代染色体1
:param parent2: 父代染色体2
:return: 两个子代染色体经过修复
:param parent1: 父代染色体1整数
:param parent2: 父代染色体2整数
:return: 两个子代染色体经过修复整数
"""
length = self.utils.chromosome_length # 染色体总长度
if length < 2: # 染色体长度不足2时无法交叉直接返回父代副本
return parent1.copy(), parent2.copy()
return parent1.copy().astype(int), parent2.copy().astype(int)
# 随机选择两个交叉点point1 < point2
point1 = random.randint(0, length // 2)
point2 = random.randint(point1 + 1, length - 1)
@ -35,15 +33,14 @@ class GeneticOperator:
# 修复染色体(确保满足约束条件)
child1 = self.utils.repair_chromosome(child1)
child2 = self.utils.repair_chromosome(child2)
return child1, child2
return child1.astype(int), child2.astype(int)
def uniform_mutation(self, chromosome: np.ndarray) -> np.ndarray:
"""
均匀变异对染色体的三层基因企业层能力层数量层进行随机变异
:param chromosome: 待变异的染色体
:return: 变异后的染色体经过修复
均匀变异对染色体的三层基因企业层能力层数量层进行随机变异整数化
:param chromosome: 待变异的染色体整数
:return: 变异后的染色体经过修复整数
"""
mutated = chromosome.copy() # 复制原始染色体,避免直接修改
mutated = chromosome.copy().astype(int) # 确保为整数
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(mutated)
split_points = np.cumsum(self.utils.material_enterprise_count)
@ -59,7 +56,7 @@ class GeneticOperator:
start = end
# 修复企业层
enterprise_layer = self.utils.repair_enterprise_layer(enterprise_layer)
# ========== 第三步:同步更新能力和数量层 ==========
# ========== 第三步:同步更新能力和数量层(整数) ==========
start = 0
for i in range(self.utils.I):
end = split_points[i]
@ -69,30 +66,34 @@ class GeneticOperator:
original_ent_selected = original_enterprise_layer[idx] # 变异前的选择状态
# 情况1企业从选中变为未选中1→0
if original_ent_selected == 1 and current_ent_selected == 0:
capacity_layer[idx] = 0 # 产能设为0
quantity_layer[idx] = 0 # 数量设为0
capacity_layer[idx] = 0 # 产能设为0(整数)
quantity_layer[idx] = 0 # 数量设为0(整数)
# 情况2企业从未选中变为选中0→1
elif original_ent_selected == 0 and current_ent_selected == 1:
ent = ents[idx - start]
# 初始化产能(不低于最小产能)
qi = self.utils.order.Q[i]
# 初始化产能(整数)
if ent == 0:
min_cap = self.utils.risk.C0_i_min[i]
max_cap = self.utils.risk.C0_total_max
else:
supplier_id = ent - 1
min_cap = self.utils.supplier.Cj_i_min[supplier_id][i]
capacity_layer[idx] = random.uniform(min_cap, min_cap * 2)
# 初始化数量
qi = self.utils.order.Q[i]
max_cap = self.utils.supplier.Cj_total_max[supplier_id]
capacity_layer[idx] = random.randint(min_cap, max_cap)
# 初始化数量(整数)
if ent == 0:
min_q = 0
max_q = qi
else:
supplier_id = ent - 1
min_q = self.utils.supplier.MinOrder[supplier_id][i]
max_q = self.utils.supplier.MaxOrder[supplier_id][i]
quantity_layer[idx] = random.uniform(1, max_q)
quantity_layer[idx] = random.randint(min_q, max_q)
# 情况3企业保持选中1→1或保持未选中0→0
# 保持不变,后续的变异步骤会处理
start = end
# ========== 第四步:能力层变异(仅对保持选中的企业 ==========
# ========== 第四步:能力层变异(仅对保持选中的企业,整数 ==========
start = 0
for i in range(self.utils.I):
end = split_points[i]
@ -105,15 +106,17 @@ class GeneticOperator:
ent = ents[idx - start]
if ent == 0:
min_cap = self.utils.risk.C0_i_min[i]
max_cap = self.utils.risk.C0_total_max
else:
supplier_id = ent - 1
min_cap = self.utils.supplier.Cj_i_min[supplier_id][i]
# 变异后产能不低于最小产能
capacity_layer[idx] = random.uniform(min_cap, min_cap * 2)
max_cap = self.utils.supplier.Cj_total_max[supplier_id]
# 变异后产能为整数
capacity_layer[idx] = random.randint(min_cap, max_cap)
start = end
# 修复能力层(考虑所有选中企业的产能)
capacity_layer = self.utils.repair_capacity_layer(enterprise_layer, capacity_layer)
# ========== 第五步:数量层变异(仅对保持选中的企业 ==========
# ========== 第五步:数量层变异(仅对保持选中的企业,整数 ==========
start = 0
for i in range(self.utils.I):
end = split_points[i]
@ -125,15 +128,18 @@ class GeneticOperator:
if random.random() < self.config.mutation_prob:
ent = ents[idx - start]
qi = self.utils.order.Q[i]
# 变异后数量为整数
if ent == 0:
min_q = 0
max_q = qi
else:
supplier_id = ent - 1
min_q = self.utils.supplier.MinOrder[supplier_id][i]
max_q = self.utils.supplier.MaxOrder[supplier_id][i]
quantity_layer[idx] = random.uniform(1, max_q)
quantity_layer[idx] = random.randint(min_q, max_q)
start = end
# 修复数量层(考虑所有选中企业的分配)
quantity_layer = self.utils.repair_quantity_layer(enterprise_layer, quantity_layer)
# 合并三层
mutated = self.utils._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer)
return mutated
return mutated.astype(int)

99
main.py
View File

@ -7,71 +7,62 @@ from encoder import Encoder
from genetic_operators import GeneticOperator
from nsga2 import NSGA2
from visualizer import ResultVisualizer
def main():
"""主函数执行NSGA-II算法求解多目标优化问题"""
"""主函数执行NSGA-II算法求解多目标优化问题整数化版本"""
try:
# 1. 初始化随机种子(确保结果可复现)
random.seed(42)
np.random.seed(42)
# 2. 初始化数据(订单、风险企业、供应商、算法配置)
print("初始化数据结构...")
order_data = OrderData() # 订单数据(需求、交货期等
risk_data = RiskEnterpriseData() # 风险企业数据
supplier_data = SupplierData() # 供应商数据
order_data = OrderData() # 订单数据(需求、交货期等,整数
risk_data = RiskEnterpriseData() # 风险企业数据(整数)
supplier_data = SupplierData() # 供应商数据(整数)
config = Config() # 算法参数配置
# 3. 初始化工具类和算法组件
print("初始化算法组件...")
utils = ChromosomeUtils(order_data, risk_data, supplier_data) # 染色体工具
calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) # 目标函数计算器
encoder = Encoder(config, utils) # 种群初始化编码器
genetic_op = GeneticOperator(config, utils) # 遗传操作器(交叉、变异
utils = ChromosomeUtils(order_data, risk_data, supplier_data) # 染色体工具(整数化)
calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) # 目标函数计算器(整数化)
encoder = Encoder(config, utils) # 种群初始化编码器(整数化)
genetic_op = GeneticOperator(config, utils) # 遗传操作器(整数化
nsga2 = NSGA2(config.pop_size, config.objective_num) # NSGA-II算法实例
visualizer = ResultVisualizer(utils) # 结果可视化工具
visualizer = ResultVisualizer(utils) # 结果可视化工具(适配整数化)
# 4. 初始化种群
print("初始化种群...")
print("初始化种群(整数化)...")
population = encoder.initialize_population()
print(f"初始化种群完成,(种群大小,染色体长度): {population.shape if population.size > 0 else ''}")
# 若种群初始化失败(为空),直接退出
if population.size == 0:
print("错误:种群初始化失败,无法继续进化")
return
# 5. 记录进化过程中的历史数据
all_objectives = [] # 所有代的目标函数值
convergence_history = [] # 收敛趋势(每代最优前沿的平均目标值)
best_front = [] # 最终帕累托前沿解
best_front_objs = [] # 最终帕累托前沿的目标值
# 5. 记录进化过程中的历史数据(整数化)
all_objectives = [] # 所有代的目标函数值(整数)
convergence_history = [] # 收敛趋势(每代最优前沿的平均目标值,整数)
best_front = [] # 最终帕累托前沿解(整数)
best_front_objs = [] # 最终帕累托前沿的目标值(整数)
no_improve_count = 0 # 无改进计数器(用于早停)
prev_best_avg = float('inf') # 上一代的最优平均目标值
prev_best_avg = float('inf') # 上一代的最优平均目标值(整数求和)
# 6. 进化主循环
print(f"开始进化(最大代数:{config.max_generations},早停耐心:{config.early_stop_patience}...")
for generation in range(config.max_generations):
try:
# 计算当前种群的目标函数值
# 计算当前种群的目标函数值(整数)
objectives = [calculator.calculate_objectives(chrom) for chrom in population]
all_objectives.extend(objectives) # 记录所有目标值
all_objectives.extend(objectives) # 记录所有目标值(整数)
# 非支配排序,获取当前代的帕累托前沿
ranks, fronts = nsga2.fast_non_dominated_sort(objectives)
current_front = fronts[0] if fronts else [] # 第0层为最优前沿
current_front_objs = [objectives[i] for i in current_front] if current_front else []
best_front = population[current_front] if current_front else [] # 更新当前最优前沿解
best_front_objs = current_front_objs # 更新当前最优前沿目标值
# 记录收敛趋势(基于最优前沿的平均目标值)
best_front = population[current_front] if current_front else [] # 更新当前最优前沿解(整数)
best_front_objs = current_front_objs # 更新当前最优前沿目标值(整数)
# 记录收敛趋势(基于最优前沿的平均目标值,整数)
if len(current_front_objs) > 0:
avg_cost = sum(obj[0] for obj in current_front_objs) / len(current_front_objs)
avg_tardiness = sum(obj[1] for obj in current_front_objs) / len(current_front_objs)
avg_cost = sum(obj[0] for obj in current_front_objs) // len(current_front_objs) # 整数平均
avg_tardiness = sum(obj[1] for obj in current_front_objs) // len(current_front_objs) # 整数平均
convergence_history.append((avg_cost, avg_tardiness))
# 判断是否改进(用于早停)
current_avg = avg_cost + avg_tardiness # 合并两个目标的平均值
current_avg = avg_cost + avg_tardiness # 合并两个目标的平均值(整数)
if abs(current_avg - prev_best_avg) < 1e-4: # 变化小于阈值,视为无改进
no_improve_count += 1
else:
@ -79,15 +70,13 @@ def main():
prev_best_avg = current_avg
else:
no_improve_count += 1 # 无前沿解,视为无改进
# 选择操作(锦标赛选择)
selected = nsga2.selection(population, objectives)
# 校验选择后的种群大小
assert len(selected) == config.pop_size, \
f"选择后的种群大小({len(selected)})与目标大小({config.pop_size})不符"
# 交叉操作(两点交叉)- 修复索引越界问题
offspring = [] # 子代种群
offspring = [] # 子代种群(整数)
selected_len = len(selected) # selected的长度等于pop_size
i = 0
max_iter = 2 * config.pop_size # 最大迭代次数,避免无限循环
@ -113,59 +102,49 @@ def main():
# 直接添加当前父代避免i+1越界
offspring.append(selected[i])
i += 1 # 处理下一个个体步长改为1避免快速越界
# 若迭代次数用尽仍未生成足够子代,补充随机个体(健壮性处理)
# 若迭代次数用尽仍未生成足够子代,补充随机个体(健壮性处理,整数)
while len(offspring) < config.pop_size:
offspring.append(encoder._generate_random_chromosome()) # 需确保该方法可访问
# 变异操作(均匀变异)
offspring.append(encoder._generate_random_chromosome()) # 整数化随机染色体
# 变异操作(均匀变异,整数化)
offspring = [
genetic_op.uniform_mutation(chrom) if random.random() < config.mutation_prob else chrom
for chrom in offspring
]
offspring = np.array(offspring[:config.pop_size]) # 确保子代大小严格等于pop_size
offspring = np.array(offspring[:config.pop_size]).astype(int) # 确保子代大小和整数类型
# 合并父代和子代,准备环境选择
combined = np.vstack([population, offspring]) # 合并种群
# 计算合并种群的目标函数值
combined = np.vstack([population, offspring]).astype(int) # 合并种群(整数)
# 计算合并种群的目标函数值(整数)
combined_objs = objectives + [calculator.calculate_objectives(chrom) for chrom in offspring]
# 环境选择保留最优的pop_size个个体
population, objectives = nsga2.environmental_selection(combined, combined_objs)
# 校验环境选择后的种群大小
# 校验环境选择后的种群大小和整数类型
population = population.astype(int)
assert len(population) == config.pop_size, \
f"环境选择后的种群大小({len(population)})与目标大小({config.pop_size})不符"
# 早停检查(连续多代无改进则停止)
if no_improve_count >= config.early_stop_patience:
print(f"早停触发:连续{no_improve_count}代无改进,终止于第{generation}")
break
# 每50代打印一次进度
if generation % 50 == 0:
front_size = len(current_front) if current_front else 0
print(f"{generation}代完成,当前最优前沿大小: {front_size},无改进计数: {no_improve_count}")
except Exception as e:
print(f"{generation}代进化出错:{str(e)},跳过当前代")
continue
# 7. 结果可视化与输出
print("进化完成,处理结果...")
# 7. 结果可视化与输出(整数化)
print("进化完成,处理结果(整数化)...")
if len(best_front_objs) > 0:
visualizer.plot_pareto_front(all_objectives, best_front_objs) # 绘制帕累托前沿
visualizer.plot_convergence(convergence_history) # 绘制收敛趋势
visualizer.print_pareto_solutions(best_front, best_front_objs) # 打印最优解详情
visualizer.plot_pareto_front(all_objectives, best_front_objs) # 绘制帕累托前沿(整数)
visualizer.plot_convergence(convergence_history) # 绘制收敛趋势(整数)
visualizer.print_pareto_solutions(best_front, best_front_objs) # 打印最优解详情(整数)
else:
print("未找到有效帕累托前沿解")
except Exception as e:
print(f"程序运行出错:{str(e)}")
import traceback
traceback.print_exc() # 打印详细错误栈
if __name__ == "__main__":
print("程序启动...")
print("程序启动(整数化版本)...")
main()
print("程序结束")

View File

@ -1,11 +1,9 @@
import numpy as np
import math
from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config
from chromosome_utils import ChromosomeUtils
class ObjectiveCalculator:
"""目标函数计算器:计算双目标值(变更成本 + 交付延期)"""
"""目标函数计算器:计算双目标值(变更成本 + 交付延期)(整数化)"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData,
utils: ChromosomeUtils, config: Config):
"""
@ -23,71 +21,60 @@ class ObjectiveCalculator:
self.config = config
# 预计算物料企业编码的分割点(提高效率)
self.split_points = np.cumsum(utils.material_enterprise_count)
def calculate_objectives(self, chromosome: np.ndarray) -> tuple[float, float]:
def calculate_objectives(self, chromosome: np.ndarray) -> tuple[int, int]:
"""
计算双目标值
计算双目标值整数
:param chromosome: 染色体
:return: (变更成本C, 交付延期T)
:return: (变更成本C, 交付延期T)均为整数
"""
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome)
# 计算变更成本和交付延期
C = self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer)
T = self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer)
return C, T
# 计算变更成本和交付延期(整数)
C = math.ceil(self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer))
T = math.ceil(self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer))
return int(C), int(T)
def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算变更成本 C = C1 + C2 + C3 + C4按新规则实现
- C1: 变更惩罚成本含α系数
- C2: 采购成本差异保持原有逻辑
- C3: 运输成本差异保持原有逻辑
- C4: 提前交付惩罚成本新公式
计算变更成本 C = C1 + C2 + C3 + C4按新规则实现整数化前
"""
C1 = 0.0
C2 = 0.0
C3 = 0.0
C4 = 0.0
# -------------------------- 基础数据计算(复用+新增)--------------------------
# 1. 原始成本(全风险企业生产时的成本)
original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I))
original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I))
original_total_cost = original_purchase_cost + original_transport_cost # 全风险生产总成本用于C4
# 2. 变更后成本(当前解的成本)
new_purchase_cost = 0.0
new_transport_cost = 0.0
# 3. 关键变量收集(风险企业/供应商的产量、交货时间)
risk_production = np.zeros(self.order.I) # 风险企业生产的各物料数量xi0
supplier_production = np.zeros(self.order.I) # 供应商生产的各物料数量Qi - xi0
risk_delivery_times = [] # 风险企业的各物料交货时间Di0 = 生产时间 + 运输时间)
supplier_delivery_times = {} # 供应商的各物料交货时间 {供应商ID: [Dij1, Dij2, ...]}
risk_production = np.zeros(self.order.I, dtype=int) # 风险企业生产的各物料数量xi0整数
supplier_production = np.zeros(self.order.I, dtype=int) # 供应商生产的各物料数量Qi - xi0整数
risk_delivery_times = [] # 风险企业的各物料交货时间(整数)
supplier_delivery_times = {} # 供应商的各物料交货时间 {供应商ID: [Dij1, Dij2, ...]}(整数)
start = 0
for i in range(self.order.I): # 遍历每种物料
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i] # 可选企业
e_segment = enterprise_layer[start:end] # 企业选择状态
q_segment = quantity_layer[start:end] # 数量分配
c_segment = capacity_layer[start:end] # 产能(用于计算生产时间)
q_segment = quantity_layer[start:end].astype(int) # 数量分配(整数)
c_segment = capacity_layer[start:end].astype(int) # 产能(整数)
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理被选中的企业
q = q_segment[idx] # 分配的数量
c = c_segment[idx] # 产能
production_time = q / c if c != 0 else 0 # 生产时间
q = q_segment[idx] # 分配的数量(整数)
c = c_segment[idx] # 产能(整数)
# 生产时间(整数,向上取整)
production_time = math.ceil(q / c) if c != 0 else 0
if ent == 0: # 风险企业
risk_production[i] += q
# 风险企业的采购/运输成本
new_purchase_cost += q * self.order.P0[i]
new_transport_cost += q * self.order.T0[i]
# 风险企业的交货时间Di0
transport_time = self.risk.distance / self.order.transport_speed
# 风险企业的交货时间Di0,整数
transport_time = math.ceil(self.risk.distance / self.order.transport_speed)
risk_delivery_times.append(production_time + transport_time)
else: # 供应商
supplier_id = ent - 1
@ -95,44 +82,38 @@ class ObjectiveCalculator:
# 供应商的采购/运输成本
new_purchase_cost += q * self.supplier.P_ij[supplier_id][i]
new_transport_cost += q * self.supplier.T_ij[supplier_id][i]
# 供应商的交货时间Dij
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
# 供应商的交货时间Dij,整数
transport_time = math.ceil(self.supplier.distance[supplier_id] / self.order.transport_speed)
if supplier_id not in supplier_delivery_times:
supplier_delivery_times[supplier_id] = []
supplier_delivery_times[supplier_id].append(production_time + transport_time)
start = end
# -------------------------- C1变更惩罚成本计算新规则--------------------------
# 计算α的两个分子
sum_xi0 = np.sum(risk_production) # 所有物料的风险企业总产量
sum_Qi = np.sum(self.order.Q) # 所有物料的订单总需求
sum_xi0 = np.sum(risk_production) # 所有物料的风险企业总产量(整数)
sum_Qi = np.sum(self.order.Q) # 所有物料的订单总需求(整数)
if sum_Qi == 0:
ratio_risk_q = 1.0 # 避免除零
else:
ratio_risk_q = sum_xi0 / sum_Qi
D_original = self.order.Dd # 原定交货时间Q1确认需求交货期
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) # 实际交货期Q2确认
D_original = self.order.Dd # 原定交货时间(整数)
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) # 实际交货期(整数)
if T_actual == 0:
ratio_delivery = 1.0 # 避免除零
else:
ratio_delivery = D_original / T_actual
# 计算α并应用约束0, 1]≤0取1
alpha = max(ratio_risk_q, ratio_delivery)
alpha = 1.0 if alpha > 1.0 else alpha # 超过1取1
alpha = 1.0 if alpha <= 0.0 else alpha # ≤0取1
# 计算C1按物料求和
for i in range(self.order.I):
supplier_q = supplier_production[i] # 物料i的供应商产量Qi - xi0
risk_unit_cost = self.order.P0[i] + self.order.T0[i] # 风险企业单位采运成本
supplier_q = supplier_production[i] # 物料i的供应商产量整数
risk_unit_cost = self.order.P0[i] + self.order.T0[i] # 风险企业单位采运成本(整数)
C1 += self.config.delta * alpha * supplier_q * risk_unit_cost
# -------------------------- C2、C3保持原有逻辑--------------------------
C2 = new_purchase_cost - original_purchase_cost # 采购成本差异
C3 = new_transport_cost - original_transport_cost # 运输成本差异
C2 = new_purchase_cost - original_purchase_cost # 采购成本差异(整数相关)
C3 = new_transport_cost - original_transport_cost # 运输成本差异(整数相关)
# -------------------------- C4提前交付惩罚成本计算新规则--------------------------
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
if T_actual <= self.order.Dd: # 不延期时才计算Q7确认
@ -143,69 +124,57 @@ class ObjectiveCalculator:
else:
base_value = original_total_cost / Dd
base_value_rounded = round(base_value) # 四舍五入取整
# 步骤2计算风险企业提前天数 = max(0, Dd - D0)D0为风险企业最长交货时间
D0 = max(risk_delivery_times) if risk_delivery_times else 0.0
risk_early_days = max(0.0, Dd - D0)
# 步骤3计算供应商最大提前天数 = max(0, Dd - Dj)的最大值Dj为每个供应商的最长交货时间
max_supplier_early = 0.0
# 步骤2计算风险企业提前天数 = max(0, Dd - D0)D0为风险企业最长交货时间整数
D0 = max(risk_delivery_times) if risk_delivery_times else 0
risk_early_days = max(0, Dd - D0)
# 步骤3计算供应商最大提前天数 = max(0, Dd - Dj)的最大值(整数)
max_supplier_early = 0
for supplier_id, times in supplier_delivery_times.items():
Dj = max(times) if times else 0.0
supplier_early = max(0.0, Dd - Dj)
Dj = max(times) if times else 0
supplier_early = max(0, Dd - Dj)
if supplier_early > max_supplier_early:
max_supplier_early = supplier_early
# 步骤4计算C4
# 步骤4计算C4整数
C4 = base_value_rounded * 0.1 * (risk_early_days + max_supplier_early)
C4 = round(C4) # 最终结果四舍五入取整
# -------------------------- 总变更成本 --------------------------
return C1 + C2 + C3 + C4
def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
quantity_layer: np.ndarray) -> int:
"""
计算实际交货期所有企业中最长的生产时间 + 运输时间
:return: 实际交货期
计算实际交货期整数向上取整
:return: 实际交货期整数
"""
max_time = 0.0 # 最大时间(实际交货期)
max_time = 0 # 最大时间(实际交货期)
start = 0
for i in range(self.order.I):
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i]
e_segment = enterprise_layer[start:end]
c_segment = capacity_layer[start:end] # 产能
q_segment = quantity_layer[start:end] # 数量
c_segment = capacity_layer[start:end].astype(int) # 产能(整数)
q_segment = quantity_layer[start:end].astype(int) # 数量(整数)
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理选中的企业
q = q_segment[idx]
c = c_segment[idx]
# 生产时间 = 数量 / 产能产能为0时按0处理
production_time = q / c if c != 0 else 0
# 运输时间
# 生产时间(整数,向上取整)
production_time = math.ceil(q / c) if c != 0 else 0
# 运输时间(整数,向上取整)
if ent == 0:
transport_time = self.risk.distance / self.order.transport_speed
transport_time = math.ceil(self.risk.distance / self.order.transport_speed)
else:
supplier_id = ent - 1
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
# 总时间 = 生产时间 + 运输时间
transport_time = math.ceil(self.supplier.distance[supplier_id] / self.order.transport_speed)
# 总时间(整数)
total_time = production_time + transport_time
if total_time > max_time:
max_time = total_time # 更新最大时间
start = end
return max_time
return int(max_time)
def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
quantity_layer: np.ndarray) -> int:
"""
计算交付延期max(0, 实际交货期 - 需求交货期)
:return: 延期时间非负
计算交付延期整数
:return: 延期时间非负整数
"""
actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
return max(0.0, actual_delivery - self.order.Dd)
return max(0, actual_delivery - self.order.Dd)

View File

@ -1,11 +1,8 @@
import matplotlib.pyplot as plt
import numpy as np
from chromosome_utils import ChromosomeUtils # 染色体工具类
class ResultVisualizer:
"""结果可视化工具类:绘制帕累托前沿、收敛曲线,打印最优解详情"""
"""结果可视化工具类:绘制帕累托前沿、收敛曲线,打印最优解详情(适配整数化)"""
def __init__(self, utils: ChromosomeUtils):
"""
初始化可视化工具
@ -15,19 +12,18 @@ class ResultVisualizer:
self.figsize = (12, 8) # 图表大小
# 企业名称列表(风险企业+供应商)
self.supplier_names = ["RiskEnterprise"] + self.utils.supplier.names
def plot_pareto_front(self, all_objectives: list[tuple[float, float]],
non_dominated_objectives: list[tuple[float, float]]):
def plot_pareto_front(self, all_objectives: list[tuple[int, int]],
non_dominated_objectives: list[tuple[int, int]]):
"""
绘制帕累托前沿所有解 vs 帕累托最优解
:param all_objectives: 所有解的目标值
:param non_dominated_objectives: 帕累托最优解的目标值
:param all_objectives: 所有解的目标值整数
:param non_dominated_objectives: 帕累托最优解的目标值整数
"""
plt.figure(figsize=self.figsize)
# 绘制所有解(蓝色点)
if all_objectives:
c_all = [obj[0] for obj in all_objectives] # 成本
t_all = [obj[1] for obj in all_objectives] # 延期
c_all = [obj[0] for obj in all_objectives] # 成本(整数)
t_all = [obj[1] for obj in all_objectives] # 延期(整数)
plt.scatter(c_all, t_all, color='blue', alpha=0.3, label='All Solutions', zorder=1)
# 绘制帕累托前沿(红色点)
if non_dominated_objectives:
@ -35,96 +31,84 @@ class ResultVisualizer:
t_nd = [obj[1] for obj in non_dominated_objectives]
plt.scatter(c_nd, t_nd, color='red', s=50, label='Pareto Front', zorder=5)
# 图表设置
plt.title('Pareto Front: Change Cost vs Tardiness')
plt.xlabel('Change Cost')
plt.ylabel('Tardiness')
plt.title('Pareto Front: Change Cost vs Tardiness (Integerized)')
plt.xlabel('Change Cost (Integer)')
plt.ylabel('Tardiness (Integer)')
plt.legend()
plt.grid(True, alpha=0.5)
plt.savefig('pareto_front.png', dpi=300, bbox_inches='tight') # 保存图片
plt.savefig('pareto_front_integerized.png', dpi=300, bbox_inches='tight') # 保存图片
plt.show()
def plot_convergence(self, convergence_history: list[tuple[float, float]]):
def plot_convergence(self, convergence_history: list[tuple[int, int]]):
"""
绘制收敛趋势图每代最优前沿的平均成本和延期
:param convergence_history: 收敛历史数据(平均成本, 平均延期)
绘制收敛趋势图每代最优前沿的平均成本和延期整数化
:param convergence_history: 收敛历史数据(平均成本, 平均延期)整数
"""
if len(convergence_history) == 0: # 无数据时不绘制
return
plt.figure(figsize=self.figsize)
generations = list(range(len(convergence_history))) # 代数
costs = [h[0] for h in convergence_history] # 平均成本
tardiness = [h[1] for h in convergence_history] # 平均延期
costs = [h[0] for h in convergence_history] # 平均成本(整数)
tardiness = [h[1] for h in convergence_history] # 平均延期(整数)
# 子图1平均成本趋势
plt.subplot(2, 1, 1)
plt.plot(generations, costs, 'b-')
plt.title('Convergence History')
plt.ylabel('Average Change Cost')
plt.title('Convergence History (Integerized)')
plt.ylabel('Average Change Cost (Integer)')
plt.grid(True, alpha=0.5)
# 子图2平均延期趋势
plt.subplot(2, 1, 2)
plt.plot(generations, tardiness, 'r-')
plt.xlabel('Generation')
plt.ylabel('Average Tardiness')
plt.ylabel('Average Tardiness (Integer)')
plt.grid(True, alpha=0.5)
plt.tight_layout() # 调整布局
plt.savefig('convergence_history.png', dpi=300, bbox_inches='tight') # 保存图片
plt.savefig('convergence_history_integerized.png', dpi=300, bbox_inches='tight') # 保存图片
plt.show()
def print_solution_details(self, solution: np.ndarray, objectives: tuple[float, float]):
def print_solution_details(self, solution: np.ndarray, objectives: tuple[int, int]):
"""
打印单个解的详细信息企业选择产能数量等
:param solution: 染色体
:param objectives: 该解的目标值成本, 延期
打印单个解的详细信息企业选择产能数量等整数化
:param solution: 染色体整数
:param objectives: 该解的目标值成本, 延期整数
"""
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(solution)
split_points = np.cumsum(self.utils.material_enterprise_count) # 分割点
print(f"\n变更成本: {objectives[0]:.2f}, 交付延期: {objectives[1]:.2f}")
print(f"\n变更成本: {objectives[0]}, 交付延期: {objectives[1]}") # 整数输出
print("=" * 80)
total_q_check = [] # 检查各物料的总数量是否满足需求
total_q_check = [] # 检查各物料的总数量是否满足需求(整数)
start = 0
for i in range(self.utils.I): # 遍历每种物料
end = split_points[i]
ents = self.utils.material_optional_enterprises[i] # 可选企业
e_segment = enterprise_layer[start:end] # 企业选择状态
c_segment = capacity_layer[start:end] # 产能
q_segment = quantity_layer[start:end] # 数量
demand_q = self.utils.order.Q[i] # 需求数量
allocated_q = np.sum(q_segment[e_segment == 1]) # 分配的总数量
print(f"物料 {i} - 需求数量: {demand_q}, 分配总量: {allocated_q:.2f}")
total_q_check.append(abs(allocated_q - demand_q) < 1e-2) # 检查是否满足需求
c_segment = capacity_layer[start:end].astype(int) # 产能(整数)
q_segment = quantity_layer[start:end].astype(int) # 数量(整数)
demand_q = self.utils.order.Q[i] # 需求数量(整数)
allocated_q = np.sum(q_segment[e_segment == 1]) # 分配的总数量(整数)
print(f"物料 {i} - 需求数量: {demand_q}, 分配总量: {allocated_q}")
total_q_check.append(allocated_q == demand_q) # 整数相等检查
print(f" 选择的企业及其分配:")
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅打印被选中的企业
ent_name = self.supplier_names[ent] # 企业名称
cap = c_segment[idx] # 产能
qty = q_segment[idx] # 数量
print(f" 企业: {ent_name}, 产能: {cap:.2f}, 分配数量: {qty:.2f}")
cap = c_segment[idx] # 产能(整数)
qty = q_segment[idx] # 数量(整数)
print(f" 企业: {ent_name}, 产能: {cap}, 分配数量: {qty}")
start = end
print("-" * 80)
# 验证数量约束是否满足
if all(total_q_check):
print("✅ 所有物料数量满足需求约束")
print("✅ 所有物料数量满足需求约束(整数匹配)")
else:
print("❌ 部分物料数量未满足需求约束")
def print_pareto_solutions(self, population: np.ndarray, objectives: list[tuple[float, float]]):
def print_pareto_solutions(self, population: np.ndarray, objectives: list[tuple[int, int]]):
"""
打印帕累托前沿解的详细信息最多打印前5个
:param population: 帕累托前沿解的种群
:param objectives: 对应的目标值列表
打印帕累托前沿解的详细信息最多打印前5个整数化
:param population: 帕累托前沿解的种群整数
:param objectives: 对应的目标值列表整数
"""
print("\n" + "=" * 100)
print("帕累托前沿解详细方案")
print("帕累托前沿解详细方案(整数化)")
print("=" * 100)
# 打印前5个解或所有解取较少者
for i in range(min(5, len(population))):