Skip to content

Commit f489cad

Browse files
committed
backport cronos custom changes (block-stm, object store etc)
1 parent d55608c commit f489cad

File tree

92 files changed

+1893
-1219
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

92 files changed

+1893
-1219
lines changed

baseapp/abci.go

Lines changed: 68 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -791,48 +791,47 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
791791

792792
// Reset the gas meter so that the AnteHandlers aren't required to
793793
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
794-
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))
794+
app.finalizeBlockState.SetContext(
795+
app.finalizeBlockState.Context().
796+
WithBlockGasMeter(gasMeter).
797+
WithTxCount(len(req.Txs)),
798+
)
795799

796800
// Iterate over all raw transactions in the proposal and attempt to execute
797801
// them, gathering the execution results.
798802
//
799803
// NOTE: Not all raw transactions may adhere to the sdk.Tx interface, e.g.
800804
// vote extensions, so skip those.
801-
txResults := make([]*abci.ExecTxResult, 0, len(req.Txs))
802-
for _, rawTx := range req.Txs {
803-
var response *abci.ExecTxResult
804-
805-
if _, err := app.txDecoder(rawTx); err == nil {
806-
response = app.deliverTx(rawTx)
807-
} else {
808-
// In the case where a transaction included in a block proposal is malformed,
809-
// we still want to return a default response to comet. This is because comet
810-
// expects a response for each transaction included in a block proposal.
811-
response = sdkerrors.ResponseExecTxResultWithEvents(
812-
sdkerrors.ErrTxDecode,
813-
0,
814-
0,
815-
nil,
816-
false,
817-
)
818-
}
819-
820-
// check after every tx if we should abort
821-
select {
822-
case <-ctx.Done():
823-
return nil, ctx.Err()
824-
default:
825-
// continue
826-
}
827-
828-
txResults = append(txResults, response)
805+
txResults, err := app.executeTxs(ctx, req.Txs)
806+
if err != nil {
807+
// usually due to canceled
808+
return nil, err
829809
}
830810

831811
if app.finalizeBlockState.ms.TracingEnabled() {
832812
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
833813
}
834814

835-
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
815+
var (
816+
blockGasUsed uint64
817+
blockGasWanted uint64
818+
)
819+
for _, res := range txResults {
820+
// GasUsed should not be -1 but just in case
821+
if res.GasUsed > 0 {
822+
blockGasUsed += uint64(res.GasUsed)
823+
}
824+
// GasWanted could be -1 if the tx is invalid
825+
if res.GasWanted > 0 {
826+
blockGasWanted += uint64(res.GasWanted)
827+
}
828+
}
829+
app.finalizeBlockState.SetContext(
830+
app.finalizeBlockState.Context().
831+
WithBlockGasUsed(blockGasUsed).
832+
WithBlockGasWanted(blockGasWanted),
833+
)
834+
endBlock, err := app.endBlock(ctx)
836835
if err != nil {
837836
return nil, err
838837
}
@@ -856,6 +855,44 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
856855
}, nil
857856
}
858857

858+
func (app *BaseApp) executeTxs(ctx context.Context, txs [][]byte) ([]*abci.ExecTxResult, error) {
859+
if app.txExecutor != nil {
860+
return app.txExecutor(ctx, txs, app.finalizeBlockState.ms, func(i int, memTx sdk.Tx, ms storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
861+
return app.deliverTxWithMultiStore(txs[i], memTx, i, ms, incarnationCache)
862+
})
863+
}
864+
865+
txResults := make([]*abci.ExecTxResult, 0, len(txs))
866+
for i, rawTx := range txs {
867+
var response *abci.ExecTxResult
868+
869+
if memTx, err := app.txDecoder(rawTx); err == nil {
870+
response = app.deliverTx(rawTx, memTx, i)
871+
} else {
872+
// In the case where a transaction included in a block proposal is malformed,
873+
// we still want to return a default response to comet. This is because comet
874+
// expects a response for each transaction included in a block proposal.
875+
response = sdkerrors.ResponseExecTxResultWithEvents(
876+
sdkerrors.ErrTxDecode,
877+
0,
878+
0,
879+
nil,
880+
false,
881+
)
882+
}
883+
// check after every tx if we should abort
884+
select {
885+
case <-ctx.Done():
886+
return nil, ctx.Err()
887+
default:
888+
// continue
889+
}
890+
891+
txResults = append(txResults, response)
892+
}
893+
return txResults, nil
894+
}
895+
859896
// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
860897
// Specifically, it will execute an application's BeginBlock (if defined), followed
861898
// by the transactions in the proposal, finally followed by the application's
@@ -1213,7 +1250,7 @@ func (app *BaseApp) CreateQueryContextWithCheckHeader(height int64, prove, check
12131250
// use custom query multi-store if provided
12141251
qms := app.qms
12151252
if qms == nil {
1216-
qms = app.cms.(storetypes.MultiStore)
1253+
qms = storetypes.RootMultiStore(app.cms)
12171254
}
12181255

12191256
lastBlockHeight := qms.LatestVersion()

baseapp/abci_utils.go

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ type (
201201
// to verify a transaction.
202202
ProposalTxVerifier interface {
203203
PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error)
204-
ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error)
204+
ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, uint64, error)
205205
TxDecode(txBz []byte) (sdk.Tx, error)
206206
TxEncode(tx sdk.Tx) ([]byte, error)
207207
}
@@ -271,7 +271,12 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
271271
return nil, err
272272
}
273273

274-
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz)
274+
var txGasLimit uint64
275+
if gasTx, ok := tx.(mempool.GasTx); ok {
276+
txGasLimit = gasTx.GetGas()
277+
}
278+
279+
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, tx, txBz, txGasLimit)
275280
if stop {
276281
break
277282
}
@@ -286,14 +291,14 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
286291
selectedTxsNums int
287292
invalidTxs []sdk.Tx // invalid txs to be removed out of the loop to avoid dead lock
288293
)
289-
mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx sdk.Tx) bool {
290-
unorderedTx, ok := memTx.(sdk.TxWithUnordered)
294+
mempool.SelectBy(ctx, h.mempool, req.Txs, func(memTx mempool.Tx) bool {
295+
unorderedTx, ok := memTx.Tx.(sdk.TxWithUnordered)
291296
isUnordered := ok && unorderedTx.GetUnordered()
292297
txSignersSeqs := make(map[string]uint64)
293298

294299
// if the tx is unordered, we don't need to check the sequence, we just add it
295300
if !isUnordered {
296-
signerData, err := h.signerExtAdapter.GetSigners(memTx)
301+
signerData, err := h.signerExtAdapter.GetSigners(memTx.Tx)
297302
if err != nil {
298303
// propagate the error to the caller
299304
resError = err
@@ -328,11 +333,11 @@ func (h *DefaultProposalHandler) PrepareProposalHandler() sdk.PrepareProposalHan
328333
// which calls mempool.Insert, in theory everything in the pool should be
329334
// valid. But some mempool implementations may insert invalid txs, so we
330335
// check again.
331-
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx)
336+
txBz, err := h.txVerifier.PrepareProposalVerifyTx(memTx.Tx)
332337
if err != nil {
333-
invalidTxs = append(invalidTxs, memTx)
338+
invalidTxs = append(invalidTxs, memTx.Tx)
334339
} else {
335-
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx, txBz)
340+
stop := h.txSelector.SelectTxForProposal(ctx, uint64(req.MaxTxBytes), maxBlockGas, memTx.Tx, txBz, memTx.GasWanted)
336341
if stop {
337342
return false
338343
}
@@ -404,17 +409,13 @@ func (h *DefaultProposalHandler) ProcessProposalHandler() sdk.ProcessProposalHan
404409
}
405410

406411
for _, txBytes := range req.Txs {
407-
tx, err := h.txVerifier.ProcessProposalVerifyTx(txBytes)
412+
_, gasWanted, err := h.txVerifier.ProcessProposalVerifyTx(txBytes)
408413
if err != nil {
409414
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
410415
}
411416

412417
if maxBlockGas > 0 {
413-
gasTx, ok := tx.(GasTx)
414-
if ok {
415-
totalTxGas += gasTx.GetGas()
416-
}
417-
418+
totalTxGas += gasWanted
418419
if totalTxGas > uint64(maxBlockGas) {
419420
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
420421
}
@@ -472,7 +473,7 @@ type TxSelector interface {
472473
// a proposal based on inclusion criteria defined by the TxSelector. It must
473474
// return <true> if the caller should halt the transaction selection loop
474475
// (typically over a mempool) or <false> otherwise.
475-
SelectTxForProposal(ctx context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool
476+
SelectTxForProposal(ctx context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte, gasWanted uint64) bool
476477
}
477478

478479
type defaultTxSelector struct {
@@ -497,23 +498,16 @@ func (ts *defaultTxSelector) Clear() {
497498
ts.selectedTxs = nil
498499
}
499500

500-
func (ts *defaultTxSelector) SelectTxForProposal(_ context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte) bool {
501+
func (ts *defaultTxSelector) SelectTxForProposal(_ context.Context, maxTxBytes, maxBlockGas uint64, memTx sdk.Tx, txBz []byte, gasWanted uint64) bool {
501502
txSize := uint64(cmttypes.ComputeProtoSizeForTxs([]cmttypes.Tx{txBz}))
502503

503-
var txGasLimit uint64
504-
if memTx != nil {
505-
if gasTx, ok := memTx.(GasTx); ok {
506-
txGasLimit = gasTx.GetGas()
507-
}
508-
}
509-
510504
// only add the transaction to the proposal if we have enough capacity
511505
if (txSize + ts.totalTxBytes) <= maxTxBytes {
512506
// If there is a max block gas limit, add the tx only if the limit has
513507
// not been met.
514508
if maxBlockGas > 0 {
515-
if (txGasLimit + ts.totalTxGas) <= maxBlockGas {
516-
ts.totalTxGas += txGasLimit
509+
if (gasWanted + ts.totalTxGas) <= maxBlockGas {
510+
ts.totalTxGas += gasWanted
517511
ts.totalTxBytes += txSize
518512
ts.selectedTxs = append(ts.selectedTxs, txBz)
519513
}

baseapp/baseapp.go

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ type BaseApp struct {
6868
name string // application name from abci.BlockInfo
6969
db dbm.DB // common DB backend
7070
cms storetypes.CommitMultiStore // Main (uncached) state
71-
qms storetypes.MultiStore // Optional alternative multistore for querying only.
71+
qms storetypes.RootMultiStore // Optional alternative multistore for querying only.
7272
storeLoader StoreLoader // function to handle store loading, may be overridden with SetStoreLoader()
7373
grpcQueryRouter *GRPCQueryRouter // router for redirecting gRPC query calls
7474
msgServiceRouter *MsgServiceRouter // router for redirecting Msg service messages
@@ -198,6 +198,9 @@ type BaseApp struct {
198198
//
199199
// SAFETY: it's safe to do if validators validate the total gas wanted in the `ProcessProposal`, which is the case in the default handler.
200200
disableBlockGasMeter bool
201+
202+
// Optional alternative tx executor, used for block-stm parallel transaction execution.
203+
txExecutor TxExecutor
201204
}
202205

203206
// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
@@ -318,6 +321,9 @@ func (app *BaseApp) MountStores(keys ...storetypes.StoreKey) {
318321
case *storetypes.MemoryStoreKey:
319322
app.MountStore(key, storetypes.StoreTypeMemory)
320323

324+
case *storetypes.ObjectStoreKey:
325+
app.MountStore(key, storetypes.StoreTypeObject)
326+
321327
default:
322328
panic(fmt.Sprintf("Unrecognized store key type :%T", key))
323329
}
@@ -356,6 +362,16 @@ func (app *BaseApp) MountMemoryStores(keys map[string]*storetypes.MemoryStoreKey
356362
}
357363
}
358364

365+
// MountObjectStores mounts all transient object stores with the BaseApp's internal
366+
// commit multi-store.
367+
func (app *BaseApp) MountObjectStores(keys map[string]*storetypes.ObjectStoreKey) {
368+
skeys := slices.Sorted(maps.Keys(keys))
369+
for _, key := range skeys {
370+
memKey := keys[key]
371+
app.MountStore(memKey, storetypes.StoreTypeObject)
372+
}
373+
}
374+
359375
// MountStore mounts a store to the provided key in the BaseApp multistore,
360376
// using the default DB.
361377
func (app *BaseApp) MountStore(key storetypes.StoreKey, typ storetypes.StoreType) {
@@ -674,7 +690,7 @@ func (app *BaseApp) getBlockGasMeter(ctx sdk.Context) storetypes.GasMeter {
674690
}
675691

676692
// retrieve the context for the tx w/ txBytes and other memoized values.
677-
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
693+
func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte, txIndex int) sdk.Context {
678694
app.mu.Lock()
679695
defer app.mu.Unlock()
680696

@@ -684,7 +700,8 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
684700
}
685701
ctx := modeState.Context().
686702
WithTxBytes(txBytes).
687-
WithGasMeter(storetypes.NewInfiniteGasMeter())
703+
WithGasMeter(storetypes.NewInfiniteGasMeter()).
704+
WithTxIndex(txIndex)
688705
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed
689706

690707
ctx = ctx.WithIsSigverifyTx(app.sigverifyTx)
@@ -769,7 +786,11 @@ func (app *BaseApp) beginBlock(_ *abci.RequestFinalizeBlock) (sdk.BeginBlock, er
769786
return resp, nil
770787
}
771788

772-
func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
789+
func (app *BaseApp) deliverTx(tx []byte, memTx sdk.Tx, txIndex int) *abci.ExecTxResult {
790+
return app.deliverTxWithMultiStore(tx, memTx, txIndex, nil, nil)
791+
}
792+
793+
func (app *BaseApp) deliverTxWithMultiStore(tx []byte, memTx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) *abci.ExecTxResult {
773794
gInfo := sdk.GasInfo{}
774795
resultStr := "successful"
775796

@@ -782,7 +803,7 @@ func (app *BaseApp) deliverTx(tx []byte) *abci.ExecTxResult {
782803
telemetry.SetGauge(float32(gInfo.GasWanted), "tx", "gas", "wanted")
783804
}()
784805

785-
gInfo, result, anteEvents, err := app.runTx(execModeFinalize, tx, nil)
806+
gInfo, result, anteEvents, err := app.runTxWithMultiStore(execModeFinalize, tx, memTx, txIndex, txMultiStore, incarnationCache)
786807
if err != nil {
787808
resultStr = "failed"
788809
resp = sdkerrors.ResponseExecTxResultWithEvents(
@@ -842,12 +863,20 @@ func (app *BaseApp) endBlock(_ context.Context) (sdk.EndBlock, error) {
842863
// both txbytes and the decoded tx are passed to runTx to avoid the state machine encoding the tx and decoding the transaction twice
843864
// passing the decoded tx to runTX is optional, it will be decoded if the tx is nil
844865
func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
866+
return app.runTxWithMultiStore(mode, txBytes, tx, -1, nil, nil)
867+
}
868+
869+
func (app *BaseApp) runTxWithMultiStore(mode execMode, txBytes []byte, tx sdk.Tx, txIndex int, txMultiStore storetypes.MultiStore, incarnationCache map[string]any) (gInfo sdk.GasInfo, result *sdk.Result, anteEvents []abci.Event, err error) {
845870
// NOTE: GasWanted should be returned by the AnteHandler. GasUsed is
846871
// determined by the GasMeter. We need access to the context to get the gas
847872
// meter, so we initialize upfront.
848873
var gasWanted uint64
849874

850-
ctx := app.getContextForTx(mode, txBytes)
875+
ctx := app.getContextForTx(mode, txBytes, txIndex)
876+
ctx = ctx.WithIncarnationCache(incarnationCache)
877+
if txMultiStore != nil {
878+
ctx = ctx.WithMultiStore(txMultiStore)
879+
}
851880
ms := ctx.MultiStore()
852881

853882
// only run the tx if there is block gas remaining
@@ -957,7 +986,7 @@ func (app *BaseApp) runTx(mode execMode, txBytes []byte, tx sdk.Tx) (gInfo sdk.G
957986

958987
switch mode {
959988
case execModeCheck:
960-
err = app.mempool.Insert(ctx, tx)
989+
err = app.mempool.InsertWithGasWanted(ctx, tx, gasWanted)
961990
if err != nil {
962991
return gInfo, nil, anteEvents, err
963992
}
@@ -1040,6 +1069,8 @@ func (app *BaseApp) runMsgs(ctx sdk.Context, msgs []sdk.Msg, msgsV2 []protov2.Me
10401069
break
10411070
}
10421071

1072+
ctx = ctx.WithMsgIndex(i)
1073+
10431074
handler := app.msgServiceRouter.Handler(msg)
10441075
if handler == nil {
10451076
return nil, errorsmod.Wrapf(sdkerrors.ErrUnknownRequest, "no message handler found for %T", msg)
@@ -1151,18 +1182,18 @@ func (app *BaseApp) PrepareProposalVerifyTx(tx sdk.Tx) ([]byte, error) {
11511182
// ProcessProposal state internally will be discarded. <nil, err> will be
11521183
// returned if the transaction cannot be decoded. <Tx, nil> will be returned if
11531184
// the transaction is valid, otherwise <Tx, err> will be returned.
1154-
func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, error) {
1185+
func (app *BaseApp) ProcessProposalVerifyTx(txBz []byte) (sdk.Tx, uint64, error) {
11551186
tx, err := app.txDecoder(txBz)
11561187
if err != nil {
1157-
return nil, err
1188+
return nil, 0, err
11581189
}
11591190

1160-
_, _, _, err = app.runTx(execModeProcessProposal, txBz, tx)
1191+
gInfo, _, _, err := app.runTx(execModeProcessProposal, txBz, tx)
11611192
if err != nil {
1162-
return nil, err
1193+
return nil, 0, err
11631194
}
11641195

1165-
return tx, nil
1196+
return tx, gInfo.GasWanted, nil
11661197
}
11671198

11681199
func (app *BaseApp) TxDecode(txBytes []byte) (sdk.Tx, error) {

0 commit comments

Comments
 (0)