OrderReallocation-HeavyTruc.../objective_calculator.py

180 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import numpy as np
import math
from data_structures import OrderData, RiskEnterpriseData, SupplierData, Config
from chromosome_utils import ChromosomeUtils
class ObjectiveCalculator:
"""目标函数计算器:计算双目标值(变更成本 + 交付延期)(整数化)"""
def __init__(self, order_data: OrderData, risk_data: RiskEnterpriseData, supplier_data: SupplierData,
utils: ChromosomeUtils, config: Config):
"""
初始化计算器
:param order_data: 订单数据
:param risk_data: 风险企业数据
:param supplier_data: 供应商数据
:param utils: 染色体工具类
:param config: 算法配置(惩罚系数等)
"""
self.order = order_data
self.risk = risk_data
self.supplier = supplier_data
self.utils = utils
self.config = config
# 预计算物料企业编码的分割点(提高效率)
self.split_points = np.cumsum(utils.material_enterprise_count)
def calculate_objectives(self, chromosome: np.ndarray) -> tuple[int, int]:
"""
计算双目标值(整数)
:param chromosome: 染色体(解)
:return: (变更成本C, 交付延期T)(均为整数)
"""
# 拆分染色体为三层
enterprise_layer, capacity_layer, quantity_layer = self.utils._split_chromosome(chromosome)
# 计算变更成本和交付延期(整数)
C = math.ceil(self._calculate_change_cost(enterprise_layer, capacity_layer, quantity_layer))
T = math.ceil(self._calculate_tardiness(enterprise_layer, capacity_layer, quantity_layer))
return int(C), int(T)
def _calculate_change_cost(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> float:
"""
计算变更成本 C = C1 + C2 + C3 + C4按新规则实现整数化前
"""
C1 = 0.0
C2 = 0.0
C3 = 0.0
C4 = 0.0
# -------------------------- 基础数据计算(复用+新增)--------------------------
# 1. 原始成本(全风险企业生产时的成本)
original_purchase_cost = sum(self.order.Q[i] * self.order.P0[i] for i in range(self.order.I))
original_transport_cost = sum(self.order.Q[i] * self.order.T0[i] for i in range(self.order.I))
original_total_cost = original_purchase_cost + original_transport_cost # 全风险生产总成本用于C4
# 2. 变更后成本(当前解的成本)
new_purchase_cost = 0.0
new_transport_cost = 0.0
# 3. 关键变量收集(风险企业/供应商的产量、交货时间)
risk_production = np.zeros(self.order.I, dtype=int) # 风险企业生产的各物料数量xi0整数
supplier_production = np.zeros(self.order.I, dtype=int) # 供应商生产的各物料数量Qi - xi0整数
risk_delivery_times = [] # 风险企业的各物料交货时间(整数)
supplier_delivery_times = {} # 供应商的各物料交货时间 {供应商ID: [Dij1, Dij2, ...]}(整数)
start = 0
for i in range(self.order.I): # 遍历每种物料
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i] # 可选企业
e_segment = enterprise_layer[start:end] # 企业选择状态
q_segment = quantity_layer[start:end].astype(int) # 数量分配(整数)
c_segment = capacity_layer[start:end].astype(int) # 产能(整数)
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理被选中的企业
q = q_segment[idx] # 分配的数量(整数)
c = c_segment[idx] # 产能(整数)
# 生产时间(整数,向上取整)
production_time = math.ceil(q / c) if c != 0 else 0
if ent == 0: # 风险企业
risk_production[i] += q
# 风险企业的采购/运输成本
new_purchase_cost += q * self.order.P0[i]
new_transport_cost += q * self.order.T0[i]
# 风险企业的交货时间Di0整数
transport_time = math.ceil(self.risk.distance / self.order.transport_speed)
risk_delivery_times.append(production_time + transport_time)
else: # 供应商
supplier_id = ent - 1
supplier_production[i] += q
# 供应商的采购/运输成本
new_purchase_cost += q * self.supplier.P_ij[supplier_id][i]
new_transport_cost += q * self.supplier.T_ij[supplier_id][i]
# 供应商的交货时间Dij整数
transport_time = math.ceil(self.supplier.distance[supplier_id] / self.order.transport_speed)
if supplier_id not in supplier_delivery_times:
supplier_delivery_times[supplier_id] = []
supplier_delivery_times[supplier_id].append(production_time + transport_time)
start = end
# -------------------------- C1变更惩罚成本计算新规则--------------------------
# 计算α的两个分子
sum_xi0 = np.sum(risk_production) # 所有物料的风险企业总产量(整数)
sum_Qi = np.sum(self.order.Q) # 所有物料的订单总需求(整数)
if sum_Qi == 0:
ratio_risk_q = 1.0 # 避免除零
else:
ratio_risk_q = sum_xi0 / sum_Qi
D_original = self.order.Dd # 原定交货时间(整数)
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer) # 实际交货期(整数)
if T_actual == 0:
ratio_delivery = 1.0 # 避免除零
else:
ratio_delivery = D_original / T_actual
# 计算α并应用约束0, 1]≤0取1
alpha = max(ratio_risk_q, ratio_delivery)
alpha = 1.0 if alpha > 1.0 else alpha # 超过1取1
alpha = 1.0 if alpha <= 0.0 else alpha # ≤0取1
# 计算C1按物料求和
for i in range(self.order.I):
supplier_q = supplier_production[i] # 物料i的供应商产量整数
risk_unit_cost = self.order.P0[i] + self.order.T0[i] # 风险企业单位采运成本(整数)
C1 += self.config.delta * alpha * supplier_q * risk_unit_cost
# -------------------------- C2、C3保持原有逻辑--------------------------
C2 = new_purchase_cost - original_purchase_cost # 采购成本差异(整数相关)
C3 = new_transport_cost - original_transport_cost # 运输成本差异(整数相关)
# -------------------------- C4提前交付惩罚成本计算新规则--------------------------
T_actual = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
if T_actual <= self.order.Dd: # 不延期时才计算Q7确认
# 步骤1计算基础值 = 四舍五入(全风险生产总成本 / 需求交货期)
Dd = self.order.Dd
if Dd == 0:
base_value = 0.0
else:
base_value = original_total_cost / Dd
base_value_rounded = round(base_value) # 四舍五入取整
# 步骤2计算风险企业提前天数 = max(0, Dd - D0)D0为风险企业最长交货时间整数
D0 = max(risk_delivery_times) if risk_delivery_times else 0
risk_early_days = max(0, Dd - D0)
# 步骤3计算供应商最大提前天数 = max(0, Dd - Dj)的最大值(整数)
max_supplier_early = 0
for supplier_id, times in supplier_delivery_times.items():
Dj = max(times) if times else 0
supplier_early = max(0, Dd - Dj)
if supplier_early > max_supplier_early:
max_supplier_early = supplier_early
# 步骤4计算C4整数
C4 = base_value_rounded * 0.1 * (risk_early_days + max_supplier_early)
# -------------------------- 总变更成本 --------------------------
return C1 + C2 + C3 + C4
def _calculate_actual_delivery_time(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> int:
"""
计算实际交货期(整数,向上取整)
:return: 实际交货期(整数)
"""
max_time = 0 # 最大时间(实际交货期)
start = 0
for i in range(self.order.I):
end = self.split_points[i]
ents = self.utils.material_optional_enterprises[i]
e_segment = enterprise_layer[start:end]
c_segment = capacity_layer[start:end].astype(int) # 产能(整数)
q_segment = quantity_layer[start:end].astype(int) # 数量(整数)
for idx, ent in enumerate(ents):
if e_segment[idx] == 1: # 仅处理选中的企业
q = q_segment[idx]
c = c_segment[idx]
# 生产时间(整数,向上取整)
production_time = math.ceil(q / c) if c != 0 else 0
# 运输时间(整数,向上取整)
if ent == 0:
transport_time = math.ceil(self.risk.distance / self.order.transport_speed)
else:
supplier_id = ent - 1
transport_time = math.ceil(self.supplier.distance[supplier_id] / self.order.transport_speed)
# 总时间(整数)
total_time = production_time + transport_time
if total_time > max_time:
max_time = total_time # 更新最大时间
start = end
return int(max_time)
def _calculate_tardiness(self, enterprise_layer: np.ndarray, capacity_layer: np.ndarray,
quantity_layer: np.ndarray) -> int:
"""
计算交付延期(整数)
:return: 延期时间(非负整数)
"""
actual_delivery = self._calculate_actual_delivery_time(enterprise_layer, capacity_layer, quantity_layer)
return max(0, actual_delivery - self.order.Dd)