Skip to content

Commit 50f437b

Browse files
authored
Extract and append config block in assembler (#330)
Signed-off-by: Yoav Tock <[email protected]>
1 parent 24d2fe4 commit 50f437b

File tree

2 files changed

+171
-68
lines changed

2 files changed

+171
-68
lines changed

node/assembler/assembler_role.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@ import (
1111
"sync"
1212
"time"
1313

14+
"github.com/hyperledger/fabric-protos-go-apiv2/common"
1415
"github.com/hyperledger/fabric-x-orderer/common/types"
1516
"github.com/hyperledger/fabric-x-orderer/common/utils"
17+
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
1618
)
1719

1820
type AssemblerIndex interface {
@@ -23,6 +25,7 @@ type AssemblerIndex interface {
2325

2426
type AssemblerLedgerWriter interface {
2527
Append(batch types.Batch, orderingInfo types.OrderingInfo)
28+
AppendConfig(configBlock *common.Block, decisionNum types.DecisionNum)
2629
Close()
2730
}
2831

@@ -62,7 +65,13 @@ func (a *AssemblerRole) processOrderedBatchAttestations() {
6265
a.Logger.Infof("Received ordered batch attestation with BatchID: %s; OrderingInfo: %s", types.BatchIDToString(oba.BatchAttestation()), oba.OrderingInfo().String())
6366

6467
if oba.BatchAttestation().Shard() == types.ShardIDConsensus {
65-
a.Logger.Infof("Config decision: shard: %d, primary: %d, Ignoring!", oba.BatchAttestation().Shard(), oba.BatchAttestation().Primary())
68+
orderingInfo := oba.OrderingInfo()
69+
a.Logger.Infof("Config decision: shard: %d, Ordering Info: %s", oba.BatchAttestation().Shard(), oba.OrderingInfo().String())
70+
// TODO break the abstraction of oba.OrderingInfo().String()
71+
ordInfo := orderingInfo.(*state.OrderingInformation)
72+
block := ordInfo.CommonBlock
73+
a.Ledger.AppendConfig(block, ordInfo.DecisionNum)
74+
// TODO apply new config
6675
return
6776
}
6877

node/assembler/assembler_role_test.go

Lines changed: 161 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,20 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler_test
88

99
import (
10-
"encoding/binary"
1110
"fmt"
1211
"sync"
1312
"testing"
1413
"time"
1514

15+
"github.com/hyperledger/fabric-protos-go-apiv2/common"
16+
"github.com/hyperledger/fabric-x-common/protoutil"
1617
"github.com/hyperledger/fabric-x-orderer/common/types"
18+
"github.com/hyperledger/fabric-x-orderer/common/utils"
1719
"github.com/hyperledger/fabric-x-orderer/node/assembler"
1820
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
21+
node_ledger "github.com/hyperledger/fabric-x-orderer/node/ledger"
1922
"github.com/hyperledger/fabric-x-orderer/testutil"
20-
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
2124
)
2225

2326
type naiveOrderedBatchAttestationReplicator chan types.OrderedBatchAttestation
@@ -52,108 +55,199 @@ func (n *naiveIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) {
5255

5356
func (n *naiveIndex) Stop() {}
5457

55-
type naiveAssemblerLedger chan types.OrderedBatchAttestation
58+
func TestAssemblerRole_Batches(t *testing.T) {
59+
shardCount := 4
60+
batchNum := 20
61+
primaryID := types.PartyID(1)
5662

57-
func (n naiveAssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingInfo) {
58-
noba := &state.AvailableBatchOrdered{
59-
AvailableBatch: state.NewAvailableBatch(batch.Primary(), batch.Shard(), batch.Seq(), batch.Digest()),
60-
OrderingInformation: orderingInfo.(*state.OrderingInformation),
61-
}
63+
// create test batches from all shards
64+
digestsSet, batches := createTestBatches(t, shardCount, batchNum, primaryID)
65+
// create test setup
66+
index, ledger, ordBARep, assemblerRole := createAssemblerRole(t, shardCount)
6267

63-
n <- noba
64-
}
68+
assemblerRole.Run()
69+
70+
totalOrder := make(chan types.OrderedBatchAttestation)
71+
72+
genBlock, err := ledger.LedgerReader().RetrieveBlockByNumber(0)
73+
prevBlock := genBlock
74+
require.NoError(t, err)
75+
require.True(t, protoutil.IsConfigBlock(prevBlock), "ledger includes an empty genesis block")
76+
decNum := types.DecisionNum(1)
77+
78+
simulateDecisions(t, batches, index, prevBlock, decNum, totalOrder)
79+
80+
go func() {
81+
for ordererBatchAttestation := range totalOrder {
82+
ordBARep <- ordererBatchAttestation
83+
}
84+
}()
85+
86+
require.Eventually(t, func() bool {
87+
return ledger.LedgerReader().Height() == uint64(shardCount)*uint64(batchNum)+1
88+
}, 10*time.Second, 100*time.Millisecond)
89+
90+
// verify all batches received
91+
for blockNUm := uint64(0); blockNUm <= uint64(batchNum*shardCount); blockNUm++ {
92+
block, err := ledger.LedgerReader().RetrieveBlockByNumber(blockNUm)
93+
require.NoError(t, err)
94+
if blockNUm == 0 {
95+
require.True(t, protoutil.IsConfigBlock(block), "ledger includes an empty genesis block")
96+
continue
97+
}
98+
99+
batchID, ordInfo, _, _ := node_ledger.AssemblerBatchIdOrderingInfoAndTxCountFromBlock(block)
100+
101+
expectedBatch := batches[batchID.Shard()][batchID.Seq()]
102+
require.Equal(t, types.BatchIDToString(expectedBatch), types.BatchIDToString(batchID))
103+
require.Contains(t, ordInfo.String(), fmt.Sprintf("DecisionNum: %d, BatchIndex: 0, BatchCount: 1; No. Sigs: 0, BlockHeader: Number: %d", blockNUm, blockNUm), ordInfo.String())
65104

66-
func (n naiveAssemblerLedger) Close() {
67-
close(n)
105+
delete(digestsSet, string(batchID.Digest()))
106+
}
107+
108+
require.Len(t, digestsSet, 0)
68109
}
69110

70-
func TestAssembler(t *testing.T) {
111+
func TestAssemblerRole_Config(t *testing.T) {
71112
shardCount := 4
72-
batchNum := 20
113+
batchNum := 3
114+
primaryID := types.PartyID(1)
73115

74-
digests := make(map[string]struct{})
116+
// create test batches from all shards
117+
_, batches := createTestBatches(t, shardCount, batchNum, primaryID)
118+
// create test setup
119+
index, ledger, ordBARep, assemblerRole := createAssemblerRole(t, shardCount)
75120

76-
var batches [][]types.Batch
77-
for shardID := types.ShardID(0); shardID < types.ShardID(shardCount); shardID++ {
78-
var batchesForShard []types.Batch
79-
for seq := types.BatchSequence(0); seq < types.BatchSequence(batchNum); seq++ {
80-
buff := make([]byte, 1024)
81-
binary.BigEndian.PutUint16(buff, uint16(shardID))
82-
binary.BigEndian.PutUint16(buff[100:], uint16(seq))
83-
batch := types.NewSimpleBatch(shardID, 1, seq, [][]byte{buff}, 0)
84-
digests[string(batch.Digest())] = struct{}{}
85-
batchesForShard = append(batchesForShard, batch)
121+
assemblerRole.Run()
122+
123+
totalOrder := make(chan types.OrderedBatchAttestation)
124+
125+
genBlock, err := ledger.LedgerReader().RetrieveBlockByNumber(0)
126+
prevBlock := genBlock
127+
require.NoError(t, err)
128+
require.True(t, protoutil.IsConfigBlock(prevBlock), "ledger includes an empty genesis block")
129+
decNum := types.DecisionNum(1)
130+
131+
simulateDecisions(t, batches, index, prevBlock, decNum, totalOrder)
132+
133+
go func() {
134+
for ordererBatchAttestation := range totalOrder {
135+
ordBARep <- ordererBatchAttestation
86136
}
87-
batches = append(batches, batchesForShard)
137+
}()
138+
139+
require.Eventually(t, func() bool {
140+
return ledger.LedgerReader().Height() == uint64(shardCount)*uint64(batchNum)+1
141+
}, 10*time.Second, 100*time.Millisecond)
142+
143+
// consensus emits a config decision
144+
lastBlock, err := ledger.LedgerReader().RetrieveBlockByNumber(ledger.LedgerReader().Height() - 1)
145+
require.NoError(t, err)
146+
configBlock := protoutil.UnmarshalBlockOrPanic(protoutil.MarshalOrPanic(genBlock)) // clone the block
147+
configBlock.Header.PreviousHash = protoutil.BlockHeaderHash(lastBlock.Header)
148+
configBlock.Header.Number = lastBlock.Header.Number + 1
149+
150+
configABO := &state.AvailableBatchOrdered{
151+
AvailableBatch: state.NewAvailableBatch(0, types.ShardIDConsensus, 0, []byte{}),
152+
OrderingInformation: &state.OrderingInformation{
153+
CommonBlock: configBlock,
154+
DecisionNum: decNum,
155+
BatchIndex: 0,
156+
BatchCount: 1,
157+
},
88158
}
159+
totalOrder <- configABO
89160

90-
index, ledger, nobar, assembler := createAssemblerRole(t, shardCount)
161+
require.Eventually(t, func() bool {
162+
return ledger.LedgerReader().Height() == uint64(shardCount)*uint64(batchNum)+2
163+
}, 10*time.Second, 100*time.Millisecond)
91164

92-
assembler.Run()
165+
lastBlock, err = ledger.LedgerReader().RetrieveBlockByNumber(ledger.LedgerReader().Height() - 1)
166+
require.NoError(t, err)
167+
require.True(t, protoutil.IsConfigBlock(lastBlock))
168+
}
93169

94-
totalOrder := make(chan *state.AvailableBatch)
170+
func simulateDecisions(
171+
t *testing.T,
172+
batches [][]types.Batch,
173+
index *naiveIndex,
174+
prevBlock *common.Block,
175+
decNum types.DecisionNum,
176+
totalOrder chan types.OrderedBatchAttestation,
177+
) {
178+
go func() {
179+
for shardID := 0; shardID < len(batches); shardID++ {
180+
for _, batch := range batches[shardID] {
95181

96-
for shardID := 0; shardID < shardCount; shardID++ {
97-
go func(shard int) {
98-
for _, batch := range batches[shard] {
99182
index.Put(batch)
100-
nba := state.NewAvailableBatch(batch.Primary(), batch.Shard(), batch.Seq(), batch.Digest())
101-
totalOrder <- nba
102-
}
103-
}(shardID)
104-
}
105183

106-
go func() {
107-
var num uint64
108-
109-
for nba := range totalOrder {
110-
noba := &state.AvailableBatchOrdered{
111-
AvailableBatch: nba,
112-
OrderingInformation: &state.OrderingInformation{
113-
BlockHeader: &state.BlockHeader{Number: num, PrevHash: []byte{0x08}, Digest: []byte{0x09}},
114-
DecisionNum: types.DecisionNum(num),
115-
BatchIndex: 0,
116-
BatchCount: 1,
117-
},
184+
ab := state.NewAvailableBatch(batch.Primary(), batch.Shard(), batch.Seq(), batch.Digest())
185+
block := protoutil.NewBlock(prevBlock.Header.Number+1, protoutil.BlockHeaderHash(prevBlock.Header))
186+
block.Header.DataHash = ab.Digest()
187+
188+
abo := &state.AvailableBatchOrdered{
189+
AvailableBatch: ab,
190+
OrderingInformation: &state.OrderingInformation{
191+
CommonBlock: block,
192+
DecisionNum: decNum,
193+
BatchIndex: 0,
194+
BatchCount: 1,
195+
},
196+
}
197+
totalOrder <- abo
198+
199+
prevBlock = block
200+
decNum++
118201
}
119-
nobar <- noba
120-
num++
121202
}
203+
204+
t.Logf("Simulated %d decisions", decNum)
122205
}()
206+
}
123207

124-
for i := uint64(0); i < uint64(batchNum*shardCount); i++ {
125-
noba := <-ledger
126-
assert.Equal(t, fmt.Sprintf("DecisionNum: %d, BatchIndex: 0, BatchCount: 1; No. Sigs: 0, BlockHeader: Number: %d, PrevHash: 08, Digest: 09, Common Block: <nil>", i, i), noba.OrderingInfo().String())
127-
delete(digests, string(noba.BatchAttestation().Digest()))
208+
func createTestBatches(t *testing.T, shardCount int, batchNum int, primaryID types.PartyID) (map[string]bool, [][]types.Batch) {
209+
digestsSet := make(map[string]bool)
210+
var batches [][]types.Batch
211+
for shardID := types.ShardID(0); shardID < types.ShardID(shardCount); shardID++ {
212+
var batchesForShard []types.Batch
213+
for seq := types.BatchSequence(0); seq < types.BatchSequence(batchNum); seq++ {
214+
buff := generateRandomBytes(t, 1024)
215+
batch := types.NewSimpleBatch(shardID, primaryID, seq, [][]byte{buff}, 0)
216+
digestsSet[string(batch.Digest())] = true
217+
batchesForShard = append(batchesForShard, batch)
218+
}
219+
batches = append(batches, batchesForShard)
128220
}
129221

130-
assert.Len(t, digests, 0)
222+
return digestsSet, batches
131223
}
132224

133-
func createAssemblerRole(t *testing.T, shardCount int) (
134-
*naiveIndex,
135-
naiveAssemblerLedger,
136-
naiveOrderedBatchAttestationReplicator,
137-
*assembler.AssemblerRole,
138-
) {
225+
func createAssemblerRole(t *testing.T, shardCount int) (*naiveIndex, node_ledger.AssemblerLedgerReaderWriter, naiveOrderedBatchAttestationReplicator, *assembler.AssemblerRole) {
226+
tempDir := t.TempDir()
227+
228+
logger := testutil.CreateLogger(t, 0)
229+
139230
index := &naiveIndex{}
140231

141232
var shards []types.ShardID
142233
for i := 0; i < shardCount; i++ {
143234
shards = append(shards, types.ShardID(i))
144235
}
145236

146-
ledger := make(naiveAssemblerLedger, 10)
237+
ledgerFactory := &node_ledger.DefaultAssemblerLedgerFactory{}
238+
ledger, err := ledgerFactory.Create(logger, tempDir)
239+
require.NoError(t, err)
240+
ledger.AppendConfig(utils.EmptyGenesisBlock("test"), 0)
147241

148-
nobar := make(naiveOrderedBatchAttestationReplicator)
242+
ordBARep := make(naiveOrderedBatchAttestationReplicator)
149243

150-
assembler := &assembler.AssemblerRole{
244+
assemblerRole := &assembler.AssemblerRole{
151245
Shards: shards,
152246
Logger: testutil.CreateLogger(t, 0),
153247
Ledger: ledger,
154248
ShardCount: shardCount,
155-
OrderedBatchAttestationReplicator: nobar,
249+
OrderedBatchAttestationReplicator: ordBARep,
156250
Index: index,
157251
}
158-
return index, ledger, nobar, assembler
252+
return index, ledger, ordBARep, assemblerRole
159253
}

0 commit comments

Comments
 (0)