-
Notifications
You must be signed in to change notification settings - Fork 85
services: add StateFetcher service for state snapshot downloading from NeoFS #3844
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
b000eb3
to
e39c22a
Compare
|
||
// GetMPTNodes search for the state object and returns MPT nodes. | ||
func (bfs *Service) GetMPTNodes() ([][]byte, error) { | ||
headerHeight := bfs.heightFunc() - 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return if is shutdown
pkg/network/server.go
Outdated
@@ -902,6 +906,13 @@ func (s *Server) requestBlocksOrHeaders(p Peer) error { | |||
return fmt.Errorf("%w: %w", errBlocksRequestFailed, err) | |||
} | |||
if requestMPTNodes { | |||
if s.config.NeoFSStateSyncExtensions { | |||
nodes, err := s.syncHeaderFetcher.GetMPTNodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not syncHeaderFetcher, but syncblockFetcher
startIndex := bfs.heightFunc()/bfs.cfg.IndexFileSize + 1 | ||
for { | ||
prm := client.PrmObjectSearch{} | ||
filters := object.NewSearchFilters() | ||
filters.AddFilter(bfs.cfg.IndexFileAttribute, fmt.Sprintf("%d", startIndex), object.MatchStringEqual) | ||
filters.AddFilter("IndexSize", fmt.Sprintf("%d", bfs.cfg.IndexFileSize), object.MatchStringEqual) | ||
prm.SetFilters(filters) | ||
|
||
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) | ||
blockOidsObject, err := bfs.objectSearch(ctx, prm) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After PR finalisation it will be useful to add a comment about this code to #3744, because it will be affected by index files support removal.
e39c22a
to
ea9e2ee
Compare
GetMPTNodes
ea9e2ee
to
11a2e66
Compare
f93aa3c
to
2c617ce
Compare
@AliceInHunterland, what's the state of the current PR? Can I review? |
2c617ce
to
0fa015c
Compare
Tests not fixed and not tested with blocks with |
var err error | ||
ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) | ||
defer cancel() | ||
for i := headerHeight; true; i-- { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have already discussed that, we need to search for the nearest state sync point, not every block for sure.
return rc, err | ||
} | ||
|
||
func (bfs *Service) retry(action func() error) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's also the common code. This code should be placed to the same place where common fetchers' settings are handled, and then reused.
bfs.stateModule.Store.Seek(storage.SeekRange{Prefix: tempPrefix}, func(k, v []byte) bool { | ||
newKey := append(permPrefix, k[1:]...) | ||
cache.Put(newKey, v) | ||
cache.Delete(k) | ||
return true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See how it's handled by StateSync module: firstly remove all items stored by the old key, then replace the active key from the old one to the new one, that's it.
return true | ||
}) | ||
|
||
if n, err := cache.PersistSync(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a single persist per the whole batch for sure. It should be batched persist. Have you seen the same pattern somewhere else?
The general idea is that this code must be robust to node shutdown.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests/linter are failing. We also need an extension of the default node's configuration.
9c45630
to
ef0500d
Compare
719d675
to
9b1616f
Compare
9b1616f
to
c9ac8bd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main point is: since we decide to move storage items based state synchronisation to StateSync Module, it should be completely Module's duty to properly update storage/MPT, store intermediate synchronisation state and properly recover from this state after the node restart. StateFetcher should only manage storage items downloading from the NeoFS.
So all code is written, but it should be properly organized and move to proper places, to keep StateSync module abstracted from storage items source. See the relevant comments in statesync module and statefetcher before the refactoring.
pkg/config/application_config.go
Outdated
if err := a.NeoFSStateFetcher.NeoFSService.Validate(); err != nil { | ||
a.NeoFSStateFetcher.NeoFSService = a.NeoFSBlockFetcher.NeoFSService | ||
} | ||
if err := a.NeoFSStateFetcher.Validate(); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Explicitly check if NeoFSStateFetcher contains empty container parameters and fill in from NeoFSBlockFetcher section prior to NeoFSStateFetcher validation.
pkg/config/neofsstorage_config.go
Outdated
@@ -0,0 +1,95 @@ | |||
package config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/pkg/config/neofsstorage_config.go
/pkg/config/neofsservice_config.go
pkg/config/neofsstorage_config.go
Outdated
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" | ||
) | ||
|
||
// NeoFSService represents the configuration for services connected with NeoFS. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
services connected with NeoFS.
services interacting with NeoFS block/state storage
pkg/config/neofsstorage_config.go
Outdated
|
||
// Equals allows to compare two NeoFSService instances, returns true if | ||
// they're equal. | ||
func (cfg *NeoFSService) Equals(o NeoFSService) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method should be used to compare two application config instances. Right now it's unused.
pkg/config/neofsstorage_config.go
Outdated
} | ||
|
||
// NeoFSStateFetcher represents the configuration for the NeoFS StateFetcher service. | ||
// if NeoFSService configuration is omitted the NeoFSBlockFetcher.NeoFSService will be used. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if NeoFSService configuration is omitted the NeoFSBlockFetcher.NeoFSService will be used.
This comment is not related to the structure documentation. I'd move it either to ApplicationConfiguration.Validate() or to ApplicationConfiguration.NeoFSStateFetcher.
@@ -394,6 +406,7 @@ func (s *Server) Shutdown() { | |||
func (s *Server) stateSyncCallBack() { | |||
needHeaders := s.stateSync.NeedHeaders() | |||
needBlocks := s.stateSync.NeedBlocks() | |||
needMPTs := s.stateSync.NeedMPTNodes() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And given the fact that starting from this commit, statesync Module is able to use both MPT nodes and raw storage items, I'd rename NeedMPTNodes
to some NeedStorageData
or NeedContractStorageData
in a separate commit.
pkg/network/server.go
Outdated
for p := range s.peers { | ||
if p.Handshaked() { | ||
heights = append(heights, p.LastBlockIndex()) | ||
if !s.ServerConfig.NeoFSBlockFetcherCfg.Enabled || s.syncStateFetcher.IsShutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part needs some polishing, but let's firstly deal with final statesync Module implementation and after that I'll review network part one more time.
if err := sfs.saveCheckpointState(tempState); err != nil { | ||
sfs.log.Error("failed to save state checkpoint", zap.Error(err)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part should be managed by statesync module (see the related reviewcomments in statesync module file). StateFetcher should only manage storage items downloading and should then pass these raw storage items to statesync module (exactly like P2P layer passes raw MPT data to statesync Module).
if err := sfs.chain.AddMPTBatch(batch, syncHeight); err != nil { | ||
sfs.log.Error("failed to process final batch during shutdown", zap.Error(err)) | ||
} | ||
if _, err := localMPT.PutBatch(mpt.MapToMPTBatch(batch)); err != nil { | ||
sfs.log.Error("failed to update local MPT during shutdown", zap.Error(err)) | ||
} | ||
localMPT.Flush(syncHeight) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And this part should also be managed by StateSync Module. I'm sure I saw the similar code in the module, so there shouldn't be duplicating functionality between StateFetcher and Module.
3d603a2
to
7beed76
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #3844 +/- ##
==========================================
- Coverage 82.52% 81.66% -0.86%
==========================================
Files 342 343 +1
Lines 47887 48598 +711
==========================================
+ Hits 39518 39687 +169
- Misses 6734 7266 +532
- Partials 1635 1645 +10 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Close #3793 Signed-off-by: Ekaterina Pavlova <[email protected]>
7beed76
to
13f91b2
Compare
Close #3793