Compare commits

..

No commits in common. "master" and "1.2" have entirely different histories.
master ... 1.2

8 changed files with 246 additions and 466 deletions

View File

@ -1,4 +1,5 @@
import numpy as np import numpy as np
from Job import Job from Job import Job
from Machine import Machine_Time_window from Machine import Machine_Time_window
@ -15,8 +16,7 @@ class Decode:
self.J = J self.J = J
self.Machines = [] # 存储机器类 self.Machines = [] # 存储机器类
self.Scheduled = [] # 已经排产过的工序 self.Scheduled = [] # 已经排产过的工序
self.fitness = 0 # 适应度(最大完工时间) self.fitness = 0 # 适应度
self.Machine_Load = np.zeros(M_num, dtype=int) # 机器总负载
self.Machine_State = np.zeros(M_num, dtype=int) # 在机器上加工的工件是哪个 self.Machine_State = np.zeros(M_num, dtype=int) # 在机器上加工的工件是哪个
self.Jobs = [] # 存储工件类 self.Jobs = [] # 存储工件类
for j in range(M_num): for j in range(M_num):
@ -32,7 +32,7 @@ class Decode:
Site = 0 Site = 0
# 按照基因的MS部分按工件序号划分 # 按照基因的MS部分按工件序号划分
for S_i in self.J.values(): for S_i in self.J.values():
Ms_decompose.append(MS[Site:Site + S_i]) # 对MS按照每道工序数进行切分 Ms_decompose.append(MS[Site:Site + S_i]) #对MS按照每道工序数进行切分
Site += S_i Site += S_i
for i in range(len(Ms_decompose)): # len(Ms_decompose)表示工件数 for i in range(len(Ms_decompose)): # len(Ms_decompose)表示工件数
JM_i = [] JM_i = []
@ -68,11 +68,11 @@ class Decode:
for le_i in range(len(M_Tlen)): for le_i in range(len(M_Tlen)):
# 当前空格时间比加工时间大可插入 # 当前空格时间比加工时间大可插入
if M_Tlen[le_i] >= P_t: if M_Tlen[le_i] >= P_t:
# 当前空格开始时间比该工件上一工序结束时间大可插入该空格 # 当前空格开始时间比该工件上一工序结束时间大可插入该空格,以空格开始时间为这一工序开始
if M_Tstart[le_i] >= last_O_end: if M_Tstart[le_i] >= last_O_end:
ealiest_start = M_Tstart[le_i] ealiest_start = M_Tstart[le_i]
break break
# 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入 # 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入该工序,以该工序的上一工序的结束为开始
if M_Tstart[le_i] < last_O_end and M_Tend[le_i] - last_O_end >= P_t: if M_Tstart[le_i] < last_O_end and M_Tend[le_i] - last_O_end >= P_t:
ealiest_start = last_O_end ealiest_start = last_O_end
break break
@ -80,46 +80,24 @@ class Decode:
End_work_time = M_Ealiest + P_t # 当前工件当前工序的结束时间 End_work_time = M_Ealiest + P_t # 当前工件当前工序的结束时间
return M_Ealiest, Selected_Machine, P_t, O_num, last_O_end, End_work_time return M_Ealiest, Selected_Machine, P_t, O_num, last_O_end, End_work_time
# 解码操作适配新编码前半部分为OS后半部分为MS # 解码操作
def decode(self, CHS, Len_Chromo): def decode(self, CHS, Len_Chromo):
""" """
:param CHS: 种群基因 :param CHS: 种群基因
:param Len_Chromo: 工序总数OS和MS各占一半长度 :param Len_Chromo: MS与OS的分解线
:return: 双目标值 [最大加工时间, 负载标准差] :return: 适应度即最大加工时间
""" """
# 重置状态 MS = list(CHS[0:Len_Chromo])
self.fitness = 0 OS = list(CHS[Len_Chromo:2 * Len_Chromo])
self.Machine_Load = np.zeros(self.M_num, dtype=int)
for machine in self.Machines:
machine.assigned_task = []
machine.O_start = []
machine.O_end = []
machine.End_time = 0
for job in self.Jobs:
job.Processed = []
job.J_start = []
job.J_end = []
job.J_machine = []
job.Last_Processing_Machine = None
job.Last_Processing_end_time = 0
OS = list(CHS[0:Len_Chromo]) # 前半部分为工件排列
MS = list(CHS[Len_Chromo:2 * Len_Chromo]) # 后半部分为机器索引
Needed_Matrix = self.Order_Matrix(MS) Needed_Matrix = self.Order_Matrix(MS)
JM, TM = Needed_Matrix[0], Needed_Matrix[1] JM = Needed_Matrix[0]
for i in OS: for i in OS:
Job = i Job = i
O_num = self.Jobs[Job].Current_Processed() # 现在加工的工序 O_num = self.Jobs[Job].Current_Processed() # 现在加工的工序
Machine = JM[Job][O_num] # 确定加工机器 Machine = JM[Job][O_num] # 用基因的OS部分的工件序号以及工序序号索引机器顺序矩阵的机器序号
Process_time = TM[Job][O_num] # 确定加工时间
Para = self.Earliest_Start(Job, O_num, Machine) Para = self.Earliest_Start(Job, O_num, Machine)
self.Jobs[Job]._Input(Para[0], Para[5], Para[1]) # 工件完成该工序 self.Jobs[Job]._Input(Para[0], Para[5], Para[1]) # 工件完成该工序
if Para[5] > self.fitness: if Para[5] > self.fitness:
self.fitness = Para[5] self.fitness = Para[5]
self.Machines[Machine]._Input(Job, Para[0], Para[2], Para[3]) # 机器完成该工件该工序 self.Machines[Machine]._Input(Job, Para[0], Para[2], Para[3]) # 机器完成该工件该工序
self.Machine_Load[Machine] += Process_time # 累加机器负载 return self.fitness
# 计算负载均衡度(标准差)
load_std = np.std(self.Machine_Load)
return [self.fitness, load_std]

149
Encode.py
View File

@ -1,4 +1,5 @@
import random import random
import numpy as np import numpy as np
@ -7,7 +8,7 @@ class Encode:
""" """
:param Matrix: 机器加工时间矩阵 :param Matrix: 机器加工时间矩阵
:param Pop_size: 种群数量 :param Pop_size: 种群数量
:param J: 各工件对应的工序数字典 :param J: 各工件对应的工序数
:param J_num: 工件数 :param J_num: 工件数
:param M_num: 机器数 :param M_num: 机器数
""" """
@ -16,25 +17,24 @@ class Encode:
self.J_num = J_num self.J_num = J_num
self.M_num = M_num self.M_num = M_num
self.CHS = [] self.CHS = []
# 调整初始化比例增加随机选择比例到50% self.GS_num = int(0.6 * Pop_size) # 全局选择初始化
self.GS_num = int(0.3 * Pop_size) # 全局选择初始化
self.LS_num = int(0.2 * Pop_size) # 局部选择初始化 self.LS_num = int(0.2 * Pop_size) # 局部选择初始化
self.RS_num = int(0.5 * Pop_size) # 随机选择初始化(提高比例) self.RS_num = int(0.2 * Pop_size) # 随机选择初始化
self.Len_Chromo = 0 self.Len_Chromo = 0
for i in J.values(): for i in J.values():
self.Len_Chromo += i # 遍历字典J的所有值并求和即工序数之和 self.Len_Chromo += i # 遍历字典J的所有值并求和即工序数之和
# 生成工序准备的部分(工件排列,天然满足工序顺序约束) # 生成工序准备的部分
def OS_List(self): def OS_List(self):
OS_list = [] OS_list = []
for k, v in self.J.items(): # 遍历字典J的所有键值对初始化工序矩阵 for k, v in self.J.items(): # 遍历字典J的所有键值对初始化工序矩阵
OS_add = [k - 1 for j in range(v)] # 工件索引从0开始 OS_add = [k - 1 for j in range(v)]
OS_list.extend(OS_add) OS_list.extend(OS_add)
return OS_list return OS_list
# 生成初始化矩阵 # 生成初始化矩阵
def CHS_Matrix(self, C_num): def CHS_Matrix(self, C_num):
return np.zeros([C_num, self.Len_Chromo * 2], dtype=int) # 工件排列+机器索引 return np.zeros([C_num, self.Len_Chromo], dtype=int)
# 定位每个工件的每道工序的位置Job第几个工件Operation第几道工序 # 定位每个工件的每道工序的位置Job第几个工件Operation第几道工序
def Site(self, Job, Operation): def Site(self, Job, Operation):
@ -48,74 +48,89 @@ class Encode:
# 全局初始化 # 全局初始化
def Global_initial(self): def Global_initial(self):
CHS = self.CHS_Matrix(self.GS_num) MS = self.CHS_Matrix(self.GS_num) # 根据GS_num生成种群
OS_list = self.OS_List() OS_list = self.OS_List()
OS = self.CHS_Matrix(self.GS_num)
for i in range(self.GS_num): for i in range(self.GS_num):
random.shuffle(OS_list) Machine_time = np.zeros(self.M_num, dtype=int) # 步骤1 生成一个整型数组长度为机器数且初始化每个元素为0
OS = OS_list.copy() random.shuffle(OS_list) # 生成工序排序部分
MS = [0] * self.Len_Chromo OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行因为有一个种群第i则是赋值在OS的第i行以此生成完整的OS
GJ_list = [i_1 for i_1 in range(self.J_num)] # 生成工件集
Machine_time = np.zeros(self.M_num, dtype=int) random.shuffle(GJ_list) # 随机打乱工件集,为的是下一步可以随机抽出第一个工件
GJ_list = [i_1 for i_1 in range(self.J_num)] for g in GJ_list: # 选择第一个工件(由于上一步已经打乱工件集,抽出第一个也是“随机”)
random.shuffle(GJ_list) h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
for j in range(len(h)): # 从此工件的第一个工序开始
for g in GJ_list: D = h[j] # D为第一个工件的第一个工序对应的时间矩阵
h = self.Matrix[g] List_Machine_weizhi = []
for j in range(len(h)): for k in range(len(D)): # 确定工序可用的机器位于第几个位置
D = h[j] Useing_Machine = D[k]
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999] if Useing_Machine != 9999:
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi] List_Machine_weizhi.append(k)
Min_time = min(Machine_Select) Machine_Select = []
K = Machine_Select.index(Min_time) for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
MS[self.Site(g, j)] = K Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]] Min_time = min(Machine_Select) # 选出时间最小的机器
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器并非Mk
CHS[i] = np.hstack((OS, MS)) # 工件排列 + 机器索引(紧凑编码) I = List_Machine_weizhi[K] # 所有机器里的第I个机器即Mi
return CHS Machine_time[I] += Min_time # 相应的机器位置加上最小时间
site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去 即生成MS的染色体
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
return CHS1
# 局部初始化 # 局部初始化
def Local_initial(self): def Local_initial(self):
CHS = self.CHS_Matrix(self.LS_num) MS = self.CHS_Matrix(self.LS_num) # 根据LS_num生成局部选择的种群大小
OS_list = self.OS_List() OS_list = self.OS_List()
OS = self.CHS_Matrix(self.LS_num)
for i in range(self.LS_num): for i in range(self.LS_num):
random.shuffle(OS_list) random.shuffle(OS_list) # (随机打乱)生成工序排序部分
OS = OS_list.copy() OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行因为有一个种群第i则是赋值在OS的第i行以此生成完整的OS
MS = [0] * self.Len_Chromo GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
for g in GJ_List: # 选择第一个工件(注意:不用随机打乱了)
Machine_time = np.zeros(self.M_num,
dtype=int) # 设置一个整型数组 并初始化每一个元素为0由于局部初始化每个工件的所有工序结束后都要重新初始化所以和全局初始化不同此步骤应放在此处
h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
for j in range(len(h)): # 从选择的工件的第一个工序开始
D = h[j] # 此工件第一个工序对应的机器加工时间矩阵
List_Machine_weizhi = []
for k in range(len(D)): # 确定工序可用的机器位于第几个位置
Useing_Machine = D[k]
if Useing_Machine != 9999:
List_Machine_weizhi.append(k)
Machine_Select = []
for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
Min_time = min(Machine_Select) # 选出这些时间里最小的
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器并非Mk
I = List_Machine_weizhi[K] # 所有机器里的第I个机器即Mi
Machine_time[I] += Min_time
site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
return CHS1
GJ_List = [i_1 for i_1 in range(self.J_num)] # 随机初始化
for g in GJ_List:
Machine_time = np.zeros(self.M_num, dtype=int)
h = self.Matrix[g]
for j in range(len(h)):
D = h[j]
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi]
Min_time = min(Machine_Select)
K = Machine_Select.index(Min_time)
MS[self.Site(g, j)] = K
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]]
CHS[i] = np.hstack((OS, MS))
return CHS
# 随机初始化提高比例到50%
def Random_initial(self): def Random_initial(self):
CHS = self.CHS_Matrix(self.RS_num) MS = self.CHS_Matrix(self.RS_num) # 根据RS_num生成随机选择的种群大小
OS_list = self.OS_List() OS_list = self.OS_List()
OS = self.CHS_Matrix(self.RS_num)
for i in range(self.RS_num): for i in range(self.RS_num):
random.shuffle(OS_list) random.shuffle(OS_list)
OS = OS_list.copy() OS[i] = np.array(OS_list)
MS = [0] * self.Len_Chromo GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
for g in GJ_List: # 选择第一个工件
for g in range(self.J_num):
h = self.Matrix[g] h = self.Matrix[g]
for j in range(len(h)): for j in range(len(h)): # 选择第一个工件的第一个工序
D = h[j] D = h[j] # 此工件第一个工序可加工的机器对应的时间矩阵
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999] List_Machine_weizhi = []
# 随机选择可用机器 for k in range(len(D)):
selected = random.choice(List_Machine_weizhi) Useing_Machine = D[k]
K = List_Machine_weizhi.index(selected) if Useing_Machine != 9999:
MS[self.Site(g, j)] = K List_Machine_weizhi.append(k)
number = random.choice(List_Machine_weizhi) # 从可选择的机器编号中随机选择一个(此编号就是机器编号)
CHS[i] = np.hstack((OS, MS)) K = List_Machine_weizhi.index(number) # 即为该工序可选择的机器里的第K个机器并非Mk
return CHS site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
CHS1 = np.hstack((MS, OS))
return CHS1

219
GA.py
View File

@ -9,156 +9,143 @@ from Instance import *
class GA(): class GA():
def __init__(self): def __init__(self):
self.Pop_size = 500 # 种群数量 self.Pop_size = 300 # 种群数量
self.Pc = 0.8 # 交叉概率 self.Pc = 0.8 # 交叉概率
self.Pm = 0.15 # 降低变异概率到0.15 self.Pm = 0.3 # 变异概率
self.Pv = 0.5 # 选择何种方式进行交叉的概率阈值 self.Pv = 0.5 # 选择何种方式进行交叉的概率阈值
self.Pw = 0.95 # 选择何种方式进行变异的概率阈值 self.Pw = 0.95 # 选择何种方式进行变异的概率阈值
self.Max_Itertions = 100 self.Max_Itertions = 20 # 最大迭代次数
# 适应度 # 适应度
def fitness(self, CHS, J, Processing_time, M_num, Len): def fitness(self, CHS, J, Processing_time, M_num, Len):
Fit = [] Fit = []
for i in range(len(CHS)): for i in range(len(CHS)):
d = Decode(J, Processing_time, M_num) # 实例化一个解码器,传入问题参数 d = Decode(J, Processing_time, M_num) #实例化一个解码器,传入问题参数
Fit.append(d.decode(CHS[i], Len)) Fit.append(d.decode(CHS[i], Len))
return Fit return Fit
def pmx_crossover(self, parent1, parent2, length):
if length <= 1:
return parent1.copy(), parent2.copy()
# 确保交叉区间[a, b]有效a < b
a = random.randint(0, length - 2)
b = random.randint(a + 1, length - 1)
child1 = parent1.copy()
child2 = parent2.copy()
# 建立映射表parent1[a:b+1]与parent2[a:b+1]的对应关系)
map1 = {parent1[i]: parent2[i] for i in range(a, b + 1)} # p1→p2的映射
map2 = {parent2[i]: parent1[i] for i in range(a, b + 1)} # p2→p1的映射
# 处理child1交叉区间外的元素替换
for i in range(length):
if i < a or i > b:
val = child1[i]
# 限制循环次数避免无限循环最多循环len(map1)次)
loop_count = 0
max_loop = len(map1)
while val in map1 and map1[val] in parent1[a:b + 1] and loop_count < max_loop:
val = map1[val]
loop_count += 1
child1[i] = map1.get(val, val) # 若超出循环次数直接使用当前val
# 处理child2交叉区间外的元素替换
for i in range(length):
if i < a or i > b:
val = child2[i]
loop_count = 0
max_loop = len(map2)
while val in map2 and map2[val] in parent2[a:b + 1] and loop_count < max_loop:
val = map2[val]
loop_count += 1
child2[i] = map2.get(val, val)
return child1, child2
# 机器部分交叉 # 机器部分交叉
def machine_cross(self, CHS1, CHS2, T0): def machine_cross(self, CHS1, CHS2, T0):
""" """
:param CHS1: 基因1 :param CHS1: 机器选择部分的基因1
:param CHS2: 基因2 :param CHS2: 机器选择部分的基因2
:param T0: 工序总数 :param T0: 工序总数
:return: 交叉后的基因 :return: 交叉后的机器选择部分的基因
""" """
OS1, MS1 = CHS1[:T0], CHS1[T0:] T_r = [j for j in range(T0)]
OS2, MS2 = CHS2[:T0], CHS2[T0:] r = random.randint(1, 10) # 在区间[1,T0]内产生一个整数r
random.shuffle(T_r) #直接打乱T_r原地修改
# 随机选择交叉位置 R = T_r[0:r] # 按照随机数r产生r个互不相等的整数
T_r = list(range(T0)) OS_1 = CHS1[O_num:2 * T0]
random.shuffle(T_r) OS_2 = CHS2[O_num:2 * T0]
r = random.randint(1, T0 // 2) # 交叉部分长度 MS_1 = CHS2[0:T0]
R = T_r[:r] MS_2 = CHS1[0:T0]
# 交换机器选择部分
for i in R: for i in R:
MS1[i], MS2[i] = MS2[i], MS1[i] K, K_2 = MS_1[i], MS_2[i]
MS_1[i], MS_2[i] = K_2, K
return np.hstack((OS1, MS1)), np.hstack((OS2, MS2)) CHS1 = np.hstack((MS_1, OS_1))
CHS2 = np.hstack((MS_2, OS_2))
# 工序部分交叉使用PMX交叉
def operation_cross(self, CHS1, CHS2, T0, J_num):
OS1 = list(CHS1[T0:2 * T0]) # 确保转换为列表
OS2 = list(CHS2[T0:2 * T0])
MS1 = CHS1[0:T0].copy()
MS2 = CHS2[0:T0].copy()
# 调用修复后的PMX交叉
new_os1, new_os2 = self.pmx_crossover(OS1, OS2, T0)
CHS1 = np.hstack((MS1, new_os1))
CHS2 = np.hstack((MS2, new_os2))
return CHS1, CHS2 return CHS1, CHS2
# 机器部分变异(仅在可用机器集中选择) # 工序部分交叉
def operation_cross(self, CHS1, CHS2, T0, J_num):
"""
:param CHS1: 工序选择部分的基因1
:param CHS2: 工序选择部分的基因2
:param T0: 工序总数
:param J_num: 工件总数
:return: 交叉后的工序选择部分的基因
"""
OS_1 = CHS1[T0:2 * T0]
OS_2 = CHS2[T0:2 * T0]
MS_1 = CHS1[0:T0]
MS_2 = CHS2[0:T0]
Job_list = [i for i in range(J_num)]
random.shuffle(Job_list)
r = random.randint(1, J_num - 1)
Set1 = Job_list[0:r]
new_os = list(np.zeros(T0, dtype=int))
for k, v in enumerate(OS_1):
if v in Set1:
new_os[k] = v + 1
for i in OS_2:
if i not in Set1:
Site = new_os.index(0)
new_os[Site] = i + 1
new_os = np.array([j - 1 for j in new_os])
CHS1 = np.hstack((MS_1, new_os))
CHS2 = np.hstack((MS_2, new_os))
return CHS1, CHS2
# 机器部分变异
def machine_variation(self, CHS, O, T0, J): def machine_variation(self, CHS, O, T0, J):
""" """
:param CHS: 基因 :param CHS: 机器选择部分的基因
:param O: 加工时间矩阵 :param O: 加工时间矩阵
:param T0: 工序总数 :param T0: 工序总数
:param J: 各工件加工信息 :param J: 各工件加工信息
:return: 变异后的基因 :return: 变异后的机器选择部分的基因
""" """
OS = CHS[:T0] Tr = [i_num for i_num in range(T0)]
MS = CHS[T0:] MS = CHS[0:T0]
OS = CHS[T0:2 * T0]
# 确定变异位置数量 # 机器选择部分
r = random.randint(1, max(1, T0 // 10)) # 最多变异10%的位置 r = random.randint(1, T0 - 1) # 在变异染色体中选择r个位置
positions = random.sample(range(T0), r) random.shuffle(Tr)
T_r = Tr[0:r]
for pos in positions: for num in T_r:
# 找到该位置对应的工件和工序 T_0 = [j for j in range(T0)]
site = 0 K = []
job = -1 Site = 0
op = -1
for k, v in J.items(): for k, v in J.items():
if site + v > pos: K.append(T_0[Site:Site + v])
job = k - 1 # 转换为0索引 Site += v
op = pos - site for i in range(len(K)):
if num in K[i]:
O_i = i
O_j = K[i].index(num)
break break
site += v Machine_using = O[O_i][O_j]
Machine_time = []
# 获取该工序的可用机器 for j in Machine_using:
D = O[job][op] if j != 9999:
available_machines = [k for k, val in enumerate(D) if val != 9999] Machine_time.append(j)
if len(available_machines) <= 1: Min_index = Machine_time.index(min(Machine_time))
continue # 只有一个可用机器时不变异 MS[num] = Min_index
CHS = np.hstack((MS, OS))
# 随机选择一个不同的可用机器 return CHS
current_idx = MS[pos]
current_machine = available_machines[current_idx]
new_machine = random.choice([m for m in available_machines if m != current_machine])
MS[pos] = available_machines.index(new_machine)
return np.hstack((OS, MS))
# 工序部分变异 # 工序部分变异
def operation_variation(self, CHS, T0, J_num, J, O, M_num): def operation_variation(self, CHS, T0, J_num, J, O, M_num):
""" """
:param CHS: 基因 :param CHS: 工序选择部分的基因
:param T0: 工序总数 :param T0: 工序总数
:param J_num: 工件总数 :param J_num: 工件总数
:param J: 各工件加工信息 :param J: 各工件加工信息
:param O: 加工时间矩阵 :param O: 加工时间矩阵
:param M_num: 机器总数 :param M_num: 机器总数
:return: 变异后的基因 :return: 变异后的工序选择部分的基因
""" """
OS = list(CHS[:T0]) MS = CHS[0:T0]
MS = CHS[T0:] OS = list(CHS[T0:2 * T0])
r = random.randint(1, J_num - 1)
# 随机选择两个位置交换 Tr = [i for i in range(J_num)]
i, j = random.sample(range(T0), 2) random.shuffle(Tr)
OS[i], OS[j] = OS[j], OS[i] Tr = Tr[0:r]
J_os = dict(enumerate(OS)) # 随机选择r个不同的基因
return np.hstack((OS, MS)) J_os = sorted(J_os.items(), key=lambda d: d[1])
Site = []
for i in range(r):
Site.append(OS.index(Tr[i]))
A = list(itertools.permutations(Tr, r))
A_CHS = []
for i in range(len(A)):
for j in range(len(A[i])):
OS[Site[j]] = A[i][j]
C_I = np.hstack((MS, OS))
A_CHS.append(C_I)
Fit = []
for i in range(len(A_CHS)):
d = Decode(J, O, M_num)
Fit.append(d.decode(CHS, T0))
return A_CHS[Fit.index(min(Fit))]

2
Job.py
View File

@ -30,4 +30,4 @@ class Job:
self.Processed.append(1) self.Processed.append(1)
self.J_start.append(W_Eailiest) self.J_start.append(W_Eailiest)
self.J_end.append(End_time) self.J_end.append(End_time)
self.J_machine.append(Machine) self.J_machine.append(Machine)

View File

@ -50,4 +50,4 @@ class Machine_Time_window:
self.O_start.sort() self.O_start.sort()
self.O_end.append(M_Ealiest + P_t) self.O_end.append(M_Ealiest + P_t)
self.O_end.sort() self.O_end.sort()
self.End_time = self.O_end[-1] self.End_time = self.O_end[-1]

111
NSGA2.py
View File

@ -1,111 +0,0 @@
import random
import numpy as np
class NSGA2:
def __init__(self, pop_size, obj_num):
self.pop_size = pop_size
self.obj_num = obj_num
def fast_non_dominated_sort(self, pop_obj):
"""快速非支配排序"""
pop_size = len(pop_obj)
dominated = [[] for _ in range(pop_size)] # 被支配个体列表
rank = [0] * pop_size # 个体的非支配等级
n = [0] * pop_size # 支配该个体的个体数量
# 计算每个个体的支配关系
for p in range(pop_size):
for q in range(pop_size):
if p != q:
# p支配q
if all(pop_obj[p][i] <= pop_obj[q][i] for i in range(self.obj_num)) and \
any(pop_obj[p][i] < pop_obj[q][i] for i in range(self.obj_num)):
dominated[p].append(q)
# q支配p
elif all(pop_obj[q][i] <= pop_obj[p][i] for i in range(self.obj_num)) and \
any(pop_obj[q][i] < pop_obj[p][i] for i in range(self.obj_num)):
n[p] += 1
# 找到等级为0的个体
if n[p] == 0:
rank[p] = 0
# 计算其他等级
current_rank = 0
while True:
next_rank = []
for p in range(pop_size):
if rank[p] == current_rank:
for q in dominated[p]:
n[q] -= 1
if n[q] == 0 and rank[q] == 0:
rank[q] = current_rank + 1
next_rank.append(q)
if not next_rank:
break
current_rank += 1
return rank
def crowding_distance(self, pop_obj, rank):
"""计算拥挤度距离"""
pop_size = len(pop_obj)
distance = [0.0] * pop_size
max_rank = max(rank) if rank else 0
# 对每个等级的个体计算拥挤度
for r in range(max_rank + 1):
# 获取当前等级的个体索引
current_indices = [i for i in range(pop_size) if rank[i] == r]
if len(current_indices) <= 1:
continue
# 对每个目标函数进行排序
for m in range(self.obj_num):
# 按目标m的值排序
sorted_indices = sorted(current_indices, key=lambda x: pop_obj[x][m])
min_val = pop_obj[sorted_indices[0]][m]
max_val = pop_obj[sorted_indices[-1]][m]
# 边界个体的拥挤度设为无穷大
distance[sorted_indices[0]] = float('inf')
distance[sorted_indices[-1]] = float('inf')
# 计算中间个体的拥挤度
for i in range(1, len(sorted_indices) - 1):
if max_val - min_val == 0:
continue
distance[sorted_indices[i]] += (pop_obj[sorted_indices[i + 1]][m] - pop_obj[sorted_indices[i - 1]][m]) / (max_val - min_val)
return distance
def selection(self, pop, pop_obj):
"""选择操作基于非支配排序和拥挤度的锦标赛选择惩罚拥挤度为0的个体"""
pop_size = len(pop)
rank = self.fast_non_dominated_sort(pop_obj)
distance = self.crowding_distance(pop_obj, rank)
selected = []
for _ in range(pop_size):
# 随机选择两个个体进行锦标赛
i = random.randint(0, pop_size - 1)
j = random.randint(0, pop_size - 1)
# 优先选择等级低(更优)的个体
if rank[i] < rank[j]:
selected.append(pop[i])
elif rank[i] > rank[j]:
selected.append(pop[j])
# 等级相同则选择拥挤度大的个体惩罚拥挤度为0的重复解
else:
# 对拥挤度为0的个体进行惩罚
if distance[i] == 0 and distance[j] > 0:
selected.append(pop[j])
elif distance[j] == 0 and distance[i] > 0:
selected.append(pop[i])
elif distance[i] >= distance[j]:
selected.append(pop[i])
else:
selected.append(pop[j])
return selected

181
main.py
View File

@ -1,11 +1,12 @@
import random import random
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
from Decode import Decode from Decode import Decode
from Encode import Encode from Encode import Encode
from GA import GA from GA import GA
from Instance import * from Instance import *
from NSGA2 import NSGA2
# 绘制甘特图 # 绘制甘特图
def Gantt(Machines): def Gantt(Machines):
@ -28,157 +29,67 @@ def Gantt(Machines):
plt.savefig('优化后排程方案的甘特图.png') plt.savefig('优化后排程方案的甘特图.png')
plt.show() plt.show()
if __name__ == '__main__': if __name__ == '__main__':
# 初始化参数 Optimal_fit = 9999 # 最佳适应度(初始化)
Optimal_CHS = 0 # 最佳适应度对应的基因个体(初始化)
g = GA() g = GA()
e = Encode(Processing_time, g.Pop_size, J, J_num, M_num) e = Encode(Processing_time, g.Pop_size, J, J_num, M_num)
CHS1 = e.Global_initial() CHS1 = e.Global_initial()
CHS2 = e.Random_initial() CHS2 = e.Random_initial()
CHS3 = e.Local_initial() CHS3 = e.Local_initial()
C = np.vstack((CHS1, CHS2, CHS3)) C = np.vstack((CHS1, CHS2, CHS3))
Best_fit = [] # 记录适应度在迭代过程中的变化,便于绘图
# 双目标优化相关
nsga2 = NSGA2(g.Pop_size, 2) # 2个优化目标
Optimal_solutions = [] # 存储非支配解
Optimal_fit_values = [] # 存储非支配解的目标值
# 新增:用于存储所有代的所有解
all_solutions = [] # 存储所有个体(染色体)
all_fitnesses = [] # 存储所有个体的目标值
# 早停策略参数
early_stop_counter = 0
max_stagnation = 50 # 连续50代无更新则停止
best_fitness_history = []
for i in range(g.Max_Itertions): for i in range(g.Max_Itertions):
print(f"iter_{i} start!") print("iter_{} start!".format(i))
Fit = g.fitness(C, J, Processing_time, M_num, O_num) Fit = g.fitness(C, J, Processing_time, M_num, O_num)
Best = C[Fit.index(min(Fit))]
# 记录当前代的所有解 best_fitness = min(Fit)
all_solutions.extend(C) if best_fitness < Optimal_fit:
all_fitnesses.extend(Fit) Optimal_fit = best_fitness
Optimal_CHS = Best
# 非支配排序 print('iter_{}: new best_fitness = {}'.format(i, best_fitness))
rank = nsga2.fast_non_dominated_sort(Fit) Best_fit.append(Optimal_fit) # 始终记录当前最优适应度
current_non_dominated = [C[j] for j in range(len(C)) if rank[j] == 0] for j in range(len(C)):
current_non_dominated_fit = [Fit[j] for j in range(len(C)) if rank[j] == 0] Cafter = []
# 更新全局非支配解
new_non_dominated = False
for sol, fit in zip(current_non_dominated, current_non_dominated_fit):
is_dominated = False
for existing_fit in Optimal_fit_values:
if all(existing_fit[d] <= fit[d] for d in range(2)) and any(existing_fit[d] < fit[d] for d in range(2)):
is_dominated = True
break
if not is_dominated:
Optimal_solutions.append(sol)
Optimal_fit_values.append(fit)
new_non_dominated = True
# 对全局解重新筛选非支配解
if Optimal_solutions:
rank_all = nsga2.fast_non_dominated_sort(Optimal_fit_values)
# 保留等级为0的解
Optimal_solutions = [Optimal_solutions[j] for j in range(len(Optimal_solutions)) if rank_all[j] == 0]
Optimal_fit_values = [Optimal_fit_values[j] for j in range(len(Optimal_fit_values)) if rank_all[j] == 0]
# 控制解的数量,过多时保留拥挤度高的解
if len(Optimal_solutions) > g.Pop_size:
distance = nsga2.crowding_distance(Optimal_fit_values, rank_all)
# 按拥挤度排序并保留前Pop_size个解
sorted_indices = sorted(range(len(distance)), key=lambda k: distance[k], reverse=True)
Optimal_solutions = [Optimal_solutions[j] for j in sorted_indices[:g.Pop_size]]
Optimal_fit_values = [Optimal_fit_values[j] for j in sorted_indices[:g.Pop_size]]
# 早停策略检查
if new_non_dominated:
early_stop_counter = 0
else:
early_stop_counter += 1
if early_stop_counter >= max_stagnation:
print(f"早停于第{i}代,连续{max_stagnation}代无新的非支配解")
break
# 定期引入随机解每20代
if i % 20 == 0 and i > 0:
random_solutions = e.Random_initial()[:int(g.Pop_size * 0.1)] # 引入10%的随机解
C = np.vstack((C[:-len(random_solutions)], random_solutions))
# 选择操作基于NSGA-II
selected = nsga2.selection(C, Fit)
# 交叉变异操作
new_pop = []
for j in range(len(selected)):
if random.random() < g.Pc: if random.random() < g.Pc:
# 选择另一个个体进行交叉 N_i = random.choice(np.arange(len(C)))
mate_idx = random.randint(0, len(selected) - 1)
if random.random() < g.Pv: if random.random() < g.Pv:
offspring1, offspring2 = g.machine_cross(selected[j], selected[mate_idx], O_num) Cross = g.machine_cross(C[j], C[N_i], O_num)
else: else:
offspring1, offspring2 = g.operation_cross(selected[j], selected[mate_idx], O_num, J_num) Cross = g.operation_cross(C[j], C[N_i], O_num, J_num)
new_pop.append(offspring1) Cafter.append(Cross[0])
new_pop.append(offspring2) Cafter.append(Cross[1])
else: Cafter.append(C[j])
new_pop.append(selected[j])
# 变异操作
if random.random() < g.Pm: if random.random() < g.Pm:
if random.random() < g.Pw: if random.random() < g.Pw:
mutated = g.machine_variation(selected[j], Processing_time, O_num, J) Variance = g.machine_variation(C[j], Processing_time, O_num, J)
else: else:
mutated = g.operation_variation(selected[j], O_num, J_num, J, Processing_time, M_num) Variance = g.operation_variation(C[j], O_num, J_num, J, Processing_time, M_num)
new_pop.append(mutated) Cafter.append(Variance)
if Cafter != []:
Fit = g.fitness(Cafter, J, Processing_time, M_num, O_num)
C[j] = Cafter[Fit.index(min(Fit))]
# 保持种群规模 # 迭代结束后输出最终结果
if len(new_pop) > g.Pop_size:
# 对新种群进行筛选
new_fit = g.fitness(new_pop, J, Processing_time, M_num, O_num)
new_rank = nsga2.fast_non_dominated_sort(new_fit)
new_distance = nsga2.crowding_distance(new_fit, new_rank)
# 按等级和拥挤度排序
sorted_indices = sorted(range(len(new_pop)), key=lambda k: (new_rank[k], -new_distance[k]))
C = [new_pop[j] for j in sorted_indices[:g.Pop_size]]
else:
C = new_pop
# 输出结果
print("\n=== 优化结果 ===") print("\n=== 优化结果 ===")
print(f"非支配解数量: {len(Optimal_solutions)}") print("最优适应度 (最少完成时间):", Optimal_fit)
print("非支配解目标值 (最大完工时间, 负载标准差):")
for fit in Optimal_fit_values:
print(f"({fit[0]}, {fit[1]:.2f})")
# 选择一个折中解绘制甘特图例如Cmax最小的解 # 解码最优解并绘制甘特图
if Optimal_solutions: d = Decode(J, Processing_time, M_num)
# 找到Cmax最小的解 final_fitness = d.decode(Optimal_CHS, O_num)
cmax_values = [fit[0] for fit in Optimal_fit_values] print("解码验证适应度:", final_fitness)
best_idx = cmax_values.index(min(cmax_values))
Optimal_CHS = Optimal_solutions[best_idx]
# 解码并绘图
d = Decode(J, Processing_time, M_num)
final_fitness = d.decode(Optimal_CHS, O_num)
print(f"\n选中解的目标值: (最大完工时间: {final_fitness[0]}, 负载标准差: {final_fitness[1]:.2f})")
Gantt(d.Machines)
# 绘制帕累托前沿(非支配解) # 绘制最优甘特图
if Optimal_fit_values: Gantt(d.Machines)
plt.figure(figsize=(10, 6))
cmax_nd = [fit[0] for fit in Optimal_fit_values]
load_std_nd = [fit[1] for fit in Optimal_fit_values]
plt.scatter(cmax_nd, load_std_nd, color='red', label='Non-dominated solutions', zorder=5)
# 绘制所有解的点图(所有代的所有个体) # 绘制收敛曲线
if all_fitnesses: x = np.linspace(0, g.Max_Itertions, g.Max_Itertions)
cmax_all = [fit[0] for fit in all_fitnesses] plt.figure()
load_std_all = [fit[1] for fit in all_fitnesses] plt.plot(x, Best_fit, '-k')
plt.scatter(cmax_all, load_std_all, color='blue', alpha=0.5, label='All solutions', zorder=1) plt.title('The maximum completion time of each iteration')
plt.ylabel('Cmax')
plt.title('All Solutions and Pareto Front') plt.xlabel('Iteration')
plt.xlabel('Max Completion Time (Cmax)') plt.grid(True)
plt.ylabel('Load Standard Deviation') plt.savefig('最大完成时间的优化过程.png')
plt.legend() plt.show()
plt.grid(True)
plt.savefig('all_solutions_pareto.png')
plt.show()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB