Compare commits

...

3 Commits
1.2 ... master

8 changed files with 464 additions and 244 deletions

View File

@ -1,5 +1,4 @@
import numpy as np
from Job import Job
from Machine import Machine_Time_window
@ -16,7 +15,8 @@ class Decode:
self.J = J
self.Machines = [] # 存储机器类
self.Scheduled = [] # 已经排产过的工序
self.fitness = 0 # 适应度
self.fitness = 0 # 适应度(最大完工时间)
self.Machine_Load = np.zeros(M_num, dtype=int) # 机器总负载
self.Machine_State = np.zeros(M_num, dtype=int) # 在机器上加工的工件是哪个
self.Jobs = [] # 存储工件类
for j in range(M_num):
@ -32,7 +32,7 @@ class Decode:
Site = 0
# 按照基因的MS部分按工件序号划分
for S_i in self.J.values():
Ms_decompose.append(MS[Site:Site + S_i]) #对MS按照每道工序数进行切分
Ms_decompose.append(MS[Site:Site + S_i]) # 对MS按照每道工序数进行切分
Site += S_i
for i in range(len(Ms_decompose)): # len(Ms_decompose)表示工件数
JM_i = []
@ -68,11 +68,11 @@ class Decode:
for le_i in range(len(M_Tlen)):
# 当前空格时间比加工时间大可插入
if M_Tlen[le_i] >= P_t:
# 当前空格开始时间比该工件上一工序结束时间大可插入该空格,以空格开始时间为这一工序开始
# 当前空格开始时间比该工件上一工序结束时间大可插入该空格
if M_Tstart[le_i] >= last_O_end:
ealiest_start = M_Tstart[le_i]
break
# 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入该工序,以该工序的上一工序的结束为开始
# 当前空格开始时间比该工件上一工序结束时间小但空格可满足插入
if M_Tstart[le_i] < last_O_end and M_Tend[le_i] - last_O_end >= P_t:
ealiest_start = last_O_end
break
@ -80,24 +80,46 @@ class Decode:
End_work_time = M_Ealiest + P_t # 当前工件当前工序的结束时间
return M_Ealiest, Selected_Machine, P_t, O_num, last_O_end, End_work_time
# 解码操作
# 解码操作适配新编码前半部分为OS后半部分为MS
def decode(self, CHS, Len_Chromo):
"""
:param CHS: 种群基因
:param Len_Chromo: MS与OS的分解线
:return: 适应度即最大加工时间
:param Len_Chromo: 工序总数OS和MS各占一半长度
:return: 双目标值 [最大加工时间, 负载标准差]
"""
MS = list(CHS[0:Len_Chromo])
OS = list(CHS[Len_Chromo:2 * Len_Chromo])
# 重置状态
self.fitness = 0
self.Machine_Load = np.zeros(self.M_num, dtype=int)
for machine in self.Machines:
machine.assigned_task = []
machine.O_start = []
machine.O_end = []
machine.End_time = 0
for job in self.Jobs:
job.Processed = []
job.J_start = []
job.J_end = []
job.J_machine = []
job.Last_Processing_Machine = None
job.Last_Processing_end_time = 0
OS = list(CHS[0:Len_Chromo]) # 前半部分为工件排列
MS = list(CHS[Len_Chromo:2 * Len_Chromo]) # 后半部分为机器索引
Needed_Matrix = self.Order_Matrix(MS)
JM = Needed_Matrix[0]
JM, TM = Needed_Matrix[0], Needed_Matrix[1]
for i in OS:
Job = i
O_num = self.Jobs[Job].Current_Processed() # 现在加工的工序
Machine = JM[Job][O_num] # 用基因的OS部分的工件序号以及工序序号索引机器顺序矩阵的机器序号
Machine = JM[Job][O_num] # 确定加工机器
Process_time = TM[Job][O_num] # 确定加工时间
Para = self.Earliest_Start(Job, O_num, Machine)
self.Jobs[Job]._Input(Para[0], Para[5], Para[1]) # 工件完成该工序
if Para[5] > self.fitness:
self.fitness = Para[5]
self.Machines[Machine]._Input(Job, Para[0], Para[2], Para[3]) # 机器完成该工件该工序
return self.fitness
self.Machine_Load[Machine] += Process_time # 累加机器负载
# 计算负载均衡度(标准差)
load_std = np.std(self.Machine_Load)
return [self.fitness, load_std]

145
Encode.py
View File

@ -1,5 +1,4 @@
import random
import numpy as np
@ -8,7 +7,7 @@ class Encode:
"""
:param Matrix: 机器加工时间矩阵
:param Pop_size: 种群数量
:param J: 各工件对应的工序数
:param J: 各工件对应的工序数字典
:param J_num: 工件数
:param M_num: 机器数
"""
@ -17,24 +16,25 @@ class Encode:
self.J_num = J_num
self.M_num = M_num
self.CHS = []
self.GS_num = int(0.6 * Pop_size) # 全局选择初始化
# 调整初始化比例增加随机选择比例到50%
self.GS_num = int(0.3 * Pop_size) # 全局选择初始化
self.LS_num = int(0.2 * Pop_size) # 局部选择初始化
self.RS_num = int(0.2 * Pop_size) # 随机选择初始化
self.RS_num = int(0.5 * Pop_size) # 随机选择初始化(提高比例)
self.Len_Chromo = 0
for i in J.values():
self.Len_Chromo += i # 遍历字典J的所有值并求和即工序数之和
# 生成工序准备的部分
# 生成工序准备的部分(工件排列,天然满足工序顺序约束)
def OS_List(self):
OS_list = []
for k, v in self.J.items(): # 遍历字典J的所有键值对初始化工序矩阵
OS_add = [k - 1 for j in range(v)]
OS_add = [k - 1 for j in range(v)] # 工件索引从0开始
OS_list.extend(OS_add)
return OS_list
# 生成初始化矩阵
def CHS_Matrix(self, C_num):
return np.zeros([C_num, self.Len_Chromo], dtype=int)
return np.zeros([C_num, self.Len_Chromo * 2], dtype=int) # 工件排列+机器索引
# 定位每个工件的每道工序的位置Job第几个工件Operation第几道工序
def Site(self, Job, Operation):
@ -48,89 +48,74 @@ class Encode:
# 全局初始化
def Global_initial(self):
MS = self.CHS_Matrix(self.GS_num) # 根据GS_num生成种群
CHS = self.CHS_Matrix(self.GS_num)
OS_list = self.OS_List()
OS = self.CHS_Matrix(self.GS_num)
for i in range(self.GS_num):
Machine_time = np.zeros(self.M_num, dtype=int) # 步骤1 生成一个整型数组长度为机器数且初始化每个元素为0
random.shuffle(OS_list) # 生成工序排序部分
OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行因为有一个种群第i则是赋值在OS的第i行以此生成完整的OS
GJ_list = [i_1 for i_1 in range(self.J_num)] # 生成工件集
random.shuffle(GJ_list) # 随机打乱工件集,为的是下一步可以随机抽出第一个工件
for g in GJ_list: # 选择第一个工件(由于上一步已经打乱工件集,抽出第一个也是“随机”)
h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
for j in range(len(h)): # 从此工件的第一个工序开始
D = h[j] # D为第一个工件的第一个工序对应的时间矩阵
List_Machine_weizhi = []
for k in range(len(D)): # 确定工序可用的机器位于第几个位置
Useing_Machine = D[k]
if Useing_Machine != 9999:
List_Machine_weizhi.append(k)
Machine_Select = []
for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
Min_time = min(Machine_Select) # 选出时间最小的机器
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器并非Mk
I = List_Machine_weizhi[K] # 所有机器里的第I个机器即Mi
Machine_time[I] += Min_time # 相应的机器位置加上最小时间
site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去 即生成MS的染色体
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
return CHS1
random.shuffle(OS_list)
OS = OS_list.copy()
MS = [0] * self.Len_Chromo
Machine_time = np.zeros(self.M_num, dtype=int)
GJ_list = [i_1 for i_1 in range(self.J_num)]
random.shuffle(GJ_list)
for g in GJ_list:
h = self.Matrix[g]
for j in range(len(h)):
D = h[j]
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi]
Min_time = min(Machine_Select)
K = Machine_Select.index(Min_time)
MS[self.Site(g, j)] = K
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]]
CHS[i] = np.hstack((OS, MS)) # 工件排列 + 机器索引(紧凑编码)
return CHS
# 局部初始化
def Local_initial(self):
MS = self.CHS_Matrix(self.LS_num) # 根据LS_num生成局部选择的种群大小
CHS = self.CHS_Matrix(self.LS_num)
OS_list = self.OS_List()
OS = self.CHS_Matrix(self.LS_num)
for i in range(self.LS_num):
random.shuffle(OS_list) # (随机打乱)生成工序排序部分
OS[i] = np.array(OS_list) # 随机打乱后将其赋值给OS的某一行因为有一个种群第i则是赋值在OS的第i行以此生成完整的OS
GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
for g in GJ_List: # 选择第一个工件(注意:不用随机打乱了)
Machine_time = np.zeros(self.M_num,
dtype=int) # 设置一个整型数组 并初始化每一个元素为0由于局部初始化每个工件的所有工序结束后都要重新初始化所以和全局初始化不同此步骤应放在此处
h = self.Matrix[g] # h为第一个工件包含的工序对应的时间矩阵
for j in range(len(h)): # 从选择的工件的第一个工序开始
D = h[j] # 此工件第一个工序对应的机器加工时间矩阵
List_Machine_weizhi = []
for k in range(len(D)): # 确定工序可用的机器位于第几个位置
Useing_Machine = D[k]
if Useing_Machine != 9999:
List_Machine_weizhi.append(k)
Machine_Select = []
for Machine_add in List_Machine_weizhi: # 将机器时间数组对应位置和工序可选机器的时间相加
Machine_Select.append(Machine_time[Machine_add] + D[Machine_add])
Min_time = min(Machine_Select) # 选出这些时间里最小的
K = Machine_Select.index(Min_time) # 第一次出现最小时间的位置,确定最小负荷为哪个机器,即为该工序可选择的机器里的第K个机器并非Mk
I = List_Machine_weizhi[K] # 所有机器里的第I个机器即Mi
Machine_time[I] += Min_time
site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
CHS1 = np.hstack((MS, OS)) # 将MS和OS整合为一个矩阵
return CHS1
random.shuffle(OS_list)
OS = OS_list.copy()
MS = [0] * self.Len_Chromo
# 随机初始化
GJ_List = [i_1 for i_1 in range(self.J_num)]
for g in GJ_List:
Machine_time = np.zeros(self.M_num, dtype=int)
h = self.Matrix[g]
for j in range(len(h)):
D = h[j]
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
Machine_Select = [Machine_time[m] + D[m] for m in List_Machine_weizhi]
Min_time = min(Machine_Select)
K = Machine_Select.index(Min_time)
MS[self.Site(g, j)] = K
Machine_time[List_Machine_weizhi[K]] += D[List_Machine_weizhi[K]]
CHS[i] = np.hstack((OS, MS))
return CHS
# 随机初始化提高比例到50%
def Random_initial(self):
MS = self.CHS_Matrix(self.RS_num) # 根据RS_num生成随机选择的种群大小
CHS = self.CHS_Matrix(self.RS_num)
OS_list = self.OS_List()
OS = self.CHS_Matrix(self.RS_num)
for i in range(self.RS_num):
random.shuffle(OS_list)
OS[i] = np.array(OS_list)
GJ_List = [i_1 for i_1 in range(self.J_num)] # 生成工件集
for g in GJ_List: # 选择第一个工件
OS = OS_list.copy()
MS = [0] * self.Len_Chromo
for g in range(self.J_num):
h = self.Matrix[g]
for j in range(len(h)): # 选择第一个工件的第一个工序
D = h[j] # 此工件第一个工序可加工的机器对应的时间矩阵
List_Machine_weizhi = []
for k in range(len(D)):
Useing_Machine = D[k]
if Useing_Machine != 9999:
List_Machine_weizhi.append(k)
number = random.choice(List_Machine_weizhi) # 从可选择的机器编号中随机选择一个(此编号就是机器编号)
K = List_Machine_weizhi.index(number) # 即为该工序可选择的机器里的第K个机器并非Mk
site = self.Site(g, j) # 定位每个工件的每道工序的位置
MS[i][site] = K # 即将每个工序选择的第K个机器赋值到每个工件的每道工序的位置上去
CHS1 = np.hstack((MS, OS))
return CHS1
for j in range(len(h)):
D = h[j]
List_Machine_weizhi = [k for k, val in enumerate(D) if val != 9999]
# 随机选择可用机器
selected = random.choice(List_Machine_weizhi)
K = List_Machine_weizhi.index(selected)
MS[self.Site(g, j)] = K
CHS[i] = np.hstack((OS, MS))
return CHS

215
GA.py
View File

@ -9,143 +9,156 @@ from Instance import *
class GA():
def __init__(self):
self.Pop_size = 300 # 种群数量
self.Pop_size = 500 # 种群数量
self.Pc = 0.8 # 交叉概率
self.Pm = 0.3 # 变异概率
self.Pm = 0.15 # 降低变异概率到0.15
self.Pv = 0.5 # 选择何种方式进行交叉的概率阈值
self.Pw = 0.95 # 选择何种方式进行变异的概率阈值
self.Max_Itertions = 20 # 最大迭代次数
self.Max_Itertions = 100
# 适应度
def fitness(self, CHS, J, Processing_time, M_num, Len):
Fit = []
for i in range(len(CHS)):
d = Decode(J, Processing_time, M_num) #实例化一个解码器,传入问题参数
d = Decode(J, Processing_time, M_num) # 实例化一个解码器,传入问题参数
Fit.append(d.decode(CHS[i], Len))
return Fit
def pmx_crossover(self, parent1, parent2, length):
if length <= 1:
return parent1.copy(), parent2.copy()
# 确保交叉区间[a, b]有效a < b
a = random.randint(0, length - 2)
b = random.randint(a + 1, length - 1)
child1 = parent1.copy()
child2 = parent2.copy()
# 建立映射表parent1[a:b+1]与parent2[a:b+1]的对应关系)
map1 = {parent1[i]: parent2[i] for i in range(a, b + 1)} # p1→p2的映射
map2 = {parent2[i]: parent1[i] for i in range(a, b + 1)} # p2→p1的映射
# 处理child1交叉区间外的元素替换
for i in range(length):
if i < a or i > b:
val = child1[i]
# 限制循环次数避免无限循环最多循环len(map1)次)
loop_count = 0
max_loop = len(map1)
while val in map1 and map1[val] in parent1[a:b + 1] and loop_count < max_loop:
val = map1[val]
loop_count += 1
child1[i] = map1.get(val, val) # 若超出循环次数直接使用当前val
# 处理child2交叉区间外的元素替换
for i in range(length):
if i < a or i > b:
val = child2[i]
loop_count = 0
max_loop = len(map2)
while val in map2 and map2[val] in parent2[a:b + 1] and loop_count < max_loop:
val = map2[val]
loop_count += 1
child2[i] = map2.get(val, val)
return child1, child2
# 机器部分交叉
def machine_cross(self, CHS1, CHS2, T0):
"""
:param CHS1: 机器选择部分的基因1
:param CHS2: 机器选择部分的基因2
:param CHS1: 基因1
:param CHS2: 基因2
:param T0: 工序总数
:return: 交叉后的机器选择部分的基因
:return: 交叉后的基因
"""
T_r = [j for j in range(T0)]
r = random.randint(1, 10) # 在区间[1,T0]内产生一个整数r
random.shuffle(T_r) #直接打乱T_r原地修改
R = T_r[0:r] # 按照随机数r产生r个互不相等的整数
OS_1 = CHS1[O_num:2 * T0]
OS_2 = CHS2[O_num:2 * T0]
MS_1 = CHS2[0:T0]
MS_2 = CHS1[0:T0]
OS1, MS1 = CHS1[:T0], CHS1[T0:]
OS2, MS2 = CHS2[:T0], CHS2[T0:]
# 随机选择交叉位置
T_r = list(range(T0))
random.shuffle(T_r)
r = random.randint(1, T0 // 2) # 交叉部分长度
R = T_r[:r]
# 交换机器选择部分
for i in R:
K, K_2 = MS_1[i], MS_2[i]
MS_1[i], MS_2[i] = K_2, K
CHS1 = np.hstack((MS_1, OS_1))
CHS2 = np.hstack((MS_2, OS_2))
return CHS1, CHS2
MS1[i], MS2[i] = MS2[i], MS1[i]
# 工序部分交叉
return np.hstack((OS1, MS1)), np.hstack((OS2, MS2))
# 工序部分交叉使用PMX交叉
def operation_cross(self, CHS1, CHS2, T0, J_num):
"""
:param CHS1: 工序选择部分的基因1
:param CHS2: 工序选择部分的基因2
:param T0: 工序总数
:param J_num: 工件总数
:return: 交叉后的工序选择部分的基因
"""
OS_1 = CHS1[T0:2 * T0]
OS_2 = CHS2[T0:2 * T0]
MS_1 = CHS1[0:T0]
MS_2 = CHS2[0:T0]
Job_list = [i for i in range(J_num)]
random.shuffle(Job_list)
r = random.randint(1, J_num - 1)
Set1 = Job_list[0:r]
new_os = list(np.zeros(T0, dtype=int))
for k, v in enumerate(OS_1):
if v in Set1:
new_os[k] = v + 1
for i in OS_2:
if i not in Set1:
Site = new_os.index(0)
new_os[Site] = i + 1
new_os = np.array([j - 1 for j in new_os])
CHS1 = np.hstack((MS_1, new_os))
CHS2 = np.hstack((MS_2, new_os))
OS1 = list(CHS1[T0:2 * T0]) # 确保转换为列表
OS2 = list(CHS2[T0:2 * T0])
MS1 = CHS1[0:T0].copy()
MS2 = CHS2[0:T0].copy()
# 调用修复后的PMX交叉
new_os1, new_os2 = self.pmx_crossover(OS1, OS2, T0)
CHS1 = np.hstack((MS1, new_os1))
CHS2 = np.hstack((MS2, new_os2))
return CHS1, CHS2
# 机器部分变异
# 机器部分变异(仅在可用机器集中选择)
def machine_variation(self, CHS, O, T0, J):
"""
:param CHS: 机器选择部分的基因
:param CHS: 基因
:param O: 加工时间矩阵
:param T0: 工序总数
:param J: 各工件加工信息
:return: 变异后的机器选择部分的基因
:return: 变异后的基因
"""
Tr = [i_num for i_num in range(T0)]
MS = CHS[0:T0]
OS = CHS[T0:2 * T0]
# 机器选择部分
r = random.randint(1, T0 - 1) # 在变异染色体中选择r个位置
random.shuffle(Tr)
T_r = Tr[0:r]
for num in T_r:
T_0 = [j for j in range(T0)]
K = []
Site = 0
OS = CHS[:T0]
MS = CHS[T0:]
# 确定变异位置数量
r = random.randint(1, max(1, T0 // 10)) # 最多变异10%的位置
positions = random.sample(range(T0), r)
for pos in positions:
# 找到该位置对应的工件和工序
site = 0
job = -1
op = -1
for k, v in J.items():
K.append(T_0[Site:Site + v])
Site += v
for i in range(len(K)):
if num in K[i]:
O_i = i
O_j = K[i].index(num)
if site + v > pos:
job = k - 1 # 转换为0索引
op = pos - site
break
Machine_using = O[O_i][O_j]
Machine_time = []
for j in Machine_using:
if j != 9999:
Machine_time.append(j)
Min_index = Machine_time.index(min(Machine_time))
MS[num] = Min_index
CHS = np.hstack((MS, OS))
return CHS
site += v
# 获取该工序的可用机器
D = O[job][op]
available_machines = [k for k, val in enumerate(D) if val != 9999]
if len(available_machines) <= 1:
continue # 只有一个可用机器时不变异
# 随机选择一个不同的可用机器
current_idx = MS[pos]
current_machine = available_machines[current_idx]
new_machine = random.choice([m for m in available_machines if m != current_machine])
MS[pos] = available_machines.index(new_machine)
return np.hstack((OS, MS))
# 工序部分变异
def operation_variation(self, CHS, T0, J_num, J, O, M_num):
"""
:param CHS: 工序选择部分的基因
:param CHS: 基因
:param T0: 工序总数
:param J_num: 工件总数
:param J: 各工件加工信息
:param O: 加工时间矩阵
:param M_num: 机器总数
:return: 变异后的工序选择部分的基因
:return: 变异后的基因
"""
MS = CHS[0:T0]
OS = list(CHS[T0:2 * T0])
r = random.randint(1, J_num - 1)
Tr = [i for i in range(J_num)]
random.shuffle(Tr)
Tr = Tr[0:r]
J_os = dict(enumerate(OS)) # 随机选择r个不同的基因
J_os = sorted(J_os.items(), key=lambda d: d[1])
Site = []
for i in range(r):
Site.append(OS.index(Tr[i]))
A = list(itertools.permutations(Tr, r))
A_CHS = []
for i in range(len(A)):
for j in range(len(A[i])):
OS[Site[j]] = A[i][j]
C_I = np.hstack((MS, OS))
A_CHS.append(C_I)
Fit = []
for i in range(len(A_CHS)):
d = Decode(J, O, M_num)
Fit.append(d.decode(CHS, T0))
return A_CHS[Fit.index(min(Fit))]
OS = list(CHS[:T0])
MS = CHS[T0:]
# 随机选择两个位置交换
i, j = random.sample(range(T0), 2)
OS[i], OS[j] = OS[j], OS[i]
return np.hstack((OS, MS))

111
NSGA2.py Normal file
View File

@ -0,0 +1,111 @@
import random
import numpy as np
class NSGA2:
def __init__(self, pop_size, obj_num):
self.pop_size = pop_size
self.obj_num = obj_num
def fast_non_dominated_sort(self, pop_obj):
"""快速非支配排序"""
pop_size = len(pop_obj)
dominated = [[] for _ in range(pop_size)] # 被支配个体列表
rank = [0] * pop_size # 个体的非支配等级
n = [0] * pop_size # 支配该个体的个体数量
# 计算每个个体的支配关系
for p in range(pop_size):
for q in range(pop_size):
if p != q:
# p支配q
if all(pop_obj[p][i] <= pop_obj[q][i] for i in range(self.obj_num)) and \
any(pop_obj[p][i] < pop_obj[q][i] for i in range(self.obj_num)):
dominated[p].append(q)
# q支配p
elif all(pop_obj[q][i] <= pop_obj[p][i] for i in range(self.obj_num)) and \
any(pop_obj[q][i] < pop_obj[p][i] for i in range(self.obj_num)):
n[p] += 1
# 找到等级为0的个体
if n[p] == 0:
rank[p] = 0
# 计算其他等级
current_rank = 0
while True:
next_rank = []
for p in range(pop_size):
if rank[p] == current_rank:
for q in dominated[p]:
n[q] -= 1
if n[q] == 0 and rank[q] == 0:
rank[q] = current_rank + 1
next_rank.append(q)
if not next_rank:
break
current_rank += 1
return rank
def crowding_distance(self, pop_obj, rank):
"""计算拥挤度距离"""
pop_size = len(pop_obj)
distance = [0.0] * pop_size
max_rank = max(rank) if rank else 0
# 对每个等级的个体计算拥挤度
for r in range(max_rank + 1):
# 获取当前等级的个体索引
current_indices = [i for i in range(pop_size) if rank[i] == r]
if len(current_indices) <= 1:
continue
# 对每个目标函数进行排序
for m in range(self.obj_num):
# 按目标m的值排序
sorted_indices = sorted(current_indices, key=lambda x: pop_obj[x][m])
min_val = pop_obj[sorted_indices[0]][m]
max_val = pop_obj[sorted_indices[-1]][m]
# 边界个体的拥挤度设为无穷大
distance[sorted_indices[0]] = float('inf')
distance[sorted_indices[-1]] = float('inf')
# 计算中间个体的拥挤度
for i in range(1, len(sorted_indices) - 1):
if max_val - min_val == 0:
continue
distance[sorted_indices[i]] += (pop_obj[sorted_indices[i + 1]][m] - pop_obj[sorted_indices[i - 1]][m]) / (max_val - min_val)
return distance
def selection(self, pop, pop_obj):
"""选择操作基于非支配排序和拥挤度的锦标赛选择惩罚拥挤度为0的个体"""
pop_size = len(pop)
rank = self.fast_non_dominated_sort(pop_obj)
distance = self.crowding_distance(pop_obj, rank)
selected = []
for _ in range(pop_size):
# 随机选择两个个体进行锦标赛
i = random.randint(0, pop_size - 1)
j = random.randint(0, pop_size - 1)
# 优先选择等级低(更优)的个体
if rank[i] < rank[j]:
selected.append(pop[i])
elif rank[i] > rank[j]:
selected.append(pop[j])
# 等级相同则选择拥挤度大的个体惩罚拥挤度为0的重复解
else:
# 对拥挤度为0的个体进行惩罚
if distance[i] == 0 and distance[j] > 0:
selected.append(pop[j])
elif distance[j] == 0 and distance[i] > 0:
selected.append(pop[i])
elif distance[i] >= distance[j]:
selected.append(pop[i])
else:
selected.append(pop[j])
return selected

177
main.py
View File

@ -1,12 +1,11 @@
import random
import matplotlib.pyplot as plt
import numpy as np
from Decode import Decode
from Encode import Encode
from GA import GA
from Instance import *
from NSGA2 import NSGA2
# 绘制甘特图
def Gantt(Machines):
@ -29,67 +28,157 @@ def Gantt(Machines):
plt.savefig('优化后排程方案的甘特图.png')
plt.show()
if __name__ == '__main__':
Optimal_fit = 9999 # 最佳适应度(初始化)
Optimal_CHS = 0 # 最佳适应度对应的基因个体(初始化)
# 初始化参数
g = GA()
e = Encode(Processing_time, g.Pop_size, J, J_num, M_num)
CHS1 = e.Global_initial()
CHS2 = e.Random_initial()
CHS3 = e.Local_initial()
C = np.vstack((CHS1, CHS2, CHS3))
Best_fit = [] # 记录适应度在迭代过程中的变化,便于绘图
# 双目标优化相关
nsga2 = NSGA2(g.Pop_size, 2) # 2个优化目标
Optimal_solutions = [] # 存储非支配解
Optimal_fit_values = [] # 存储非支配解的目标值
# 新增:用于存储所有代的所有解
all_solutions = [] # 存储所有个体(染色体)
all_fitnesses = [] # 存储所有个体的目标值
# 早停策略参数
early_stop_counter = 0
max_stagnation = 50 # 连续50代无更新则停止
best_fitness_history = []
for i in range(g.Max_Itertions):
print("iter_{} start!".format(i))
print(f"iter_{i} start!")
Fit = g.fitness(C, J, Processing_time, M_num, O_num)
Best = C[Fit.index(min(Fit))]
best_fitness = min(Fit)
if best_fitness < Optimal_fit:
Optimal_fit = best_fitness
Optimal_CHS = Best
print('iter_{}: new best_fitness = {}'.format(i, best_fitness))
Best_fit.append(Optimal_fit) # 始终记录当前最优适应度
for j in range(len(C)):
Cafter = []
if random.random() < g.Pc:
N_i = random.choice(np.arange(len(C)))
if random.random() < g.Pv:
Cross = g.machine_cross(C[j], C[N_i], O_num)
# 记录当前代的所有解
all_solutions.extend(C)
all_fitnesses.extend(Fit)
# 非支配排序
rank = nsga2.fast_non_dominated_sort(Fit)
current_non_dominated = [C[j] for j in range(len(C)) if rank[j] == 0]
current_non_dominated_fit = [Fit[j] for j in range(len(C)) if rank[j] == 0]
# 更新全局非支配解
new_non_dominated = False
for sol, fit in zip(current_non_dominated, current_non_dominated_fit):
is_dominated = False
for existing_fit in Optimal_fit_values:
if all(existing_fit[d] <= fit[d] for d in range(2)) and any(existing_fit[d] < fit[d] for d in range(2)):
is_dominated = True
break
if not is_dominated:
Optimal_solutions.append(sol)
Optimal_fit_values.append(fit)
new_non_dominated = True
# 对全局解重新筛选非支配解
if Optimal_solutions:
rank_all = nsga2.fast_non_dominated_sort(Optimal_fit_values)
# 保留等级为0的解
Optimal_solutions = [Optimal_solutions[j] for j in range(len(Optimal_solutions)) if rank_all[j] == 0]
Optimal_fit_values = [Optimal_fit_values[j] for j in range(len(Optimal_fit_values)) if rank_all[j] == 0]
# 控制解的数量,过多时保留拥挤度高的解
if len(Optimal_solutions) > g.Pop_size:
distance = nsga2.crowding_distance(Optimal_fit_values, rank_all)
# 按拥挤度排序并保留前Pop_size个解
sorted_indices = sorted(range(len(distance)), key=lambda k: distance[k], reverse=True)
Optimal_solutions = [Optimal_solutions[j] for j in sorted_indices[:g.Pop_size]]
Optimal_fit_values = [Optimal_fit_values[j] for j in sorted_indices[:g.Pop_size]]
# 早停策略检查
if new_non_dominated:
early_stop_counter = 0
else:
Cross = g.operation_cross(C[j], C[N_i], O_num, J_num)
Cafter.append(Cross[0])
Cafter.append(Cross[1])
Cafter.append(C[j])
early_stop_counter += 1
if early_stop_counter >= max_stagnation:
print(f"早停于第{i}代,连续{max_stagnation}代无新的非支配解")
break
# 定期引入随机解每20代
if i % 20 == 0 and i > 0:
random_solutions = e.Random_initial()[:int(g.Pop_size * 0.1)] # 引入10%的随机解
C = np.vstack((C[:-len(random_solutions)], random_solutions))
# 选择操作基于NSGA-II
selected = nsga2.selection(C, Fit)
# 交叉变异操作
new_pop = []
for j in range(len(selected)):
if random.random() < g.Pc:
# 选择另一个个体进行交叉
mate_idx = random.randint(0, len(selected) - 1)
if random.random() < g.Pv:
offspring1, offspring2 = g.machine_cross(selected[j], selected[mate_idx], O_num)
else:
offspring1, offspring2 = g.operation_cross(selected[j], selected[mate_idx], O_num, J_num)
new_pop.append(offspring1)
new_pop.append(offspring2)
else:
new_pop.append(selected[j])
# 变异操作
if random.random() < g.Pm:
if random.random() < g.Pw:
Variance = g.machine_variation(C[j], Processing_time, O_num, J)
mutated = g.machine_variation(selected[j], Processing_time, O_num, J)
else:
Variance = g.operation_variation(C[j], O_num, J_num, J, Processing_time, M_num)
Cafter.append(Variance)
if Cafter != []:
Fit = g.fitness(Cafter, J, Processing_time, M_num, O_num)
C[j] = Cafter[Fit.index(min(Fit))]
mutated = g.operation_variation(selected[j], O_num, J_num, J, Processing_time, M_num)
new_pop.append(mutated)
# 迭代结束后输出最终结果
# 保持种群规模
if len(new_pop) > g.Pop_size:
# 对新种群进行筛选
new_fit = g.fitness(new_pop, J, Processing_time, M_num, O_num)
new_rank = nsga2.fast_non_dominated_sort(new_fit)
new_distance = nsga2.crowding_distance(new_fit, new_rank)
# 按等级和拥挤度排序
sorted_indices = sorted(range(len(new_pop)), key=lambda k: (new_rank[k], -new_distance[k]))
C = [new_pop[j] for j in sorted_indices[:g.Pop_size]]
else:
C = new_pop
# 输出结果
print("\n=== 优化结果 ===")
print("最优适应度 (最少完成时间):", Optimal_fit)
print(f"非支配解数量: {len(Optimal_solutions)}")
print("非支配解目标值 (最大完工时间, 负载标准差):")
for fit in Optimal_fit_values:
print(f"({fit[0]}, {fit[1]:.2f})")
# 解码最优解并绘制甘特图
# 选择一个折中解绘制甘特图例如Cmax最小的解
if Optimal_solutions:
# 找到Cmax最小的解
cmax_values = [fit[0] for fit in Optimal_fit_values]
best_idx = cmax_values.index(min(cmax_values))
Optimal_CHS = Optimal_solutions[best_idx]
# 解码并绘图
d = Decode(J, Processing_time, M_num)
final_fitness = d.decode(Optimal_CHS, O_num)
print("解码验证适应度:", final_fitness)
# 绘制最优甘特图
print(f"\n选中解的目标值: (最大完工时间: {final_fitness[0]}, 负载标准差: {final_fitness[1]:.2f})")
Gantt(d.Machines)
# 绘制收敛曲线
x = np.linspace(0, g.Max_Itertions, g.Max_Itertions)
plt.figure()
plt.plot(x, Best_fit, '-k')
plt.title('The maximum completion time of each iteration')
plt.ylabel('Cmax')
plt.xlabel('Iteration')
# 绘制帕累托前沿(非支配解)
if Optimal_fit_values:
plt.figure(figsize=(10, 6))
cmax_nd = [fit[0] for fit in Optimal_fit_values]
load_std_nd = [fit[1] for fit in Optimal_fit_values]
plt.scatter(cmax_nd, load_std_nd, color='red', label='Non-dominated solutions', zorder=5)
# 绘制所有解的点图(所有代的所有个体)
if all_fitnesses:
cmax_all = [fit[0] for fit in all_fitnesses]
load_std_all = [fit[1] for fit in all_fitnesses]
plt.scatter(cmax_all, load_std_all, color='blue', alpha=0.5, label='All solutions', zorder=1)
plt.title('All Solutions and Pareto Front')
plt.xlabel('Max Completion Time (Cmax)')
plt.ylabel('Load Standard Deviation')
plt.legend()
plt.grid(True)
plt.savefig('最大完成时间的优化过程.png')
plt.savefig('all_solutions_pareto.png')
plt.show()

Binary file not shown.

Before

Width:  |  Height:  |  Size: 19 KiB

After

Width:  |  Height:  |  Size: 19 KiB