-
Notifications
You must be signed in to change notification settings - Fork 127
/
batch.go
100 lines (85 loc) · 3.58 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package sphinx
import "errors"
// ErrAlreadyCommitted signals that an entry could not be added to the
// batch because it has already been persisted.
var ErrAlreadyCommitted = errors.New("cannot add to batch after committing")
// Batch is an object used to incrementally construct a set of entries to add to
// the replay log. After construction is completed, it can be added to the log
// using the PutBatch method.
type Batch struct {
// IsCommitted denotes whether or not this batch has been successfully
// written to disk.
IsCommitted bool
// ID is a unique, caller chosen identifier for this batch.
ID []byte
// ReplaySet contains the sequence numbers of all entries that were
// detected as replays. The set is finalized upon writing the batch to
// disk, and merges replays detected by the replay cache and on-disk
// replay log.
ReplaySet *ReplaySet
// entries stores the set of all potential entries that might get
// written to the replay log. Some entries may be skipped after
// examining the on-disk content at the time of commit..
entries map[uint16]batchEntry
// replayCache is an in memory lookup-table, which stores the hash
// prefix of entries already added to this batch. This allows a quick
// mechanism for intra-batch duplicate detection.
replayCache map[HashPrefix]struct{}
}
// NewBatch initializes an object for constructing a set of entries to
// atomically add to a replay log. Batches are identified by byte slice, which
// allows the caller to safely process the same batch twice and get an
// idempotent result.
func NewBatch(id []byte) *Batch {
return &Batch{
ID: id,
ReplaySet: NewReplaySet(),
entries: make(map[uint16]batchEntry),
replayCache: make(map[HashPrefix]struct{}),
}
}
// Put inserts a hash-prefix/CLTV pair into the current batch. This method only
// returns an error in the event that the batch was already committed to disk.
// Decisions regarding whether or not a particular sequence number is a replay
// is ultimately reported via the batch's ReplaySet after committing to disk.
func (b *Batch) Put(seqNum uint16, hashPrefix *HashPrefix, cltv uint32) error {
// Abort if this batch was already written to disk.
if b.IsCommitted {
return ErrAlreadyCommitted
}
// Check to see if this hash prefix is already included in this batch.
// If so, we will opportunistically mark this index as replayed.
if _, ok := b.replayCache[*hashPrefix]; ok {
b.ReplaySet.Add(seqNum)
return nil
}
// Otherwise, this is a distinct hash prefix for this batch. Add it to
// our list of entries that we will try to write to disk. Each of these
// entries will be checked again during the commit to see if any other
// on-disk entries contain the same hash prefix.
b.entries[seqNum] = batchEntry{
hashPrefix: *hashPrefix,
cltv: cltv,
}
// Finally, add this hash prefix to our in-memory replay cache, this
// will be consulted upon further adds to check for duplicates in the
// same batch.
b.replayCache[*hashPrefix] = struct{}{}
return nil
}
// ForEach iterates through each entry in the batch and calls the provided
// function with the sequence number and entry contents as arguments.
func (b *Batch) ForEach(fn func(seqNum uint16, hashPrefix *HashPrefix, cltv uint32) error) error {
for seqNum, entry := range b.entries {
if err := fn(seqNum, &entry.hashPrefix, entry.cltv); err != nil {
return err
}
}
return nil
}
// batchEntry is a tuple of a secret's hash prefix and the corresponding CLTV at
// which the onion blob from which the secret was derived expires.
type batchEntry struct {
hashPrefix HashPrefix
cltv uint32
}