OrderReallocation-HeavyTruc.../objective_calculator.py

211 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config
from chromosome_utils import ChromosomeUtils
class ObjectiveCalculator:
"""目标函数计算器:计算双目标值(变更成本 + 交付延期)"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData,
utils: ChromosomeUtils, config: Config):
"""
初始化计算器
:param order_data: 订单数据
:param risk_data: 风险企业数据
:param supplier_data: 供应商数据
:param utils: 染色体工具类
:param config: 算法配置(惩罚系数等)
"""
self.order = order_data
self.risk = risk_data
self.supplier = supplier_data
self.utils = utils
self.config = config
# 预计算物料企业编码的分割点(提高效率)
self.split_points = np.cumsum(utils.material_enterprise_count)
def calculate_objectives(self, chromosome: np.ndarray) -> tuple[float, float]:
"""
计算双目标值
:param chromosome: 染色体(解)
:return: (变更成本C, 交付延期T)
"""
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome)
# 计算变更成本和交付延期
C = self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer)
T = self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer)
return C, T
def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算变更成本 C = C1 + C2 + C3 + C4按新规则实现
- C1: 变更惩罚成本(含α系数)
- C2: 采购成本差异(保持原有逻辑)
- C3: 运输成本差异(保持原有逻辑)
- C4: 提前交付惩罚成本(新公式)
"""
C1 = 0.0
C2 = 0.0
C3 = 0.0
C4 = 0.0
# -------------------------- 基础数据计算(复用+新增)--------------------------
# 1. 原始成本(全风险企业生产时的成本)
original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I))
original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I))
original_total_cost = original_purchase_cost + original_transport_cost # 全风险生产总成本用于C4
# 2. 变更后成本(当前解的成本)
new_purchase_cost = 0.0
new_transport_cost = 0.0
# 3. 关键变量收集(风险企业/供应商的产量、交货时间)
risk_production = np.zeros(self.order.I) # 风险企业生产的各物料数量xi0
supplier_production = np.zeros(self.order.I) # 供应商生产的各物料数量Qi - xi0
risk_delivery_times = [] # 风险企业的各物料交货时间Di0 = 生产时间 + 运输时间)
supplier_delivery_times = {} # 供应商的各物料交货时间 {供应商ID: [Dij1, Dij2, ...]}
start = 0
for i in range(self.order.I): # 遍历每种物料
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i] # 可选企业
e_segment = enterprise_layer[start:end] # 企业选择状态
q_segment = quantity_layer[start:end] # 数量分配
c_segment = capacity_layer[start:end] # 产能(用于计算生产时间)
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理被选中的企业
q = q_segment[idx] # 分配的数量
c = c_segment[idx] # 产能
production_time = q / c if c != 0 else 0 # 生产时间
if ent == 0: # 风险企业
risk_production[i] += q
# 风险企业的采购/运输成本
new_purchase_cost += q * self.order.P0[i]
new_transport_cost += q * self.order.T0[i]
# 风险企业的交货时间Di0
transport_time = self.risk.distance / self.order.transport_speed
risk_delivery_times.append(production_time + transport_time)
else: # 供应商
supplier_id = ent - 1
supplier_production[i] += q
# 供应商的采购/运输成本
new_purchase_cost += q * self.supplier.P_ij[supplier_id][i]
new_transport_cost += q * self.supplier.T_ij[supplier_id][i]
# 供应商的交货时间Dij
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
if supplier_id not in supplier_delivery_times:
supplier_delivery_times[supplier_id] = []
supplier_delivery_times[supplier_id].append(production_time + transport_time)
start = end
# -------------------------- C1变更惩罚成本计算新规则--------------------------
# 计算α的两个分子
sum_xi0 = np.sum(risk_production) # 所有物料的风险企业总产量
sum_Qi = np.sum(self.order.Q) # 所有物料的订单总需求
if sum_Qi == 0:
ratio_risk_q = 1.0 # 避免除零
else:
ratio_risk_q = sum_xi0 / sum_Qi
D_original = self.order.Dd # 原定交货时间Q1确认需求交货期
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) # 实际交货期Q2确认
if T_actual == 0:
ratio_delivery = 1.0 # 避免除零
else:
ratio_delivery = D_original / T_actual
# 计算α并应用约束0, 1]≤0取1
alpha = max(ratio_risk_q, ratio_delivery)
alpha = 1.0 if alpha > 1.0 else alpha # 超过1取1
alpha = 1.0 if alpha <= 0.0 else alpha # ≤0取1
# 计算C1按物料求和
for i in range(self.order.I):
supplier_q = supplier_production[i] # 物料i的供应商产量Qi - xi0
risk_unit_cost = self.order.P0[i] + self.order.T0[i] # 风险企业单位采运成本
C1 += self.config.delta * alpha * supplier_q * risk_unit_cost
# -------------------------- C2、C3保持原有逻辑--------------------------
C2 = new_purchase_cost - original_purchase_cost # 采购成本差异
C3 = new_transport_cost - original_transport_cost # 运输成本差异
# -------------------------- C4提前交付惩罚成本计算新规则--------------------------
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
if T_actual <= self.order.Dd: # 不延期时才计算Q7确认
# 步骤1计算基础值 = 四舍五入(全风险生产总成本 / 需求交货期)
Dd = self.order.Dd
if Dd == 0:
base_value = 0.0
else:
base_value = original_total_cost / Dd
base_value_rounded = round(base_value) # 四舍五入取整
# 步骤2计算风险企业提前天数 = max(0, Dd - D0)D0为风险企业最长交货时间
D0 = max(risk_delivery_times) if risk_delivery_times else 0.0
risk_early_days = max(0.0, Dd - D0)
# 步骤3计算供应商最大提前天数 = max(0, Dd - Dj)的最大值Dj为每个供应商的最长交货时间
max_supplier_early = 0.0
for supplier_id, times in supplier_delivery_times.items():
Dj = max(times) if times else 0.0
supplier_early = max(0.0, Dd - Dj)
if supplier_early > max_supplier_early:
max_supplier_early = supplier_early
# 步骤4计算C4
C4 = base_value_rounded * 0.1 * (risk_early_days + max_supplier_early)
C4 = round(C4) # 最终结果四舍五入取整
# -------------------------- 总变更成本 --------------------------
return C1 + C2 + C3 + C4
def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算实际交货期:所有企业中最长的(生产时间 + 运输时间)
:return: 实际交货期
"""
max_time = 0.0 # 最大时间(实际交货期)
start = 0
for i in range(self.order.I):
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i]
e_segment = enterprise_layer[start:end]
c_segment = capacity_layer[start:end] # 产能
q_segment = quantity_layer[start:end] # 数量
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理选中的企业
q = q_segment[idx]
c = c_segment[idx]
# 生产时间 = 数量 / 产能产能为0时按0处理
production_time = q / c if c != 0 else 0
# 运输时间
if ent == 0:
transport_time = self.risk.distance / self.order.transport_speed
else:
supplier_id = ent - 1
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
# 总时间 = 生产时间 + 运输时间
total_time = production_time + transport_time
if total_time > max_time:
max_time = total_time # 更新最大时间
start = end
return max_time
def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算交付延期max(0, 实际交货期 - 需求交货期)
:return: 延期时间(非负)
"""
actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
return max(0.0, actual_delivery - self.order.Dd)