diff --git a/chromosome_utils.py b/chromosome_utils.py new file mode 100644 index 0000000..5ee478b --- /dev/null +++ b/chromosome_utils.py @@ -0,0 +1,206 @@ +import random +import numpy as np +from data_structures import OrderData, RiskEnterpriseData, SupplierData + + +class ChromosomeUtils: + """染色体编码、解码、修复工具类""" + def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData): + self.order = order_data + self.risk = risk_data + self.supplier = supplier_data + self.I = order_data.I + self.material_optional_enterprises = self._get_material_optional_enterprises() + self.material_enterprise_count = [len(ents) for ents in self.material_optional_enterprises] + self.chromosome_length = 3 * sum(self.material_enterprise_count) + + def _get_material_optional_enterprises(self) -> list[list[int]]: + """获取每个物料的可选企业列表:0=风险企业,1~supplier_count=供应商""" + optional = [] + for i in range(self.I): + ents = [0] # 先加入风险企业 + for j in range(self.supplier.supplier_count): + if self.supplier.can_produce[j][i] == 1: + ents.append(j + 1) + optional.append(ents) + return optional + + def _split_chromosome(self, chromosome: np.ndarray) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """拆分染色体为三层:企业编码层、能力编码层、数量编码层""" + total_ent_count = sum(self.material_enterprise_count) + enterprise_layer = chromosome[:total_ent_count] + capacity_layer = chromosome[total_ent_count:2*total_ent_count] + quantity_layer = chromosome[2*total_ent_count:] + return enterprise_layer, capacity_layer, quantity_layer + + def _merge_chromosome(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, + quantity_layer: np.ndarray) -> np.ndarray: + """合并三层为完整染色体""" + return np.hstack([enterprise_layer, capacity_layer, quantity_layer]) + + def repair_enterprise_layer(self, enterprise_layer: np.ndarray) -> np.ndarray: + """修复企业编码层:确保每种物料至少有一个企业生产""" + repaired = enterprise_layer.copy() + split_points = np.cumsum(self.material_enterprise_count) + start = 0 + for i in range(self.I): + end = split_points[i] + segment = repaired[start:end] + if np.sum(segment) == 0: + select_idx = random.randint(0, len(segment)-1) + segment[select_idx] = 1 + repaired[start:end] = segment + start = end + return repaired + + def repair_capacity_layer(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray) -> np.ndarray: + """修复能力编码层:满足单物料产能和总产能约束""" + repaired = capacity_layer.copy() + split_points = np.cumsum(self.material_enterprise_count) + start = 0 + # 单物料产能约束 + for i in range(self.I): + end = split_points[i] + ents = self.material_optional_enterprises[i] + segment = repaired[start:end] + enterprise_segment = enterprise_layer[start:end] + for idx, ent in enumerate(ents): + if enterprise_segment[idx] == 1: + if ent == 0: + max_cap = self.risk.C0_i_std[i] + else: + supplier_id = ent - 1 + max_cap = self.supplier.Cj_i_std[supplier_id][i] + if segment[idx] <= 0 or segment[idx] > max_cap: + segment[idx] = random.uniform(1, max_cap) + repaired[start:end] = segment + start = end + # 总产能约束 + enterprise_total_capacity = {} + start = 0 + for i in range(self.I): + end = split_points[i] + ents = self.material_optional_enterprises[i] + enterprise_segment = enterprise_layer[start:end] + capacity_segment = repaired[start:end] + for idx, ent in enumerate(ents): + if enterprise_segment[idx] == 1: + if ent not in enterprise_total_capacity: + enterprise_total_capacity[ent] = 0 + enterprise_total_capacity[ent] += capacity_segment[idx] + start = end + # 调整超出总产能的企业 + for ent, total_cap in enterprise_total_capacity.items(): + if ent == 0: + max_total_cap = self.risk.C0_total_max + else: + supplier_id = ent - 1 + max_total_cap = self.supplier.Cj_total_max[supplier_id] + if total_cap > max_total_cap: + scale = max_total_cap / total_cap + start = 0 + for i in range(self.I): + end = split_points[i] + ents = self.material_optional_enterprises[i] + enterprise_segment = enterprise_layer[start:end] + capacity_segment = repaired[start:end] + for idx, e in enumerate(ents): + if e == ent and enterprise_segment[idx] == 1: + capacity_segment[idx] *= scale + repaired[start:end] = capacity_segment + start = end + return repaired + + def repair_quantity_layer(self, enterprise_layer: np.ndarray, quantity_layer: np.ndarray) -> np.ndarray: + """修复数量编码层:满足总量需求、起订量和最大供应量约束""" + repaired = quantity_layer.copy() + split_points = np.cumsum(self.material_enterprise_count) + start = 0 + for i in range(self.I): + qi = self.order.Q[i] + end = split_points[i] + ents = self.material_optional_enterprises[i] + segment = repaired[start:end] + enterprise_segment = enterprise_layer[start:end] + selected_indices = np.where(enterprise_segment == 1)[0] + if len(selected_indices) == 0: + start = end + continue + selected_ents = [ents[idx] for idx in selected_indices] + # 步骤1:初始化合法范围(起订量~最大供应量) + for idx, ent in zip(selected_indices, selected_ents): + if ent == 0: + min_q = 0 + max_q = qi + else: + supplier_id = ent - 1 + min_q = self.supplier.MinOrder[supplier_id][i] + max_q = self.supplier.MaxOrder[supplier_id][i] + if segment[idx] < min_q: + segment[idx] = min_q + elif segment[idx] > max_q: + segment[idx] = max_q + # 步骤2:强制总量等于需求数量(核心修复) + current_total = np.sum(segment[selected_indices]) + if current_total <= 0: + weights = np.random.rand(len(selected_indices)) + weights /= np.sum(weights) + segment[selected_indices] = qi * weights + else: + scale = qi / current_total + segment[selected_indices] *= scale + # 步骤3:处理超出最大供应量的情况(修复中文变量名+逻辑) + need_adjust = True + while need_adjust: + need_adjust = False + total_excess = 0 + for idx, ent in zip(selected_indices, selected_ents): + if ent == 0: + max_q = qi + else: + supplier_id = ent - 1 + max_q = self.supplier.MaxOrder[supplier_id][i] + if segment[idx] > max_q: + excess = segment[idx] - max_q + total_excess += excess + segment[idx] = max_q + need_adjust = True + if total_excess > 0: + available_indices = [] + available_max = [] + for idx, ent in zip(selected_indices, selected_ents): + if ent == 0: + max_q = qi + else: + supplier_id = ent - 1 + max_q = self.supplier.MaxOrder[supplier_id][i] + remaining = max_q - segment[idx] + if remaining > 0: + available_indices.append(idx) + available_max.append(remaining) + if len(available_indices) > 0: + available_total = sum(available_max) + if available_total > 0: + alloc_ratio = np.array(available_max) / available_total + alloc_excess = total_excess * alloc_ratio + for idx, alloc in zip(available_indices, alloc_excess): + segment[idx] += alloc + else: + segment[selected_indices] = np.minimum(segment[selected_indices], + [self.supplier.MaxOrder[ent-1][i] if ent !=0 else qi for ent in selected_ents]) + current_total = np.sum(segment[selected_indices]) + if abs(current_total - qi) > 1e-6: + scale = qi / current_total + segment[selected_indices] *= scale + need_adjust = True + repaired[start:end] = segment + start = end + return repaired + + def repair_chromosome(self, chromosome: np.ndarray) -> np.ndarray: + """完整修复染色体:企业层→能力层→数量层""" + enterprise_layer, capacity_layer, quantity_layer = self._split_chromosome(chromosome) + enterprise_layer = self.repair_enterprise_layer(enterprise_layer) + capacity_layer = self.repair_capacity_layer(enterprise_layer, capacity_layer) + quantity_layer = self.repair_quantity_layer(enterprise_layer, quantity_layer) + return self._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer) \ No newline at end of file diff --git a/data_structures.py b/data_structures.py new file mode 100644 index 0000000..b633486 --- /dev/null +++ b/data_structures.py @@ -0,0 +1,99 @@ +# 数据结构定义 +class OrderData: + """订单数据类""" + + def __init__(self): + self.I = 5 # 物料种类数 + self.Q = [250, 300, 200, 350, 280] # 各物料待生产量 + self.Dd = 12 # 需求交货期(时长) + self.P0 = [50.0, 80.0, 60.0, 70.0, 90.0] # 风险企业单位采购价 + self.T0 = [5.0, 8.0, 6.0, 7.0, 9.0] # 风险企业单位运输成本 + self.transport_speed = 10.0 # 运输速度(单位/时间) + + +class RiskEnterpriseData: + """风险企业数据类(单物料产能约束+总产能约束)""" + + def __init__(self): + self.I = 5 # 物料种类数(与订单一致) + self.C0_i_std = [40.0, 50.0, 35.0, 45.0, 48.0] # 单物料单位时间标准产能 + self.C0_total_max = 100.0 # 总产能上限 + self.distance = 10.0 # 位置距离 + + +class SupplierData: + """供应商数据类(按约束文档定义)""" + + def __init__(self, I=5): + self.I = I + self.supplier_count = 4 + self.names = ["S0", "S1", "S2", "S3"] + # 能否生产某物料矩阵 (supplier_count × I) + self.can_produce = [ + [1, 1, 1, 1, 1], + [1, 0, 1, 0, 1], + [0, 1, 0, 1, 0], + [0, 0, 1, 1, 1] + ] + # 单物料单位时间标准生产能力 + self.Cj_i_std = [ + [20.0, 18.0, 15.0, 22.0, 25.0], + [25.0, 0.0, 30.0, 0.0, 28.0], + [0.0, 22.0, 0.0, 35.0, 0.0], + [0.0, 0.0, 20.0, 30.0, 22.0] + ] + # 供应商单位时间最大总生产能力 + self.Cj_total_max = [120.0, 110.0, 100.0, 95.0] + # 最小起订量 + self.MinOrder = [ + [20.0, 20.0, 15.0, 25.0, 20.0], + [30.0, 0.0, 25.0, 0.0, 30.0], + [0.0, 25.0, 0.0, 30.0, 0.0], + [0.0, 0.0, 20.0, 35.0, 25.0] + ] + # 最大供应量 + self.MaxOrder = [ + [100.0, 150.0, 80.0, 120.0, 130.0], + [120.0, 0.0, 100.0, 0.0, 110.0], + [0.0, 140.0, 0.0, 150.0, 0.0], + [0.0, 0.0, 90.0, 130.0, 100.0] + ] + # 单位采购价格 + self.P_ij = [ + [60.0, 85.0, 70.0, 80.0, 100.0], + [65.0, 0.0, 75.0, 0.0, 105.0], + [0.0, 90.0, 0.0, 85.0, 0.0], + [0.0, 0.0, 78.0, 88.0, 98.0] + ] + # 单位运输成本 + self.T_ij = [ + [7.0, 9.0, 8.0, 10.0, 12.0], + [6.0, 0.0, 9.0, 0.0, 11.0], + [0.0, 10.0, 0.0, 12.0, 0.0], + [0.0, 0.0, 10.0, 13.0, 14.0] + ] + # 供应商位置距离 + self.distance = [45.0, 35.0, 60.0, 50.0] + + +class Config: + """算法参数配置类""" + + def __init__(self): + # 种群参数 + self.pop_size = 50 + self.N1_ratio = 0.2 # 变更成本最小种群比例 + self.N2_ratio = 0.2 # 交付延期最短种群比例 + self.N3_ratio = 0.3 # 基于风险企业种群比例 + self.N4_ratio = 0.3 # 随机种群比例 + # 遗传操作参数 + self.crossover_prob = 0.8 + self.mutation_prob = 0.3 + self.max_generations = 100 + # 惩罚系数 + self.delta = 1.3 # 变更惩罚系数 + self.gamma = 0.8 # 提前交付惩罚系数 + # 早停参数 + self.early_stop_patience = 50 + # 目标函数数量 + self.objective_num = 2 \ No newline at end of file diff --git a/encoder.py b/encoder.py new file mode 100644 index 0000000..8c79ff8 --- /dev/null +++ b/encoder.py @@ -0,0 +1,114 @@ +import random +import numpy as np +from data_structures import Config +from chromosome_utils import ChromosomeUtils +from objective_calculator import ObjectiveCalculator +from nsga2 import NSGA2 + + +class Encoder: + """种群初始化编码器""" + def __init__(self, config: Config, utils: ChromosomeUtils): + self.config = config + self.utils = utils + self.pop_size = config.pop_size + self.N1 = int(config.N1_ratio * self.pop_size) + self.N2 = int(config.N2_ratio * self.pop_size) + self.N3 = int(config.N3_ratio * self.pop_size) + self.N4 = self.pop_size - self.N1 - self.N2 - self.N3 + + def _generate_random_chromosome(self, force_risk_enterprise: bool = False) -> np.ndarray: + """生成随机染色体""" + enterprise_layer = [] + capacity_layer = [] + quantity_layer = [] + for i in range(self.utils.I): + ent_count = self.utils.material_enterprise_count[i] + ents = self.utils.material_optional_enterprises[i] + # 企业层 + e_genes = np.zeros(ent_count) + select_count = random.randint(1, ent_count) + select_indices = random.sample(range(ent_count), select_count) + e_genes[select_indices] = 1 + if force_risk_enterprise: + e_genes[0] = 1 + enterprise_layer.extend(e_genes) + # 能力层 + c_genes = np.zeros(ent_count) + for idx, ent in enumerate(ents): + if e_genes[idx] == 1: + if ent == 0: + max_cap = self.utils.risk.C0_i_std[i] # 修复:self.utils.risk + else: + supplier_id = ent - 1 + max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] # 修复:self.utils.supplier + c_genes[idx] = random.uniform(1, max_cap) + capacity_layer.extend(c_genes) + # 数量层 + q_genes = np.zeros(ent_count) + for idx, ent in enumerate(ents): + if e_genes[idx] == 1: + if ent == 0: + max_q = self.utils.order.Q[i] # 修复:self.utils.order + else: + supplier_id = ent - 1 + max_q = self.utils.supplier.MaxOrder[supplier_id][i] # 修复:self.utils.supplier + q_genes[idx] = random.uniform(1, max_q) + quantity_layer.extend(q_genes) + chromosome = self.utils._merge_chromosome( + np.array(enterprise_layer), np.array(capacity_layer), np.array(quantity_layer) + ) + return self.utils.repair_chromosome(chromosome) + + def initialize_population(self) -> np.ndarray: + """初始化种群:N1+N2+N3+N4""" + pop1 = self._initialize_by_objective(self.N1, "cost") + pop2 = self._initialize_by_objective(self.N2, "tardiness") + pop3 = self._initialize_by_risk_enterprise(self.N3) + pop4 = self._initialize_random(self.N4) + # 处理空数组情况 + population_list = [p for p in [pop1, pop2, pop3, pop4] if len(p) > 0] + if len(population_list) == 0: + return np.array([]) + population = np.vstack(population_list) + np.random.shuffle(population) + return population[:self.pop_size] + + def _initialize_by_objective(self, count: int, objective_type: str) -> np.ndarray: + """基于目标函数初始化""" + if count <= 0: + return np.array([]) + candidate_count = max(2*count, 20) + candidates = [self._generate_random_chromosome() for _ in range(candidate_count)] + calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 + objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] + if objective_type == "cost": + sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][0]) + else: + sorted_indices = sorted(range(candidate_count), key=lambda x: objectives[x][1]) + return np.array([candidates[i] for i in sorted_indices[:count]]) + + def _initialize_by_risk_enterprise(self, count: int) -> np.ndarray: + """基于风险企业初始化""" + if count <= 0: + return np.array([]) + candidate_count = max(2*count, 20) + candidates = [self._generate_random_chromosome(force_risk_enterprise=True) for _ in range(candidate_count)] + calculator = ObjectiveCalculator(self.utils.order, self.utils.risk, self.utils.supplier, self.utils, self.config) # 修复:self.utils的属性 + objectives = [calculator.calculate_objectives(chrom) for chrom in candidates] + nsga2 = NSGA2(candidate_count, 2) + ranks, fronts = nsga2.fast_non_dominated_sort(objectives) + selected = [] + for front in fronts: + if len(selected) + len(front) <= count: + selected.extend([candidates[i] for i in front]) + else: + selected.extend([candidates[i] for i in front[:count-len(selected)]]) + break + return np.array(selected) + + def _initialize_random(self, count: int) -> np.ndarray: + """随机初始化""" + if count <= 0: + return np.array([]) + return np.array([self._generate_random_chromosome() for _ in range(count)]) \ No newline at end of file diff --git a/genetic_operators.py b/genetic_operators.py new file mode 100644 index 0000000..adf2d30 --- /dev/null +++ b/genetic_operators.py @@ -0,0 +1,79 @@ +import random +import numpy as np +from data_structures import Config +from chromosome_utils import ChromosomeUtils + + +class GeneticOperator: + """遗传操作:交叉、变异""" + def __init__(self, config: Config, utils: ChromosomeUtils): + self.config = config + self.utils = utils + + def two_point_crossover(self, parent1: np.ndarray, parent2: np.ndarray) -> tuple[np.ndarray, np.ndarray]: + """两点交叉""" + length = self.utils.chromosome_length + if length < 2: + return parent1.copy(), parent2.copy() + point1 = random.randint(0, length//2) + point2 = random.randint(point1+1, length-1) + child1 = parent1.copy() + child2 = parent2.copy() + child1[point1:point2] = parent2[point1:point2] + child2[point1:point2] = parent1[point1:point2] + child1 = self.utils.repair_chromosome(child1) + child2 = self.utils.repair_chromosome(child2) + return child1, child2 + + def uniform_mutation(self, chromosome: np.ndarray) -> np.ndarray: + """均匀变异:遍历三层每个基因""" + mutated = chromosome.copy() + enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(mutated) + split_points = np.cumsum(self.utils.material_enterprise_count) + # 企业层变异(0/1翻转) + start = 0 + for i in range(self.utils.I): + end = split_points[i] + for idx in range(start, end): + if random.random() < self.config.mutation_prob: + enterprise_layer[idx] = 1 - enterprise_layer[idx] + start = end + enterprise_layer = self.utils.repair_enterprise_layer(enterprise_layer) + # 能力层变异 + start = 0 + for i in range(self.utils.I): + end = split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + for idx in range(start, end): + if random.random() < self.config.mutation_prob and e_segment[idx-start] == 1: + ent = ents[idx-start] + if ent == 0: + max_cap = self.utils.risk.C0_i_std[i] + else: + supplier_id = ent - 1 + max_cap = self.utils.supplier.Cj_i_std[supplier_id][i] + capacity_layer[idx] = random.uniform(1, max_cap) + start = end + capacity_layer = self.utils.repair_capacity_layer(enterprise_layer, capacity_layer) + # 数量层变异 + start = 0 + for i in range(self.utils.I): + end = split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + for idx in range(start, end): + if random.random() < self.config.mutation_prob and e_segment[idx-start] == 1: + ent = ents[idx-start] + qi = self.utils.order.Q[i] + if ent == 0: + max_q = qi + else: + supplier_id = ent - 1 + max_q = self.supplier.MaxOrder[supplier_id][i] + quantity_layer[idx] = random.uniform(1, max_q) + start = end + quantity_layer = self.utils.repair_quantity_layer(enterprise_layer, quantity_layer) + # 合并并修复 + mutated = self.utils._merge_chromosome(enterprise_layer, capacity_layer, quantity_layer) + return mutated \ No newline at end of file diff --git a/main.py b/main.py index 3747c9e..53b470a 100644 --- a/main.py +++ b/main.py @@ -1,876 +1,114 @@ -import numpy as np import random -from typing import List, Dict, Tuple, Optional -import matplotlib.pyplot as plt - - -# ============================= 数据输入模块(用户需填写)============================= -class OrderData: - """订单基础数据类(对应文档表四7符号)""" - - def __init__(self): - # 1. 订单核心参数 - self.I = 3 # 物料种类数(用户修改) - self.Q = [100, 150, 200] # 物料i待生产量Qi(索引0对应物料1,用户修改) - self.Dd = 10 # 订单需求交货期(时长,用户修改) - - # 2. 物料基础成本(风险企业供应) - self.P = [50.0, 80.0, 60.0] # 物料i单位采购价格P_i(用户修改) - self.T0 = [5.0, 8.0, 6.0] # 物料i风险企业运输成本T_i^0(用户修改) - - # 3. 惩罚系数(用户可按文档建议调整) - self.alpha = 2.0 # C1增长系数 - self.beta = 1.0 # 时间尺度系数 - self.delta = 1.3 # 惩罚倍数(1.2-1.5) - self.gamma = 0.5 # 提前交付惩罚系数 - - -class RiskEnterpriseData: - """风险企业数据类(对应文档表四7符号)""" - - def __init__(self): - self.C0_max = 30.0 # 风险企业单位时间最大生产能力C0^max(用户修改) - self.C0_std = 20.0 # 风险企业单位时间标准生产能力(用户修改) - self.L1 = 3.0 # 扰动持续时长系数下限(关键订单3,一般订单2,用户修改) - self.L2 = 10.0 # 扰动持续时长系数上限(关键订单10,一般订单5,用户修改) - self.distance = 10.0 # 风险企业位置距离(用户修改) - - -class SupplierData: - """供应商数据类(对应文档表四7符号)""" - - def __init__(self, order: OrderData): - self.I = order.I - # Ji:物料i的供应商个数(索引0对应物料1,用户修改) - self.J = [2, 3, 2] # 物料1有2个供应商,物料2有3个,物料3有2个 - # 供应商j的基础参数(按物料i分组,用户修改) - # 结构:self.suppliers[i][j] = {参数},i=物料索引,j=供应商索引(0开始) - self.suppliers = [ - # 物料1的2个供应商 - [ - {"Cj_max": 40.0, "Cj_std": 30.0, "P_ij": 55.0, "T_ij": 7.0, "MinOrder": 20, "MaxOrder": 100, - "distance": 15.0}, - {"Cj_max": 35.0, "Cj_std": 25.0, "P_ij": 52.0, "T_ij": 9.0, "MinOrder": 15, "MaxOrder": 90, - "distance": 20.0} - ], - # 物料2的3个供应商 - [ - {"Cj_max": 50.0, "Cj_std": 40.0, "P_ij": 85.0, "T_ij": 6.0, "MinOrder": 25, "MaxOrder": 150, - "distance": 12.0}, - {"Cj_max": 45.0, "Cj_std": 35.0, "P_ij": 82.0, "T_ij": 8.0, "MinOrder": 20, "MaxOrder": 120, - "distance": 18.0}, - {"Cj_max": 55.0, "Cj_std": 45.0, "P_ij": 88.0, "T_ij": 5.0, "MinOrder": 30, "MaxOrder": 160, - "distance": 10.0} - ], - # 物料3的2个供应商 - [ - {"Cj_max": 42.0, "Cj_std": 32.0, "P_ij": 65.0, "T_ij": 8.0, "MinOrder": 22, "MaxOrder": 200, - "distance": 16.0}, - {"Cj_max": 38.0, "Cj_std": 28.0, "P_ij": 63.0, "T_ij": 10.0, "MinOrder": 18, "MaxOrder": 180, - "distance": 22.0} - ] - ] - - -class AlgorithmParams: - """算法参数类(默认值可修改)""" - - def __init__(self): - self.N = 100 # 种群规模 - self.Pm = 0.05 # 变异概率 - self.Pc = 0.8 # 交叉概率 - self.MaxIter = 200 # 最大迭代次数 - self.tournament_size = 3 # 锦标赛选择规模 - # 种群比例 N1:N2:N3:N4 = 3:3:2:2 - self.N1_ratio = 0.3 - self.N2_ratio = 0.3 - self.N3_ratio = 0.2 - self.N4_ratio = 0.2 - - -# ============================= 辅助计算函数(文档对应函数)============================= -def calculate_RNum(P_Num: float, A_Num: float) -> float: - """计算剩余待生产数量R_Num = A_Num - P_Num(文档1.2.2节)""" - return max(0.0, A_Num - P_Num) - - -def calculate_ResTime(DueTime: float, riskTime: float) -> float: - """计算剩余交货期限ResTime = DueTime - riskTime(文档1.2.2节)""" - return max(0.0, DueTime - riskTime) - - -def calculate_TimeLevel(Td: float, ResTime: float, L1: float, L2: float) -> str: - """计算扰动持续程度TimeLevel(文档1.2.2节)""" - if ResTime == 0: - return "长" - ratio = Td / ResTime - if ratio <= L1: - return "短" - elif ratio >= L2: - return "长" - else: - return "中" - - -def calculate_CapLevel(R_Num: float, ResTime: float, C: float) -> str: - """计算订单生产能力等级CapLevel(文档1.2.2节)""" - if ResTime == 0: - return "不足" - capacity = C * ResTime # 总生产能力 - if capacity >= R_Num * 1.2: # 预留20%缓冲 - return "充足" - elif capacity <= R_Num * 0.8: # 缺口20%以上 - return "不足" - else: - return "一般" - - -# ============================= 染色体与种群类============================= -class Chromosome: - """染色体类(三层编码结构)""" - - def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData): - self.order = order - self.risk_ent = risk_ent - self.supplier = supplier - self.I = order.I - self.J_list = supplier.J # 各物料的供应商个数 - - # 三层编码初始化(随机生成基础结构) - self.enterprise_layer: List[List[int]] = [] # 企业编码层:I×(Ji+1),0/1 - self.capacity_layer: List[List[float]] = [] # 能力编码层:I×(Ji+1),生产能力 - self.quantity_layer: List[List[float]] = [] # 数量编码层:I×(Ji+1),分配数量 - - for i in range(self.I): - Ji = self.J_list[i] - total_ents = Ji + 1 # 1个风险企业 + Ji个供应商 - - # 企业编码层:随机生成0/1,确保至少有1个1 - ent_codes = [random.randint(0, 1) for _ in range(total_ents)] - if sum(ent_codes) == 0: - ent_codes[random.randint(0, total_ents - 1)] = 1 # 强制激活1个 - self.enterprise_layer.append(ent_codes) - - # 能力编码层:基于企业标准生产能力随机生成 - cap_codes = [] - for j in range(total_ents): - if j == 0: # 风险企业 - max_cap = risk_ent.C0_max - std_cap = risk_ent.C0_std - else: # 供应商j-1(索引转换) - max_cap = supplier.suppliers[i][j - 1]["Cj_max"] - std_cap = supplier.suppliers[i][j - 1]["Cj_std"] - # 在[0.5×std_cap, max_cap]范围内随机 - cap = random.uniform(0.5 * std_cap, max_cap) - cap_codes.append(cap) - self.capacity_layer.append(cap_codes) - - # 数量编码层:基于物料待生产量随机分配 - qty_codes = [0.0] * total_ents - Qi = order.Q[i] - selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] - if len(selected_ents) > 0: - # 随机分配数量(确保总和=Qi) - qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi - for idx, ent_idx in enumerate(selected_ents): - qty_codes[ent_idx] = qtys[idx] - self.quantity_layer.append(qty_codes) - - # 修复初始染色体(确保满足约束) - self.repair() - - def repair(self): - """染色体修复机制(文档1.4.1节)""" - # 步骤1:检查每类物料是否有供应方(物料生产约束) - for i in range(self.I): - ent_codes = self.enterprise_layer[i] - if sum(ent_codes) == 0: - # 随机激活1个企业 - active_idx = random.randint(0, len(ent_codes) - 1) - self.enterprise_layer[i][active_idx] = 1 - - # 步骤2:检查企业最大生产能力约束 - self._repair_capacity_constraint() - - # 步骤3:检查物料数量约束(总和=Qi,供应商起订量/最大供应量) - self._repair_quantity_constraint() - - def _repair_capacity_constraint(self): - """修复生产能力约束(修复属性名不匹配问题)""" - # 步骤1:修复风险企业生产能力(原逻辑不变) - risk_total_cap = 0.0 - risk_assignments = [] # 存储(i, cap):物料i分配给风险企业的能力 - for i in range(self.I): - if self.enterprise_layer[i][0] == 1: # 风险企业被选中 - cap = self.capacity_layer[i][0] - risk_total_cap += cap - risk_assignments.append((i, cap)) - - # 若超出最大产能,剔除最小能力的物料 - while risk_total_cap > self.risk_ent.C0_max and len(risk_assignments) > 0: - risk_assignments.sort(key=lambda x: x[1]) - min_i, min_cap = risk_assignments.pop(0) - self.enterprise_layer[min_i][0] = 0 # 取消选择风险企业 - self.capacity_layer[min_i][0] = 0.0 - self.quantity_layer[min_i][0] = 0.0 - risk_total_cap -= min_cap - - # 步骤2:修复各供应商生产能力(核心修复:J_list → J) - for i in range(self.I): # 遍历每个物料i,找到该物料的所有供应商编码索引 - Ji = self.J_list[i] # 物料i的供应商个数(注:此处self.J_list是Chromosome类自身属性,无需修改) - for j in range(1, Ji + 1): # j=1~Ji:物料i的供应商编码索引(对应供应商j-1) - supplier_idx = j - 1 # 供应商在suppliers列表中的索引(0开始) - supp_total_cap = 0.0 - supp_assignments = [] - - # 遍历所有物料k,计算该供应商(j索引)在各物料上的总产能 - for k in range(self.I): - # 核心修复:self.supplier.J_list[k] → self.supplier.J[k] - if j >= len(self.enterprise_layer[k]) or supplier_idx >= self.supplier.J[k]: - continue # 跳过无该供应商的物料,避免索引越界 - - # 若物料k选择了该供应商(j索引),累加产能 - if self.enterprise_layer[k][j] == 1: - cap = self.capacity_layer[k][j] - supp_total_cap += cap - supp_assignments.append((k, cap)) - - # 若该供应商总产能超出最大产能,剔除最小能力的物料 - if len(supp_assignments) == 0: - continue # 无物料分配给该供应商,跳过 - - # 获取该供应商在当前物料k上的最大产能(修正:用第一个分配的物料k) - first_k = supp_assignments[0][0] - supp_max_cap = self.supplier.suppliers[first_k][supplier_idx]["Cj_max"] - - # 剔除超出产能的物料分配 - while supp_total_cap > supp_max_cap and len(supp_assignments) > 0: - supp_assignments.sort(key=lambda x: x[1]) - min_k, min_cap = supp_assignments.pop(0) - self.enterprise_layer[min_k][j] = 0 # 取消该物料对该供应商的选择 - self.capacity_layer[min_k][j] = 0.0 - self.quantity_layer[min_k][j] = 0.0 - supp_total_cap -= min_cap - - def _repair_quantity_constraint(self): - """修复物料数量约束(修复除以零错误)""" - for i in range(self.I): - Qi = self.order.Q[i] - ent_codes = self.enterprise_layer[i] - qty_codes = self.quantity_layer[i] - selected_ents = [idx for idx, val in enumerate(ent_codes) if val == 1] - current_total = sum(qty_codes) - - # 调整总量至Qi(允许微小误差) - if abs(current_total - Qi) > 1e-6: - ratio = Qi / (current_total + 1e-10) # 避免初始总量为0时除以零 - for idx in selected_ents: - qty_codes[idx] *= ratio - - # 检查供应商起订量和最大供应量 - Ji = self.J_list[i] - for j in range(1, Ji + 1): # 供应商j-1 - if ent_codes[j] == 1: - min_order = self.supplier.suppliers[i][j - 1]["MinOrder"] - max_order = self.supplier.suppliers[i][j - 1]["MaxOrder"] - qty = qty_codes[j] - - if qty < min_order: - # 低于起订量:要么增加到起订量,要么取消选择 - if sum(qty_codes) + (min_order - qty) <= Qi * 1.1: # 允许10%缓冲 - qty_codes[j] = min_order - # 从其他选中企业中减少对应数量 - surplus_reduce = min_order - qty - other_ents = [idx for idx in selected_ents if idx != j] - if other_ents: - sum_other_qty = sum(qty_codes[idx] for idx in other_ents) - if sum_other_qty > 1e-10: # 避免除以零 - reduce_ratio = surplus_reduce / sum_other_qty - for idx in other_ents: - qty_codes[idx] *= (1 - reduce_ratio) - else: - # 其他企业无数量可减,直接取消当前供应商选择 - ent_codes[j] = 0 - qty_codes[j] = 0.0 - selected_ents.remove(j) - else: - ent_codes[j] = 0 - qty_codes[j] = 0.0 - selected_ents.remove(j) - - elif qty > max_order: - # 超过最大供应量:调整到最大值,剩余差额分配给其他企业 - qty_codes[j] = max_order - surplus_add = qty - max_order # 此处原逻辑笔误:应为当前qty - max_order,即超出部分转为需补充的差额 - other_ents = [idx for idx in selected_ents if idx != j] - if other_ents: - # 核心修复:检查other_ents的数量总和是否为零 - sum_other_qty = sum(qty_codes[idx] for idx in other_ents) - if sum_other_qty > 1e-10: # 总和非零,按比例分配 - add_ratio = surplus_add / sum_other_qty - for idx in other_ents: - qty_codes[idx] *= (1 + add_ratio) - else: - # 总和为零,直接将差额分配给第一个其他企业 - target_ent = other_ents[0] - # 确保不超过目标企业的最大供应量(若为供应商) - if target_ent == 0: # 目标是风险企业(无最大供应量限制,仅需满足产能) - qty_codes[target_ent] += surplus_add - else: # 目标是供应商,需检查最大供应量 - supp_max_order = self.supplier.suppliers[i][target_ent - 1]["MaxOrder"] - add_qty = min(surplus_add, supp_max_order - qty_codes[target_ent]) - qty_codes[target_ent] += add_qty - # 若仍有剩余差额(目标企业已达最大供应量),递归处理 - remaining_surplus = surplus_add - add_qty - if remaining_surplus > 1e-6: - other_ents.remove(target_ent) - if other_ents: - # 重新计算剩余差额分配 - sum_other_qty_new = sum(qty_codes[idx] for idx in other_ents) - if sum_other_qty_new > 1e-10: - add_ratio_new = remaining_surplus / sum_other_qty_new - for idx in other_ents: - qty_codes[idx] *= (1 + add_ratio_new) - else: - # 继续分配给下一个企业 - for next_ent in other_ents: - if remaining_surplus <= 1e-6: - break - if next_ent == 0: - qty_codes[next_ent] += remaining_surplus - remaining_surplus = 0 - else: - supp_max = self.supplier.suppliers[i][next_ent - 1]["MaxOrder"] - add = min(remaining_surplus, supp_max - qty_codes[next_ent]) - qty_codes[next_ent] += add - remaining_surplus -= add - - # 最终确保总和=Qi(避免累积误差) - current_total = sum(qty_codes) - if abs(current_total - Qi) > 1e-6 and selected_ents: - ratio = Qi / (current_total + 1e-10) # 加微小值避免除以零 - for idx in selected_ents: - qty_codes[idx] *= ratio - - def calculate_fitness(self) -> Tuple[float, float]: - """计算适应度(变更成本C,交付延期T)""" - C1 = self._calculate_C1() - C2 = self._calculate_C2() - C3 = self._calculate_C3() - C4 = self._calculate_C4() - C = C1 + C2 + C3 + C4 - - T = self._calculate_T() - return C, T - - def _calculate_C1(self) -> float: - """计算变更惩罚成本C1""" - # 简化:假设ResTime=Dd(风险识别时刻为0),R_Num=Qi - ResTime = self.order.Dd - total_C1 = 0.0 - for i in range(self.I): - Qi = self.order.Q[i] - # 最大惩罚成本=δ*(采购成本+运输成本)*Qi - max_penalty = self.order.delta * (self.order.P[i] + self.order.T0[i]) * Qi - # 指数惩罚项 - exp_penalty = self.order.delta * Qi * np.exp(self.order.alpha / (ResTime + self.order.beta)) - total_C1 += min(max_penalty, exp_penalty) - return total_C1 - - def _calculate_C2(self) -> float: - """计算采购变更成本C2""" - total_purchase_risk = 0.0 # 原风险企业采购成本 - total_purchase_supp = 0.0 # 变更后供应商采购成本 - for i in range(self.I): - # 风险企业采购成本 - if self.enterprise_layer[i][0] == 1: - total_purchase_risk += self.quantity_layer[i][0] * self.order.P[i] - # 供应商采购成本 - Ji = self.J_list[i] - for j in range(1, Ji + 1): - if self.enterprise_layer[i][j] == 1: - supp_p = self.supplier.suppliers[i][j - 1]["P_ij"] - total_purchase_supp += self.quantity_layer[i][j] * supp_p - return total_purchase_supp - total_purchase_risk - - def _calculate_C3(self) -> float: - """计算运输变更成本C3""" - total_trans_risk = 0.0 # 原风险企业运输成本 - total_trans_supp = 0.0 # 变更后供应商运输成本 - for i in range(self.I): - # 风险企业运输成本 - if self.enterprise_layer[i][0] == 1: - total_trans_risk += self.quantity_layer[i][0] * self.order.T0[i] - # 供应商运输成本 - Ji = self.J_list[i] - for j in range(1, Ji + 1): - if self.enterprise_layer[i][j] == 1: - supp_t = self.supplier.suppliers[i][j - 1]["T_ij"] - total_trans_supp += self.quantity_layer[i][j] * supp_t - return total_trans_supp - total_trans_risk - - def _calculate_C4(self) -> float: - """计算提前交付惩罚成本C4""" - delivery_dates = self._get_delivery_dates() - actual_delivery = max(delivery_dates) - advance = self.order.Dd - actual_delivery - return self.order.gamma * max(0.0, advance) - - def _calculate_T(self) -> float: - """计算交付延期T""" - delivery_dates = self._get_delivery_dates() - actual_delivery = max(delivery_dates) - delay = actual_delivery - self.order.Dd - return max(0.0, delay) - - def _get_delivery_dates(self) -> List[float]: - """计算各企业的交付日期(生产耗时+运输耗时)""" - delivery_dates = [] - for i in range(self.I): - # 风险企业交付日期 - if self.enterprise_layer[i][0] == 1: - qty = self.quantity_layer[i][0] - cap = self.capacity_layer[i][0] - if cap == 0: - prod_time = 0.0 - else: - prod_time = qty / cap # 生产耗时 - trans_time = self.risk_ent.distance / 10.0 # 简化运输耗时(距离/10) - delivery_dates.append(prod_time + trans_time) - # 供应商交付日期 - Ji = self.J_list[i] - for j in range(1, Ji + 1): - if self.enterprise_layer[i][j] == 1: - qty = self.quantity_layer[i][j] - cap = self.capacity_layer[i][j] - if cap == 0: - prod_time = 0.0 - else: - prod_time = qty / cap - trans_time = self.supplier.suppliers[i][j - 1]["distance"] / 10.0 - delivery_dates.append(prod_time + trans_time) - - # 新增:如果没有任何交付日期(空列表),返回一个默认值 - if not delivery_dates: - # 返回一个很大的值表示无法交付,触发最大延期惩罚 - return [float('inf')] - return delivery_dates - - -class Population: - """种群类""" - - def __init__(self, order: OrderData, risk_ent: RiskEnterpriseData, supplier: SupplierData, params: AlgorithmParams): - self.order = order - self.risk_ent = risk_ent - self.supplier = supplier - self.params = params - self.N = params.N - self.population: List[Chromosome] = [] - - # 初始化种群(混合策略) - self._initialize_population() - - def _initialize_population(self): - """混合种群初始化(文档1.4.1节)""" - N1 = int(self.N * self.params.N1_ratio) # 变更成本最小导向 - N2 = int(self.N * self.params.N2_ratio) # 交付延期最短导向 - N3 = int(self.N * self.params.N3_ratio) # 风险企业导向 - N4 = self.N - N1 - N2 - N3 # 随机种群 - - # 1. 生成N1:变更成本最小导向 - self.population.extend(self._generate_target_oriented_pop(N1, target="cost")) - - # 2. 生成N2:交付延期最短导向 - self.population.extend(self._generate_target_oriented_pop(N2, target="delay")) - - # 3. 生成N3:风险企业导向 - self.population.extend(self._generate_risk_ent_oriented_pop(N3)) - - # 4. 生成N4:随机种群 - self.population.extend([Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(N4)]) - - def _generate_target_oriented_pop(self, size: int, target: str) -> List[Chromosome]: - """生成目标函数导向种群(文档1.4.1节)""" - if size <= 0: - return [] - - # 随机生成2*size个候选解 - candidates = [Chromosome(self.order, self.risk_ent, self.supplier) for _ in range(2 * size)] - - # 计算适应度并排序 - if target == "cost": - # 按变更成本升序排序 - candidates.sort(key=lambda x: x.calculate_fitness()[0]) - else: # target == "delay" - # 按交付延期升序排序 - candidates.sort(key=lambda x: x.calculate_fitness()[1]) - - # 选择前size个 - return candidates[:size] - - def _generate_risk_ent_oriented_pop(self, size: int) -> List[Chromosome]: - """生成风险企业导向种群(文档1.4.1节)""" - if size <= 0: - return [] - - candidates = [] - for _ in range(2 * size): - chrom = Chromosome(self.order, self.risk_ent, self.supplier) - # 强制风险企业选中所有物料 - for i in range(self.order.I): - chrom.enterprise_layer[i][0] = 1 # yi0=1 - # 修复染色体 - chrom.repair() - candidates.append(chrom) - - # 计算Pareto Rank,选择Rank最低的size个 - ranked_candidates = self._fast_non_dominated_sort(candidates)[0] # Rank=0的解 - if len(ranked_candidates) >= size: - return random.sample(ranked_candidates, size) - else: - # 不足时补充其他Rank的解 - all_ranked = self._fast_non_dominated_sort(candidates) - selected = [] - for rank in all_ranked: - selected.extend(rank) - if len(selected) >= size: - return selected[:size] - return selected - - def _fast_non_dominated_sort(self, population: List[Chromosome]) -> List[List[Chromosome]]: - """快速非支配排序(标准NSGA-II算法)""" - dominated_sets = [[] for _ in range(len(population))] - domination_counts = [0] * len(population) - ranks = [0] * len(population) - fronts = [[]] - - # 计算每个个体的支配关系 - for p_idx in range(len(population)): - p_cost, p_delay = population[p_idx].calculate_fitness() - for q_idx in range(len(population)): - if p_idx == q_idx: - continue - q_cost, q_delay = population[q_idx].calculate_fitness() - - # p支配q:p的两个目标都不劣于q,且至少一个目标更优 - if (p_cost <= q_cost and p_delay <= q_delay) and (p_cost < q_cost or p_delay < q_delay): - dominated_sets[p_idx].append(q_idx) - # q支配p - elif (q_cost <= p_cost and q_delay <= p_delay) and (q_cost < p_cost or q_delay < q_delay): - domination_counts[p_idx] += 1 - - # 初始前沿(Rank=0) - if domination_counts[p_idx] == 0: - ranks[p_idx] = 0 - fronts[0].append(population[p_idx]) - - # 生成后续前沿 - rank = 1 - while True: - current_front = [] - for p_idx in range(len(population)): - if ranks[p_idx] == rank - 1: - for q_idx in dominated_sets[p_idx]: - domination_counts[q_idx] -= 1 - if domination_counts[q_idx] == 0: - ranks[q_idx] = rank - current_front.append(population[q_idx]) - if not current_front: - break - fronts.append(current_front) - rank += 1 - - return fronts - - def _calculate_crowding_distance(self, front: List[Chromosome]) -> List[float]: - """计算拥挤度(标准NSGA-II算法)""" - n = len(front) - if n == 0: - return [] - distances = [0.0] * n - - # 按变更成本排序 - cost_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[0]) - # 边界个体拥挤度设为无穷大 - distances[cost_sorted[0][0]] = float('inf') - distances[cost_sorted[-1][0]] = float('inf') - # 计算中间个体拥挤度 - max_cost = cost_sorted[-1][1].calculate_fitness()[0] - min_cost = cost_sorted[0][1].calculate_fitness()[0] - if max_cost - min_cost > 1e-6: - for i in range(1, n - 1): - prev_cost = cost_sorted[i - 1][1].calculate_fitness()[0] - next_cost = cost_sorted[i + 1][1].calculate_fitness()[0] - distances[cost_sorted[i][0]] += (next_cost - prev_cost) / (max_cost - min_cost) - - # 按交付延期排序 - delay_sorted = sorted(enumerate(front), key=lambda x: x[1].calculate_fitness()[1]) - # 边界个体拥挤度设为无穷大 - distances[delay_sorted[0][0]] = float('inf') - distances[delay_sorted[-1][0]] = float('inf') - # 计算中间个体拥挤度 - max_delay = delay_sorted[-1][1].calculate_fitness()[1] - min_delay = delay_sorted[0][1].calculate_fitness()[1] - if max_delay - min_delay > 1e-6: - for i in range(1, n - 1): - prev_delay = delay_sorted[i - 1][1].calculate_fitness()[1] - next_delay = delay_sorted[i + 1][1].calculate_fitness()[1] - distances[delay_sorted[i][0]] += (next_delay - prev_delay) / (max_delay - min_delay) - - return distances - - -# ============================= 遗传操作函数============================= -def crossover(parent1: Chromosome, parent2: Chromosome, Pc: float) -> Tuple[Chromosome, Chromosome]: - """两点交叉(文档1.4.1节)""" - if random.random() > Pc: - return parent1, parent2 - - # 复制父代染色体 - child1 = Chromosome(parent1.order, parent1.risk_ent, parent1.supplier) - child2 = Chromosome(parent2.order, parent2.risk_ent, parent2.supplier) - - # 确定染色体总基因长度(每层基因数相同) - total_genes_per_layer = sum([len(layer) for layer in parent1.enterprise_layer]) - if total_genes_per_layer < 2: - return child1, child2 - - # 随机选择两个交叉位置 - pos1 = random.randint(0, total_genes_per_layer - 2) - pos2 = random.randint(pos1 + 1, total_genes_per_layer - 1) - - # 对三层编码分别执行交叉 - for layer_idx, (layer1, layer2) in enumerate([ - (child1.enterprise_layer, child2.enterprise_layer), - (child1.capacity_layer, child2.capacity_layer), - (child1.quantity_layer, child2.quantity_layer) - ]): - # 展平层编码 - flat1 = [] - flat2 = [] - for segment in layer1: - flat1.extend(segment) - for segment in layer2: - flat2.extend(segment) - - # 交换pos1-pos2区间的基因 - flat1[pos1:pos2 + 1], flat2[pos1:pos2 + 1] = flat2[pos1:pos2 + 1], flat1[pos1:pos2 + 1] - - # 重构层编码 - ptr = 0 - for i in range(len(layer1)): - segment_len = len(layer1[i]) - layer1[i] = flat1[ptr:ptr + segment_len] - layer2[i] = flat2[ptr:ptr + segment_len] - ptr += segment_len - - # 修复子代染色体 - child1.repair() - child2.repair() - - return child1, child2 - - -def mutate(chrom: Chromosome, Pm: float) -> Chromosome: - """均匀变异(文档1.4.1节)""" - # 对三层编码分别执行变异 - # 1. 企业编码层(0/1翻转) - for i in range(chrom.I): - for j in range(len(chrom.enterprise_layer[i])): - if random.random() < Pm: - chrom.enterprise_layer[i][j] = 1 - chrom.enterprise_layer[i][j] - - # 2. 能力编码层(随机重置) - for i in range(chrom.I): - Ji = chrom.J_list[i] - for j in range(len(chrom.capacity_layer[i])): - if random.random() < Pm: - if j == 0: # 风险企业 - max_cap = chrom.risk_ent.C0_max - std_cap = chrom.risk_ent.C0_std - else: # 供应商j-1 - max_cap = chrom.supplier.suppliers[i][j - 1]["Cj_max"] - std_cap = chrom.supplier.suppliers[i][j - 1]["Cj_std"] - chrom.capacity_layer[i][j] = random.uniform(0.5 * std_cap, max_cap) - - # 3. 数量编码层(随机重置) - for i in range(chrom.I): - Qi = chrom.order.Q[i] - selected_ents = [idx for idx, val in enumerate(chrom.enterprise_layer[i]) if val == 1] - if len(selected_ents) > 0 and random.random() < Pm: - # 重新分配数量 - qtys = np.random.dirichlet(np.ones(len(selected_ents))) * Qi - for idx, ent_idx in enumerate(selected_ents): - chrom.quantity_layer[i][ent_idx] = qtys[idx] - - # 修复变异后的染色体 - chrom.repair() - return chrom - - -def tournament_selection(population: List[Chromosome], tournament_size: int) -> Chromosome: - """锦标赛选择""" - candidates = random.sample(population, tournament_size) - # 选择Pareto支配最优的个体 - best = candidates[0] - best_cost, best_delay = best.calculate_fitness() - for cand in candidates[1:]: - cand_cost, cand_delay = cand.calculate_fitness() - # cand支配best - if (cand_cost <= best_cost and cand_delay <= best_delay) and (cand_cost < best_cost or cand_delay < best_delay): - best = cand - best_cost, best_delay = cand_cost, cand_delay - return best - - -# ============================= 主算法函数============================= -def improved_nsga2() -> Tuple[List[Chromosome], List[Dict]]: - """改进NSGA-II算法主函数(文档1.4.2节流程)""" - # 1. 初始化数据和参数 - order = OrderData() - risk_ent = RiskEnterpriseData() - supplier = SupplierData(order) - params = AlgorithmParams() - - # 2. 初始化种群(第0代) - pop = Population(order, risk_ent, supplier, params) - population = pop.population - - # 3. 迭代优化 - iteration_history = [] # 记录每代最优目标值 - for iter_idx in range(params.MaxIter): - # 4. 生成子代种群 - offspring = [] - while len(offspring) < params.N: - # 锦标赛选择父代 - parent1 = tournament_selection(population, params.tournament_size) - parent2 = tournament_selection(population, params.tournament_size) - # 交叉 - child1, child2 = crossover(parent1, parent2, params.Pc) - # 变异 - child1 = mutate(child1, params.Pm) - child2 = mutate(child2, params.Pm) - # 添加到子代 - offspring.append(child1) - if len(offspring) < params.N: - offspring.append(child2) - - # 5. 合并父代和子代 - combined = population + offspring - - # 6. 快速非支配排序 - fronts = pop._fast_non_dominated_sort(combined) - - # 7. 拥挤度计算与精英保留 - next_population = [] - for front in fronts: - if len(next_population) + len(front) <= params.N: - # 该前沿全部入选 - next_population.extend(front) +import numpy as np +from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config +from chromosome_utils import ChromosomeUtils +from objective_calculator import ObjectiveCalculator +from encoder import Encoder +from genetic_operators import GeneticOperator +from nsga2 import NSGA2 +from visualizer import ResultVisualizer + + +def main(): + # 初始化数据 + order_data = OrderData() + risk_data = RiskEnterpriseData() + supplier_data = SupplierData() + config = Config() + + # 初始化工具类 + utils = ChromosomeUtils(order_data, risk_data, supplier_data) + calculator = ObjectiveCalculator(order_data, risk_data, supplier_data, utils, config) + encoder = Encoder(config, utils) + genetic_op = GeneticOperator(config, utils) + nsga2 = NSGA2(config.pop_size, config.objective_num) + visualizer = ResultVisualizer(utils) + + # 初始化种群 + population = encoder.initialize_population() + print(f"初始化种群完成,种群大小: {population.shape}") + + # 记录历史 + all_objectives = [] + convergence_history = [] + best_front = [] + best_front_objs = [] + no_improve_count = 0 + prev_best_avg = float('inf') + + # 进化过程 + for generation in range(config.max_generations): + # 计算目标函数 + objectives = [calculator.calculate_objectives(chrom) for chrom in population] + all_objectives.extend(objectives) + + # 记录当前代的最优前沿 + ranks, fronts = nsga2.fast_non_dominated_sort(objectives) + current_front = fronts[0] + current_front_objs = [objectives[i] for i in current_front] + best_front = population[current_front] + best_front_objs = current_front_objs + + # 收敛判断(基于前沿平均目标值) + if len(current_front_objs) > 0: + avg_cost = sum(obj[0] for obj in current_front_objs) / len(current_front_objs) + avg_tardiness = sum(obj[1] for obj in current_front_objs) / len(current_front_objs) + convergence_history.append((avg_cost, avg_tardiness)) + current_avg = avg_cost + avg_tardiness + if abs(current_avg - prev_best_avg) < 1e-4: + no_improve_count += 1 else: - # 计算拥挤度,选择拥挤度大的个体 - distances = pop._calculate_crowding_distance(front) - # 按拥挤度降序排序 - front_sorted = [front[i] for i in np.argsort(distances)[::-1]] - # 选择剩余名额 - need = params.N - len(next_population) - next_population.extend(front_sorted[:need]) - break + no_improve_count = 0 + prev_best_avg = current_avg + else: + no_improve_count += 1 - # 8. 更新种群 - population = next_population + # 选择(锦标赛选择) + selected = nsga2.selection(population, objectives) - # 9. 记录迭代历史 - current_front = pop._fast_non_dominated_sort(population)[0] - min_cost = min([chrom.calculate_fitness()[0] for chrom in current_front]) - min_delay = min([chrom.calculate_fitness()[1] for chrom in current_front]) - iteration_history.append({ - "iter": iter_idx, - "min_cost": min_cost, - "min_delay": min_delay, - "pareto_size": len(current_front) - }) + # 交叉 + offspring = [] + i = 0 + while len(offspring) < config.pop_size: + if i+1 < config.pop_size and random.random() < config.crossover_prob: + parent1 = selected[i] + parent2 = selected[i+1] + child1, child2 = genetic_op.two_point_crossover(parent1, parent2) + offspring.append(child1) + if len(offspring) < config.pop_size: + offspring.append(child2) + else: + offspring.append(selected[i]) + i += 2 - # 打印迭代信息 - if (iter_idx + 1) % 20 == 0: - print( - f"迭代 {iter_idx + 1:3d} | 最优成本: {min_cost:8.2f} | 最优延期: {min_delay:6.2f} | Pareto解数: {len(current_front)}") + # 变异 + offspring = [genetic_op.uniform_mutation(chrom) if random.random() < config.mutation_prob else chrom + for chrom in offspring] + offspring = np.array(offspring[:config.pop_size]) - # 10. 输出最终Pareto解集 - final_front = pop._fast_non_dominated_sort(population)[0] - return final_front, iteration_history + # 合并父代和子代 + combined = np.vstack([population, offspring]) + combined_objs = objectives + [calculator.calculate_objectives(chrom) for chrom in offspring] + + # 环境选择(核心:NSGA-II的环境选择) + population, objectives = nsga2.environmental_selection(combined, combined_objs) + + # 早停检查 + if no_improve_count >= config.early_stop_patience: + print(f"早停于第 {generation} 代") + break + + if generation % 50 == 0: + print(f"第 {generation} 代完成,当前最优前沿大小: {len(current_front)}") + + # 结果可视化与打印 + print("进化完成,绘制结果...") + visualizer.plot_pareto_front(all_objectives, best_front_objs) + visualizer.plot_convergence(convergence_history) + visualizer.print_pareto_solutions(best_front, best_front_objs) -# ============================= 结果可视化与输出============================= -def visualize_results(pareto_front: List[Chromosome], iteration_history: List[Dict]): - """结果可视化""" - # 1. Pareto前沿曲线 - costs = [chrom.calculate_fitness()[0] for chrom in pareto_front] - delays = [chrom.calculate_fitness()[1] for chrom in pareto_front] - - plt.figure(figsize=(12, 5)) - - # 子图1:Pareto前沿 - plt.subplot(1, 2, 1) - plt.scatter(costs, delays, c='red', s=50, alpha=0.7) - plt.xlabel('变更成本', fontsize=12) - plt.ylabel('交付延期', fontsize=12) - plt.title('Pareto最优前沿', fontsize=14) - plt.grid(True, alpha=0.3) - - # 子图2:迭代收敛曲线 - plt.subplot(1, 2, 2) - iters = [item["iter"] for item in iteration_history] - min_costs = [item["min_cost"] for item in iteration_history] - min_delays = [item["min_delay"] for item in iteration_history] - - plt.plot(iters, min_costs, label='最优变更成本', linewidth=2) - plt.plot(iters, min_delays, label='最优交付延期', linewidth=2, linestyle='--') - plt.xlabel('迭代次数', fontsize=12) - plt.ylabel('目标值', fontsize=12) - plt.title('迭代收敛曲线', fontsize=14) - plt.legend() - plt.grid(True, alpha=0.3) - - plt.tight_layout() - plt.show() - - -def print_pareto_solutions(pareto_front: List[Chromosome]): - """打印Pareto解的详细信息""" - print("\n" + "=" * 80) - print("Pareto最优解集详细信息") - print("=" * 80) - for idx, chrom in enumerate(pareto_front): - cost, delay = chrom.calculate_fitness() - print(f"\n解 {idx + 1}:") - print(f" 变更成本: {cost:.2f} | 交付延期: {delay:.2f}") - print(" 订单分配方案:") - for i in range(chrom.I): - print(f" 物料{i + 1} (待生产: {chrom.order.Q[i]}):") - # 风险企业分配 - if chrom.enterprise_layer[i][0] == 1: - qty = chrom.quantity_layer[i][0] - cap = chrom.capacity_layer[i][0] - print(f" 风险企业: 数量={qty:.1f}, 生产能力={cap:.1f}") - # 供应商分配 - Ji = chrom.J_list[i] - for j in range(1, Ji + 1): - if chrom.enterprise_layer[i][j] == 1: - qty = chrom.quantity_layer[i][j] - cap = chrom.capacity_layer[i][j] - supp_info = chrom.supplier.suppliers[i][j - 1] - print(f" 供应商{j}: 数量={qty:.1f}, 生产能力={cap:.1f}, 采购价={supp_info['P_ij']:.1f}") - - -# ============================= 主函数调用============================= if __name__ == "__main__": - # 运行算法 - print("改进NSGA-II算法开始运行...") - print("=" * 60) - pareto_front, iteration_history = improved_nsga2() - - # 结果输出与可视化 - print_pareto_solutions(pareto_front) - visualize_results(pareto_front, iteration_history) \ No newline at end of file + random.seed(42) + np.random.seed(42) + main() \ No newline at end of file diff --git a/nsga2.py b/nsga2.py new file mode 100644 index 0000000..c74c61a --- /dev/null +++ b/nsga2.py @@ -0,0 +1,121 @@ +import random +import numpy as np + + +class NSGA2: + def __init__(self, pop_size, objective_num): + self.pop_size = pop_size + self.objective_num = objective_num + + def fast_non_dominated_sort(self, objective_values): + """标准快速非支配排序实现""" + pop_size = len(objective_values) + domination_count = [0] * pop_size + dominated_solutions = [[] for _ in range(pop_size)] + ranks = [0] * pop_size + front = [[]] # 第0层 + + # 计算支配关系 + for p in range(pop_size): + for q in range(pop_size): + if p == q: + continue + # p支配q? + if self._dominates(objective_values[p], objective_values[q]): + dominated_solutions[p].append(q) + # q支配p? + elif self._dominates(objective_values[q], objective_values[p]): + domination_count[p] += 1 + if domination_count[p] == 0: + ranks[p] = 0 + front[0].append(p) + + # 处理后续层 + i = 0 + while len(front[i]) > 0: + next_front = [] + for p in front[i]: + for q in dominated_solutions[p]: + domination_count[q] -= 1 + if domination_count[q] == 0: + ranks[q] = i + 1 + next_front.append(q) + i += 1 + front.append(next_front) + + return ranks, front[:-1] # 返回rank数组和各层front列表 + + def _dominates(self, a, b): + """判断a是否支配b:a的所有目标≤b,且至少一个目标= distances[j]): + selected.append(population[i]) + else: + selected.append(population[j]) + return np.array(selected) + + def _calculate_all_crowding_distances(self, objective_values, ranks): + """计算所有个体的拥挤度""" + max_rank = max(ranks) if ranks else 0 + all_distances = [0.0] * len(objective_values) + for r in range(max_rank + 1): + front = [i for i in range(len(objective_values)) if ranks[i] == r] + if len(front) <= 1: + for i in front: + all_distances[i] = float('inf') + continue + distances = self.crowding_distance(objective_values, front) + for i, idx in enumerate(front): + all_distances[idx] = distances[i] + return all_distances + + def environmental_selection(self, population, objective_values): + """环境选择:从合并种群中选择pop_size个个体""" + ranks, fronts = self.fast_non_dominated_sort(objective_values) + selected = [] + for front in fronts: + if len(selected) + len(front) <= self.pop_size: + selected.extend(front) + else: + # 计算当前front的拥挤度,选择剩余数量的个体 + distances = self.crowding_distance(objective_values, front) + sorted_front = [x for _, x in sorted(zip(distances, front), reverse=True)] + selected.extend(sorted_front[:self.pop_size - len(selected)]) + break + return population[selected], [objective_values[i] for i in selected] \ No newline at end of file diff --git a/objective_calculator.py b/objective_calculator.py new file mode 100644 index 0000000..60218ab --- /dev/null +++ b/objective_calculator.py @@ -0,0 +1,156 @@ +import numpy as np +from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config +from chromosome_utils import ChromosomeUtils + + +class ObjectiveCalculator: + """目标函数计算器:变更成本C + 交付延期T""" + + def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData, + utils: ChromosomeUtils, config: Config): + self.order = order_data + self.risk = risk_data + self.supplier = supplier_data + self.utils = utils + self.config = config + self.split_points = np.cumsum(utils.material_enterprise_count) + + def calculate_objectives(self, chromosome: np.ndarray) -> tuple[float, float]: + """计算双目标值:(变更成本C, 交付延期T)""" + enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome) + # 修复:传入capacity_layer参数 + C = self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer) + T = self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer) + return C, T + + # 修复:添加capacity_layer参数 + def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, quantity_layer: np.ndarray) -> float: + """计算变更成本C = C1 + C2 + C3 + C4""" + C1 = 0.0 # 变更惩罚成本 + C2 = 0.0 # 采购变更成本 + C3 = 0.0 # 运输变更成本 + C4 = 0.0 # 提前交付惩罚成本 + + # 原始成本(全部由风险企业生产) + original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I)) + original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I)) + + # 变更后成本 + new_purchase_cost = 0.0 + new_transport_cost = 0.0 + risk_production = np.zeros(self.order.I) # 风险企业生产数量 + supplier_production = np.zeros(self.order.I) # 供应商生产数量 + + start = 0 + for i in range(self.order.I): + end = self.split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + q_segment = quantity_layer[start:end] + + for idx, ent in enumerate(ents): + if e_segment[idx] == 1: + q = q_segment[idx] + if ent == 0: + # 风险企业 + risk_production[i] += q + new_purchase_cost += q * self.order.P0[i] + new_transport_cost += q * self.order.T0[i] + else: + # 供应商 + supplier_id = ent - 1 + supplier_production[i] += q + new_purchase_cost += q * self.supplier.P_ij[supplier_id][i] + new_transport_cost += q * self.supplier.T_ij[supplier_id][i] + start = end + + # 计算C1:变更惩罚成本 + for i in range(self.order.I): + C1 += self.config.delta * supplier_production[i] * (self.order.P0[i] + self.order.T0[i]) + + # 计算C2:采购变更成本 + C2 = new_purchase_cost - original_purchase_cost + + # 计算C3:运输变更成本 + C3 = new_transport_cost - original_transport_cost + + # 计算C4:提前交付惩罚成本 + # 修复:传入capacity_layer参数 + actual_delivery_time = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) + if actual_delivery_time < self.order.Dd: + # 计算风险企业和各供应商的交货时间 + risk_delivery = [] + supplier_deliveries = {} + start = 0 + for i in range(self.order.I): + end = self.split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + c_segment = capacity_layer[start:end] + q_segment = quantity_layer[start:end] + + for idx, ent in enumerate(ents): + if e_segment[idx] == 1: + q = q_segment[idx] + c = c_segment[idx] + if c == 0: + production_time = 0 + else: + production_time = q / c + + if ent == 0: + transport_time = self.risk.distance / self.order.transport_speed + risk_delivery.append(production_time + transport_time) + else: + supplier_id = ent - 1 + transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed + if supplier_id not in supplier_deliveries: + supplier_deliveries[supplier_id] = [] + supplier_deliveries[supplier_id].append(production_time + transport_time) + start = end + + D0 = max(risk_delivery) if risk_delivery else 0 + Dj_sum = sum(max(times) for times in supplier_deliveries.values()) if supplier_deliveries else 0 + C4 = self.config.gamma * ((self.order.Dd - D0) + Dj_sum) + + return C1 + C2 + C3 + C4 + + def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, + quantity_layer: np.ndarray) -> float: + """计算实际交货期:所有企业中最长的生产+运输时间""" + max_time = 0.0 + start = 0 + for i in range(self.order.I): + end = self.split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + c_segment = capacity_layer[start:end] + q_segment = quantity_layer[start:end] + + for idx, ent in enumerate(ents): + if e_segment[idx] == 1: + q = q_segment[idx] + c = c_segment[idx] + if c == 0: + production_time = 0 + else: + production_time = q / c + + # 计算运输时间 + if ent == 0: + transport_time = self.risk.distance / self.order.transport_speed + else: + supplier_id = ent - 1 + transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed + + total_time = production_time + transport_time + if total_time > max_time: + max_time = total_time + start = end + return max_time + + def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray, + quantity_layer: np.ndarray) -> float: + """计算交付延期T = max(0, 实际交货期 - 需求交货期)""" + actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) + return max(0.0, actual_delivery - self.order.Dd) \ No newline at end of file diff --git a/visualizer.py b/visualizer.py new file mode 100644 index 0000000..31e70d7 --- /dev/null +++ b/visualizer.py @@ -0,0 +1,93 @@ +import matplotlib.pyplot as plt +import numpy as np +from chromosome_utils import ChromosomeUtils + + +class ResultVisualizer: + """结果可视化工具""" + def __init__(self, utils: ChromosomeUtils): + self.utils = utils + self.figsize = (12, 8) + self.supplier_names = ["RiskEnterprise"] + self.utils.supplier.names + + def plot_pareto_front(self, all_objectives: list[tuple[float, float]], + non_dominated_objectives: list[tuple[float, float]]): + """绘制帕累托前沿""" + plt.figure(figsize=self.figsize) + if all_objectives: + c_all = [obj[0] for obj in all_objectives] + t_all = [obj[1] for obj in all_objectives] + plt.scatter(c_all, t_all, color='blue', alpha=0.3, label='All Solutions', zorder=1) + if non_dominated_objectives: + c_nd = [obj[0] for obj in non_dominated_objectives] + t_nd = [obj[1] for obj in non_dominated_objectives] + plt.scatter(c_nd, t_nd, color='red', s=50, label='Pareto Front', zorder=5) + plt.title('Pareto Front: Change Cost vs Tardiness') + plt.xlabel('Change Cost') + plt.ylabel('Tardiness') + plt.legend() + plt.grid(True, alpha=0.5) + plt.savefig('pareto_front.png', dpi=300, bbox_inches='tight') + plt.show() + + def plot_convergence(self, convergence_history: list[tuple[float, float]]): + """绘制收敛趋势图""" + if len(convergence_history) == 0: + return + plt.figure(figsize=self.figsize) + generations = list(range(len(convergence_history))) + costs = [h[0] for h in convergence_history] + tardiness = [h[1] for h in convergence_history] + plt.subplot(2, 1, 1) + plt.plot(generations, costs, 'b-') + plt.title('Convergence History') + plt.ylabel('Average Change Cost') + plt.grid(True, alpha=0.5) + plt.subplot(2, 1, 2) + plt.plot(generations, tardiness, 'r-') + plt.xlabel('Generation') + plt.ylabel('Average Tardiness') + plt.grid(True, alpha=0.5) + plt.tight_layout() + plt.savefig('convergence_history.png', dpi=300, bbox_inches='tight') + plt.show() + + def print_solution_details(self, solution: np.ndarray, objectives: tuple[float, float]): + """打印单个解的详细信息(含数量校验)""" + enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(solution) + split_points = np.cumsum(self.utils.material_enterprise_count) + print(f"\n变更成本: {objectives[0]:.2f}, 交付延期: {objectives[1]:.2f}") + print("=" * 80) + total_q_check = [] + start = 0 + for i in range(self.utils.I): + end = split_points[i] + ents = self.utils.material_optional_enterprises[i] + e_segment = enterprise_layer[start:end] + c_segment = capacity_layer[start:end] + q_segment = quantity_layer[start:end] + demand_q = self.utils.order.Q[i] + print(f"物料 {i} - 需求数量: {demand_q}, 分配总量: {np.sum(q_segment[e_segment==1]):.2f}") + total_q_check.append(abs(np.sum(q_segment[e_segment==1]) - demand_q) < 1e-2) + print(f" 选择的企业及其分配:") + for idx, ent in enumerate(ents): + if e_segment[idx] == 1: + ent_name = self.supplier_names[ent] + cap = c_segment[idx] + qty = q_segment[idx] + print(f" 企业: {ent_name}, 产能: {cap:.2f}, 分配数量: {qty:.2f}") + start = end + print("-" * 80) + if all(total_q_check): + print("✅ 所有物料数量满足需求约束") + else: + print("❌ 部分物料数量未满足需求约束") + + def print_pareto_solutions(self, population: np.ndarray, objectives: list[tuple[float, float]]): + """打印帕累托前沿解的详细信息""" + print("\n" + "=" * 100) + print("帕累托前沿解详细方案") + print("=" * 100) + for i in range(min(5, len(population))): + print(f"\n===== 最优解 {i+1} =====") + self.print_solution_details(population[i], objectives[i]) \ No newline at end of file