Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql-server: Add behavior: auto_gc_behavior: enable. #8849

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
dc4b94d
sql-server: Add behavior: auto_gc_behavior: enable.
reltuk Feb 10, 2025
9b53d20
go: cmd: sqlserver: Fix nil SIGSEGV on AutoGCBehavior read in init.
reltuk Feb 11, 2025
62e5032
integration-tests/go-sql-server-driver: auto_gc_test.go: Tweak assert…
reltuk Feb 11, 2025
f509d98
go: sqle: auto_gc.go: Make sure the safepoint controller works on the…
reltuk Feb 12, 2025
10dfcec
[ga-format-pr] Run go/utils/repofmt/format_repo.sh and go/Godeps/upda…
reltuk Feb 12, 2025
2a47d40
go: sqle: auto_gc: Responding to PR feedback. Comments and cleanup.
reltuk Feb 14, 2025
7c9c808
Merge remote-tracking branch 'origin/main' into aaron/autogc
reltuk Feb 14, 2025
1e20b67
Merge remote-tracking branch 'origin/main' into aaron/autogc
reltuk Feb 14, 2025
20f38ca
Merge remote-tracking branch 'origin/main' into aaron/autogc
reltuk Feb 19, 2025
69630e1
go: auto_gc: Add a heuristic which allows followers in cluster replic…
reltuk Feb 19, 2025
1438933
integration-tests: go-sql-server-driver: auto_gc_test: Add a skipped …
reltuk Feb 21, 2025
efb5edc
go: sqle: auto_gc: Move to a background thread which periodically che…
reltuk Feb 21, 2025
97fd0f8
Merge remote-tracking branch 'origin/main' into aaron/autogc
reltuk Feb 21, 2025
e4a4b5d
go: store,remotesrv: Improve logging. Improve gRPC error statuses for…
reltuk Feb 21, 2025
4ce49e6
go: store/nbs: store.go: Fix GetManyCompressed to not redeliver chunk…
reltuk Feb 22, 2025
837ae43
integration-tests/go-sql-server-driver: auto_gc_test.go: Bring down a…
reltuk Feb 22, 2025
36b39df
Merge remote-tracking branch 'origin/main' into aaron/autogc
reltuk Feb 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion go/cmd/dolt/commands/engine/sqlengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
dsqle "github.com/dolthub/dolt/go/libraries/doltcore/sqle"
dblr "github.com/dolthub/dolt/go/libraries/doltcore/sqle/binlogreplication"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/cluster"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dprocedures"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/dsess"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/kvexec"
"github.com/dolthub/dolt/go/libraries/doltcore/sqle/mysql_file_handler"
Expand Down Expand Up @@ -81,6 +82,7 @@ type SqlEngineConfig struct {
JwksConfig []servercfg.JwksConfig
SystemVariables SystemVariables
ClusterController *cluster.Controller
AutoGCController *dsqle.AutoGCController
BinlogReplicaController binlogreplication.BinlogReplicaController
EventSchedulerStatus eventscheduler.SchedulerStatus
}
Expand Down Expand Up @@ -115,7 +117,15 @@ func NewSqlEngine(
return nil, err
}

all := dbs[:]
// Make a copy of the databases. |all| is going to be provided
// as the set of all initial databases to dsqle
// DatabaseProvider. |dbs| is only the databases that came
// from MultiRepoEnv, and they are all real databases based on
// DoltDB instances. |all| is going to include some extension,
// informational databases like |dolt_cluster| sometimes,
// depending on config.
all := make([]dsess.SqlDatabase, len(dbs))
copy(all, dbs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might add a note why making a copy here is important


// this is overwritten only for server sessions
for _, db := range dbs {
Expand Down Expand Up @@ -194,6 +204,17 @@ func NewSqlEngine(
statsPro := statspro.NewProvider(pro, statsnoms.NewNomsStatsFactory(mrEnv.RemoteDialProvider()))
engine.Analyzer.Catalog.StatsProvider = statsPro

if config.AutoGCController != nil {
err = config.AutoGCController.RunBackgroundThread(bThreads, sqlEngine.NewDefaultContext)
if err != nil {
return nil, err
}
config.AutoGCController.ApplyCommitHooks(ctx, mrEnv, dbs...)
pro.InitDatabaseHooks = append(pro.InitDatabaseHooks, config.AutoGCController.InitDatabaseHook())
// XXX: We force session aware safepoint controller if auto_gc is on.
dprocedures.UseSessionAwareSafepointController = true
}

engine.Analyzer.ExecBuilder = rowexec.NewOverrideBuilder(kvexec.Builder{})
sessFactory := doltSessionFactory(pro, statsPro, mrEnv.Config(), bcController, gcSafepointController, config.Autocommit)
sqlEngine.provider = pro
Expand Down
11 changes: 11 additions & 0 deletions go/cmd/dolt/commands/sqlserver/command_line_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@ func (cfg *commandLineServerConfig) ValueSet(value string) bool {
return ok
}

func (cfg *commandLineServerConfig) AutoGCBehavior() servercfg.AutoGCBehavior {
return stubAutoGCBehavior{}
}

// DoltServerConfigReader is the default implementation of ServerConfigReader suitable for parsing Dolt config files
// and command line options.
type DoltServerConfigReader struct{}
Expand All @@ -510,3 +514,10 @@ func (d DoltServerConfigReader) ReadConfigFile(cwdFS filesys.Filesys, file strin
func (d DoltServerConfigReader) ReadConfigArgs(args *argparser.ArgParseResults, dataDirOverride string) (servercfg.ServerConfig, error) {
return NewCommandLineConfig(nil, args, dataDirOverride)
}

type stubAutoGCBehavior struct {
}

func (stubAutoGCBehavior) Enable() bool {
return false
}
11 changes: 11 additions & 0 deletions go/cmd/dolt/commands/sqlserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,17 @@ func ConfigureServices(
}
controller.Register(InitEventSchedulerStatus)

InitAutoGCController := &svcs.AnonService{
InitF: func(context.Context) error {
if serverConfig.AutoGCBehavior() != nil &&
serverConfig.AutoGCBehavior().Enable() {
config.AutoGCController = sqle.NewAutoGCController(lgr)
}
return nil
},
}
controller.Register(InitAutoGCController)

var sqlEngine *engine.SqlEngine
InitSqlEngine := &svcs.AnonService{
InitF: func(ctx context.Context) (err error) {
Expand Down
70 changes: 59 additions & 11 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1917,23 +1917,71 @@ func (ddb *DoltDB) IsTableFileStore() bool {

// ChunkJournal returns the ChunkJournal for this DoltDB, if one is in use.
func (ddb *DoltDB) ChunkJournal() *nbs.ChunkJournal {
tableFileStore, ok := datas.ChunkStoreFromDatabase(ddb.db).(chunks.TableFileStore)
if !ok {
return nil
}
cs := datas.ChunkStoreFromDatabase(ddb.db)

generationalNbs, ok := tableFileStore.(*nbs.GenerationalNBS)
if !ok {
return nil
if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok {
cs = generationalNBS.NewGen()
}

newGen := generationalNbs.NewGen()
nbs, ok := newGen.(*nbs.NomsBlockStore)
if !ok {
if nbsStore, ok := cs.(*nbs.NomsBlockStore); ok {
return nbsStore.ChunkJournal()
} else {
return nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this impossible? Store is set in both cases above

}
}

// An approximate representation of how large the on-disk storage is for a DoltDB.
type StoreSizes struct {
// For ChunkJournal stores, this will be size of the journal file. A size
// of zero does not mean the store is not journaled. The store could be
// journaled, and the journal could be empty.
JournalBytes uint64
// For Generational storages this will be the size of the new gen. It will
// include any JournalBytes. A size of zero does not mean the store is not
// generational, since it could be the case that the store is generational
// but everything in it is in the old gen. In practice, given how we build
// oldgen references today, this will never be the case--there is always
// a little bit of data that only goes in the newgen.
NewGenBytes uint64
// This is the approximate total on-disk storage overhead of the store.
// It includes Journal and NewGenBytes, if there are any.
TotalBytes uint64
}

return nbs.ChunkJournal()
func (ddb *DoltDB) StoreSizes(ctx context.Context) (StoreSizes, error) {
cs := datas.ChunkStoreFromDatabase(ddb.db)
if generationalNBS, ok := cs.(*nbs.GenerationalNBS); ok {
newgen := generationalNBS.NewGen()
newgenSz, err := newgen.(chunks.TableFileStore).Size(ctx)
if err != nil {
return StoreSizes{}, err
}
totalSz, err := cs.(chunks.TableFileStore).Size(ctx)
if err != nil {
return StoreSizes{}, err
}
journal := newgen.(*nbs.NomsBlockStore).ChunkJournal()
if journal != nil {
return StoreSizes{
JournalBytes: uint64(journal.Size()),
NewGenBytes: newgenSz,
TotalBytes: totalSz,
}, nil
} else {
return StoreSizes{
NewGenBytes: newgenSz,
TotalBytes: totalSz,
}, nil
}
} else {
totalSz, err := cs.(chunks.TableFileStore).Size(ctx)
if err != nil {
return StoreSizes{}, err
}
return StoreSizes{
TotalBytes: totalSz,
}, nil
}
}

func (ddb *DoltDB) TableFileStoreHasJournal(ctx context.Context) (bool, error) {
Expand Down
41 changes: 27 additions & 14 deletions go/libraries/doltcore/remotesrv/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/dolthub/dolt/go/libraries/utils/filesys"
"github.com/dolthub/dolt/go/store/chunks"
"github.com/dolthub/dolt/go/store/hash"
"github.com/dolthub/dolt/go/store/nbs"
"github.com/dolthub/dolt/go/store/types"
)

Expand Down Expand Up @@ -97,7 +98,7 @@ func (rs *RemoteChunkStore) HasChunks(ctx context.Context, req *remotesapi.HasCh
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand Down Expand Up @@ -155,7 +156,7 @@ func (rs *RemoteChunkStore) GetDownloadLocations(ctx context.Context, req *remot
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand Down Expand Up @@ -233,7 +234,7 @@ func (rs *RemoteChunkStore) StreamDownloadLocations(stream remotesapi.ChunkStore
"num_requested": numHashes,
"num_urls": numUrls,
"num_ranges": numRanges,
}).Info("finished")
}).Trace("finished")
}()
logger := ologger

Expand Down Expand Up @@ -387,7 +388,7 @@ func (rs *RemoteChunkStore) GetUploadLocations(ctx context.Context, req *remotes
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

_, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand Down Expand Up @@ -445,7 +446,7 @@ func (rs *RemoteChunkStore) Rebase(ctx context.Context, req *remotesapi.RebaseRe
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

_, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand All @@ -462,7 +463,7 @@ func (rs *RemoteChunkStore) Root(ctx context.Context, req *remotesapi.RootReques
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand All @@ -485,7 +486,7 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand All @@ -500,7 +501,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version()))
if err != nil {
logger.WithError(err).Error("error calling AddTableFilesToManifest")
return nil, status.Errorf(codes.Internal, "manifest update error: %v", err)
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Errorf(code, "manifest update error: %v", err)
}

currHash := hash.New(req.Current)
Expand All @@ -513,7 +518,11 @@ func (rs *RemoteChunkStore) Commit(ctx context.Context, req *remotesapi.CommitRe
"last_hash": lastHash.String(),
"curr_hash": currHash.String(),
}).Error("error calling Commit")
return nil, status.Errorf(codes.Internal, "failed to commit: %v", err)
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Errorf(code, "failed to commit: %v", err)
}

logger.Tracef("Commit success; moved from %s -> %s", lastHash.String(), currHash.String())
Expand All @@ -528,7 +537,7 @@ func (rs *RemoteChunkStore) GetRepoMetadata(ctx context.Context, req *remotesapi

repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getOrCreateStore(ctx, logger, repoPath, req.ClientRepoFormat.NbfVersion)
if err != nil {
Expand Down Expand Up @@ -556,7 +565,7 @@ func (rs *RemoteChunkStore) ListTableFiles(ctx context.Context, req *remotesapi.
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand Down Expand Up @@ -634,7 +643,7 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
}
repoPath := getRepoPath(req)
logger = logger.WithField(RepoPathField, repoPath)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

cs, err := rs.getStore(ctx, logger, repoPath)
if err != nil {
Expand All @@ -649,7 +658,11 @@ func (rs *RemoteChunkStore) AddTableFiles(ctx context.Context, req *remotesapi.A
err = cs.AddTableFilesToManifest(ctx, updates, rs.getAddrs(cs.Version()))
if err != nil {
logger.WithError(err).Error("error occurred updating the manifest")
return nil, status.Error(codes.Internal, "manifest update error")
code := codes.Internal
if errors.Is(err, nbs.ErrDanglingRef) || errors.Is(err, nbs.ErrTableFileNotFound) {
code = codes.FailedPrecondition
}
return nil, status.Error(code, "manifest update error")
}

logger = logger.WithFields(logrus.Fields{
Expand Down Expand Up @@ -707,7 +720,7 @@ func getReqLogger(lgr *logrus.Entry, method string) *logrus.Entry {
"method": method,
"request_num": strconv.Itoa(incReqId()),
})
lgr.Info("starting request")
lgr.Trace("starting request")
return lgr
}

Expand Down
2 changes: 1 addition & 1 deletion go/libraries/doltcore/remotesrv/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newFileHandler(lgr *logrus.Entry, dbCache DBCache, fs filesys.Filesys, read

func (fh filehandler) ServeHTTP(respWr http.ResponseWriter, req *http.Request) {
logger := getReqLogger(fh.lgr, req.Method+"_"+req.RequestURI)
defer func() { logger.Info("finished") }()
defer func() { logger.Trace("finished") }()

var err error
req.URL, err = fh.sealer.Unseal(req.URL)
Expand Down
10 changes: 10 additions & 0 deletions go/libraries/doltcore/servercfg/serverconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
DefaultReadOnly = false
DefaultLogLevel = LogLevel_Info
DefaultAutoCommit = true
DefaultAutoGCBehaviorEnable = false
DefaultDoltTransactionCommit = false
DefaultMaxConnections = 100
DefaultDataDir = "."
Expand Down Expand Up @@ -198,6 +199,8 @@ type ServerConfig interface {
EventSchedulerStatus() string
// ValueSet returns whether the value string provided was explicitly set in the config
ValueSet(value string) bool
// AutoGCBehavior defines parameters around how auto-GC works for the running server.
AutoGCBehavior() AutoGCBehavior
}

// DefaultServerConfig creates a `*ServerConfig` that has all of the options set to their default values.
Expand All @@ -214,6 +217,9 @@ func defaultServerConfigYAML() *YAMLConfig {
ReadOnly: ptr(DefaultReadOnly),
AutoCommit: ptr(DefaultAutoCommit),
DoltTransactionCommit: ptr(DefaultDoltTransactionCommit),
AutoGCBehavior: &AutoGCBehaviorYAMLConfig{
Enable_: ptr(DefaultAutoGCBehaviorEnable),
},
},
UserConfig: UserYAMLConfig{
Name: ptr(""),
Expand Down Expand Up @@ -445,3 +451,7 @@ func CheckForUnixSocket(config ServerConfig) (string, bool, error) {

return "", false, nil
}

type AutoGCBehavior interface {
Enable() bool
}
Loading
Loading