-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathART2.py
512 lines (415 loc) · 15.8 KB
/
ART2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
"""
# ART2
Adaptive Resonance Theory Neural Networks
by Aman Ahuja | github.com/amanahuja | twitter: @amanqa
## Notes
1. ART that accommodates patterns with continuous valued components. Since
continuous input patterns may be arbitrarily close together, ART2
introduces normalization and noise suppression.
2. ART2 treats small components as noise and does not distinguish between
patterns that are merely scaled versions of each other.
## F1 layer
F1 layers consists of (six) types of units :
- W, X, U, V, P, and Q
- There are n units of each type: ($6n$ units)
- There are 3 supplemental units, one each between
W and X units, P and Q units, and V and U units. Designated
WN, VN, and PN, respectively.
- Thus, there are 6n + 3 units in the F1 layer.
Unit roles:
- $U$ are analagous to input phase of ART1's input layer F1 [(F0? -AA)]
- Units X and Q inhibit any vector components that fall below
the user-seleted parameter theta, for noise suppression.
## F2 layer
The F2 competition layer forms the network output. F2 nodes are called cluster
units.
- F2 nodes are designated Y, same as in ART1
- Cluster Unit compete to learn each input pattern. Candidacy and selection
are similar to ART1.
- As ART1, learning occurs IFF top-down weight vector for the candidate
unit J is sufficiently similar to input vector, as determined by
the vigilance parameter rho.
- The reset test for ART2 is different from ART1
- Activation of the winning/candidate F2 unit is designated D
## Resonance Learning
Learning consists of the adjustment of weights between F1 and F2 units.
- Unlike the ART1 case, which deals with only binary values, the differential
equations for learning do not simplify. Instead the weights for ART1
reach equilibrium through iteration.
- In "fast learning" mode, a single input pattern is used to adjust weights
until they achieve stability.
Where:
- n is len(input pattern)
"""
import numpy as np
from numpy.linalg import norm
# Logging conf
import logging
import sys
class ART2(object):
def __init__(self, n=5, m=3, rho=0.9, theta=None):
"""
Create ART2 network with specified shape
For Input array I of size n, we need n input nodes in F1.
Parameters:
-----------
n : int
feature dimension of input; number of nodes in F1
m : int
Number of neurons in F2 competition layer
max number of categories
compare to n_class
rho : float
Vigilance parameter
larger rho: less inclusive prototypes
smaller rho: more generalization
theta :
Suppression paramater
L : float
Learning parameter: # TODO
internal parameters
----------
Bij: array of shape (m x n)
Feed-Forward weights
Tji: array of shape (n x m)
Feed-back weights
"""
self.input_size = n
self.output_size = m
"""init layers
F0 --> F1 --> F2
S --> X --> Y
"""
# F2
self.yj = np.zeros(self.output_size)
self.active_cluster_units = []
# F1
self.xi = np.zeros(self.input_size)
# F0
self.si = np.zeros(self.input_size)
"""init parameters"""
self.params = {}
# a,b fixed weights in F1 layer; should not be zero
self.params['a'] = 10
self.params['b'] = 10
# c fixed weight used in testing for reset
self.params['c'] = 0.1
# d activation of winning F2 unit
self.params['d'] = 0.9
# c*d / (1-d) must be less than or equal to one
# as ratio --> 1 for greater vigilance
self.params['e'] = 0.00001
# small param to prevent division by zero
# self.L = 2
# rho : vigilance parameter
self.rho = rho
# theta: noise suppression parameter
# e.g. theta = 1 / sqrt(n)
if theta is None:
self.theta = 1 / np.sqrt(self.input_size)
else:
self.theta = theta
# alpha: learning rate. Small value : slower learning,
# but also more likely to reach equilibrium in slow
# learning mode
self.alpha = 0.6
"""init weights"""
# Bij initially (7.0, 7.0) for each cluster unit
self.Bij = np.ones((n, m)) * 5.0
# Tji initially 0
self.Tji = np.zeros((m, n))
"""init other activations"""
self.ui = np.zeros(self.input_size)
self.vi = None
"""Other helpers"""
self.log = None
def compute(self, all_data):
"""Process and learn from all data
Step 1
fast learning: repeat this step until placement of
patterns on cluster units does not change
from one epoch to the next
"""
for iepoch in range(self.n_epochs):
self._training_epoch(all_data)
# test stopping condition for n_epochs
return True
def learning_trial(self, idata):
"""
Step 3-11
idata is a single row of input
A learning trial consists of one presentation of one input pattern.
V and P will reach equilibrium after two updates of F1
"""
self.log.info("Starting Learning Trial.")
self.log.debug("input pattern: {}".format(idata))
self.log.debug("theta: {}".format(self.theta))
# at beginning of learning trial, set all activations to zero
self._zero_activations()
self.si = idata
# TODO: Should this be here?
# Update F1 activations, no candidate cluster unit
self._update_F1_activation()
# Update F1 activations again
self._update_F1_activation()
"""
After F1 activations achieve equilibrium
TMP: Assume only two F1 updates needed for now
Then proceed feed-forward to F2
"""
# TODO: instead check if ui or pi will change significantly
# now P units send signals to F2 layer
self.yj = np.dot(self.Bij.T, self.pi)
J = self._select_candidate_cluster_unit()
"""step 8 (resonance)
reset cannot occur during resonance
new winning unit (J) cannot be chosen during resonance
"""
if len(self.active_cluster_units) == 0:
self._update_weights_first_pattern(J)
else:
self._resonance_learning(J)
# add J to active list
if J not in self.active_cluster_units:
self.active_cluster_units.append(J)
return True
def _training_epoch(self, all_data):
# initialize parameters and weights
pass # done in __init__
for idata in all_data:
self.si = idata # input vector F0
self.learning_trial()
return True
def _select_candidate_cluster_unit(self):
""" RESET LOOP
This loop selects an appropriate candidate cluster unit for learninig
- Each iteration selects a candidate unit.
- Iterations continue until reset condition is met (reset is False)
- if a candidate unit does not satisfy, it is inhibited and can not be
selected again in this presentation of the input pattern.
No learning occurs in this phase.
returns:
J, the index of the selected cluster unit
"""
self.reset = True
while self.reset:
self.log.info("candidate selection loop iter start")
# check reset
# Select best active candidate
# ... largest element of Y that is not inhibited
J = np.argmax(self.yj) # J current candidate, not same as index jj
self.log.debug("\tyj: {}".format(self.yj))
self.log.debug("\tpicking J = {}".format(J))
# Test stopping condition here
# (check reset)
e = self.params['e']
# confirm candidate: inhibit or proceed
if (self.vi == 0).all():
self.ui = np.zeros(self.input_size)
else:
self.ui = self.vi / (e + norm(self.vi))
# pi =
# calculate ri (reset node)
c = self.params['c']
term1 = norm(self.ui + c*self.ui)
term2 = norm(self.ui) + c*norm(self.ui)
self.ri = term1 / term2
if self.ri >= (self.rho - e):
self.log.info("\tReset is False: Candidate is good.")
# Reset condition satisfied: cluster unit may learn
self.reset = False
# finish updating F1 activations
self._update_F1_activation()
# TODO: this will update ui twice. Confirm ok
elif self.ri < (self.rho - e):
self.reset = True
self.log.info("\treset is True")
self.yj[J] = -1.0
# break inf loop manually
# self.log.warn("EXIT RESET LOOP MANUALLY")
# self.reset = False
return J
def _resonance_learning(self, J, n_iter=20):
"""
Learn on confirmed candidate
In slow learning, only one update of weights in this trial
n_learning_iterations = 1
we then present the next input pattern
In fast learning, present input again (same learning trial)
- until weights reach equilibrium for this trial
- presentation is: "weight-update-F1-update"
"""
self.log.info("Entering Resonance phase with J = {}".format(J))
for ilearn in range(n_iter):
self.log.info("learning iter start")
self._update_weights(J)
# in slow learning, this step not required?
D = np.ones(self.output_size)
self._update_F1_activation(J, D)
# test stopping condition for weight updates
# if change in weights was below some tolerance
return True
def _update_weights_first_pattern(self, J):
"""Equilibrium weights for the first pattern presented
converge to these values. This shortcut can save many
iterations.
"""
self.log.info("Weight update using first-pattern shortcut")
# Fast learning first pattern simplification
d = self.params['d']
self.Tji[J, :] = self.ui / (1 - d)
self.Bij[:, J] = self.ui / (1 - d)
# log
self.log.debug("Tji[J]: {}".format(self.Tji[J, :]))
self.log.debug("Bij[J]: {}".format(self.Bij[:, J]))
return
def _update_weights(self, J):
"""update weights
for Tji and Bij
"""
self.log.info("Updating Weights")
# get useful terms
alpha = self.alpha
d = self.params['d']
term1 = alpha*d*self.ui
term2 = (1 + alpha*d*(d - 1))
self.Tji[J, :] = term1 + term2*self.Tji[J, :]
self.Bij[:, J] = term1 + term2*self.Bij[:, J]
# log
self.log.debug("Tji[J]: {}".format(self.Tji[J, :]))
self.log.debug("Bij[J]: {}".format(self.Bij[:, J]))
return
def _update_F1_activation(self, J=None, D=None):
"""
if winning unit has been selected
J is winning cluster unit
D is F2 activation
else if no winning unit selected
J is None
D is zero vector
"""
# Checks
# self.log.warn("Warning: Skipping J xor D check!")
# if (J is None) ^ (D is None):
# raise Exception("Must provide both J and D, or neither.")
msg = "Updating F1 activations"
if J is not None:
msg = msg + " with J = {}".format(J)
self.log.info(msg)
a = self.params['a']
b = self.params['b']
d = self.params['d']
e = self.params['e']
# compute activation of Unit Ui
# - activation of Vi normalized to unit length
if self.vi is None:
self.ui = np.zeros(self.input_size)
else:
self.ui = self.vi / (e + norm(self.vi))
# signal sent from each unit Ui to associated Wi and Pi
# compute activation of Wi
self.wi = self.si + a * self.ui
# compute activation of pi
# WRONG: self.pi = self.ui + np.dot(self.yj, self.Tji)
if J is not None:
self.pi = self.ui + d * self.Tji[J, :]
else:
self.pi = self.ui
# TODO: consider RESET here
# compute activation of Xi
# self.xi = self._thresh(self.wi / norm(self.wi))
self.xi = self.wi / (e + norm(self.wi))
# compute activation of Qi
# self.qi = self._thresh(self.pi / (e + norm(self.pi)))
self.qi = self.pi / (e + norm(self.pi))
# send signal to Vi
self.vi = self._thresh(self.xi) + b * self._thresh(self.qi)
self._log_values()
return True
"""Helper methods"""
def _zero_activations(self):
"""Set activations to zero
common operation, e.g. beginning of a learning trial
"""
self.log.debug("zero'ing activations")
self.si = np.zeros(self.input_size)
self.ui = np.zeros(self.input_size)
self.vi = np.zeros(self.input_size)
return
def _thresh(self, vec):
"""
This function treats any signal that is less than theta
as noise and suppresses it (sets it to zero). The value
of the parameter theta is specified by the user.
"""
assert isinstance(vec, np.ndarray), "type check"
cpy = vec.copy()
cpy[cpy < self.theta] = 0
return cpy
def _clean_input_pattern(self, idata):
assert len(idata) == self.input_size, "size check"
assert isinstance(idata, np.ndarray), "type check"
return idata
"""Logging Functions"""
def stop_logging(self):
"""Logging stuff
closes filehandlers and stuff
"""
self.log.info('Stop Logging.')
handlers = self.log.handlers[:]
for handler in handlers:
handler.close()
self.log.removeHandler(handler)
self.log = None
def start_logging(self, to_file=True, to_console=True):
"""Logging!
init logging handlers and stuff
to_file and to_console are booleans
# TODO: accept logging level
"""
# remove any existing logger
if self.log is not None:
self.stop_logging()
self.log = None
# Create logger and configure
self.log = logging.getLogger('ann.art.art2')
self.log.setLevel(logging.DEBUG)
self.log.propagate = False
formatter = logging.Formatter(
fmt='%(levelname)8s:%(message)s'
)
# add file logging
if to_file:
fh = logging.FileHandler(
filename='ART_LOG.log',
mode='w',
)
fh.setFormatter(formatter)
fh.setLevel(logging.WARN)
self.log.addHandler(fh)
# create console handler with a lower log level for debugging
if to_console:
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(formatter)
ch.setLevel(logging.DEBUG)
self.log.addHandler(ch)
self.log.info('Start Logging')
def getlogger(self):
"""Logging stuff
"""
return self.log
def _log_values(self, J=None):
"""Logging stuff
convenience function
"""
self.log.debug("\t--- debug values --- ")
self.log.debug("\tui : {}".format(self.ui))
self.log.debug("\twi : {}".format(self.wi))
self.log.debug("\tpi : {}".format(self.pi))
self.log.debug("\txi : {}".format(self.xi))
self.log.debug("\tqi : {}".format(self.qi))
self.log.debug("\tvi : {}".format(self.vi))
if J is not None:
self.log.debug("\tWeights with J = {}".format(J))
self.log.debug("\tBij: {}".format(self.bij[:, J]))
self.log.debug("\tTji: {}".format(self.tji[J, :]))