Skip to content

Commit bfd80f7

Browse files
authored
Merge pull request #1914 from josephschorr/rw-tx-metadata
Implement support for metadata associated with read-write transactions
2 parents 0d882c7 + fb28510 commit bfd80f7

29 files changed

+606
-71
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ require (
110110
)
111111

112112
require (
113+
github.com/Masterminds/semver v1.5.0
113114
github.com/Yiling-J/theine-go v0.4.1
114115
github.com/ccoveille/go-safecast v1.1.0
115116
github.com/gosimple/slug v1.14.0

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -654,6 +654,8 @@ github.com/IBM/pgxpoolprometheus v1.1.1/go.mod h1:GFJDkHbidFfB2APbhBTSy2X4PKH3bL
654654
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
655655
github.com/KimMachineGun/automemlimit v0.6.1 h1:ILa9j1onAAMadBsyyUJv5cack8Y1WT26yLj/V+ulKp8=
656656
github.com/KimMachineGun/automemlimit v0.6.1/go.mod h1:T7xYht7B8r6AG/AqFcUdc7fzd2bIdBKmepfP2S1svPY=
657+
github.com/Masterminds/semver v1.5.0 h1:H65muMkzWKEuNDnfl9d70GUjFniHKHRbFPGBuZ3QEww=
658+
github.com/Masterminds/semver v1.5.0/go.mod h1:MB6lktGJrhw8PrUyiEoblNEGEQ+RzHPF078ddwwvV3Y=
657659
github.com/Masterminds/semver/v3 v3.3.0 h1:B8LGeaivUe71a5qox1ICM/JLl0NqZSW5CHyL+hmvYS0=
658660
github.com/Masterminds/semver/v3 v3.3.0/go.mod h1:4V+yj/TJE1HU9XfppCwVMZq3I84lprf4nC11bSS5beM=
659661
github.com/Masterminds/squirrel v1.5.4 h1:uUcX/aBc8O7Fg9kaISIUsHXdKuqehiXAMQTYX8afzqM=

internal/datastore/common/changes.go

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"sort"
66

77
"golang.org/x/exp/maps"
8+
"google.golang.org/protobuf/types/known/structpb"
89

910
"github.com/ccoveille/go-safecast"
1011

@@ -37,6 +38,7 @@ type changeRecord[R datastore.Revision] struct {
3738
definitionsChanged map[string]datastore.SchemaDefinition
3839
namespacesDeleted map[string]struct{}
3940
caveatsDeleted map[string]struct{}
41+
metadata map[string]any
4042
}
4143

4244
// NewChanges creates a new Changes object for change tracking and de-duplication.
@@ -132,6 +134,25 @@ func (ch *Changes[R, K]) adjustByteSize(item sized, delta int) error {
132134
return nil
133135
}
134136

137+
// SetRevisionMetadata sets the metadata for the given revision.
138+
func (ch *Changes[R, K]) SetRevisionMetadata(ctx context.Context, rev R, metadata map[string]any) error {
139+
if len(metadata) == 0 {
140+
return nil
141+
}
142+
143+
record, err := ch.recordForRevision(rev)
144+
if err != nil {
145+
return err
146+
}
147+
148+
if len(record.metadata) > 0 {
149+
return spiceerrors.MustBugf("metadata already set for revision")
150+
}
151+
152+
maps.Copy(record.metadata, metadata)
153+
return nil
154+
}
155+
135156
func (ch *Changes[R, K]) recordForRevision(rev R) (changeRecord[R], error) {
136157
k := ch.keyFunc(rev)
137158
revisionChanges, ok := ch.records[k]
@@ -143,6 +164,7 @@ func (ch *Changes[R, K]) recordForRevision(rev R) (changeRecord[R], error) {
143164
make(map[string]datastore.SchemaDefinition),
144165
make(map[string]struct{}),
145166
make(map[string]struct{}),
167+
make(map[string]any),
146168
}
147169
ch.records[k] = revisionChanges
148170
}
@@ -248,21 +270,25 @@ func (ch *Changes[R, K]) AddChangedDefinition(
248270

249271
// AsRevisionChanges returns the list of changes processed so far as a datastore watch
250272
// compatible, ordered, changelist.
251-
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) []datastore.RevisionChanges {
273+
func (ch *Changes[R, K]) AsRevisionChanges(lessThanFunc func(lhs, rhs K) bool) ([]datastore.RevisionChanges, error) {
252274
return ch.revisionChanges(lessThanFunc, *new(R), false)
253275
}
254276

255277
// FilterAndRemoveRevisionChanges filters a list of changes processed up to the bound revision from the changes list, removing them
256278
// and returning the filtered changes.
257-
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) []datastore.RevisionChanges {
258-
changes := ch.revisionChanges(lessThanFunc, boundRev, true)
279+
func (ch *Changes[R, K]) FilterAndRemoveRevisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R) ([]datastore.RevisionChanges, error) {
280+
changes, err := ch.revisionChanges(lessThanFunc, boundRev, true)
281+
if err != nil {
282+
return nil, err
283+
}
284+
259285
ch.removeAllChangesBefore(boundRev)
260-
return changes
286+
return changes, nil
261287
}
262288

263-
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) []datastore.RevisionChanges {
289+
func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, boundRev R, withBound bool) ([]datastore.RevisionChanges, error) {
264290
if ch.IsEmpty() {
265-
return nil
291+
return nil, nil
266292
}
267293

268294
revisionsWithChanges := make([]K, 0, len(ch.records))
@@ -273,7 +299,7 @@ func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, bou
273299
}
274300

275301
if len(revisionsWithChanges) == 0 {
276-
return nil
302+
return nil, nil
277303
}
278304

279305
sort.Slice(revisionsWithChanges, func(i int, j int) bool {
@@ -299,9 +325,18 @@ func (ch *Changes[R, K]) revisionChanges(lessThanFunc func(lhs, rhs K) bool, bou
299325
changes[i].ChangedDefinitions = maps.Values(revisionChangeRecord.definitionsChanged)
300326
changes[i].DeletedNamespaces = maps.Keys(revisionChangeRecord.namespacesDeleted)
301327
changes[i].DeletedCaveats = maps.Keys(revisionChangeRecord.caveatsDeleted)
328+
329+
if len(revisionChangeRecord.metadata) > 0 {
330+
metadata, err := structpb.NewStruct(revisionChangeRecord.metadata)
331+
if err != nil {
332+
return nil, spiceerrors.MustBugf("failed to convert metadata to structpb: %v", err)
333+
}
334+
335+
changes[i].Metadata = metadata
336+
}
302337
}
303338

304-
return changes
339+
return changes, nil
305340
}
306341

307342
func (ch *Changes[R, K]) removeAllChangesBefore(boundRev R) {

internal/datastore/common/changes_test.go

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -330,9 +330,12 @@ func TestChanges(t *testing.T) {
330330
}
331331
}
332332

333+
actual, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
334+
require.NoError(err)
335+
333336
require.Equal(
334337
canonicalize(tc.expected),
335-
canonicalize(ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)),
338+
canonicalize(actual),
336339
)
337340
})
338341
}
@@ -347,6 +350,23 @@ func TestFilteredSchemaChanges(t *testing.T) {
347350
require.True(t, ch.IsEmpty())
348351
}
349352

353+
func TestSetMetadata(t *testing.T) {
354+
ctx := context.Background()
355+
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships|datastore.WatchSchema, 0)
356+
require.True(t, ch.IsEmpty())
357+
358+
err := ch.SetRevisionMetadata(ctx, rev1, map[string]any{"foo": "bar"})
359+
require.NoError(t, err)
360+
require.False(t, ch.IsEmpty())
361+
362+
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev2)
363+
require.NoError(t, err)
364+
require.Equal(t, 1, len(results))
365+
require.True(t, ch.IsEmpty())
366+
367+
require.Equal(t, map[string]any{"foo": "bar"}, results[0].Metadata.AsMap())
368+
}
369+
350370
func TestFilteredRelationshipChanges(t *testing.T) {
351371
ctx := context.Background()
352372
ch := NewChanges(revisions.TransactionIDKeyFunc, datastore.WatchRelationships, 0)
@@ -374,7 +394,8 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
374394

375395
require.False(t, ch.IsEmpty())
376396

377-
results := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
397+
results, err := ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, rev3)
398+
require.NoError(t, err)
378399
require.Equal(t, 2, len(results))
379400
require.False(t, ch.IsEmpty())
380401

@@ -393,8 +414,9 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
393414
},
394415
}, results)
395416

396-
remaining := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
417+
remaining, err := ch.AsRevisionChanges(revisions.TransactionIDKeyLessThanFunc)
397418
require.Equal(t, 1, len(remaining))
419+
require.NoError(t, err)
398420

399421
require.Equal(t, []datastore.RevisionChanges{
400422
{
@@ -405,11 +427,13 @@ func TestFilterAndRemoveRevisionChanges(t *testing.T) {
405427
},
406428
}, remaining)
407429

408-
results = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion)
430+
results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillion)
431+
require.NoError(t, err)
409432
require.Equal(t, 1, len(results))
410433
require.True(t, ch.IsEmpty())
411434

412-
results = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne)
435+
results, err = ch.FilterAndRemoveRevisionChanges(revisions.TransactionIDKeyLessThanFunc, revOneMillionOne)
436+
require.NoError(t, err)
413437
require.Equal(t, 0, len(results))
414438
require.True(t, ch.IsEmpty())
415439
}
@@ -432,7 +456,8 @@ func TestHLCOrdering(t *testing.T) {
432456
err = ch.AddRelationshipChange(ctx, rev0, tuple.MustParse("document:foo#viewer@user:tom"), core.RelationTupleUpdate_TOUCH)
433457
require.NoError(t, err)
434458

435-
remaining := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
459+
remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
460+
require.NoError(t, err)
436461
require.Equal(t, 2, len(remaining))
437462

438463
require.Equal(t, []datastore.RevisionChanges{
@@ -475,7 +500,8 @@ func TestHLCSameRevision(t *testing.T) {
475500
err = ch.AddRelationshipChange(ctx, rev0again, tuple.MustParse("document:foo#viewer@user:sarah"), core.RelationTupleUpdate_TOUCH)
476501
require.NoError(t, err)
477502

478-
remaining := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
503+
remaining, err := ch.AsRevisionChanges(revisions.HLCKeyLessThanFunc)
504+
require.NoError(t, err)
479505
require.Equal(t, 1, len(remaining))
480506

481507
expected := []*core.RelationTupleUpdate{

internal/datastore/crdb/crdb.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ const (
5454
tableTransactions = "transactions"
5555
tableCaveat = "caveat"
5656
tableRelationshipCounter = "relationship_counter"
57+
tableTransactionMetadata = "transaction_metadata"
5758

5859
colNamespace = "namespace"
5960
colConfig = "serialized_config"
@@ -79,6 +80,8 @@ const (
7980
colCounterSerializedFilter = "serialized_filter"
8081
colCounterCurrentCount = "current_count"
8182
colCounterUpdatedAt = "updated_at_timestamp"
83+
colExpiresAt = "expires_at"
84+
colMetadata = "metadata"
8285

8386
errUnableToInstantiate = "unable to instantiate datastore"
8487
errRevision = "unable to find revision: %w"
@@ -207,6 +210,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas
207210
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
208211
filterMaximumIDCount: config.filterMaximumIDCount,
209212
supportsIntegrity: config.withIntegrity,
213+
gcWindow: config.gcWindow,
210214
}
211215
ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal)
212216

@@ -289,6 +293,7 @@ type crdbDatastore struct {
289293
writeOverlapKeyer overlapKeyer
290294
overlapKeyInit func(ctx context.Context) keySet
291295
analyzeBeforeStatistics bool
296+
gcWindow time.Duration
292297

293298
beginChangefeedQuery string
294299
transactionNowQuery string
@@ -332,6 +337,23 @@ func (cds *crdbDatastore) ReadWriteTx(
332337
Executor: pgxcommon.NewPGXExecutorWithIntegrityOption(querier, cds.supportsIntegrity),
333338
}
334339

340+
// If metadata is to be attached, write that row now.
341+
if config.Metadata != nil {
342+
expiresAt := time.Now().Add(cds.gcWindow).Add(1 * time.Minute)
343+
insertTransactionMetadata := psql.Insert(tableTransactionMetadata).
344+
Columns(colExpiresAt, colMetadata).
345+
Values(expiresAt, config.Metadata.AsMap())
346+
347+
sql, args, err := insertTransactionMetadata.ToSql()
348+
if err != nil {
349+
return fmt.Errorf("error building metadata insert: %w", err)
350+
}
351+
352+
if _, err := tx.Exec(ctx, sql, args...); err != nil {
353+
return fmt.Errorf("error writing metadata: %w", err)
354+
}
355+
}
356+
335357
rwt := &crdbReadWriteTXN{
336358
&crdbReader{
337359
querier,
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package migrations
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"regexp"
7+
8+
"github.com/Masterminds/semver"
9+
"github.com/jackc/pgx/v5"
10+
)
11+
12+
const (
13+
// ttl_expiration_expression support was added in CRDB v22.2, but the E2E tests
14+
// use v21.2.
15+
addTransactionMetadataTableQueryWithBasicTTL = `
16+
CREATE TABLE transaction_metadata (
17+
key UUID PRIMARY KEY DEFAULT gen_random_uuid(),
18+
expires_at TIMESTAMPTZ,
19+
metadata JSONB
20+
) WITH (ttl_expire_after = '1d');
21+
`
22+
23+
addTransactionMetadataTableQuery = `
24+
CREATE TABLE transaction_metadata (
25+
key UUID PRIMARY KEY DEFAULT gen_random_uuid(),
26+
expires_at TIMESTAMPTZ,
27+
metadata JSONB
28+
) WITH (ttl_expiration_expression = 'expires_at', ttl_job_cron = '@daily');
29+
`
30+
31+
// See: https://www.cockroachlabs.com/docs/stable/changefeed-messages#prevent-changefeeds-from-emitting-row-level-ttl-deletes
32+
// for why we set ttl_disable_changefeed_replication = 'true'. This isn't stricly necessary as the Watch API will ignore the
33+
// deletions of these metadata rows, but no reason to even have it in the changefeed.
34+
// NOTE: This only applies on CRDB v24 and later.
35+
addTransactionMetadataTableQueryWithTTLIgnore = `
36+
CREATE TABLE transaction_metadata (
37+
key UUID PRIMARY KEY DEFAULT gen_random_uuid(),
38+
expires_at TIMESTAMPTZ,
39+
metadata JSONB
40+
) WITH (ttl_expiration_expression = 'expires_at', ttl_job_cron = '@daily', ttl_disable_changefeed_replication = 'true');
41+
`
42+
)
43+
44+
func init() {
45+
err := CRDBMigrations.Register("add-transaction-metadata-table", "add-integrity-relationtuple-table", addTransactionMetadataTable, noAtomicMigration)
46+
if err != nil {
47+
panic("failed to register migration: " + err.Error())
48+
}
49+
}
50+
51+
func addTransactionMetadataTable(ctx context.Context, conn *pgx.Conn) error {
52+
row := conn.QueryRow(ctx, "select version()")
53+
var fullVersionString string
54+
if err := row.Scan(&fullVersionString); err != nil {
55+
return err
56+
}
57+
58+
re, err := regexp.Compile(semver.SemVerRegex)
59+
if err != nil {
60+
return fmt.Errorf("failed to compile regex: %w", err)
61+
}
62+
63+
version := re.FindString(fullVersionString)
64+
v, err := semver.NewVersion(version)
65+
if err != nil {
66+
return fmt.Errorf("failed to parse version %q: %w", version, err)
67+
}
68+
69+
if v.Major() < 22 {
70+
return fmt.Errorf("unsupported version %q", version)
71+
}
72+
73+
// v22.1 doesn't support `ttl_expiration_expression`; it was added in v22.2.
74+
if v.Major() == 22 && v.Minor() == 1 {
75+
_, err := conn.Exec(ctx, addTransactionMetadataTableQueryWithBasicTTL)
76+
return err
77+
}
78+
79+
// `ttl_disable_changefeed_replication` was added in v24.
80+
if v.Major() < 24 {
81+
_, err := conn.Exec(ctx, addTransactionMetadataTableQuery)
82+
return err
83+
}
84+
85+
// v24 and later
86+
_, err = conn.Exec(ctx, addTransactionMetadataTableQueryWithTTLIgnore)
87+
return err
88+
}

0 commit comments

Comments
 (0)