-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMOGOMEA.py
372 lines (327 loc) · 17.7 KB
/
MOGOMEA.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
from solution import Solution
from cluster import Cluster
import utilities as util
import random
import copy
import math
class MOGOMEA:
def __init__(self, populationSize, amountOfClusters, problem, maxEvaluations):
self.populationSize = populationSize
self.amountOfClusters = amountOfClusters
self.problem = problem
self.evaluations = 0 # Number of evaluations
self.maxEvaluations = maxEvaluations
self.currentGeneration = 0 # Generation number
self.noImprovementStretch = 0 # No-improvement stretch
self.population = []
self.elitistArchive = [[]] # TODO: POSSIBLE CHANGE = REDUCE MEMORY BY ONLY STORING TWO ARCHIVES (t - 1 AND t)
self.clusters = []
self.extremeClusters = {}
def evolve(self):
"""Runs the algorithm until an optimum is found or until the maximum amount of evaluations is reached."""
for _ in range(self.populationSize):
solution = self.createRandomSolution()
self.population.append(solution)
self.evaluateFitness(solution)
self.updateElitistArchive(self.elitistArchive[self.currentGeneration], solution)
while self.evaluations < self.maxEvaluations and self.noImprovementStretch < 1 + 2 * math.floor(math.log10(self.populationSize)):
print(str(self.evaluations / self.maxEvaluations * 100) + "%")
self.currentGeneration += 1
self.elitistArchive.append(copy.deepcopy(self.elitistArchive[self.currentGeneration - 1]))
self.clusterPopulation()
self.determineExtremeClusters()
for cluster in self.clusters:
selection = self.tournamentSelection(cluster)
cluster.learnLinkageModel(selection)
offspring = []
for solution in self.population:
cluster = self.determineCluster(solution)
if cluster in self.extremeClusters:
objective = random.choice(self.extremeClusters[cluster])
offspring.append(self.singleObjectiveOptimalMixing(objective, solution, cluster, self.elitistArchive[self.currentGeneration]))
else:
offspring.append(self.multiObjectiveOptimalMixing(solution, cluster, self.elitistArchive[self.currentGeneration]))
self.population = offspring
if self.evaluateFitnessElitistArchive(self.elitistArchive[self.currentGeneration]) == self.evaluateFitnessElitistArchive(self.elitistArchive[self.currentGeneration - 1]):
self.noImprovementStretch += 1
else:
self.noImprovementStretch = 0
def createRandomSolution(self):
"""Creates a random solution."""
# TODO: HOW DO YOU CREATE A RANDOM SOLUTION?
randomSolution = Solution(util.randomGenotype(self.problem.problemSize))
while not self.problem.isValidSolution(randomSolution):
randomSolution = Solution(util.randomGenotype(self.problem.problemSize))
return randomSolution
def evaluateFitness(self, solution):
"""Evaluates the fitness of the given solution."""
self.evaluations += 1
self.problem.evaluateFitness(solution)
def clusterPopulation(self):
"""Clusters the given population into k clusters using balanced k-leader-means clustering."""
# TODO: POSSIBLE CHANGE = CALCULATE OPTIMAL k VALUE
# The first leader is the solution with maximum value in an arbitrary objective.
leaders = [self.population[0]]
for solution in self.population:
if solution.fitness[0] > leaders[0].fitness[0]:
leaders[0] = solution
# The solution with the largest nearest-leader distance is chosen as the next leader,
# repeated k - 1 times to obtain k leaders.
for _ in range(self.amountOfClusters - 1):
nearestLeaderDistance = {}
for solution in self.population:
if solution not in leaders:
nearestLeaderDistance[solution] = util.euclidianDistance(solution.fitness, leaders[0].fitness)
for leader in leaders:
leaderDistance = util.euclidianDistance(solution.fitness, leader.fitness)
if leaderDistance < nearestLeaderDistance[solution]:
nearestLeaderDistance[solution] = leaderDistance
leader = max(nearestLeaderDistance, key=nearestLeaderDistance.get)
leaders.append(leader)
# k-means clustering is performed with k leaders as the initial cluster means.
clusters = []
for leader in leaders:
mean = leader.fitness
cluster = Cluster(mean, self.problem)
clusters.append(cluster)
# Perform k-means clustering until all clusters are unchanged.
while True in [cluster.changed for cluster in clusters]:
for solution in self.population:
nearestCluster = clusters[0]
nearestClusterDistance = util.euclidianDistance(solution.fitness, nearestCluster.mean)
for cluster in clusters:
clusterDistance = util.euclidianDistance(solution.fitness, cluster.mean)
if clusterDistance < nearestClusterDistance:
nearestCluster = cluster
nearestClusterDistance = clusterDistance
nearestCluster.append(solution)
for cluster in clusters:
cluster.computeMean()
cluster.clear()
# Expand the clusters with the closest c solutions.
c = int(2 / self.amountOfClusters * self.populationSize)
for cluster in clusters:
distance = {}
for solution in self.population:
distance[solution] = util.euclidianDistance(solution.fitness, cluster.mean)
for _ in range(c):
if len(distance) > 0:
solution = min(distance, key=distance.get)
del distance[solution]
cluster.append(solution)
self.clusters = clusters
def tournamentSelection(self, cluster):
"""Performs tournament selection in the given cluster."""
selection = []
for _ in range(len(cluster.population)):
one = random.choice(cluster.population)
other = random.choice(cluster.population)
if one.dominates(other):
selection.append(one)
else:
selection.append(other)
return selection
def determineCluster(self, solution):
"""Determines the cluster index of the given solution from the list of clusters."""
# Determine the clusters that are assigned to the solution.
assignedClusters = []
for cluster in self.clusters:
if solution in cluster.population:
assignedClusters.append(cluster)
if len(assignedClusters) > 0:
# One or more clusters are assigned to the solution, return a random one.
return random.choice(self.clusters)
else:
# No clusters are assigned to the solution, return the nearest one.
nearestCluster = self.clusters[0]
nearestClusterDistance = util.euclidianDistance(solution.fitness, nearestCluster.mean)
for cluster in self.clusters:
clusterDistance = util.euclidianDistance(solution.fitness, cluster.mean)
if clusterDistance < nearestClusterDistance:
nearestCluster = cluster
nearestClusterDistance = clusterDistance
return nearestCluster
def determineExtremeClusters(self):
"""Determines which clusters are extreme clusters."""
extremeClusters = {}
for one in self.clusters:
for objective in range(self.problem.numberOfObjectives):
isExtreme = True
for other in self.clusters:
if other != one:
if other.mean[objective] > one.mean[objective]:
isExtreme = False
if isExtreme:
if one in extremeClusters:
extremeClusters[one].append(objective)
else:
extremeClusters[one] = [objective]
self.extremeClusters = extremeClusters
def multiObjectiveOptimalMixing(self, parent, cluster, elitistArchive):
"""Generates an offspring solution using multi-objective optimal mixing."""
# Clone the parent solution and create a backup solution.
offspring = copy.deepcopy(parent)
backup = copy.deepcopy(parent)
changed = False
# Traverse the cluster's linkage groups and choose a random donor and apply its genotype to the offspring if
# the mixing results in an improved, equally good or side-stepped solution.
for linkageGroup in cluster.linkageModel:
donor = random.choice(cluster.population)
unchanged = True
for index in linkageGroup:
offspring.genotype[index] = donor.genotype[index]
if offspring.genotype[index] != backup.genotype[index]:
unchanged = False
if not unchanged:
self.evaluateFitness(offspring)
if offspring.dominates(backup) or offspring.fitness == backup.fitness or not self.dominatedByElitistArchive(elitistArchive, offspring):
for index in linkageGroup:
backup.genotype[index] = offspring.genotype[index]
backup.fitness = copy.deepcopy(offspring.fitness)
changed = True
else:
for index in linkageGroup:
offspring.genotype[index] = backup.genotype[index]
offspring.fitness = copy.deepcopy(backup.fitness)
# If the previous mixing step did not change the offspring, repeat the same step, but now pick a random elitist
# from the elitist archive as a donor. Apply the donor's genotype to the offspring if the mixing results in a
# direct domination or a pareto front improvement.
if not changed or self.noImprovementStretch > 1 + math.floor(math.log10(self.populationSize)):
changed = False
for linkageGroup in cluster.linkageModel:
donor = random.choice(elitistArchive)
unchanged = True
for index in linkageGroup:
offspring.genotype[index] = donor.genotype[index]
if offspring.genotype[index] != backup.genotype[index]:
unchanged = False
if not unchanged:
self.evaluateFitness(offspring)
if offspring.dominates(backup) or (not self.dominatedByElitistArchive(elitistArchive, offspring) and not self.fitnessContainedInElitistArchive(elitistArchive, offspring)):
for index in linkageGroup:
backup.genotype[index] = offspring.genotype[index]
backup.fitness = copy.deepcopy(offspring.fitness)
changed = True
else:
for index in linkageGroup:
offspring.genotype[index] = backup.genotype[index]
offspring.fitness = copy.deepcopy(backup.fitness)
if changed: break
# If both previous mixing steps still did not change the offspring, pick a random elitist from the elitist
# archive as a donor and apply the full donor's genotype to the offspring.
if not changed:
donor = random.choice(elitistArchive)
offspring.genotype = copy.deepcopy(donor.genotype)
offspring.fitness = copy.deepcopy(donor.fitness)
self.updateElitistArchive(elitistArchive, offspring)
return offspring
def singleObjectiveOptimalMixing(self, objective, parent, cluster, elitistArchive):
"""Generates an offspring solution using single-objective optimal mixing with the given objective."""
# Clone the parent solution and create a backup solution.
best = copy.deepcopy(parent)
offspring = copy.deepcopy(parent)
backup = copy.deepcopy(parent)
changed = False
for linkageGroup in cluster.linkageModel:
donor = random.choice(cluster.population)
unchanged = True
for index in linkageGroup:
offspring.genotype[index] = donor.genotype[index]
if offspring.genotype[index] != backup.genotype[index]:
unchanged = False
if not unchanged:
self.evaluateFitness(offspring)
if offspring.fitness[objective] >= backup.fitness[objective]:
for index in linkageGroup:
backup.genotype[index] = offspring.genotype[index]
backup.fitness = copy.deepcopy(offspring.fitness)
changed = True
else:
for index in linkageGroup:
offspring.genotype[index] = backup.genotype[index]
offspring.fitness = copy.deepcopy(backup.fitness)
if donor.fitness[objective] > best.fitness[objective]:
best = donor
if not changed or self.noImprovementStretch > 1 + math.floor(math.log10(self.populationSize)):
changed = False
for linkageGroup in cluster.linkageModel:
donor = best
unchanged = True
for index in linkageGroup:
offspring.genotype[index] = donor.genotype[index]
if offspring.genotype[index] != backup.genotype[index]:
unchanged = False
if not unchanged:
self.evaluateFitness(offspring)
if offspring.fitness[objective] >= backup.fitness[objective]:
for index in linkageGroup:
backup.genotype[index] = offspring.genotype[index]
backup.fitness = copy.deepcopy(offspring.fitness)
changed = True
else:
for index in linkageGroup:
offspring.genotype[index] = backup.genotype[index]
offspring.fitness = copy.deepcopy(backup.fitness)
if changed: break
if not changed:
donor = best
offspring.genotype = copy.deepcopy(donor.genotype)
offspring.fitness = copy.deepcopy(donor.fitness)
self.updateElitistArchive(elitistArchive, offspring)
return offspring
def updateElitistArchive(self, elitistArchive, solution):
"""Updates the given elitist archive using the given solution."""
# Discard the solution if it is already in the elitist archive.
for elitist in elitistArchive:
if elitist.genotype == solution.genotype:
return
# Replace elitist by solution if the solution is further away from the nearest archive neighbor of the
# elitist, based on the Hamming distance.
# TODO: POSSIBLE CHANGE = CHOOSE A DIFFERENT METRIC TO ENSURE DIVERSITY IN THE ARCHIVE
for i, elitist in enumerate(elitistArchive):
if elitist.fitness == solution.fitness:
if i == 0:
nearestElitist = elitistArchive[i + 1]
else:
nearestElitist = elitistArchive[i - 1]
nearestNeighborDistance = util.hammingDistance(elitist.genotype, nearestElitist.genotype)
for otherElitist in elitistArchive:
if elitist != otherElitist:
distance = util.euclidianDistance(elitist.genotype, otherElitist.genotype)
if distance < nearestNeighborDistance:
nearestElitist = otherElitist
nearestNeighborDistance = distance
solutionDistance = util.hammingDistance(solution.genotype, nearestElitist.genotype)
if solutionDistance < nearestNeighborDistance:
elitistArchive[i] = solution
return
# Determine if the solution is dominated by elitists, and determine which elitists are dominated by
# the solution.
dominatedElitists = []
for elitist in elitistArchive:
if elitist.dominates(solution):
return
if solution.dominates(elitist):
dominatedElitists.append(elitist)
# Remove elitists that are dominated by the solution.
elitistArchive.append(solution)
for dominatedElitist in dominatedElitists:
elitistArchive.remove(dominatedElitist)
def dominatedByElitistArchive(self, elitistArchive, solution):
"""Determines whether the given solution is dominated by any solution in the given elitist archive."""
for elitist in elitistArchive:
if elitist.dominates(solution):
return True
return False
def fitnessContainedInElitistArchive(self, elitistArchive, solution):
"""Determines whether a solution in the elitists archive has the same fitness as the given solution."""
for elitist in elitistArchive:
if elitist.fitness == solution.fitness:
return True
return False
def evaluateFitnessElitistArchive(self, elitistArchive):
"""Evaluates the fitness of the given elitist archive."""
fitness = [0, 0]
for solution in elitistArchive:
fitness = [x + y for x, y in zip(fitness, solution.fitness)]
return fitness