OrderReallocation-HeavyTruc.../objective_calculator.py

184 lines
8.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config
from chromosome_utils import ChromosomeUtils
class ObjectiveCalculator:
"""目标函数计算器:计算双目标值(变更成本 + 交付延期)"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData,
utils: ChromosomeUtils, config: Config):
"""
初始化计算器
:param order_data: 订单数据
:param risk_data: 风险企业数据
:param supplier_data: 供应商数据
:param utils: 染色体工具类
:param config: 算法配置(惩罚系数等)
"""
self.order = order_data
self.risk = risk_data
self.supplier = supplier_data
self.utils = utils
self.config = config
# 预计算物料企业编码的分割点(提高效率)
self.split_points = np.cumsum(utils.material_enterprise_count)
def calculate_objectives(self, chromosome: np.ndarray) -> tuple[float, float]:
"""
计算双目标值
:param chromosome: 染色体(解)
:return: (变更成本C, 交付延期T)
"""
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome)
# 计算变更成本和交付延期
C = self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer)
T = self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer)
return C, T
def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算变更成本 C = C1 + C2 + C3 + C4
- C1: 变更惩罚成本(使用供应商替代风险企业的惩罚)
- C2: 采购成本差异(变更后 - 原始)
- C3: 运输成本差异(变更后 - 原始)
- C4: 提前交付惩罚成本
"""
C1 = 0.0
C2 = 0.0
C3 = 0.0
C4 = 0.0
# 原始成本(全部由风险企业生产时的成本)
original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I))
original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I))
# 变更后成本(当前解的成本)
new_purchase_cost = 0.0
new_transport_cost = 0.0
risk_production = np.zeros(self.order.I) # 风险企业生产的数量
supplier_production = np.zeros(self.order.I) # 供应商生产的数量
start = 0
for i in range(self.order.I): # 遍历每种物料
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i] # 可选企业
e_segment = enterprise_layer[start:end] # 企业选择状态
q_segment = quantity_layer[start:end] # 数量分配
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理被选中的企业
q = q_segment[idx] # 分配的数量
if ent == 0: # 风险企业
risk_production[i] += q
new_purchase_cost += q * self.order.P0[i] # 采购成本
new_transport_cost += q * self.order.T0[i] # 运输成本
else: # 供应商
supplier_id = ent - 1
supplier_production[i] += q
new_purchase_cost += q * self.supplier.P_ij[supplier_id][i] # 采购成本
new_transport_cost += q * self.supplier.T_ij[supplier_id][i] # 运输成本
start = end
# 计算C1变更惩罚成本对供应商生产的部分收取惩罚
for i in range(self.order.I):
# 惩罚系数×供应商生产数量×(风险企业的单位采购+运输成本)
C1 += self.config.delta * supplier_production[i] * (self.order.P0[i] + self.order.T0[i])
# 计算C2采购成本差异变更后 - 原始)
C2 = new_purchase_cost - original_purchase_cost
# 计算C3运输成本差异变更后 - 原始)
C3 = new_transport_cost - original_transport_cost
# 计算C4提前交付惩罚成本若实际交货期早于需求交货期
actual_delivery_time = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
if actual_delivery_time < self.order.Dd:
# 计算风险企业和供应商的交货时间
risk_delivery = [] # 风险企业的各物料交货时间
supplier_deliveries = {} # {供应商ID: 各物料交货时间}
start = 0
for i in range(self.order.I):
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i]
e_segment = enterprise_layer[start:end]
c_segment = capacity_layer[start:end] # 产能(用于计算生产时间)
q_segment = quantity_layer[start:end] # 数量
for idx, ent in enumerate(ents):
if e_segment[idx] == 1:
q = q_segment[idx]
c = c_segment[idx]
# 生产时间 = 数量 / 产能产能为0时按0处理
production_time = q / c if c != 0 else 0
# 运输时间 = 距离 / 运输速度
if ent == 0:
transport_time = self.risk.distance / self.order.transport_speed
risk_delivery.append(production_time + transport_time)
else:
supplier_id = ent - 1
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
if supplier_id not in supplier_deliveries:
supplier_deliveries[supplier_id] = []
supplier_deliveries[supplier_id].append(production_time + transport_time)
start = end
# 风险企业的最大交货时间(取最长)
D0 = max(risk_delivery) if risk_delivery else 0
# 所有供应商的最大交货时间之和
Dj_sum = sum(max(times) for times in supplier_deliveries.values()) if supplier_deliveries else 0
# 提前交付惩罚 = 惩罚系数 ×(需求交货期 - 风险企业交货时间 + 供应商总交货时间)
C4 = self.config.gamma * ((self.order.Dd - D0) + Dj_sum)
return C1 + C2 + C3 + C4
def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算实际交货期:所有企业中最长的(生产时间 + 运输时间)
:return: 实际交货期
"""
max_time = 0.0 # 最大时间(实际交货期)
start = 0
for i in range(self.order.I):
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i]
e_segment = enterprise_layer[start:end]
c_segment = capacity_layer[start:end] # 产能
q_segment = quantity_layer[start:end] # 数量
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理选中的企业
q = q_segment[idx]
c = c_segment[idx]
# 生产时间 = 数量 / 产能产能为0时按0处理
production_time = q / c if c != 0 else 0
# 运输时间
if ent == 0:
transport_time = self.risk.distance / self.order.transport_speed
else:
supplier_id = ent - 1
transport_time = self.supplier.distance[supplier_id] / self.order.transport_speed
# 总时间 = 生产时间 + 运输时间
total_time = production_time + transport_time
if total_time > max_time:
max_time = total_time # 更新最大时间
start = end
return max_time
def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算交付延期max(0, 实际交货期 - 需求交货期)
:return: 延期时间(非负)
"""
actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
return max(0.0, actual_delivery - self.order.Dd)