Compare commits
No commits in common. "master" and "1.2" have entirely different histories.
48
Decode.py
48
Decode.py
|
|
@ -1,4 +1,5 @@
|
|||
import numpy as np
|
||||
|
||||
from Job import Job
|
||||
from Machine import Machine_Time_window
|
||||
|
||||
|
|
@ -15,8 +16,7 @@ class Decode:
|
|||
self.J = J
|
||||
self.Machines = [] # 存储机器类
|
||||
self.Scheduled = [] # 已经排产过的工序
|
||||
self.fitness = 0 # 适应度(最大完工时间)
|
||||
self.Machine_Load = np.zeros(M_num, dtype=int) # 机器总负载
|
||||
self.fitness = 0 # 适应度
|
||||
self.Machine_State = np.zeros(M_num, dtype=int) # 在机器上加工的工件是哪个
|
||||
self.Jobs = [] # 存储工件类
|
||||
for j in range(M_num):
|
||||
|
|
@ -32,7 +32,7 @@ class Decode:
|
|||
Site = 0
|
||||
# 按照基因的MS部分按工件序号划分
|
||||
for S_i in self.J.values():
|
||||
Ms_decompose.append(MS[Site:Site + S_i]) # 对MS按照每道工序数进行切分
|
||||
Ms_decompose.append(MS[Site:Site + S_i]) #对MS按照每道工序数进行切分
|
||||
Site += S_i
|
||||
for i in range(len(Ms_decompose)): # len(Ms_decompose)表示工件数
|
||||
JM_i = []
|
||||
|
|
@ -68,11 +68,11 @@ class Decode:
|
|||
for le_i in range(len(M_Tlen)):
|
||||
# 当前空格时间比加工时间大可插入
|
||||
if M_Tlen[le_i] >= P_t:
|
||||
# 当前空格开始时间比该工件上一工序结束时间大可插入该空格
|
||||
# 当前空格开始时间比该工件上一工序结束时间大可插入该空格,以空格开始时间为这一工序开始
|
||||
if M_Tstart[le_i] >= last_O_end:
|
||||
ealiest_start = M_Tstart[le_i]
|
||||
break
|
||||
# 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入
|
||||
# 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入该工序,以该工序的上一工序的结束为开始
|
||||
if M_Tstart[le_i] < last_O_end and M_Tend[le_i] - last_O_end >= P_t:
|
||||
ealiest_start = last_O_end
|
||||
break
|
||||
|
|
@ -80,46 +80,24 @@ class Decode:
|
|||
End_work_time = M_Ealiest + P_t # 当前工件当前工序的结束时间
|
||||
return M_Ealiest, Selected_Machine, P_t, O_num, last_O_end, End_work_time
|
||||
|
||||
# 解码操作(适配新编码:前半部分为OS,后半部分为MS)
|
||||
# 解码操作
|
||||
def decode(self, CHS, Len_Chromo):
|
||||
"""
|
||||
:param CHS: 种群基因
|
||||
:param Len_Chromo: 工序总数(OS和MS各占一半长度)
|
||||
:return: 双目标值 [最大加工时间, 负载标准差]
|
||||
:param Len_Chromo: MS与OS的分解线
|
||||
:return: 适应度,即最大加工时间
|
||||
"""
|
||||
# 重置状态
|
||||
self.fitness = 0
|
||||
self.Machine_Load = np.zeros(self.M_num, dtype=int)
|
||||
for machine in self.Machines:
|
||||
machine.assigned_task = []
|
||||
machine.O_start = []
|
||||
machine.O_end = []
|
||||
machine.End_time = 0
|
||||
for job in self.Jobs:
|
||||
job.Processed = []
|
||||
job.J_start = []
|
||||
job.J_end = []
|
||||
job.J_machine = []
|
||||
job.Last_Processing_Machine = None
|
||||
job.Last_Processing_end_time = 0
|
||||
|
||||
OS = list(CHS[0:Len_Chromo]) # 前半部分为工件排列
|
||||
MS = list(CHS[Len_Chromo:2 * Len_Chromo]) # 后半部分为机器索引
|
||||
MS = list(CHS[0:Len_Chromo])
|
||||
OS = list(CHS[Len_Chromo:2 * Len_Chromo])
|
||||
Needed_Matrix = self.Order_Matrix(MS)
|
||||
JM, TM = Needed_Matrix[0], Needed_Matrix[1]
|
||||
|
||||
JM = Needed_Matrix[0]
|
||||
for i in OS:
|
||||
Job = i
|
||||
O_num = self.Jobs[Job].Current_Processed() # 现在加工的工序
|
||||
Machine = JM[Job][O_num] # 确定加工机器
|
||||
Process_time = TM[Job][O_num] # 确定加工时间
|
||||
Machine = JM[Job][O_num] # 用基因的OS部分的工件序号以及工序序号索引机器顺序矩阵的机器序号
|
||||
Para = self.Earliest_Start(Job, O_num, Machine)
|
||||
self.Jobs[Job]._Input(Para[0], Para[5], Para[1]) # 工件完成该工序
|
||||
if Para[5] > self.fitness:
|
||||
self.fitness = Para[5]
|
||||
self.Machines[Machine]._Input(Job, Para[0], Para[2], Para[3]) # 机器完成该工件该工序
|
||||
self.Machine_Load[Machine] += Process_time # 累加机器负载
|
||||
|
||||
# 计算负载均衡度(标准差)
|
||||
load_std = np.std(self.Machine_Load)
|
||||
return [self.fitness, load_std]
|
||||
return self.fitness
|
||||
|
|
|
|||
149
Encode.py
149
Encode.py
|
|
@ -1,4 +1,5 @@
|
|||
import random
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
|
|
@ -7,7 +8,7 @@ class Encode:
|
|||
"""
|
||||
:param Matrix: 机器加工时间矩阵
|
||||
:param Pop_size: 种群数量
|
||||
:param J: 各工件对应的工序数字典
|
||||
:param J: 各工件对应的工序数
|
||||
:param J_num: 工件数
|
||||
:param M_num: 机器数
|
||||
"""
|
||||
|
|
@ -16,25 +17,24 @@ class Encode:
|
|||
self.J_num = J_num
|
||||
self.M_num = M_num
|
||||
self.CHS = []
|
||||
# 调整初始化比例:增加随机选择比例到50%
|
||||
self.GS_num = int(0.3 * Pop_size) # 全局选择初始化
|
||||
self.GS_num = int(0.6 * Pop_size) # 全局选择初始化
|
||||
self.LS_num = int(0.2 * Pop_size) # 局部选择初始化
|
||||
self.RS_num = int(0.5 * Pop_size) # 随机选择初始化(提高比例)
|
||||
self.RS_num = int(0.2 * Pop_size) # 随机选择初始化
|
||||
self.Len_Chromo = 0
|
||||
for i in J.values():
|
||||
self.Len_Chromo += i # 遍历字典J的所有值并求和,即工序数之和
|
||||
self.Len_Chromo += i # 遍历字典J的所有值并求和,即工序数之和
|
||||
|
||||
# 生成工序准备的部分(工件排列,天然满足工序顺序约束)
|
||||
# 生成工序准备的部分
|
||||
def OS_List(self):
|
||||
OS_list = []
|
||||
for k, v in self.J.items(): # 遍历字典J的所有键值对,初始化工序矩阵
|
||||
OS_add = [k - 1 for j in range(v)] # 工件索引从0开始
|
||||
for k, v in self.J.items(): # 遍历字典J的所有键值对,初始化工序矩阵
|
||||
OS_add = [k - 1 for j in range(v)]
|
||||
OS_list.extend(OS_add)
|
||||
return OS_list
|
||||
|
||||
# 生成初始化矩阵
|
||||
def CHS_Matrix(self, C_num):
|
||||
return np.zeros([C_num, self.Len_Chromo * 2], dtype=int) # 工件排列+机器索引
|
||||
return np.zeros([C_num, self.Len_Chromo], dtype=int)
|
||||
|
||||
# 定位每个工件的每道工序的位置,Job第几个工件,Operation第几道工序
|
||||
def Site(self, Job, Operation):
|
||||
|
|
@ -48,74 +48,89 @@ class Encode:
|
|||
|
||||
# 全局初始化
|
||||
def Global_initial(self):
|
||||
CHS = self.CHS_Matrix(self.GS_num)
|
||||
MS = self.CHS_Matrix(self.GS_num) # 根据GS_num生成种群
|
||||
OS_list = self.OS_List()
|
||||
OS = self.CHS_Matrix(self.GS_num)
|
||||
for i in range(self.GS_num):
|
||||
random.shuffle(OS_list)
|
||||
OS = OS_list.copy()
|
||||
MS = [0] * self.Len_Chromo
|
||||
|
||||
Machine_time = np.zeros(self.M_num, dtype=int)
|
||||
GJ_list = [i_1 for i_1 in range(self.J_num)]
|
||||
random.shuffle(GJ_list)
|
||||
|
||||
for g in GJ_list:
|
||||
h = self.Matrix[g]
|
||||
for j in range(len(h)):
|
||||
D = h[j]
|
||||
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
|
||||
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi]
|
||||
Min_time = min(Machine_Select)
|
||||
K = Machine_Select.index(Min_time)
|
||||
MS[self.Site(g, j)] = K
|
||||
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]]
|
||||
|
||||
CHS[i] = np.hstack((OS, MS)) # 工件排列 + 机器索引(紧凑编码)
|
||||
return CHS
|
||||
Machine_time = np.zeros(self.M_num, dtype=int) # 步骤1 生成一个整型数组,长度为机器数,且初始化每个元素为0
|
||||
random.shuffle(OS_list) # 生成工序排序部分
|
||||
OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行(因为有一个种群,第i则是赋值在OS的第i行,以此生成完整的OS)
|
||||
GJ_list = [i_1 for i_1 in range(self.J_num)] # 生成工件集
|
||||
random.shuffle(GJ_list) # 随机打乱工件集,为的是下一步可以随机抽出第一个工件
|
||||
for g in GJ_list: # 选择第一个工件(由于上一步已经打乱工件集,抽出第一个也是“随机”)
|
||||
h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
|
||||
for j in range(len(h)): # 从此工件的第一个工序开始
|
||||
D = h[j] # D为第一个工件的第一个工序对应的时间矩阵
|
||||
List_Machine_weizhi = []
|
||||
for k in range(len(D)): # 确定工序可用的机器位于第几个位置
|
||||
Useing_Machine = D[k]
|
||||
if Useing_Machine != 9999:
|
||||
List_Machine_weizhi.append(k)
|
||||
Machine_Select = []
|
||||
for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
|
||||
Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
|
||||
Min_time = min(Machine_Select) # 选出时间最小的机器
|
||||
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器,并非Mk
|
||||
I = List_Machine_weizhi[K] # 所有机器里的第I个机器,即Mi
|
||||
Machine_time[I] += Min_time # 相应的机器位置加上最小时间
|
||||
site = self.Site(g, j) # 定位每个工件的每道工序的位置
|
||||
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去 即生成MS的染色体
|
||||
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
|
||||
return CHS1
|
||||
|
||||
# 局部初始化
|
||||
def Local_initial(self):
|
||||
CHS = self.CHS_Matrix(self.LS_num)
|
||||
MS = self.CHS_Matrix(self.LS_num) # 根据LS_num生成局部选择的种群大小
|
||||
OS_list = self.OS_List()
|
||||
OS = self.CHS_Matrix(self.LS_num)
|
||||
for i in range(self.LS_num):
|
||||
random.shuffle(OS_list)
|
||||
OS = OS_list.copy()
|
||||
MS = [0] * self.Len_Chromo
|
||||
random.shuffle(OS_list) # (随机打乱)生成工序排序部分
|
||||
OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行(因为有一个种群,第i则是赋值在OS的第i行,以此生成完整的OS)
|
||||
GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
|
||||
for g in GJ_List: # 选择第一个工件(注意:不用随机打乱了)
|
||||
Machine_time = np.zeros(self.M_num,
|
||||
dtype=int) # 设置一个整型数组 并初始化每一个元素为0,由于局部初始化,每个工件的所有工序结束后都要重新初始化,所以和全局初始化不同,此步骤应放在此处
|
||||
h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
|
||||
for j in range(len(h)): # 从选择的工件的第一个工序开始
|
||||
D = h[j] # 此工件第一个工序对应的机器加工时间矩阵
|
||||
List_Machine_weizhi = []
|
||||
for k in range(len(D)): # 确定工序可用的机器位于第几个位置
|
||||
Useing_Machine = D[k]
|
||||
if Useing_Machine != 9999:
|
||||
List_Machine_weizhi.append(k)
|
||||
Machine_Select = []
|
||||
for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
|
||||
Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
|
||||
Min_time = min(Machine_Select) # 选出这些时间里最小的
|
||||
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器,并非Mk
|
||||
I = List_Machine_weizhi[K] # 所有机器里的第I个机器,即Mi
|
||||
Machine_time[I] += Min_time
|
||||
site = self.Site(g, j) # 定位每个工件的每道工序的位置
|
||||
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
|
||||
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
|
||||
return CHS1
|
||||
|
||||
GJ_List = [i_1 for i_1 in range(self.J_num)]
|
||||
for g in GJ_List:
|
||||
Machine_time = np.zeros(self.M_num, dtype=int)
|
||||
h = self.Matrix[g]
|
||||
for j in range(len(h)):
|
||||
D = h[j]
|
||||
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
|
||||
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi]
|
||||
Min_time = min(Machine_Select)
|
||||
K = Machine_Select.index(Min_time)
|
||||
MS[self.Site(g, j)] = K
|
||||
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]]
|
||||
|
||||
CHS[i] = np.hstack((OS, MS))
|
||||
return CHS
|
||||
|
||||
# 随机初始化(提高比例到50%)
|
||||
# 随机初始化
|
||||
def Random_initial(self):
|
||||
CHS = self.CHS_Matrix(self.RS_num)
|
||||
MS = self.CHS_Matrix(self.RS_num) # 根据RS_num生成随机选择的种群大小
|
||||
OS_list = self.OS_List()
|
||||
OS = self.CHS_Matrix(self.RS_num)
|
||||
for i in range(self.RS_num):
|
||||
random.shuffle(OS_list)
|
||||
OS = OS_list.copy()
|
||||
MS = [0] * self.Len_Chromo
|
||||
|
||||
for g in range(self.J_num):
|
||||
OS[i] = np.array(OS_list)
|
||||
GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
|
||||
for g in GJ_List: # 选择第一个工件
|
||||
h = self.Matrix[g]
|
||||
for j in range(len(h)):
|
||||
D = h[j]
|
||||
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
|
||||
# 随机选择可用机器
|
||||
selected = random.choice(List_Machine_weizhi)
|
||||
K = List_Machine_weizhi.index(selected)
|
||||
MS[self.Site(g, j)] = K
|
||||
|
||||
CHS[i] = np.hstack((OS, MS))
|
||||
return CHS
|
||||
for j in range(len(h)): # 选择第一个工件的第一个工序
|
||||
D = h[j] # 此工件第一个工序可加工的机器对应的时间矩阵
|
||||
List_Machine_weizhi = []
|
||||
for k in range(len(D)):
|
||||
Useing_Machine = D[k]
|
||||
if Useing_Machine != 9999:
|
||||
List_Machine_weizhi.append(k)
|
||||
number = random.choice(List_Machine_weizhi) # 从可选择的机器编号中随机选择一个(此编号就是机器编号)
|
||||
K = List_Machine_weizhi.index(number) # 即为该工序可选择的机器里的第K个机器,并非Mk
|
||||
site = self.Site(g, j) # 定位每个工件的每道工序的位置
|
||||
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
|
||||
CHS1 = np.hstack((MS, OS))
|
||||
return CHS1
|
||||
|
|
|
|||
219
GA.py
219
GA.py
|
|
@ -9,156 +9,143 @@ from Instance import *
|
|||
|
||||
class GA():
|
||||
def __init__(self):
|
||||
self.Pop_size = 500 # 种群数量
|
||||
self.Pop_size = 300 # 种群数量
|
||||
self.Pc = 0.8 # 交叉概率
|
||||
self.Pm = 0.15 # 降低变异概率到0.15
|
||||
self.Pm = 0.3 # 变异概率
|
||||
self.Pv = 0.5 # 选择何种方式进行交叉的概率阈值
|
||||
self.Pw = 0.95 # 选择何种方式进行变异的概率阈值
|
||||
self.Max_Itertions = 100
|
||||
self.Max_Itertions = 20 # 最大迭代次数
|
||||
|
||||
# 适应度
|
||||
def fitness(self, CHS, J, Processing_time, M_num, Len):
|
||||
Fit = []
|
||||
for i in range(len(CHS)):
|
||||
d = Decode(J, Processing_time, M_num) # 实例化一个解码器,传入问题参数
|
||||
d = Decode(J, Processing_time, M_num) #实例化一个解码器,传入问题参数
|
||||
Fit.append(d.decode(CHS[i], Len))
|
||||
return Fit
|
||||
|
||||
def pmx_crossover(self, parent1, parent2, length):
|
||||
if length <= 1:
|
||||
return parent1.copy(), parent2.copy()
|
||||
|
||||
# 确保交叉区间[a, b]有效(a < b)
|
||||
a = random.randint(0, length - 2)
|
||||
b = random.randint(a + 1, length - 1)
|
||||
|
||||
child1 = parent1.copy()
|
||||
child2 = parent2.copy()
|
||||
|
||||
# 建立映射表(parent1[a:b+1]与parent2[a:b+1]的对应关系)
|
||||
map1 = {parent1[i]: parent2[i] for i in range(a, b + 1)} # p1→p2的映射
|
||||
map2 = {parent2[i]: parent1[i] for i in range(a, b + 1)} # p2→p1的映射
|
||||
|
||||
# 处理child1:交叉区间外的元素替换
|
||||
for i in range(length):
|
||||
if i < a or i > b:
|
||||
val = child1[i]
|
||||
# 限制循环次数,避免无限循环(最多循环len(map1)次)
|
||||
loop_count = 0
|
||||
max_loop = len(map1)
|
||||
while val in map1 and map1[val] in parent1[a:b + 1] and loop_count < max_loop:
|
||||
val = map1[val]
|
||||
loop_count += 1
|
||||
child1[i] = map1.get(val, val) # 若超出循环次数,直接使用当前val
|
||||
|
||||
# 处理child2:交叉区间外的元素替换
|
||||
for i in range(length):
|
||||
if i < a or i > b:
|
||||
val = child2[i]
|
||||
loop_count = 0
|
||||
max_loop = len(map2)
|
||||
while val in map2 and map2[val] in parent2[a:b + 1] and loop_count < max_loop:
|
||||
val = map2[val]
|
||||
loop_count += 1
|
||||
child2[i] = map2.get(val, val)
|
||||
|
||||
return child1, child2
|
||||
|
||||
# 机器部分交叉
|
||||
def machine_cross(self, CHS1, CHS2, T0):
|
||||
"""
|
||||
:param CHS1: 基因1
|
||||
:param CHS2: 基因2
|
||||
:param CHS1: 机器选择部分的基因1
|
||||
:param CHS2: 机器选择部分的基因2
|
||||
:param T0: 工序总数
|
||||
:return: 交叉后的基因
|
||||
:return: 交叉后的机器选择部分的基因
|
||||
"""
|
||||
OS1, MS1 = CHS1[:T0], CHS1[T0:]
|
||||
OS2, MS2 = CHS2[:T0], CHS2[T0:]
|
||||
|
||||
# 随机选择交叉位置
|
||||
T_r = list(range(T0))
|
||||
random.shuffle(T_r)
|
||||
r = random.randint(1, T0 // 2) # 交叉部分长度
|
||||
R = T_r[:r]
|
||||
|
||||
# 交换机器选择部分
|
||||
T_r = [j for j in range(T0)]
|
||||
r = random.randint(1, 10) # 在区间[1,T0]内产生一个整数r
|
||||
random.shuffle(T_r) #直接打乱T_r,原地修改
|
||||
R = T_r[0:r] # 按照随机数r产生r个互不相等的整数
|
||||
OS_1 = CHS1[O_num:2 * T0]
|
||||
OS_2 = CHS2[O_num:2 * T0]
|
||||
MS_1 = CHS2[0:T0]
|
||||
MS_2 = CHS1[0:T0]
|
||||
for i in R:
|
||||
MS1[i], MS2[i] = MS2[i], MS1[i]
|
||||
|
||||
return np.hstack((OS1, MS1)), np.hstack((OS2, MS2))
|
||||
|
||||
# 工序部分交叉(使用PMX交叉)
|
||||
def operation_cross(self, CHS1, CHS2, T0, J_num):
|
||||
OS1 = list(CHS1[T0:2 * T0]) # 确保转换为列表
|
||||
OS2 = list(CHS2[T0:2 * T0])
|
||||
MS1 = CHS1[0:T0].copy()
|
||||
MS2 = CHS2[0:T0].copy()
|
||||
|
||||
# 调用修复后的PMX交叉
|
||||
new_os1, new_os2 = self.pmx_crossover(OS1, OS2, T0)
|
||||
|
||||
CHS1 = np.hstack((MS1, new_os1))
|
||||
CHS2 = np.hstack((MS2, new_os2))
|
||||
K, K_2 = MS_1[i], MS_2[i]
|
||||
MS_1[i], MS_2[i] = K_2, K
|
||||
CHS1 = np.hstack((MS_1, OS_1))
|
||||
CHS2 = np.hstack((MS_2, OS_2))
|
||||
return CHS1, CHS2
|
||||
|
||||
# 机器部分变异(仅在可用机器集中选择)
|
||||
# 工序部分交叉
|
||||
def operation_cross(self, CHS1, CHS2, T0, J_num):
|
||||
"""
|
||||
:param CHS1: 工序选择部分的基因1
|
||||
:param CHS2: 工序选择部分的基因2
|
||||
:param T0: 工序总数
|
||||
:param J_num: 工件总数
|
||||
:return: 交叉后的工序选择部分的基因
|
||||
"""
|
||||
OS_1 = CHS1[T0:2 * T0]
|
||||
OS_2 = CHS2[T0:2 * T0]
|
||||
MS_1 = CHS1[0:T0]
|
||||
MS_2 = CHS2[0:T0]
|
||||
Job_list = [i for i in range(J_num)]
|
||||
random.shuffle(Job_list)
|
||||
r = random.randint(1, J_num - 1)
|
||||
Set1 = Job_list[0:r]
|
||||
new_os = list(np.zeros(T0, dtype=int))
|
||||
for k, v in enumerate(OS_1):
|
||||
if v in Set1:
|
||||
new_os[k] = v + 1
|
||||
for i in OS_2:
|
||||
if i not in Set1:
|
||||
Site = new_os.index(0)
|
||||
new_os[Site] = i + 1
|
||||
new_os = np.array([j - 1 for j in new_os])
|
||||
CHS1 = np.hstack((MS_1, new_os))
|
||||
CHS2 = np.hstack((MS_2, new_os))
|
||||
return CHS1, CHS2
|
||||
|
||||
# 机器部分变异
|
||||
def machine_variation(self, CHS, O, T0, J):
|
||||
"""
|
||||
:param CHS: 基因
|
||||
:param CHS: 机器选择部分的基因
|
||||
:param O: 加工时间矩阵
|
||||
:param T0: 工序总数
|
||||
:param J: 各工件加工信息
|
||||
:return: 变异后的基因
|
||||
:return: 变异后的机器选择部分的基因
|
||||
"""
|
||||
OS = CHS[:T0]
|
||||
MS = CHS[T0:]
|
||||
|
||||
# 确定变异位置数量
|
||||
r = random.randint(1, max(1, T0 // 10)) # 最多变异10%的位置
|
||||
positions = random.sample(range(T0), r)
|
||||
|
||||
for pos in positions:
|
||||
# 找到该位置对应的工件和工序
|
||||
site = 0
|
||||
job = -1
|
||||
op = -1
|
||||
Tr = [i_num for i_num in range(T0)]
|
||||
MS = CHS[0:T0]
|
||||
OS = CHS[T0:2 * T0]
|
||||
# 机器选择部分
|
||||
r = random.randint(1, T0 - 1) # 在变异染色体中选择r个位置
|
||||
random.shuffle(Tr)
|
||||
T_r = Tr[0:r]
|
||||
for num in T_r:
|
||||
T_0 = [j for j in range(T0)]
|
||||
K = []
|
||||
Site = 0
|
||||
for k, v in J.items():
|
||||
if site + v > pos:
|
||||
job = k - 1 # 转换为0索引
|
||||
op = pos - site
|
||||
K.append(T_0[Site:Site + v])
|
||||
Site += v
|
||||
for i in range(len(K)):
|
||||
if num in K[i]:
|
||||
O_i = i
|
||||
O_j = K[i].index(num)
|
||||
break
|
||||
site += v
|
||||
|
||||
# 获取该工序的可用机器
|
||||
D = O[job][op]
|
||||
available_machines = [k for k, val in enumerate(D) if val != 9999]
|
||||
if len(available_machines) <= 1:
|
||||
continue # 只有一个可用机器时不变异
|
||||
|
||||
# 随机选择一个不同的可用机器
|
||||
current_idx = MS[pos]
|
||||
current_machine = available_machines[current_idx]
|
||||
new_machine = random.choice([m for m in available_machines if m != current_machine])
|
||||
MS[pos] = available_machines.index(new_machine)
|
||||
|
||||
return np.hstack((OS, MS))
|
||||
Machine_using = O[O_i][O_j]
|
||||
Machine_time = []
|
||||
for j in Machine_using:
|
||||
if j != 9999:
|
||||
Machine_time.append(j)
|
||||
Min_index = Machine_time.index(min(Machine_time))
|
||||
MS[num] = Min_index
|
||||
CHS = np.hstack((MS, OS))
|
||||
return CHS
|
||||
|
||||
# 工序部分变异
|
||||
def operation_variation(self, CHS, T0, J_num, J, O, M_num):
|
||||
"""
|
||||
:param CHS: 基因
|
||||
:param CHS: 工序选择部分的基因
|
||||
:param T0: 工序总数
|
||||
:param J_num: 工件总数
|
||||
:param J: 各工件加工信息
|
||||
:param O: 加工时间矩阵
|
||||
:param M_num: 机器总数
|
||||
:return: 变异后的基因
|
||||
:return: 变异后的工序选择部分的基因
|
||||
"""
|
||||
OS = list(CHS[:T0])
|
||||
MS = CHS[T0:]
|
||||
|
||||
# 随机选择两个位置交换
|
||||
i, j = random.sample(range(T0), 2)
|
||||
OS[i], OS[j] = OS[j], OS[i]
|
||||
|
||||
return np.hstack((OS, MS))
|
||||
MS = CHS[0:T0]
|
||||
OS = list(CHS[T0:2 * T0])
|
||||
r = random.randint(1, J_num - 1)
|
||||
Tr = [i for i in range(J_num)]
|
||||
random.shuffle(Tr)
|
||||
Tr = Tr[0:r]
|
||||
J_os = dict(enumerate(OS)) # 随机选择r个不同的基因
|
||||
J_os = sorted(J_os.items(), key=lambda d: d[1])
|
||||
Site = []
|
||||
for i in range(r):
|
||||
Site.append(OS.index(Tr[i]))
|
||||
A = list(itertools.permutations(Tr, r))
|
||||
A_CHS = []
|
||||
for i in range(len(A)):
|
||||
for j in range(len(A[i])):
|
||||
OS[Site[j]] = A[i][j]
|
||||
C_I = np.hstack((MS, OS))
|
||||
A_CHS.append(C_I)
|
||||
Fit = []
|
||||
for i in range(len(A_CHS)):
|
||||
d = Decode(J, O, M_num)
|
||||
Fit.append(d.decode(CHS, T0))
|
||||
return A_CHS[Fit.index(min(Fit))]
|
||||
|
|
|
|||
2
Job.py
2
Job.py
|
|
@ -30,4 +30,4 @@ class Job:
|
|||
self.Processed.append(1)
|
||||
self.J_start.append(W_Eailiest)
|
||||
self.J_end.append(End_time)
|
||||
self.J_machine.append(Machine)
|
||||
self.J_machine.append(Machine)
|
||||
|
|
|
|||
|
|
@ -50,4 +50,4 @@ class Machine_Time_window:
|
|||
self.O_start.sort()
|
||||
self.O_end.append(M_Ealiest + P_t)
|
||||
self.O_end.sort()
|
||||
self.End_time = self.O_end[-1]
|
||||
self.End_time = self.O_end[-1]
|
||||
|
|
|
|||
111
NSGA2.py
111
NSGA2.py
|
|
@ -1,111 +0,0 @@
|
|||
import random
|
||||
import numpy as np
|
||||
|
||||
class NSGA2:
|
||||
def __init__(self, pop_size, obj_num):
|
||||
self.pop_size = pop_size
|
||||
self.obj_num = obj_num
|
||||
|
||||
def fast_non_dominated_sort(self, pop_obj):
|
||||
"""快速非支配排序"""
|
||||
pop_size = len(pop_obj)
|
||||
dominated = [[] for _ in range(pop_size)] # 被支配个体列表
|
||||
rank = [0] * pop_size # 个体的非支配等级
|
||||
n = [0] * pop_size # 支配该个体的个体数量
|
||||
|
||||
# 计算每个个体的支配关系
|
||||
for p in range(pop_size):
|
||||
for q in range(pop_size):
|
||||
if p != q:
|
||||
# p支配q
|
||||
if all(pop_obj[p][i] <= pop_obj[q][i] for i in range(self.obj_num)) and \
|
||||
any(pop_obj[p][i] < pop_obj[q][i] for i in range(self.obj_num)):
|
||||
dominated[p].append(q)
|
||||
# q支配p
|
||||
elif all(pop_obj[q][i] <= pop_obj[p][i] for i in range(self.obj_num)) and \
|
||||
any(pop_obj[q][i] < pop_obj[p][i] for i in range(self.obj_num)):
|
||||
n[p] += 1
|
||||
|
||||
# 找到等级为0的个体
|
||||
if n[p] == 0:
|
||||
rank[p] = 0
|
||||
|
||||
# 计算其他等级
|
||||
current_rank = 0
|
||||
while True:
|
||||
next_rank = []
|
||||
for p in range(pop_size):
|
||||
if rank[p] == current_rank:
|
||||
for q in dominated[p]:
|
||||
n[q] -= 1
|
||||
if n[q] == 0 and rank[q] == 0:
|
||||
rank[q] = current_rank + 1
|
||||
next_rank.append(q)
|
||||
if not next_rank:
|
||||
break
|
||||
current_rank += 1
|
||||
|
||||
return rank
|
||||
|
||||
def crowding_distance(self, pop_obj, rank):
|
||||
"""计算拥挤度距离"""
|
||||
pop_size = len(pop_obj)
|
||||
distance = [0.0] * pop_size
|
||||
max_rank = max(rank) if rank else 0
|
||||
|
||||
# 对每个等级的个体计算拥挤度
|
||||
for r in range(max_rank + 1):
|
||||
# 获取当前等级的个体索引
|
||||
current_indices = [i for i in range(pop_size) if rank[i] == r]
|
||||
if len(current_indices) <= 1:
|
||||
continue
|
||||
|
||||
# 对每个目标函数进行排序
|
||||
for m in range(self.obj_num):
|
||||
# 按目标m的值排序
|
||||
sorted_indices = sorted(current_indices, key=lambda x: pop_obj[x][m])
|
||||
min_val = pop_obj[sorted_indices[0]][m]
|
||||
max_val = pop_obj[sorted_indices[-1]][m]
|
||||
|
||||
# 边界个体的拥挤度设为无穷大
|
||||
distance[sorted_indices[0]] = float('inf')
|
||||
distance[sorted_indices[-1]] = float('inf')
|
||||
|
||||
# 计算中间个体的拥挤度
|
||||
for i in range(1, len(sorted_indices) - 1):
|
||||
if max_val - min_val == 0:
|
||||
continue
|
||||
distance[sorted_indices[i]] += (pop_obj[sorted_indices[i + 1]][m] - pop_obj[sorted_indices[i - 1]][m]) / (max_val - min_val)
|
||||
|
||||
return distance
|
||||
|
||||
def selection(self, pop, pop_obj):
|
||||
"""选择操作:基于非支配排序和拥挤度的锦标赛选择,惩罚拥挤度为0的个体"""
|
||||
pop_size = len(pop)
|
||||
rank = self.fast_non_dominated_sort(pop_obj)
|
||||
distance = self.crowding_distance(pop_obj, rank)
|
||||
selected = []
|
||||
|
||||
for _ in range(pop_size):
|
||||
# 随机选择两个个体进行锦标赛
|
||||
i = random.randint(0, pop_size - 1)
|
||||
j = random.randint(0, pop_size - 1)
|
||||
|
||||
# 优先选择等级低(更优)的个体
|
||||
if rank[i] < rank[j]:
|
||||
selected.append(pop[i])
|
||||
elif rank[i] > rank[j]:
|
||||
selected.append(pop[j])
|
||||
# 等级相同则选择拥挤度大的个体(惩罚拥挤度为0的重复解)
|
||||
else:
|
||||
# 对拥挤度为0的个体进行惩罚
|
||||
if distance[i] == 0 and distance[j] > 0:
|
||||
selected.append(pop[j])
|
||||
elif distance[j] == 0 and distance[i] > 0:
|
||||
selected.append(pop[i])
|
||||
elif distance[i] >= distance[j]:
|
||||
selected.append(pop[i])
|
||||
else:
|
||||
selected.append(pop[j])
|
||||
|
||||
return selected
|
||||
181
main.py
181
main.py
|
|
@ -1,11 +1,12 @@
|
|||
import random
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
|
||||
from Decode import Decode
|
||||
from Encode import Encode
|
||||
from GA import GA
|
||||
from Instance import *
|
||||
from NSGA2 import NSGA2
|
||||
|
||||
|
||||
# 绘制甘特图
|
||||
def Gantt(Machines):
|
||||
|
|
@ -28,157 +29,67 @@ def Gantt(Machines):
|
|||
plt.savefig('优化后排程方案的甘特图.png')
|
||||
plt.show()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
# 初始化参数
|
||||
Optimal_fit = 9999 # 最佳适应度(初始化)
|
||||
Optimal_CHS = 0 # 最佳适应度对应的基因个体(初始化)
|
||||
g = GA()
|
||||
e = Encode(Processing_time, g.Pop_size, J, J_num, M_num)
|
||||
CHS1 = e.Global_initial()
|
||||
CHS2 = e.Random_initial()
|
||||
CHS3 = e.Local_initial()
|
||||
C = np.vstack((CHS1, CHS2, CHS3))
|
||||
|
||||
# 双目标优化相关
|
||||
nsga2 = NSGA2(g.Pop_size, 2) # 2个优化目标
|
||||
Optimal_solutions = [] # 存储非支配解
|
||||
Optimal_fit_values = [] # 存储非支配解的目标值
|
||||
|
||||
# 新增:用于存储所有代的所有解
|
||||
all_solutions = [] # 存储所有个体(染色体)
|
||||
all_fitnesses = [] # 存储所有个体的目标值
|
||||
|
||||
# 早停策略参数
|
||||
early_stop_counter = 0
|
||||
max_stagnation = 50 # 连续50代无更新则停止
|
||||
best_fitness_history = []
|
||||
|
||||
Best_fit = [] # 记录适应度在迭代过程中的变化,便于绘图
|
||||
for i in range(g.Max_Itertions):
|
||||
print(f"iter_{i} start!")
|
||||
print("iter_{} start!".format(i))
|
||||
Fit = g.fitness(C, J, Processing_time, M_num, O_num)
|
||||
|
||||
# 记录当前代的所有解
|
||||
all_solutions.extend(C)
|
||||
all_fitnesses.extend(Fit)
|
||||
|
||||
# 非支配排序
|
||||
rank = nsga2.fast_non_dominated_sort(Fit)
|
||||
current_non_dominated = [C[j] for j in range(len(C)) if rank[j] == 0]
|
||||
current_non_dominated_fit = [Fit[j] for j in range(len(C)) if rank[j] == 0]
|
||||
|
||||
# 更新全局非支配解
|
||||
new_non_dominated = False
|
||||
for sol, fit in zip(current_non_dominated, current_non_dominated_fit):
|
||||
is_dominated = False
|
||||
for existing_fit in Optimal_fit_values:
|
||||
if all(existing_fit[d] <= fit[d] for d in range(2)) and any(existing_fit[d] < fit[d] for d in range(2)):
|
||||
is_dominated = True
|
||||
break
|
||||
if not is_dominated:
|
||||
Optimal_solutions.append(sol)
|
||||
Optimal_fit_values.append(fit)
|
||||
new_non_dominated = True
|
||||
|
||||
# 对全局解重新筛选非支配解
|
||||
if Optimal_solutions:
|
||||
rank_all = nsga2.fast_non_dominated_sort(Optimal_fit_values)
|
||||
# 保留等级为0的解
|
||||
Optimal_solutions = [Optimal_solutions[j] for j in range(len(Optimal_solutions)) if rank_all[j] == 0]
|
||||
Optimal_fit_values = [Optimal_fit_values[j] for j in range(len(Optimal_fit_values)) if rank_all[j] == 0]
|
||||
# 控制解的数量,过多时保留拥挤度高的解
|
||||
if len(Optimal_solutions) > g.Pop_size:
|
||||
distance = nsga2.crowding_distance(Optimal_fit_values, rank_all)
|
||||
# 按拥挤度排序并保留前Pop_size个解
|
||||
sorted_indices = sorted(range(len(distance)), key=lambda k: distance[k], reverse=True)
|
||||
Optimal_solutions = [Optimal_solutions[j] for j in sorted_indices[:g.Pop_size]]
|
||||
Optimal_fit_values = [Optimal_fit_values[j] for j in sorted_indices[:g.Pop_size]]
|
||||
|
||||
# 早停策略检查
|
||||
if new_non_dominated:
|
||||
early_stop_counter = 0
|
||||
else:
|
||||
early_stop_counter += 1
|
||||
if early_stop_counter >= max_stagnation:
|
||||
print(f"早停于第{i}代,连续{max_stagnation}代无新的非支配解")
|
||||
break
|
||||
|
||||
# 定期引入随机解(每20代)
|
||||
if i % 20 == 0 and i > 0:
|
||||
random_solutions = e.Random_initial()[:int(g.Pop_size * 0.1)] # 引入10%的随机解
|
||||
C = np.vstack((C[:-len(random_solutions)], random_solutions))
|
||||
|
||||
# 选择操作(基于NSGA-II)
|
||||
selected = nsga2.selection(C, Fit)
|
||||
|
||||
# 交叉变异操作
|
||||
new_pop = []
|
||||
for j in range(len(selected)):
|
||||
Best = C[Fit.index(min(Fit))]
|
||||
best_fitness = min(Fit)
|
||||
if best_fitness < Optimal_fit:
|
||||
Optimal_fit = best_fitness
|
||||
Optimal_CHS = Best
|
||||
print('iter_{}: new best_fitness = {}'.format(i, best_fitness))
|
||||
Best_fit.append(Optimal_fit) # 始终记录当前最优适应度
|
||||
for j in range(len(C)):
|
||||
Cafter = []
|
||||
if random.random() < g.Pc:
|
||||
# 选择另一个个体进行交叉
|
||||
mate_idx = random.randint(0, len(selected) - 1)
|
||||
N_i = random.choice(np.arange(len(C)))
|
||||
if random.random() < g.Pv:
|
||||
offspring1, offspring2 = g.machine_cross(selected[j], selected[mate_idx], O_num)
|
||||
Cross = g.machine_cross(C[j], C[N_i], O_num)
|
||||
else:
|
||||
offspring1, offspring2 = g.operation_cross(selected[j], selected[mate_idx], O_num, J_num)
|
||||
new_pop.append(offspring1)
|
||||
new_pop.append(offspring2)
|
||||
else:
|
||||
new_pop.append(selected[j])
|
||||
|
||||
# 变异操作
|
||||
Cross = g.operation_cross(C[j], C[N_i], O_num, J_num)
|
||||
Cafter.append(Cross[0])
|
||||
Cafter.append(Cross[1])
|
||||
Cafter.append(C[j])
|
||||
if random.random() < g.Pm:
|
||||
if random.random() < g.Pw:
|
||||
mutated = g.machine_variation(selected[j], Processing_time, O_num, J)
|
||||
Variance = g.machine_variation(C[j], Processing_time, O_num, J)
|
||||
else:
|
||||
mutated = g.operation_variation(selected[j], O_num, J_num, J, Processing_time, M_num)
|
||||
new_pop.append(mutated)
|
||||
Variance = g.operation_variation(C[j], O_num, J_num, J, Processing_time, M_num)
|
||||
Cafter.append(Variance)
|
||||
if Cafter != []:
|
||||
Fit = g.fitness(Cafter, J, Processing_time, M_num, O_num)
|
||||
C[j] = Cafter[Fit.index(min(Fit))]
|
||||
|
||||
# 保持种群规模
|
||||
if len(new_pop) > g.Pop_size:
|
||||
# 对新种群进行筛选
|
||||
new_fit = g.fitness(new_pop, J, Processing_time, M_num, O_num)
|
||||
new_rank = nsga2.fast_non_dominated_sort(new_fit)
|
||||
new_distance = nsga2.crowding_distance(new_fit, new_rank)
|
||||
# 按等级和拥挤度排序
|
||||
sorted_indices = sorted(range(len(new_pop)), key=lambda k: (new_rank[k], -new_distance[k]))
|
||||
C = [new_pop[j] for j in sorted_indices[:g.Pop_size]]
|
||||
else:
|
||||
C = new_pop
|
||||
|
||||
# 输出结果
|
||||
# 迭代结束后输出最终结果
|
||||
print("\n=== 优化结果 ===")
|
||||
print(f"非支配解数量: {len(Optimal_solutions)}")
|
||||
print("非支配解目标值 (最大完工时间, 负载标准差):")
|
||||
for fit in Optimal_fit_values:
|
||||
print(f"({fit[0]}, {fit[1]:.2f})")
|
||||
print("最优适应度 (最少完成时间):", Optimal_fit)
|
||||
|
||||
# 选择一个折中解绘制甘特图(例如Cmax最小的解)
|
||||
if Optimal_solutions:
|
||||
# 找到Cmax最小的解
|
||||
cmax_values = [fit[0] for fit in Optimal_fit_values]
|
||||
best_idx = cmax_values.index(min(cmax_values))
|
||||
Optimal_CHS = Optimal_solutions[best_idx]
|
||||
# 解码并绘图
|
||||
d = Decode(J, Processing_time, M_num)
|
||||
final_fitness = d.decode(Optimal_CHS, O_num)
|
||||
print(f"\n选中解的目标值: (最大完工时间: {final_fitness[0]}, 负载标准差: {final_fitness[1]:.2f})")
|
||||
Gantt(d.Machines)
|
||||
# 解码最优解并绘制甘特图
|
||||
d = Decode(J, Processing_time, M_num)
|
||||
final_fitness = d.decode(Optimal_CHS, O_num)
|
||||
print("解码验证适应度:", final_fitness)
|
||||
|
||||
# 绘制帕累托前沿(非支配解)
|
||||
if Optimal_fit_values:
|
||||
plt.figure(figsize=(10, 6))
|
||||
cmax_nd = [fit[0] for fit in Optimal_fit_values]
|
||||
load_std_nd = [fit[1] for fit in Optimal_fit_values]
|
||||
plt.scatter(cmax_nd, load_std_nd, color='red', label='Non-dominated solutions', zorder=5)
|
||||
# 绘制最优甘特图
|
||||
Gantt(d.Machines)
|
||||
|
||||
# 绘制所有解的点图(所有代的所有个体)
|
||||
if all_fitnesses:
|
||||
cmax_all = [fit[0] for fit in all_fitnesses]
|
||||
load_std_all = [fit[1] for fit in all_fitnesses]
|
||||
plt.scatter(cmax_all, load_std_all, color='blue', alpha=0.5, label='All solutions', zorder=1)
|
||||
|
||||
plt.title('All Solutions and Pareto Front')
|
||||
plt.xlabel('Max Completion Time (Cmax)')
|
||||
plt.ylabel('Load Standard Deviation')
|
||||
plt.legend()
|
||||
plt.grid(True)
|
||||
plt.savefig('all_solutions_pareto.png')
|
||||
plt.show()
|
||||
# 绘制收敛曲线
|
||||
x = np.linspace(0, g.Max_Itertions, g.Max_Itertions)
|
||||
plt.figure()
|
||||
plt.plot(x, Best_fit, '-k')
|
||||
plt.title('The maximum completion time of each iteration')
|
||||
plt.ylabel('Cmax')
|
||||
plt.xlabel('Iteration')
|
||||
plt.grid(True)
|
||||
plt.savefig('最大完成时间的优化过程.png')
|
||||
plt.show()
|
||||
BIN
优化后排程方案的甘特图.png
BIN
优化后排程方案的甘特图.png
Binary file not shown.
|
Before Width: | Height: | Size: 19 KiB After Width: | Height: | Size: 19 KiB |
Loading…
Reference in New Issue