@@ -52,6 +52,10 @@ type ArchiverService struct {
52
52
api * API
53
53
}
54
54
55
+ // Start starts the archiver service. It begins polling the beacon node for the latest blocks and persisting blobs for
56
+ // them. Concurrently it'll also begin a backfill process (see backfillBlobs) to store all blobs from the current head
57
+ // to the previously stored blocks. This ensures that during restarts or outages of an archiver, any gaps will be
58
+ // filled in.
55
59
func (a * ArchiverService ) Start (ctx context.Context ) error {
56
60
if a .cfg .MetricsConfig .Enabled {
57
61
a .log .Info ("starting metrics server" , "addr" , a .cfg .MetricsConfig .ListenAddr , "port" , a .cfg .MetricsConfig .ListenPort )
@@ -82,6 +86,11 @@ func (a *ArchiverService) Start(ctx context.Context) error {
82
86
return a .trackLatestBlocks (ctx )
83
87
}
84
88
89
+ // persistBlobsForBlockToS3 fetches the blobs for a given block and persists them to S3. It returns the block header
90
+ // and a boolean indicating whether the blobs already existed in S3 and any errors that occur.
91
+ // If the blobs are already stored, it will not overwrite the data. Currently, the archiver does not
92
+ // perform any validation of the blobs, it assumes a trusted beacon node. See:
93
+ // https://github.com/base-org/blob-archiver/issues/4.
85
94
func (a * ArchiverService ) persistBlobsForBlockToS3 (ctx context.Context , blockIdentifier string ) (* v1.BeaconBlockHeader , bool , error ) {
86
95
currentHeader , err := a .beaconClient .BeaconBlockHeader (ctx , & api.BeaconBlockHeaderOpts {
87
96
Block : blockIdentifier ,
@@ -121,6 +130,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde
121
130
BlobSidecars : storage.BlobSidecars {Data : blobSidecars .Data },
122
131
}
123
132
133
+ // The blob that is being written has not been validated. It is assumed that the beacon node is trusted.
124
134
err = a .dataStoreClient .Write (ctx , blobData )
125
135
126
136
if err != nil {
@@ -133,6 +143,7 @@ func (a *ArchiverService) persistBlobsForBlockToS3(ctx context.Context, blockIde
133
143
return currentHeader .Data , false , nil
134
144
}
135
145
146
+ // Stops the archiver service.
136
147
func (a * ArchiverService ) Stop (ctx context.Context ) error {
137
148
if a .stopped .Load () {
138
149
return ErrAlreadyStopped
@@ -155,6 +166,9 @@ func (a *ArchiverService) Stopped() bool {
155
166
return a .stopped .Load ()
156
167
}
157
168
169
+ // backfillBlobs will persist all blobs from the provided beacon block header, to either the last block that was persisted
170
+ // to the archivers storage or the origin block in the configuration. This is used to ensure that any gaps can be filled.
171
+ // If an error is encountered persisting a block, it will retry after waiting for a period of time.
158
172
func (a * ArchiverService ) backfillBlobs (ctx context.Context , latest * v1.BeaconBlockHeader ) {
159
173
current , alreadyExists , err := latest , false , error (nil )
160
174
@@ -182,6 +196,7 @@ func (a *ArchiverService) backfillBlobs(ctx context.Context, latest *v1.BeaconBl
182
196
a .log .Info ("backfill complete" , "endHash" , current .Root .String (), "startHash" , latest .Root .String ())
183
197
}
184
198
199
+ // trackLatestBlocks will poll the beacon node for the latest blocks and persist blobs for them.
185
200
func (a * ArchiverService ) trackLatestBlocks (ctx context.Context ) error {
186
201
t := time .NewTicker (a .cfg .PollInterval )
187
202
defer t .Stop ()
@@ -198,6 +213,9 @@ func (a *ArchiverService) trackLatestBlocks(ctx context.Context) error {
198
213
}
199
214
}
200
215
216
+ // processBlocksUntilKnownBlock will fetch and persist blobs for blocks until it finds a block that has been stored before.
217
+ // In the case of a reorg, it will fetch the new head and then walk back the chain, storing all blobs until it finds a
218
+ // known block -- that already exists in the archivers' storage.
201
219
func (a * ArchiverService ) processBlocksUntilKnownBlock (ctx context.Context ) {
202
220
a .log .Debug ("refreshing live data" )
203
221
0 commit comments