Skip to content

Commit 7765e64

Browse files
craig[bot]jeffswenson
andcommitted
Merge #155999
155999: isession: add a helper for mutating the session r=jeffswenson a=jeffswenson ### sessionmutator: extract sessionDataMutator into a package This extracts the sessionDataMutator into its own package so that it can be exposed on the isql.Session interface. I tried pushing it into the pre-existing sessiondata package, but that creates a circular dependency between the sessiondata and tree package. Release note: none Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) ### isession: add a helper for mutating the session This adds a MutateSession method that allows modifying the session without going through the SQL layer. It is possible to modify the session using `SET ...` statements, but LDR must mutate the session before every statement and modifying the session consumes ~10% of all CPU cycles in a cluster that is using the crud writer to replicate data. An alternative to adding MutateSession is we could optimize the Set statement. The main reasons its inefficient for LDR are: 1. It only supports strings, so we can't set the hlc origin timestamp without roundtripping through a string. 2. The optimizer does not support generic query plans for prepared statements that include a set statement. Release note: none Epic: [CRDB-48647](https://cockroachlabs.atlassian.net/browse/CRDB-48647) Co-authored-by: Jeff Swenson <[email protected]>
2 parents e3751cf + 78309d8 commit 7765e64

28 files changed

+1557
-1426
lines changed

pkg/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,6 +2348,7 @@ GO_TARGETS = [
23482348
"//pkg/sql/sessiondatapb:sessiondatapb_test",
23492349
"//pkg/sql/sessioninit:sessioninit",
23502350
"//pkg/sql/sessioninit:sessioninit_test",
2351+
"//pkg/sql/sessionmutator:sessionmutator",
23512352
"//pkg/sql/sessionphase:sessionphase",
23522353
"//pkg/sql/sessionprotectedts:sessionprotectedts",
23532354
"//pkg/sql/sessionprotectedts:sessionprotectedts_test",

pkg/sql/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,7 @@ go_library(
525525
"//pkg/sql/sessiondata",
526526
"//pkg/sql/sessiondatapb",
527527
"//pkg/sql/sessioninit",
528+
"//pkg/sql/sessionmutator",
528529
"//pkg/sql/sessionphase",
529530
"//pkg/sql/sessionprotectedts",
530531
"//pkg/sql/span",
@@ -918,6 +919,7 @@ go_test(
918919
"//pkg/sql/sem/tree",
919920
"//pkg/sql/sessiondata",
920921
"//pkg/sql/sessiondatapb",
922+
"//pkg/sql/sessionmutator",
921923
"//pkg/sql/sessionphase",
922924
"//pkg/sql/sqlclustersettings",
923925
"//pkg/sql/sqlcommenter",

pkg/sql/conn_executor.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ import (
6262
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
6363
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
6464
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
65+
"github.com/cockroachdb/cockroach/pkg/sql/sessionmutator"
6566
"github.com/cockroachdb/cockroach/pkg/sql/sessionphase"
6667
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
6768
"github.com/cockroachdb/cockroach/pkg/sql/sqlstats"
@@ -901,12 +902,12 @@ func (s *Server) SetupConn(
901902
sds := sessiondata.NewStack(sd)
902903
// Set the SessionData from args.SessionDefaults. This also validates the
903904
// respective values.
904-
sdMutIterator := makeSessionDataMutatorIterator(sds, args.SessionDefaults, s.cfg.Settings)
905-
sdMutIterator.onDefaultIntSizeChange = onDefaultIntSizeChange
906-
if err := sdMutIterator.applyOnEachMutatorError(func(m sessionDataMutator) error {
905+
sdMutIterator := sessionmutator.MakeSessionDataMutatorIterator(sds, args.SessionDefaults, s.cfg.Settings)
906+
sdMutIterator.OnDefaultIntSizeChange = onDefaultIntSizeChange
907+
if err := sdMutIterator.ApplyOnEachMutatorError(func(m sessionmutator.SessionDataMutator) error {
907908
for varName, v := range varGen {
908909
if v.Set != nil {
909-
hasDefault, defVal := getSessionVarDefaultString(varName, v, m.sessionDataMutatorBase)
910+
hasDefault, defVal := getSessionVarDefaultString(varName, v, m.SessionDataMutatorBase)
910911
if hasDefault {
911912
if err := v.Set(ctx, m, defVal); err != nil {
912913
return err
@@ -1032,7 +1033,7 @@ func (h ConnectionHandler) GetParamStatus(ctx context.Context, varName string) s
10321033
log.Dev.Fatalf(ctx, "programming error: status param %q must be defined session var", varName)
10331034
return ""
10341035
}
1035-
hasDefault, defVal := getSessionVarDefaultString(name, v, h.ex.dataMutatorIterator.sessionDataMutatorBase)
1036+
hasDefault, defVal := getSessionVarDefaultString(name, v, h.ex.dataMutatorIterator.SessionDataMutatorBase)
10361037
if !hasDefault {
10371038
log.Dev.Fatalf(ctx, "programming error: status param %q must have a default value", varName)
10381039
return ""
@@ -1119,7 +1120,7 @@ func populateMinimalSessionData(sd *sessiondata.SessionData) {
11191120
func (s *Server) newConnExecutor(
11201121
ctx context.Context,
11211122
executorType executorType,
1122-
sdMutIterator *sessionDataMutatorIterator,
1123+
sdMutIterator *sessionmutator.SessionDataMutatorIterator,
11231124
stmtBuf *StmtBuf,
11241125
clientComm ClientComm,
11251126
memMetrics MemoryMetrics,
@@ -1168,7 +1169,7 @@ func (s *Server) newConnExecutor(
11681169
mon: sessionRootMon,
11691170
sessionMon: sessionMon,
11701171
sessionPreparedMon: sessionPreparedMon,
1171-
sessionDataStack: sdMutIterator.sds,
1172+
sessionDataStack: sdMutIterator.Sds,
11721173
dataMutatorIterator: sdMutIterator,
11731174
state: txnState{
11741175
mon: txnMon,
@@ -1213,7 +1214,7 @@ func (s *Server) newConnExecutor(
12131214

12141215
// The transaction_read_only variable is special; its updates need to be
12151216
// hooked-up to the executor.
1216-
ex.dataMutatorIterator.setCurTxnReadOnly = func(readOnly bool) error {
1217+
ex.dataMutatorIterator.SetCurTxnReadOnly = func(readOnly bool) error {
12171218
mode := tree.ReadWrite
12181219
if readOnly {
12191220
mode = tree.ReadOnly
@@ -1222,7 +1223,7 @@ func (s *Server) newConnExecutor(
12221223
}
12231224
// kv_transaction_buffered_writes_enabled is special since it won't affect
12241225
// the current explicit txn, so we want to let the user know.
1225-
ex.dataMutatorIterator.setBufferedWritesEnabled = func(enabled bool) {
1226+
ex.dataMutatorIterator.SetBufferedWritesEnabled = func(enabled bool) {
12261227
if ex.state.mu.txn.BufferedWritesEnabled() == enabled {
12271228
return
12281229
}
@@ -1234,11 +1235,11 @@ func (s *Server) newConnExecutor(
12341235
ex.planner.BufferClientNotice(ctx, pgnotice.Newf("%s buffered writes will apply after the current txn commits", action))
12351236
}
12361237
}
1237-
ex.dataMutatorIterator.onTempSchemaCreation = func() {
1238+
ex.dataMutatorIterator.OnTempSchemaCreation = func() {
12381239
ex.hasCreatedTemporarySchema = true
12391240
}
12401241

1241-
ex.dataMutatorIterator.upgradedIsolationLevel = func(
1242+
ex.dataMutatorIterator.UpgradedIsolationLevel = func(
12421243
ctx context.Context, upgradedFrom tree.IsolationLevel, requiresNotice bool,
12431244
) {
12441245
telemetry.Inc(sqltelemetry.IsolationLevelUpgradedCounter(ctx, upgradedFrom))
@@ -1270,7 +1271,7 @@ func (s *Server) newConnExecutor(
12701271
s.localSqlStats.GetCounters(),
12711272
s.cfg.SQLStatsTestingKnobs,
12721273
)
1273-
ex.dataMutatorIterator.onApplicationNameChange = func(newName string) {
1274+
ex.dataMutatorIterator.OnApplicationNameChange = func(newName string) {
12741275
ex.applicationName.Store(newName)
12751276
ex.applicationStats = ex.server.localSqlStats.GetApplicationStats(newName)
12761277
if strings.HasPrefix(newName, catconstants.InternalAppNamePrefix) {
@@ -1285,7 +1286,7 @@ func (s *Server) newConnExecutor(
12851286
ex.extraTxnState.prepStmtsNamespace.portals = make(map[string]PreparedPortal)
12861287
ex.extraTxnState.prepStmtsNamespace.portalsSnapshot = make(map[string]PreparedPortal)
12871288
ex.extraTxnState.prepStmtsNamespaceMemAcc = ex.sessionMon.MakeBoundAccount()
1288-
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sdMutIterator.sds)
1289+
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(sdMutIterator.Sds)
12891290
ex.extraTxnState.descCollection = s.cfg.CollectionFactory.NewCollection(
12901291
ctx, descs.WithDescriptorSessionDataProvider(dsdp), descs.WithMonitor(ex.sessionMon),
12911292
)
@@ -1758,7 +1759,7 @@ type connExecutor struct {
17581759
// dataMutatorIterator is nil for session-bound internal executors; we
17591760
// shouldn't issue statements that manipulate session state to an internal
17601761
// executor.
1761-
dataMutatorIterator *sessionDataMutatorIterator
1762+
dataMutatorIterator *sessionmutator.SessionDataMutatorIterator
17621763

17631764
// applicationStats records per-application SQL usage statistics. It is
17641765
// maintained to represent statistics for the application currently identified
@@ -3674,7 +3675,7 @@ func (ex *connExecutor) txnIsolationLevelToKV(
36743675
hasLicense := base.CCLDistributionAndEnterpriseEnabled(ex.server.cfg.Settings)
36753676
level, upgraded, upgradedDueToLicense := level.UpgradeToEnabledLevel(
36763677
allowReadCommitted, allowRepeatableRead, hasLicense)
3677-
if f := ex.dataMutatorIterator.upgradedIsolationLevel; upgraded && f != nil {
3678+
if f := ex.dataMutatorIterator.UpgradedIsolationLevel; upgraded && f != nil {
36783679
f(ctx, originalLevel, upgradedDueToLicense)
36793680
}
36803681

pkg/sql/conn_executor_exec.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ func (ex *connExecutor) execStmtInOpenState(
451451
stmtTS := ex.server.cfg.Clock.PhysicalTime()
452452
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
453453
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
454-
p.sessionDataMutatorIterator.paramStatusUpdater = res
454+
p.sessionDataMutatorIterator.ParamStatusUpdater = res
455455
p.noticeSender = res
456456
ih := &p.instrumentation
457457

@@ -1337,7 +1337,7 @@ func (ex *connExecutor) execStmtInOpenStateWithPausablePortal(
13371337
stmtTS := ex.server.cfg.Clock.PhysicalTime()
13381338
ex.statsCollector.Reset(ex.applicationStats, ex.phaseTimes)
13391339
ex.resetPlanner(ctx, p, ex.state.mu.txn, stmtTS)
1340-
p.sessionDataMutatorIterator.paramStatusUpdater = res
1340+
p.sessionDataMutatorIterator.ParamStatusUpdater = res
13411341
p.noticeSender = res
13421342
ih := &p.instrumentation
13431343

@@ -2331,7 +2331,7 @@ func (ex *connExecutor) reportSessionDataChanges(fn func() error) error {
23312331
return err
23322332
}
23332333
after := ex.sessionDataStack.Top()
2334-
if ex.dataMutatorIterator.paramStatusUpdater != nil {
2334+
if ex.dataMutatorIterator.ParamStatusUpdater != nil {
23352335
for _, param := range bufferableParamStatusUpdates {
23362336
if param.sv.Equal == nil {
23372337
return errors.AssertionFailedf("Equal for %s must be set", param.name)
@@ -2340,18 +2340,18 @@ func (ex *connExecutor) reportSessionDataChanges(fn func() error) error {
23402340
return errors.AssertionFailedf("GetFromSessionData for %s must be set", param.name)
23412341
}
23422342
if !param.sv.Equal(before, after) {
2343-
ex.dataMutatorIterator.paramStatusUpdater.BufferParamStatusUpdate(
2343+
ex.dataMutatorIterator.ParamStatusUpdater.BufferParamStatusUpdate(
23442344
param.name,
23452345
param.sv.GetFromSessionData(after),
23462346
)
23472347
}
23482348
}
23492349
}
2350-
if before.DefaultIntSize != after.DefaultIntSize && ex.dataMutatorIterator.onDefaultIntSizeChange != nil {
2351-
ex.dataMutatorIterator.onDefaultIntSizeChange(after.DefaultIntSize)
2350+
if before.DefaultIntSize != after.DefaultIntSize && ex.dataMutatorIterator.OnDefaultIntSizeChange != nil {
2351+
ex.dataMutatorIterator.OnDefaultIntSizeChange(after.DefaultIntSize)
23522352
}
2353-
if before.ApplicationName != after.ApplicationName && ex.dataMutatorIterator.onApplicationNameChange != nil {
2354-
ex.dataMutatorIterator.onApplicationNameChange(after.ApplicationName)
2353+
if before.ApplicationName != after.ApplicationName && ex.dataMutatorIterator.OnApplicationNameChange != nil {
2354+
ex.dataMutatorIterator.OnApplicationNameChange(after.ApplicationName)
23552355
}
23562356
return nil
23572357
}

pkg/sql/discard.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1313
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1414
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
15+
"github.com/cockroachdb/cockroach/pkg/sql/sessionmutator"
1516
"github.com/cockroachdb/errors"
1617
)
1718

@@ -51,9 +52,9 @@ func (n *discardNode) startExec(params runParams) error {
5152
params.p.preparedStatements.DeleteAll(params.ctx)
5253

5354
// DISCARD SEQUENCES
54-
params.p.sessionDataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) {
55-
m.data.SequenceState = sessiondata.NewSequenceState()
56-
m.initSequenceCache()
55+
params.p.sessionDataMutatorIterator.ApplyOnEachMutator(func(m sessionmutator.SessionDataMutator) {
56+
m.Data.SequenceState = sessiondata.NewSequenceState()
57+
m.InitSequenceCache()
5758
})
5859

5960
// DISCARD TEMP
@@ -63,9 +64,9 @@ func (n *discardNode) startExec(params runParams) error {
6364
}
6465

6566
case tree.DiscardModeSequences:
66-
params.p.sessionDataMutatorIterator.applyOnEachMutator(func(m sessionDataMutator) {
67-
m.data.SequenceState = sessiondata.NewSequenceState()
68-
m.initSequenceCache()
67+
params.p.sessionDataMutatorIterator.ApplyOnEachMutator(func(m sessionmutator.SessionDataMutator) {
68+
m.Data.SequenceState = sessiondata.NewSequenceState()
69+
m.InitSequenceCache()
6970
})
7071
case tree.DiscardModeTemp:
7172
err := deleteTempTables(params.ctx, params.p)

0 commit comments

Comments
 (0)