Skip to content

Commit a0fa942

Browse files
authored
Support Nanosecond timestamps (#103)
* Support sub-second report intervals This change is backwards compatible with previous versions so can be rolled out node-by-node without causing protocol downtime (in theory). CAUTION: Retirement report is NOT backwards compatible so do not call setStagingConfig on a cluster with nodes running older versions! This change enables specifying sub-second report cadence using the onchain config. * copyloopvar lint
1 parent d33e956 commit a0fa942

34 files changed

+2361
-1567
lines changed

.github/workflows/push-master.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ jobs:
5555
use-go-cache: true
5656
go-cache-dep-path: "**/go.sum"
5757
go-version-file: go.mod
58-
golangci-lint-version: "v1.55.2"
58+
golangci-lint-version: "v1.64.5"
5959
golangci-lint-args: --out-format colored-line-number,checkstyle:golangci-lint-report.xml ${{ needs.init.outputs.lint_args_packages }}
6060

6161
ci-test:

.golangci.yml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ linters:
1515
- whitespace
1616
- depguard
1717
- containedctx
18+
- copyloopvar
1819
linters-settings:
1920
exhaustive:
2021
default-signifies-exhaustive: true

llo/channel_definitions.go

+4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ func VerifyChannelDefinitions(ctx context.Context, codecs map[llotypes.ReportFor
1919
merr = errors.Join(merr, fmt.Errorf("ChannelDefinition with ID %d has no streams", channelID))
2020
continue
2121
}
22+
if len(cd.Streams) > MaxStreamsPerChannel {
23+
merr = errors.Join(merr, fmt.Errorf("ChannelDefinition with ID %d has too many streams, got: %d/%d", channelID, len(cd.Streams), MaxStreamsPerChannel))
24+
continue
25+
}
2226
for _, strm := range cd.Streams {
2327
if strm.Aggregator == 0 {
2428
merr = errors.Join(merr, fmt.Errorf("ChannelDefinition with ID %d has stream %d with zero aggregator (this may indicate an uninitialized struct)", channelID, strm.StreamID))

llo/channel_definitions_test.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,15 @@ func Test_VerifyChannelDefinitions(t *testing.T) {
3636
err := VerifyChannelDefinitions(ctx, codecs, channelDefs)
3737
assert.EqualError(t, err, "too many channels, got: 2001/2000")
3838
})
39-
39+
t.Run("fails if channel has too many streams", func(t *testing.T) {
40+
channelDefs := llotypes.ChannelDefinitions{
41+
1: llotypes.ChannelDefinition{
42+
Streams: make([]llotypes.Stream, MaxStreamsPerChannel+1),
43+
},
44+
}
45+
err := VerifyChannelDefinitions(ctx, codecs, channelDefs)
46+
assert.EqualError(t, err, "ChannelDefinition with ID 1 has too many streams, got: 10001/10000")
47+
})
4048
t.Run("fails for channel with no streams", func(t *testing.T) {
4149
channelDefs := llotypes.ChannelDefinitions{
4250
1: llotypes.ChannelDefinition{},
@@ -57,8 +65,8 @@ func Test_VerifyChannelDefinitions(t *testing.T) {
5765

5866
t.Run("fails if too many total unique stream IDs", func(t *testing.T) {
5967
streams := make([]llotypes.Stream, MaxObservationStreamValuesLength)
60-
for i := 0; i < MaxObservationStreamValuesLength; i++ {
61-
streams[i] = llotypes.Stream{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}
68+
for i := uint32(0); i < MaxObservationStreamValuesLength; i++ {
69+
streams[i] = llotypes.Stream{StreamID: i, Aggregator: llotypes.AggregatorMedian}
6270
}
6371
channelDefs := llotypes.ChannelDefinitions{
6472
1: llotypes.ChannelDefinition{
@@ -105,8 +113,8 @@ func Test_VerifyChannelDefinitions(t *testing.T) {
105113

106114
t.Run("succeeds with exact maxes", func(t *testing.T) {
107115
streams := make([]llotypes.Stream, MaxObservationStreamValuesLength)
108-
for i := 0; i < MaxObservationStreamValuesLength; i++ {
109-
streams[i] = llotypes.Stream{StreamID: uint32(i), Aggregator: llotypes.AggregatorMedian}
116+
for i := uint32(0); i < MaxObservationStreamValuesLength; i++ {
117+
streams[i] = llotypes.Stream{StreamID: i, Aggregator: llotypes.AggregatorMedian}
110118
}
111119
channelDefs := make(llotypes.ChannelDefinitions, MaxOutcomeChannelDefinitionsLength)
112120
for i := uint32(0); i < MaxOutcomeChannelDefinitionsLength; i++ {

llo/json_report_codec.go

+28-28
Original file line numberDiff line numberDiff line change
@@ -47,13 +47,13 @@ type JSONReportCodec struct{}
4747

4848
func (cdc JSONReportCodec) Encode(_ context.Context, r Report, _ llotypes.ChannelDefinition) ([]byte, error) {
4949
type encode struct {
50-
ConfigDigest types.ConfigDigest
51-
SeqNr uint64
52-
ChannelID llotypes.ChannelID
53-
ValidAfterSeconds uint32
54-
ObservationTimestampSeconds uint32
55-
Values []JSONStreamValue
56-
Specimen bool
50+
ConfigDigest types.ConfigDigest
51+
SeqNr uint64
52+
ChannelID llotypes.ChannelID
53+
ValidAfterNanoseconds uint64
54+
ObservationTimestampNanoseconds uint64
55+
Values []JSONStreamValue
56+
Specimen bool
5757
}
5858
values := make([]JSONStreamValue, len(r.Values))
5959
for i, sv := range r.Values {
@@ -70,13 +70,13 @@ func (cdc JSONReportCodec) Encode(_ context.Context, r Report, _ llotypes.Channe
7070
}
7171
}
7272
e := encode{
73-
ConfigDigest: r.ConfigDigest,
74-
SeqNr: r.SeqNr,
75-
ChannelID: r.ChannelID,
76-
ValidAfterSeconds: r.ValidAfterSeconds,
77-
ObservationTimestampSeconds: r.ObservationTimestampSeconds,
78-
Values: values,
79-
Specimen: r.Specimen,
73+
ConfigDigest: r.ConfigDigest,
74+
SeqNr: r.SeqNr,
75+
ChannelID: r.ChannelID,
76+
ValidAfterNanoseconds: r.ValidAfterNanoseconds,
77+
ObservationTimestampNanoseconds: r.ObservationTimestampNanoseconds,
78+
Values: values,
79+
Specimen: r.Specimen,
8080
}
8181
return json.Marshal(e)
8282
}
@@ -90,13 +90,13 @@ func (cdc JSONReportCodec) Verify(_ context.Context, cd llotypes.ChannelDefiniti
9090

9191
func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) {
9292
type decode struct {
93-
ConfigDigest string
94-
SeqNr uint64
95-
ChannelID llotypes.ChannelID
96-
ValidAfterSeconds uint32
97-
ObservationTimestampSeconds uint32
98-
Values []JSONStreamValue
99-
Specimen bool
93+
ConfigDigest string
94+
SeqNr uint64
95+
ChannelID llotypes.ChannelID
96+
ValidAfterNanoseconds uint64
97+
ObservationTimestampNanoseconds uint64
98+
Values []JSONStreamValue
99+
Specimen bool
100100
}
101101
d := decode{}
102102
err = json.Unmarshal(b, &d)
@@ -125,13 +125,13 @@ func (cdc JSONReportCodec) Decode(b []byte) (r Report, err error) {
125125
}
126126

127127
return Report{
128-
ConfigDigest: cd,
129-
SeqNr: d.SeqNr,
130-
ChannelID: d.ChannelID,
131-
ValidAfterSeconds: d.ValidAfterSeconds,
132-
ObservationTimestampSeconds: d.ObservationTimestampSeconds,
133-
Values: values,
134-
Specimen: d.Specimen,
128+
ConfigDigest: cd,
129+
SeqNr: d.SeqNr,
130+
ChannelID: d.ChannelID,
131+
ValidAfterNanoseconds: d.ValidAfterNanoseconds,
132+
ObservationTimestampNanoseconds: d.ObservationTimestampNanoseconds,
133+
Values: values,
134+
Specimen: d.Specimen,
135135
}, err
136136
}
137137

llo/json_report_codec_test.go

+26-26
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func FuzzJSONCodec_Decode_Unpack(f *testing.F) {
3232
incompleteJSON := []byte(`{`)
3333
notJSON := []byte(`"random string"`)
3434
unprintable := []byte{1, 2, 3}
35-
validJSONReport := []byte(`{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterSeconds":44,"ObservationTimestampSeconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`)
35+
validJSONReport := []byte(`{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`)
3636
invalidConfigDigest := []byte(`{"SeqNr":42,"ConfigDigest":"foo"}`)
3737
invalidConfigDigestNotEnoughBytes := []byte(`{"SeqNr":42,"ConfigDigest":"0xdead"}`)
3838
badStreamValues := []byte(`{"SeqNr":42,"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000", "Values":[{"Type":0,"Value":null},{"Type":-1,"Value":"2"}]}`)
@@ -104,13 +104,13 @@ func Test_JSONCodec_Properties(t *testing.T) {
104104
return equalReports(r, r2)
105105
},
106106
gen.StrictStruct(reflect.TypeOf(&Report{}), map[string]gopter.Gen{
107-
"ConfigDigest": genConfigDigest(),
108-
"SeqNr": genSeqNr(),
109-
"ChannelID": gen.UInt32(),
110-
"ValidAfterSeconds": gen.UInt32(),
111-
"ObservationTimestampSeconds": gen.UInt32(),
112-
"Values": genStreamValues(),
113-
"Specimen": gen.Bool(),
107+
"ConfigDigest": genConfigDigest(),
108+
"SeqNr": genSeqNr(),
109+
"ChannelID": gen.UInt32(),
110+
"ValidAfterNanoseconds": gen.UInt64(),
111+
"ObservationTimestampNanoseconds": gen.UInt64(),
112+
"Values": genStreamValues(),
113+
"Specimen": gen.Bool(),
114114
}),
115115
))
116116

@@ -159,10 +159,10 @@ func equalReports(r, r2 Report) bool {
159159
if r.ChannelID != r2.ChannelID {
160160
return false
161161
}
162-
if r.ValidAfterSeconds != r2.ValidAfterSeconds {
162+
if r.ValidAfterNanoseconds != r2.ValidAfterNanoseconds {
163163
return false
164164
}
165-
if r.ObservationTimestampSeconds != r2.ObservationTimestampSeconds {
165+
if r.ObservationTimestampNanoseconds != r2.ObservationTimestampNanoseconds {
166166
return false
167167
}
168168
if len(r.Values) != len(r2.Values) {
@@ -272,21 +272,21 @@ func Test_JSONCodec(t *testing.T) {
272272
t.Run("Encode=>Decode", func(t *testing.T) {
273273
ctx := tests.Context(t)
274274
r := Report{
275-
ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}),
276-
SeqNr: 43,
277-
ChannelID: llotypes.ChannelID(46),
278-
ValidAfterSeconds: 44,
279-
ObservationTimestampSeconds: 45,
280-
Values: []StreamValue{ToDecimal(decimal.NewFromInt(1)), ToDecimal(decimal.NewFromInt(2)), &Quote{Bid: decimal.NewFromFloat(3.13), Benchmark: decimal.NewFromFloat(4.4), Ask: decimal.NewFromFloat(5.12)}},
281-
Specimen: true,
275+
ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}),
276+
SeqNr: 43,
277+
ChannelID: llotypes.ChannelID(46),
278+
ValidAfterNanoseconds: 44,
279+
ObservationTimestampNanoseconds: 45,
280+
Values: []StreamValue{ToDecimal(decimal.NewFromInt(1)), ToDecimal(decimal.NewFromInt(2)), &Quote{Bid: decimal.NewFromFloat(3.13), Benchmark: decimal.NewFromFloat(4.4), Ask: decimal.NewFromFloat(5.12)}},
281+
Specimen: true,
282282
}
283283

284284
cdc := JSONReportCodec{}
285285

286286
encoded, err := cdc.Encode(ctx, r, llo.ChannelDefinition{})
287287
require.NoError(t, err)
288288

289-
assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterSeconds":44,"ObservationTimestampSeconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`, string(encoded))
289+
assert.Equal(t, `{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true}`, string(encoded))
290290

291291
decoded, err := cdc.Decode(encoded)
292292
require.NoError(t, err)
@@ -326,7 +326,7 @@ func Test_JSONCodec(t *testing.T) {
326326
})
327327
})
328328
t.Run("UnpackDecode unpacks and decodes report", func(t *testing.T) {
329-
b := []byte(`{"configDigest":"0102030000000000000000000000000000000000000000000000000000000000","seqNr":43,"report":{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterSeconds":44,"ObservationTimestampSeconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true},"sigs":[{"Signature":"AgME","Signer":2}]}`)
329+
b := []byte(`{"configDigest":"0102030000000000000000000000000000000000000000000000000000000000","seqNr":43,"report":{"ConfigDigest":"0102030000000000000000000000000000000000000000000000000000000000","SeqNr":43,"ChannelID":46,"ValidAfterNanoseconds":44,"ObservationTimestampNanoseconds":45,"Values":[{"Type":0,"Value":"1"},{"Type":0,"Value":"2"},{"Type":1,"Value":"Q{Bid: 3.13, Benchmark: 4.4, Ask: 5.12}"}],"Specimen":true},"sigs":[{"Signature":"AgME","Signer":2}]}`)
330330

331331
cdc := JSONReportCodec{}
332332
digest, seqNr, report, sigs, err := cdc.UnpackDecode(b)
@@ -335,13 +335,13 @@ func Test_JSONCodec(t *testing.T) {
335335
assert.Equal(t, types.ConfigDigest([32]byte{1, 2, 3}), digest)
336336
assert.Equal(t, uint64(43), seqNr)
337337
assert.Equal(t, Report{
338-
ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}),
339-
SeqNr: 43,
340-
ChannelID: llotypes.ChannelID(46),
341-
ValidAfterSeconds: 44,
342-
ObservationTimestampSeconds: 45,
343-
Values: []StreamValue{ToDecimal(decimal.NewFromInt(1)), ToDecimal(decimal.NewFromInt(2)), &Quote{Bid: decimal.NewFromFloat(3.13), Benchmark: decimal.NewFromFloat(4.4), Ask: decimal.NewFromFloat(5.12)}},
344-
Specimen: true,
338+
ConfigDigest: types.ConfigDigest([32]byte{1, 2, 3}),
339+
SeqNr: 43,
340+
ChannelID: llotypes.ChannelID(46),
341+
ValidAfterNanoseconds: 44,
342+
ObservationTimestampNanoseconds: 45,
343+
Values: []StreamValue{ToDecimal(decimal.NewFromInt(1)), ToDecimal(decimal.NewFromInt(2)), &Quote{Bid: decimal.NewFromFloat(3.13), Benchmark: decimal.NewFromFloat(4.4), Ask: decimal.NewFromFloat(5.12)}},
344+
Specimen: true,
345345
}, report)
346346
assert.Equal(t, []types.AttributedOnchainSignature{{Signature: []byte{2, 3, 4}, Signer: 2}}, sigs)
347347
})

0 commit comments

Comments
 (0)