Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -1043,10 +1043,7 @@ func (sc *SchemaChanger) distIndexBackfill(
)
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
chunkSize := sc.getChunkSize(indexBatchSize)
spec, err := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
if err != nil {
return err
}
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
return err
}); err != nil {
Expand Down Expand Up @@ -1351,10 +1348,7 @@ func (sc *SchemaChanger) distColumnBackfill(
planCtx := sc.distSQLPlanner.NewPlanningCtx(
ctx, &evalCtx, nil /* planner */, txn.KV(), FullDistribution,
)
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
if err != nil {
return err
}
spec := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
plan, err := sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
if err != nil {
return err
Expand Down Expand Up @@ -2917,15 +2911,18 @@ func indexBackfillInTxn(
tableDesc catalog.TableDescriptor,
traceKV bool,
) error {
var indexBackfillerMon *mon.BytesMonitor
if evalCtx.Planner.Mon() != nil {
indexBackfillerMon = execinfra.NewMonitor(ctx, evalCtx.Planner.Mon(),
mon.MakeName("local-index-backfill-mon"))
}
indexBackfillerMon := execinfra.NewMonitor(
ctx, evalCtx.Planner.Mon(), mon.MakeName("local-index-backfill-mon"),
)

var backfiller backfill.IndexBackfiller
if err := backfiller.InitForLocalUse(
ctx, evalCtx, semaCtx, tableDesc, indexBackfillerMon,
ctx,
evalCtx,
semaCtx,
tableDesc,
indexBackfillerMon,
evalCtx.Planner.ExecutorConfig().(*ExecutorConfig).VecIndexManager,
); err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/backfill/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/sql/vecindex/vecencoding",
"//pkg/util/admission/admissionpb",
"//pkg/util/ctxgroup",
"//pkg/util/errorutil/unimplemented",
"//pkg/util/hlc",
"//pkg/util/intsets",
"//pkg/util/log",
Expand Down
56 changes: 46 additions & 10 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/vecindex"
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/cspann"
"github.com/cockroachdb/cockroach/pkg/sql/vecindex/vecencoding"
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
Expand Down Expand Up @@ -588,16 +589,30 @@ func (ib *IndexBackfiller) ContainsInvertedIndex() bool {
// InitForLocalUse initializes an IndexBackfiller for use during local execution
// within a transaction. In this case, the entire backfill process is occurring
// on the gateway as part of the user's transaction.
//
// Non-nil memory monitor must be provided. If an error is returned, it'll be
// stopped automatically; otherwise, the backfiller takes ownership of the
// monitor.
func (ib *IndexBackfiller) InitForLocalUse(
ctx context.Context,
evalCtx *eval.Context,
semaCtx *tree.SemaContext,
desc catalog.TableDescriptor,
mon *mon.BytesMonitor,
) error {
vecIndexManager *vecindex.Manager,
) (retErr error) {
if mon == nil {
return errors.AssertionFailedf("memory monitor must be provided")
}
defer func() {
if retErr != nil {
mon.Stop(ctx)
}
}()

// Initialize ib.added.
if err := ib.initIndexes(ctx, evalCtx.Codec, desc, nil /* allowList */, 0 /*sourceIndex*/, nil); err != nil {
// TODO(150163): Pass vecIndexManager once vector index build is supported with the legacy schema changer.
if err := ib.initIndexes(ctx, evalCtx.Codec, desc, nil /* allowList */, 0 /*sourceIndex*/, nil /*vecIndexManager*/); err != nil {
return err
}

Expand All @@ -619,7 +634,8 @@ func (ib *IndexBackfiller) InitForLocalUse(
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, mon)
ib.init(evalCtx, predicates, colExprs, mon)
return nil
}

// constructExprs is a helper to construct the index and column expressions
Expand Down Expand Up @@ -730,14 +746,27 @@ func constructExprs(
// backfill operation manages its own transactions. This separation is necessary
// due to the different procedure for accessing user defined type metadata as
// part of a distributed flow.
//
// Non-nil memory monitor must be provided. If an error is returned, it'll be
// stopped automatically; otherwise, the backfiller takes ownership of the
// monitor.
func (ib *IndexBackfiller) InitForDistributedUse(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
desc catalog.TableDescriptor,
allowList []catid.IndexID,
sourceIndexID catid.IndexID,
mon *mon.BytesMonitor,
) error {
) (retErr error) {
if mon == nil {
return errors.AssertionFailedf("memory monitor must be provided")
}
defer func() {
if retErr != nil {
mon.Stop(ctx)
}
}()

// We'll be modifying the eval.Context in BuildIndexEntriesChunk, so we need
// to make a copy.
evalCtx := flowCtx.NewEvalCtx()
Expand Down Expand Up @@ -786,7 +815,8 @@ func (ib *IndexBackfiller) InitForDistributedUse(
ib.valNeededForCol.Add(ib.colIdxMap.GetDefault(col))
})

return ib.init(evalCtx, predicates, colExprs, mon)
ib.init(evalCtx, predicates, colExprs, mon)
return nil
}

// Close releases the resources used by the IndexBackfiller. It can be called
Expand Down Expand Up @@ -879,6 +909,13 @@ func (ib *IndexBackfiller) initIndexes(
continue
}

if vecIndexManager == nil {
return unimplemented.NewWithIssue(
150163,
"vector index build not supported with the legacy schema changer",
)
}

if ib.VectorIndexes == nil {
ib.VectorIndexes = make(map[descpb.IndexID]VectorIndexHelper)
}
Expand Down Expand Up @@ -907,12 +944,15 @@ func (ib *IndexBackfiller) initIndexes(
}

// init completes the initialization of an IndexBackfiller.
//
// The IndexBackfiller takes ownership of the monitor which must be non-nil.
// It'll be closed when the backfiller is closed.
func (ib *IndexBackfiller) init(
evalCtx *eval.Context,
predicateExprs map[descpb.IndexID]tree.TypedExpr,
colExprs map[descpb.ColumnID]tree.TypedExpr,
mon *mon.BytesMonitor,
) error {
) {
ib.evalCtx = evalCtx
ib.predicates = predicateExprs
ib.colExprs = colExprs
Expand All @@ -933,12 +973,8 @@ func (ib *IndexBackfiller) init(
}

// Create a bound account associated with the index backfiller monitor.
if mon == nil {
return errors.AssertionFailedf("no memory monitor linked to IndexBackfiller during init")
}
ib.mon = mon
ib.muBoundAccount.boundAccount = mon.MakeBoundAccount()
return nil
}

// BuildIndexEntriesChunk reads a chunk of rows from a table using the span sp
Expand Down
12 changes: 6 additions & 6 deletions pkg/sql/distsql_plan_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@ func initColumnBackfillerSpec(
chunkSize int64,
updateChunkSizeThresholdBytes uint64,
readAsOf hlc.Timestamp,
) (execinfrapb.BackfillerSpec, error) {
) execinfrapb.BackfillerSpec {
return execinfrapb.BackfillerSpec{
Table: *tbl.TableDesc(),
Duration: duration,
ChunkSize: chunkSize,
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
ReadAsOf: readAsOf,
Type: execinfrapb.BackfillerSpec_Column,
}, nil
}
}

func initIndexBackfillerSpec(
Expand All @@ -46,7 +46,7 @@ func initIndexBackfillerSpec(
chunkSize int64,
indexesToBackfill []descpb.IndexID,
sourceIndexID descpb.IndexID,
) (execinfrapb.BackfillerSpec, error) {
) execinfrapb.BackfillerSpec {
return execinfrapb.BackfillerSpec{
Table: desc,
WriteAsOf: writeAsOf,
Expand All @@ -55,21 +55,21 @@ func initIndexBackfillerSpec(
ChunkSize: chunkSize,
IndexesToBackfill: indexesToBackfill,
SourceIndexID: sourceIndexID,
}, nil
}
}

func initIndexBackfillMergerSpec(
desc descpb.TableDescriptor,
addedIndexes []descpb.IndexID,
temporaryIndexes []descpb.IndexID,
mergeTimestamp hlc.Timestamp,
) (execinfrapb.IndexBackfillMergerSpec, error) {
) execinfrapb.IndexBackfillMergerSpec {
return execinfrapb.IndexBackfillMergerSpec{
Table: desc,
AddedIndexes: addedIndexes,
TemporaryIndexes: temporaryIndexes,
MergeTimestamp: mergeTimestamp,
}, nil
}
}

var initialSplitsPerProcessor = settings.RegisterIntSetting(
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/index_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,13 +207,11 @@ func (ib *IndexBackfillPlanner) plan(
// batch size. Also plumb in a testing knob.
chunkSize := indexBackfillBatchSize.Get(&ib.execCfg.Settings.SV)
const writeAtRequestTimestamp = true
spec, err := initIndexBackfillerSpec(
spec := initIndexBackfillerSpec(
*td.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize,
indexesToBackfill, sourceIndexID,
)
if err != nil {
return err
}
var err error
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, sourceSpans)
return err
}); err != nil {
Expand Down
22 changes: 22 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vector_index
Original file line number Diff line number Diff line change
Expand Up @@ -688,3 +688,25 @@ statement ok
CREATE VECTOR INDEX vec_idx ON test_145973 (v)

subtest end

subtest test_backfill_149236

statement ok
set autocommit_before_ddl=off;

statement ok
BEGIN;

statement ok
CREATE TABLE test_backfill_149236 (a INT PRIMARY KEY, b INT, vec1 VECTOR(3));

statement error pgcode 0A000 vector index build not supported with the legacy schema changer
CREATE VECTOR INDEX idx ON test_backfill_149236 (vec1);

statement ok
COMMIT;

statement ok
SET autocommit_before_ddl=on;

subtest end
6 changes: 2 additions & 4 deletions pkg/sql/mvcc_backfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,8 @@ func (im *IndexBackfillerMergePlanner) plan(
ctx, &extEvalCtx, nil /* planner */, txn.KV(), FullDistribution,
)

spec, err := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
if err != nil {
return err
}
spec := initIndexBackfillMergerSpec(*tableDesc.TableDesc(), addedIndexes, temporaryIndexes, mergeTimestamp)
var err error
p, err = im.execCfg.DistSQLPlanner.createIndexBackfillerMergePhysicalPlan(ctx, planCtx, spec, todoSpanList)
return err
}); err != nil {
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ func newIndexBackfiller(
processorID int32,
spec execinfrapb.BackfillerSpec,
) (*indexBackfiller, error) {
indexBackfillerMon := execinfra.NewMonitor(ctx, flowCtx.Cfg.BackfillerMonitor,
mon.MakeName("index-backfill-mon"))
indexBackfillerMon := execinfra.NewMonitor(
ctx, flowCtx.Cfg.BackfillerMonitor, mon.MakeName("index-backfill-mon"),
)
ib := &indexBackfiller{
desc: flowCtx.TableDescriptor(ctx, &spec.Table),
spec: spec,
Expand All @@ -94,8 +95,9 @@ func newIndexBackfiller(
filter: backfill.IndexMutationFilter,
}

if err := ib.IndexBackfiller.InitForDistributedUse(ctx, flowCtx, ib.desc,
ib.spec.IndexesToBackfill, ib.spec.SourceIndexID, indexBackfillerMon); err != nil {
if err := ib.IndexBackfiller.InitForDistributedUse(
ctx, flowCtx, ib.desc, ib.spec.IndexesToBackfill, ib.spec.SourceIndexID, indexBackfillerMon,
); err != nil {
return nil, err
}

Expand Down