OrderReallocation-HeavyTruc.../main.py

235 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
import numpy as np
from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config
from chromosome_utils import ChromosomeUtils
from objective_calculator import ObjectiveCalculator
from encoder import Encoder
from genetic_operators import GeneticOperator
from nsga2 import NSGA2
from visualizer import ResultVisualizer
from data_structures import DataStructures
def main():
"""主函数执行NSGA-II算法求解多目标优化问题"""
try:
# 1. 初始化随机种子(确保结果可复现)
random.seed(42)
np.random.seed(42)
# 2. 初始化数据(订单、风险企业、供应商、算法配置)
print("初始化数据结构...")
order_data = OrderData() # 订单数据(需求、交货期等,整数)
risk_data = RiskEnterpriseData() # 风险企业数据
supplier_data = SupplierData() # 供应商数据
config = Config() # 算法参数配置
# 3. 初始化工具类和算法组件
print("初始化算法组件...")
utils = ChromosomeUtils(order_data, risk_data, supplier_data) # 染色体工具
calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) # 目标函数计算器
encoder = Encoder(config, utils) # 种群初始化编码器
genetic_op = GeneticOperator(config, utils) # 遗传操作器
nsga2 = NSGA2(config.pop_size, config.objective_num) # NSGA-II算法实例
visualizer = ResultVisualizer(utils) # 结果可视化工具
# 4. 初始化种群
print("初始化种群...")
population = encoder.initialize_population()
print(f"初始化种群完成,(种群大小,染色体长度): {population.shape if population.size > 0 else ''}")
# 若种群初始化失败(为空),直接退出
if population.size == 0:
print("错误:种群初始化失败,无法继续进化")
return
# 5. 记录进化过程中的历史数据
all_objectives = [] # 所有代的目标函数值
convergence_history = [] # 收敛趋势(每代最优前沿的平均目标值,整数)
best_front = [] # 最终帕累托前沿解
best_front_objs = [] # 最终帕累托前沿的目标值
prev_avg_cost = None # 上一代的平均成本
prev_avg_tardiness = None # 上一代的平均延期
no_improve_count = 0 # 无改进计数器
# 6. 进化主循环
print(f"开始进化(最大代数:{config.max_generations},早停耐心:{config.early_stop_patience}...")
for generation in range(config.max_generations):
try:
# 计算当前种群的目标函数值
objectives = [calculator.calculate_objectives(chrom) for chrom in population]
all_objectives.extend(objectives) # 记录所有目标值
# 非支配排序,获取当前代的帕累托前沿
ranks, fronts = nsga2.fast_non_dominated_sort(objectives)
current_front = fronts[0] if fronts else [] # 第0层为最优前沿
current_front_objs = [objectives[i] for i in current_front] if current_front else []
best_front = population[current_front] if current_front else [] # 更新当前最优前沿解
best_front_objs = current_front_objs # 更新当前最优前沿目标值
# 记录收敛趋势(基于最优前沿的平均目标值,整数)
# 记录收敛趋势并判断早停条件
if len(current_front_objs) > 0:
avg_cost = sum(obj[0] for obj in current_front_objs) // len(current_front_objs)
avg_tardiness = sum(obj[1] for obj in current_front_objs) // len(current_front_objs)
convergence_history.append((avg_cost, avg_tardiness))
# 早停判断逻辑
if prev_avg_cost is not None and prev_avg_tardiness is not None:
# 计算两个目标的变化量
cost_change = abs(avg_cost - prev_avg_cost)
tardiness_change = abs(avg_tardiness - prev_avg_tardiness)
# 检查是否两个目标的变化率
if (cost_change < config.early_stop_threshold * prev_avg_cost and
tardiness_change < config.early_stop_threshold * prev_avg_tardiness):
no_improve_count += 1
else:
no_improve_count = 0 # 有改进,重置计数器
prev_avg_cost = avg_cost
prev_avg_tardiness = avg_tardiness
else:
# 初始化历史值(第一代)
prev_avg_cost = avg_cost
prev_avg_tardiness = avg_tardiness
no_improve_count = 0
else:
no_improve_count += 1 # 无前沿解,视为无改进
# 早停检查(连续多代无改进则停止)
if no_improve_count >= config.early_stop_patience:
print(
f"早停触发:连续{no_improve_count}代两个目标值变化均小于{config.early_stop_threshold},终止于第{generation}")
break
# 选择操作(锦标赛选择)
selected = nsga2.selection(population, objectives)
# 校验选择后的种群大小
assert len(selected) == config.pop_size, \
f"选择后的种群大小({len(selected)})与目标大小({config.pop_size})不符"
# 交叉操作(两点交叉)- 修复索引越界问题
offspring = [] # 子代种群
selected_len = len(selected) # selected的长度等于pop_size
i = 0
max_iter = 2 * config.pop_size # 最大迭代次数,避免无限循环
iter_count = 0
while len(offspring) < config.pop_size and iter_count < max_iter:
iter_count += 1
# 检查i是否超出selected范围若超出则重置循环使用个体
if i >= selected_len:
i = 0
# 尝试两两交叉需i+1在范围内且满足交叉概率
if i + 1 < selected_len and random.random() < config.crossover_prob:
parent1 = selected[i]
parent2 = selected[i + 1]
child1, child2 = genetic_op.two_point_crossover(parent1, parent2)
# 添加child1若未达到种群大小
if len(offspring) < config.pop_size:
offspring.append(child1)
# 添加child2若未达到种群大小
if len(offspring) < config.pop_size:
offspring.append(child2)
i += 2 # 处理下一对个体
else:
# 直接添加当前父代避免i+1越界
offspring.append(selected[i])
i += 1 # 处理下一个个体步长改为1避免快速越界
# 若迭代次数用尽仍未生成足够子代,补充随机个体(健壮性处理,整数)
while len(offspring) < config.pop_size:
offspring.append(encoder._generate_random_chromosome()) # 整数化随机染色体
# 变异操作(均匀变异,整数化)
offspring = [
genetic_op.uniform_mutation(chrom) if random.random() < config.mutation_prob else chrom
for chrom in offspring
]
offspring = np.array(offspring[:config.pop_size]).astype(int) # 确保子代大小和整数类型
# 合并父代和子代,准备环境选择
combined = np.vstack([population, offspring]).astype(int) # 合并种群
# 计算合并种群的目标函数值
combined_objs = objectives + [calculator.calculate_objectives(chrom) for chrom in offspring]
# 环境选择保留最优的pop_size个个体
population, objectives = nsga2.environmental_selection(combined, combined_objs)
# 校验环境选择后的种群大小和整数类型
population = population.astype(int)
assert len(population) == config.pop_size, \
f"环境选择后的种群大小({len(population)})与目标大小({config.pop_size})不符"
# 早停检查(连续多代无改进则停止)
if no_improve_count >= config.early_stop_patience:
print(f"早停触发:连续{no_improve_count}代无改进,终止于第{generation}")
break
# 每50代打印一次进度
if generation % 50 == 0:
front_size = len(current_front) if current_front else 0
print(f"{generation}代完成,当前最优前沿大小: {front_size},无改进计数: {no_improve_count}")
except Exception as e:
print(f"{generation}代进化出错:{str(e)},跳过当前代")
continue
# 7. 结果可视化与输出
print("进化完成,处理结果...")
if len(best_front_objs) > 0:
# 1. 过滤重复解(关键改进:基于目标值去重,确保相同目标值只保留一个解)
unique_front = []
unique_front_objs = []
seen_obj = set() # 仅跟踪目标值,确保相同目标值只保留一个
for sol, obj in zip(best_front, best_front_objs):
obj_tuple = tuple(obj) # 将目标值转为元组用于查重
if obj_tuple not in seen_obj:
seen_obj.add(obj_tuple)
unique_front.append(sol)
unique_front_objs.append(obj)
# 2. 计算评价指标并排序
if unique_front_objs:
# 找出各目标的最优值
optimal_cost = min(obj[0] for obj in unique_front_objs)
optimal_tardiness = min(obj[1] for obj in unique_front_objs)
max_cost = max(obj[0] for obj in unique_front_objs)
max_tardiness = max(obj[1] for obj in unique_front_objs)
# 计算每个解的评价指标
evaluated_solutions = []
for sol, obj in zip(unique_front, unique_front_objs):
index = DataStructures.calculate_evaluation_index(
obj, optimal_cost, optimal_tardiness, max_cost, max_tardiness
)
evaluated_solutions.append((sol, obj, index))
# 按评价指标升序排序
evaluated_solutions.sort(key=lambda x: x[2])
# 选择前n个解
top_n = min(config.print_top_n, len(evaluated_solutions))
top_solutions = evaluated_solutions[:top_n]
top_population = [sol for sol, _, _ in top_solutions]
top_objectives = [obj for _, obj, _ in top_solutions]
# 打印排序后的解的评价指标
print("\n" + "=" * 100)
print(f"排序后的前{top_n}个最优解(评价指标从小到大)")
print("=" * 100)
for i, (_, obj, idx) in enumerate(top_solutions):
print(f"{i + 1}: 成本={obj[0]}, 延期={obj[1]}, 评价指标={idx:.4f}")
else:
top_population = []
top_objectives = []
# 保持原有图表绘制逻辑不变
visualizer.plot_pareto_front(all_objectives, best_front_objs) # 绘制帕累托前沿
visualizer.plot_convergence(convergence_history) # 绘制收敛趋势
# 打印处理后的最优解详情
if top_population:
visualizer.print_pareto_solutions(top_population, top_objectives)
else:
print("未找到有效帕累托前沿解")
else:
print("未找到有效帕累托前沿解")
except Exception as e:
print(f"程序运行出错:{str(e)}")
import traceback
traceback.print_exc() # 打印详细错误栈
if __name__ == "__main__":
print("程序启动(整数化版本)...")
main()
print("程序结束")