|
| 1 | +peer模块包含了downloader使用的peer节点,封装了吞吐量,是否空闲,并记录了之前失败的信息。 |
| 2 | + |
| 3 | + |
| 4 | +## peer |
| 5 | + |
| 6 | + // peerConnection represents an active peer from which hashes and blocks are retrieved. |
| 7 | + type peerConnection struct { |
| 8 | + id string // Unique identifier of the peer |
| 9 | + |
| 10 | + headerIdle int32 // Current header activity state of the peer (idle = 0, active = 1) 当前的header获取的工作状态。 |
| 11 | + blockIdle int32 // Current block activity state of the peer (idle = 0, active = 1) 当前的区块获取的工作状态 |
| 12 | + receiptIdle int32 // Current receipt activity state of the peer (idle = 0, active = 1) 当前的收据获取的工作状态 |
| 13 | + stateIdle int32 // Current node data activity state of the peer (idle = 0, active = 1) 当前节点状态的工作状态 |
| 14 | + |
| 15 | + headerThroughput float64 // Number of headers measured to be retrievable per second //记录每秒能够接收多少个区块头的度量值 |
| 16 | + blockThroughput float64 // Number of blocks (bodies) measured to be retrievable per second //记录每秒能够接收多少个区块的度量值 |
| 17 | + receiptThroughput float64 // Number of receipts measured to be retrievable per second 记录每秒能够接收多少个收据的度量值 |
| 18 | + stateThroughput float64 // Number of node data pieces measured to be retrievable per second 记录每秒能够接收多少个账户状态的度量值 |
| 19 | + |
| 20 | + rtt time.Duration // Request round trip time to track responsiveness (QoS) 请求回应时间 |
| 21 | + |
| 22 | + headerStarted time.Time // Time instance when the last header fetch was started 记录最后一个header fetch的请求时间 |
| 23 | + blockStarted time.Time // Time instance when the last block (body) fetch was started |
| 24 | + receiptStarted time.Time // Time instance when the last receipt fetch was started |
| 25 | + stateStarted time.Time // Time instance when the last node data fetch was started |
| 26 | + |
| 27 | + lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) 记录的Hash值不会去请求,一般是因为之前的请求失败 |
| 28 | + |
| 29 | + peer Peer // eth的peer |
| 30 | + |
| 31 | + version int // Eth protocol version number to switch strategies |
| 32 | + log log.Logger // Contextual logger to add extra infos to peer logs |
| 33 | + lock sync.RWMutex |
| 34 | + } |
| 35 | + |
| 36 | + |
| 37 | + |
| 38 | +FetchXXX |
| 39 | +FetchHeaders FetchBodies等函数 主要调用了eth.peer的功能来进行发送数据请求。 |
| 40 | + |
| 41 | + // FetchHeaders sends a header retrieval request to the remote peer. |
| 42 | + func (p *peerConnection) FetchHeaders(from uint64, count int) error { |
| 43 | + // Sanity check the protocol version |
| 44 | + if p.version < 62 { |
| 45 | + panic(fmt.Sprintf("header fetch [eth/62+] requested on eth/%d", p.version)) |
| 46 | + } |
| 47 | + // Short circuit if the peer is already fetching |
| 48 | + if !atomic.CompareAndSwapInt32(&p.headerIdle, 0, 1) { |
| 49 | + return errAlreadyFetching |
| 50 | + } |
| 51 | + p.headerStarted = time.Now() |
| 52 | + |
| 53 | + // Issue the header retrieval request (absolut upwards without gaps) |
| 54 | + go p.peer.RequestHeadersByNumber(from, count, 0, false) |
| 55 | + |
| 56 | + return nil |
| 57 | + } |
| 58 | + |
| 59 | +SetXXXIdle函数 |
| 60 | +SetHeadersIdle, SetBlocksIdle 等函数 设置peer的状态为空闲状态,允许它执行新的请求。 同时还会通过本次传输的数据的多少来重新评估链路的吞吐量。 |
| 61 | + |
| 62 | + // SetHeadersIdle sets the peer to idle, allowing it to execute new header retrieval |
| 63 | + // requests. Its estimated header retrieval throughput is updated with that measured |
| 64 | + // just now. |
| 65 | + func (p *peerConnection) SetHeadersIdle(delivered int) { |
| 66 | + p.setIdle(p.headerStarted, delivered, &p.headerThroughput, &p.headerIdle) |
| 67 | + } |
| 68 | + |
| 69 | +setIdle |
| 70 | + |
| 71 | + // setIdle sets the peer to idle, allowing it to execute new retrieval requests. |
| 72 | + // Its estimated retrieval throughput is updated with that measured just now. |
| 73 | + func (p *peerConnection) setIdle(started time.Time, delivered int, throughput *float64, idle *int32) { |
| 74 | + // Irrelevant of the scaling, make sure the peer ends up idle |
| 75 | + defer atomic.StoreInt32(idle, 0) |
| 76 | + |
| 77 | + p.lock.Lock() |
| 78 | + defer p.lock.Unlock() |
| 79 | + |
| 80 | + // If nothing was delivered (hard timeout / unavailable data), reduce throughput to minimum |
| 81 | + if delivered == 0 { |
| 82 | + *throughput = 0 |
| 83 | + return |
| 84 | + } |
| 85 | + // Otherwise update the throughput with a new measurement |
| 86 | + elapsed := time.Since(started) + 1 // +1 (ns) to ensure non-zero divisor |
| 87 | + measured := float64(delivered) / (float64(elapsed) / float64(time.Second)) |
| 88 | + |
| 89 | + // measurementImpact = 0.1 , 新的吞吐量=老的吞吐量*0.9 + 这次的吞吐量*0.1 |
| 90 | + *throughput = (1-measurementImpact)*(*throughput) + measurementImpact*measured |
| 91 | + // 更新RTT |
| 92 | + p.rtt = time.Duration((1-measurementImpact)*float64(p.rtt) + measurementImpact*float64(elapsed)) |
| 93 | + |
| 94 | + p.log.Trace("Peer throughput measurements updated", |
| 95 | + "hps", p.headerThroughput, "bps", p.blockThroughput, |
| 96 | + "rps", p.receiptThroughput, "sps", p.stateThroughput, |
| 97 | + "miss", len(p.lacking), "rtt", p.rtt) |
| 98 | + } |
| 99 | + |
| 100 | + |
| 101 | +XXXCapacity函数,用来返回当前的链接允许的吞吐量。 |
| 102 | + |
| 103 | + // HeaderCapacity retrieves the peers header download allowance based on its |
| 104 | + // previously discovered throughput. |
| 105 | + func (p *peerConnection) HeaderCapacity(targetRTT time.Duration) int { |
| 106 | + p.lock.RLock() |
| 107 | + defer p.lock.RUnlock() |
| 108 | + // 这里有点奇怪,targetRTT越大,请求的数量就越多。 |
| 109 | + return int(math.Min(1+math.Max(1, p.headerThroughput*float64(targetRTT)/float64(time.Second)), float64(MaxHeaderFetch))) |
| 110 | + } |
| 111 | + |
| 112 | + |
| 113 | +Lacks 用来标记上次是否失败,以便下次同样的请求不通过这个peer |
| 114 | + |
| 115 | + // MarkLacking appends a new entity to the set of items (blocks, receipts, states) |
| 116 | + // that a peer is known not to have (i.e. have been requested before). If the |
| 117 | + // set reaches its maximum allowed capacity, items are randomly dropped off. |
| 118 | + func (p *peerConnection) MarkLacking(hash common.Hash) { |
| 119 | + p.lock.Lock() |
| 120 | + defer p.lock.Unlock() |
| 121 | + |
| 122 | + for len(p.lacking) >= maxLackingHashes { |
| 123 | + for drop := range p.lacking { |
| 124 | + delete(p.lacking, drop) |
| 125 | + break |
| 126 | + } |
| 127 | + } |
| 128 | + p.lacking[hash] = struct{}{} |
| 129 | + } |
| 130 | + |
| 131 | + // Lacks retrieves whether the hash of a blockchain item is on the peers lacking |
| 132 | + // list (i.e. whether we know that the peer does not have it). |
| 133 | + func (p *peerConnection) Lacks(hash common.Hash) bool { |
| 134 | + p.lock.RLock() |
| 135 | + defer p.lock.RUnlock() |
| 136 | + |
| 137 | + _, ok := p.lacking[hash] |
| 138 | + return ok |
| 139 | + } |
| 140 | + |
| 141 | + |
| 142 | +## peerSet |
| 143 | + |
| 144 | + // peerSet represents the collection of active peer participating in the chain |
| 145 | + // download procedure. |
| 146 | + type peerSet struct { |
| 147 | + peers map[string]*peerConnection |
| 148 | + newPeerFeed event.Feed |
| 149 | + peerDropFeed event.Feed |
| 150 | + lock sync.RWMutex |
| 151 | + } |
| 152 | + |
| 153 | + |
| 154 | +Register 和 UnRegister |
| 155 | + |
| 156 | + // Register injects a new peer into the working set, or returns an error if the |
| 157 | + // peer is already known. |
| 158 | + // |
| 159 | + // The method also sets the starting throughput values of the new peer to the |
| 160 | + // average of all existing peers, to give it a realistic chance of being used |
| 161 | + // for data retrievals. |
| 162 | + func (ps *peerSet) Register(p *peerConnection) error { |
| 163 | + // Retrieve the current median RTT as a sane default |
| 164 | + p.rtt = ps.medianRTT() |
| 165 | + |
| 166 | + // Register the new peer with some meaningful defaults |
| 167 | + ps.lock.Lock() |
| 168 | + if _, ok := ps.peers[p.id]; ok { |
| 169 | + ps.lock.Unlock() |
| 170 | + return errAlreadyRegistered |
| 171 | + } |
| 172 | + if len(ps.peers) > 0 { |
| 173 | + p.headerThroughput, p.blockThroughput, p.receiptThroughput, p.stateThroughput = 0, 0, 0, 0 |
| 174 | + |
| 175 | + for _, peer := range ps.peers { |
| 176 | + peer.lock.RLock() |
| 177 | + p.headerThroughput += peer.headerThroughput |
| 178 | + p.blockThroughput += peer.blockThroughput |
| 179 | + p.receiptThroughput += peer.receiptThroughput |
| 180 | + p.stateThroughput += peer.stateThroughput |
| 181 | + peer.lock.RUnlock() |
| 182 | + } |
| 183 | + p.headerThroughput /= float64(len(ps.peers)) |
| 184 | + p.blockThroughput /= float64(len(ps.peers)) |
| 185 | + p.receiptThroughput /= float64(len(ps.peers)) |
| 186 | + p.stateThroughput /= float64(len(ps.peers)) |
| 187 | + } |
| 188 | + ps.peers[p.id] = p |
| 189 | + ps.lock.Unlock() |
| 190 | + |
| 191 | + ps.newPeerFeed.Send(p) |
| 192 | + return nil |
| 193 | + } |
| 194 | + |
| 195 | + // Unregister removes a remote peer from the active set, disabling any further |
| 196 | + // actions to/from that particular entity. |
| 197 | + func (ps *peerSet) Unregister(id string) error { |
| 198 | + ps.lock.Lock() |
| 199 | + p, ok := ps.peers[id] |
| 200 | + if !ok { |
| 201 | + defer ps.lock.Unlock() |
| 202 | + return errNotRegistered |
| 203 | + } |
| 204 | + delete(ps.peers, id) |
| 205 | + ps.lock.Unlock() |
| 206 | + |
| 207 | + ps.peerDropFeed.Send(p) |
| 208 | + return nil |
| 209 | + } |
| 210 | + |
| 211 | +XXXIdlePeers |
| 212 | + |
| 213 | + // HeaderIdlePeers retrieves a flat list of all the currently header-idle peers |
| 214 | + // within the active peer set, ordered by their reputation. |
| 215 | + func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { |
| 216 | + idle := func(p *peerConnection) bool { |
| 217 | + return atomic.LoadInt32(&p.headerIdle) == 0 |
| 218 | + } |
| 219 | + throughput := func(p *peerConnection) float64 { |
| 220 | + p.lock.RLock() |
| 221 | + defer p.lock.RUnlock() |
| 222 | + return p.headerThroughput |
| 223 | + } |
| 224 | + return ps.idlePeers(62, 64, idle, throughput) |
| 225 | + } |
| 226 | + |
| 227 | + // idlePeers retrieves a flat list of all currently idle peers satisfying the |
| 228 | + // protocol version constraints, using the provided function to check idleness. |
| 229 | + // The resulting set of peers are sorted by their measure throughput. |
| 230 | + func (ps *peerSet) idlePeers(minProtocol, maxProtocol int, idleCheck func(*peerConnection) bool, throughput func(*peerConnection) float64) ([]*peerConnection, int) { |
| 231 | + ps.lock.RLock() |
| 232 | + defer ps.lock.RUnlock() |
| 233 | + |
| 234 | + idle, total := make([]*peerConnection, 0, len(ps.peers)), 0 |
| 235 | + for _, p := range ps.peers { //首先抽取idle的peer |
| 236 | + if p.version >= minProtocol && p.version <= maxProtocol { |
| 237 | + if idleCheck(p) { |
| 238 | + idle = append(idle, p) |
| 239 | + } |
| 240 | + total++ |
| 241 | + } |
| 242 | + } |
| 243 | + for i := 0; i < len(idle); i++ { // 冒泡排序, 从吞吐量大到吞吐量小。 |
| 244 | + for j := i + 1; j < len(idle); j++ { |
| 245 | + if throughput(idle[i]) < throughput(idle[j]) { |
| 246 | + idle[i], idle[j] = idle[j], idle[i] |
| 247 | + } |
| 248 | + } |
| 249 | + } |
| 250 | + return idle, total |
| 251 | + } |
| 252 | + |
| 253 | +medianRTT,求得peerset的RTT的中位数, |
| 254 | + |
| 255 | + // medianRTT returns the median RTT of te peerset, considering only the tuning |
| 256 | + // peers if there are more peers available. |
| 257 | + func (ps *peerSet) medianRTT() time.Duration { |
| 258 | + // Gather all the currnetly measured round trip times |
| 259 | + ps.lock.RLock() |
| 260 | + defer ps.lock.RUnlock() |
| 261 | + |
| 262 | + rtts := make([]float64, 0, len(ps.peers)) |
| 263 | + for _, p := range ps.peers { |
| 264 | + p.lock.RLock() |
| 265 | + rtts = append(rtts, float64(p.rtt)) |
| 266 | + p.lock.RUnlock() |
| 267 | + } |
| 268 | + sort.Float64s(rtts) |
| 269 | + |
| 270 | + median := rttMaxEstimate |
| 271 | + if qosTuningPeers <= len(rtts) { |
| 272 | + median = time.Duration(rtts[qosTuningPeers/2]) // Median of our tuning peers |
| 273 | + } else if len(rtts) > 0 { |
| 274 | + median = time.Duration(rtts[len(rtts)/2]) // Median of our connected peers (maintain even like this some baseline qos) |
| 275 | + } |
| 276 | + // Restrict the RTT into some QoS defaults, irrelevant of true RTT |
| 277 | + if median < rttMinEstimate { |
| 278 | + median = rttMinEstimate |
| 279 | + } |
| 280 | + if median > rttMaxEstimate { |
| 281 | + median = rttMaxEstimate |
| 282 | + } |
| 283 | + return median |
| 284 | + } |
0 commit comments