Skip to content

Commit f6aa23e

Browse files
authored
Improvements to the SQL storage indexer for observability and reliability (#1334)
Added some improvements into the SQL storage indexer. All spans related to updating database are now nested under the same parent for better visibility. Cleanup database process just runs if cursor has been updated in that iteration. In case of failure dropping the database, it has been added a new column in each row of the database to set the cursor where that package belongs. All the entrypoints whose response is a JSON (root, search, categories and package endpoints) are unified and they use the same method. And it is allowed to customize the default batch sizes via environment variables.
1 parent ed84701 commit f6aa23e

16 files changed

+192
-95
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99
### Breaking changes
1010

1111
### Bugfixes
12+
* Fix spans used in SQL storage indexer (technical preview). [#1334](https://github.com/elastic/package-registry/pull/1334)
1213

1314
### Added
1415
* Add new method `MustParsePackage` to create new packages, running more validations. [#1333](https://github.com/elastic/package-registry/pull/1333)
15-
* Allow to customize SQL storage indexer configurations (technical preview). [#1337](https://github.coim/elastic/package-registry/pull/1337)
16+
* Allow to customize settings related to SQL storage indexer (technical preview). [#1334](https://github.com/elastic/package-registry/pull/1334) [#1337](https://github.coim/elastic/package-registry/pull/1337)
17+
* Cleanup SQL storage indexer backup database only when a new index version is downloaded (technical preview). [#1337](https://github.coim/elastic/package-registry/pull/1337)
1618

1719
### Deprecated
1820

categories.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ func categoriesHandlerWithProxyMode(logger *zap.Logger, indexer Indexer, proxyMo
9292
return
9393
}
9494

95-
cacheHeaders(w, cacheTime)
96-
jsonHeader(w)
97-
w.Write(data)
95+
serveJSONResponse(r.Context(), w, cacheTime, data)
9896
}
9997
}
10098

@@ -242,3 +240,12 @@ func getCategoriesOutput(ctx context.Context, categories map[string]*packages.Ca
242240

243241
return util.MarshalJSONPretty(outputCategories)
244242
}
243+
244+
func serveJSONResponse(ctx context.Context, w http.ResponseWriter, cacheTime time.Duration, data []byte) {
245+
span, _ := apm.StartSpan(ctx, "Serve JSON Response", "app")
246+
defer span.End()
247+
248+
cacheHeaders(w, cacheTime)
249+
jsonHeader(w)
250+
w.Write(data)
251+
}

dev/docker-compose-epr-gcs.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ services:
2626
- EPR_DISABLE_PACKAGE_VALIDATION=true
2727
- EPR_ADDRESS=0.0.0.0:8080
2828
# - EPR_LOG_LEVEL=debug
29+
# - EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
30+
# - EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
2931
# - EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
3032
# - EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
3133
# - EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m

dev/docker-compose-epr.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ services:
1919
- EPR_DISABLE_PACKAGE_VALIDATION=true
2020
- EPR_ADDRESS=0.0.0.0:8080
2121
# - EPR_LOG_LEVEL=debug
22+
# - EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
23+
# - EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
2224
# - EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
2325
# - EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
2426
# - EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m

dev/launch_epr_service_storage_indexer.sh

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ export EPR_CONFIG="${CONFIG_PATH}"
108108
# export EPR_SQL_INDEXER_DATABASE_FOLDER_PATH=/tmp
109109
# export EPR_SQL_INDEXER_SEARCH_CACHE_SIZE=100
110110
# export EPR_SQL_INDEXER_SEARCH_CACHE_TTL=10m
111+
# export EPR_SQL_INDEXER_READ_PACKAGES_BATCH_SIZE=2000
112+
# export EPR_SQL_INDEXER_DB_INSERT_BATCH_SIZE=2000
111113

112114
./package-registry
113115

index.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,6 @@ func indexHandler(cacheTime time.Duration) (func(w http.ResponseWriter, r *http.
2626
return nil, err
2727
}
2828
return func(w http.ResponseWriter, r *http.Request) {
29-
w.Header().Set("Content-Type", "application/json")
30-
cacheHeaders(w, cacheTime)
31-
w.Write(body)
29+
serveJSONResponse(r.Context(), w, cacheTime, body)
3230
}, nil
3331
}

internal/database/memoryrepository.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,11 @@ import (
1212
_ "modernc.org/sqlite" // Import the SQLite driver
1313
)
1414

15-
func NewMemorySQLDB(path string) (*SQLiteRepository, error) {
15+
type MemorySQLDBOptions struct {
16+
Path string
17+
}
18+
19+
func NewMemorySQLDB(options MemorySQLDBOptions) (*SQLiteRepository, error) {
1620
db, err := sql.Open("sqlite", ":memory:")
1721
if err != nil {
1822
return nil, fmt.Errorf("failed to open database: %w", err)
@@ -22,14 +26,14 @@ func NewMemorySQLDB(path string) (*SQLiteRepository, error) {
2226
return nil, fmt.Errorf("failed to connect to database: %w", err)
2327
}
2428

25-
dbRepo, err := newSQLiteRepository(db)
29+
dbRepo, err := newSQLiteRepository(sqlDBOptions{db: db})
2630
if err != nil {
2731
return nil, fmt.Errorf("failed to create SQLite repository: %w", err)
2832
}
2933

3034
if err := dbRepo.Initialize(context.Background()); err != nil {
3135
return nil, fmt.Errorf("failed to create database: %w", err)
3236
}
33-
dbRepo.path = "memory-" + path
37+
dbRepo.path = "memory-" + options.Path
3438
return dbRepo, nil
3539
}

internal/database/model.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package database
66

77
type Package struct {
8+
Cursor string
89
Name string
910
Version string
1011
FormatVersion string

internal/database/sqliterepository.go

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type keyDefinition struct {
2929
}
3030

3131
var keys = []keyDefinition{
32+
{"cursor", "TEXT NOT NULL"},
3233
{"name", "TEXT NOT NULL"},
3334
{"version", "TEXT NOT NULL"},
3435
{"formatVersion", "TEXT NOT NULL"},
@@ -47,20 +48,25 @@ var keys = []keyDefinition{
4748
const defaultMaxBulkAddBatch = 2000
4849

4950
type SQLiteRepository struct {
50-
db *sql.DB
51-
path string
52-
maxBulkAddBatch int
53-
numberFields int
51+
db *sql.DB
52+
path string
53+
maxBulkAddBatchSize int
54+
maxTotalArgs int
5455
}
5556

5657
var _ Repository = new(SQLiteRepository)
5758

58-
func NewFileSQLDB(path string) (*SQLiteRepository, error) {
59+
type FileSQLDBOptions struct {
60+
Path string
61+
BatchSizeInserts int
62+
}
63+
64+
func NewFileSQLDB(options FileSQLDBOptions) (*SQLiteRepository, error) {
5965
// NOTE: Even using sqlcache (with Ristretto or Redis), data column needs to be processed (Unmarshalled)
6066
// again for all the Get queries performed, so there is no advantage in time of using sqlcache with SQLite
6167
// for our use case.
6268

63-
db, err := sql.Open("sqlite", path)
69+
db, err := sql.Open("sqlite", options.Path)
6470
if err != nil {
6571
return nil, fmt.Errorf("failed to open database: %w", err)
6672
}
@@ -69,19 +75,27 @@ func NewFileSQLDB(path string) (*SQLiteRepository, error) {
6975
return nil, fmt.Errorf("failed to connect to database: %w", err)
7076
}
7177

72-
dbRepo, err := newSQLiteRepository(db)
78+
dbRepo, err := newSQLiteRepository(sqlDBOptions{
79+
db: db,
80+
batchSizeInserts: options.BatchSizeInserts,
81+
})
7382
if err != nil {
7483
return nil, fmt.Errorf("failed to create SQLite repository: %w", err)
7584
}
7685
if err := dbRepo.Initialize(context.Background()); err != nil {
7786
return nil, fmt.Errorf("failed to create database: %w", err)
7887
}
79-
dbRepo.path = path
88+
dbRepo.path = options.Path
8089

8190
return dbRepo, nil
8291
}
8392

84-
func newSQLiteRepository(db *sql.DB) (*SQLiteRepository, error) {
93+
type sqlDBOptions struct {
94+
db *sql.DB
95+
batchSizeInserts int
96+
}
97+
98+
func newSQLiteRepository(options sqlDBOptions) (*SQLiteRepository, error) {
8599
// https://www.sqlite.org/pragma.html#pragma_journal_mode
86100
// Not observed any performance difference with WAL mode, so keeping it as DELETE mode for now.
87101
// if _, err = db.Exec("PRAGMA journal_mode = WAL;"); err != nil {
@@ -106,10 +120,14 @@ func newSQLiteRepository(db *sql.DB) (*SQLiteRepository, error) {
106120
// if _, err := db.Exec("PRAGMA cache_size = -10000;"); err != nil {
107121
// return nil, fmt.Errorf("failed to update cache_size: %w", err)
108122
// }
123+
batchSize := defaultMaxBulkAddBatch
124+
if options.batchSizeInserts > 0 {
125+
batchSize = options.batchSizeInserts
126+
}
109127
return &SQLiteRepository{
110-
db: db,
111-
maxBulkAddBatch: defaultMaxBulkAddBatch,
112-
numberFields: len(keys),
128+
db: options.db,
129+
maxBulkAddBatchSize: batchSize,
130+
maxTotalArgs: batchSize * len(keys),
113131
}, nil
114132
}
115133

@@ -119,6 +137,7 @@ func (r *SQLiteRepository) File(ctx context.Context) string {
119137

120138
func (r *SQLiteRepository) Ping(ctx context.Context) error {
121139
span, ctx := apm.StartSpan(ctx, "SQL: Ping", "app")
140+
span.Context.SetLabel("database.path", r.File(ctx))
122141
defer span.End()
123142
if r.db == nil {
124143
return errors.New("database is not initialized")
@@ -131,14 +150,15 @@ func (r *SQLiteRepository) Ping(ctx context.Context) error {
131150

132151
func (r *SQLiteRepository) Initialize(ctx context.Context) error {
133152
span, ctx := apm.StartSpan(ctx, "SQL: Initialize", "app")
153+
span.Context.SetLabel("database.path", r.File(ctx))
134154
defer span.End()
135155
createQuery := strings.Builder{}
136156
createQuery.WriteString("CREATE TABLE IF NOT EXISTS ")
137157
createQuery.WriteString("packages (")
138158
for _, i := range keys {
139159
createQuery.WriteString(fmt.Sprintf("%s %s, ", i.Name, i.SQLType))
140160
}
141-
createQuery.WriteString("PRIMARY KEY (name, version));")
161+
createQuery.WriteString("PRIMARY KEY (cursor, name, version));")
142162
if _, err := r.db.ExecContext(ctx, createQuery.String()); err != nil {
143163
return err
144164
}
@@ -159,19 +179,18 @@ func (r *SQLiteRepository) Initialize(ctx context.Context) error {
159179

160180
func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []*Package) error {
161181
span, ctx := apm.StartSpan(ctx, "SQL: Insert batches", "app")
182+
span.Context.SetLabel("insert.batch.size", r.maxBulkAddBatchSize)
183+
span.Context.SetLabel("database.path", r.File(ctx))
162184
defer span.End()
163185

164186
if len(pkgs) == 0 {
165187
return nil
166188
}
167189

168190
totalProcessed := 0
169-
args := make([]any, 0, r.maxBulkAddBatch*r.numberFields)
191+
args := make([]any, 0, r.maxTotalArgs)
170192
for {
171193
read := 0
172-
// reuse args slice
173-
args = args[:0]
174-
175194
var sb strings.Builder
176195
sb.WriteString("INSERT INTO ")
177196
sb.WriteString(database)
@@ -184,7 +203,7 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
184203
}
185204
sb.WriteString(") values")
186205

187-
endBatch := totalProcessed + r.maxBulkAddBatch
206+
endBatch := totalProcessed + r.maxBulkAddBatchSize
188207
for i := totalProcessed; i < endBatch && i < len(pkgs); i++ {
189208
sb.WriteString("(")
190209
for j := range keys {
@@ -206,6 +225,7 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
206225
discoveryFields := addCommasToString(pkgs[i].DiscoveryFields)
207226

208227
args = append(args,
228+
pkgs[i].Cursor,
209229
pkgs[i].Name,
210230
pkgs[i].Version,
211231
pkgs[i].FormatVersion,
@@ -240,6 +260,9 @@ func (r *SQLiteRepository) BulkAdd(ctx context.Context, database string, pkgs []
240260
if totalProcessed >= len(pkgs) {
241261
break
242262
}
263+
264+
// reuse args slice
265+
args = args[:0]
243266
}
244267

245268
return nil
@@ -263,6 +286,7 @@ func addCommasToString(s string) string {
263286

264287
func (r *SQLiteRepository) All(ctx context.Context, database string, whereOptions WhereOptions) ([]*Package, error) {
265288
span, ctx := apm.StartSpan(ctx, "SQL: Get All", "app")
289+
span.Context.SetLabel("database.path", r.File(ctx))
266290
defer span.End()
267291

268292
var all []*Package
@@ -276,6 +300,7 @@ func (r *SQLiteRepository) All(ctx context.Context, database string, whereOption
276300

277301
func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOptions WhereOptions, process func(ctx context.Context, pkg *Package) error) error {
278302
span, ctx := apm.StartSpan(ctx, "SQL: Get All (process each package)", "app")
303+
span.Context.SetLabel("database.path", r.File(ctx))
279304
defer span.End()
280305

281306
useBaseData := whereOptions == nil || !whereOptions.UseFullData()
@@ -301,8 +326,6 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
301326
clause, whereArgs = whereOptions.Where()
302327
query.WriteString(clause)
303328
}
304-
// TODO: remove debug
305-
// fmt.Println("Query:", query.String())
306329
rows, err := r.db.QueryContext(ctx, query.String(), whereArgs...)
307330
if err != nil {
308331
return err
@@ -313,6 +336,7 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
313336
var pkg Package
314337
for rows.Next() {
315338
if err := rows.Scan(
339+
&pkg.Cursor,
316340
&pkg.Name,
317341
&pkg.Version,
318342
&pkg.FormatVersion,
@@ -345,6 +369,7 @@ func (r *SQLiteRepository) AllFunc(ctx context.Context, database string, whereOp
345369

346370
func (r *SQLiteRepository) Drop(ctx context.Context, table string) error {
347371
span, ctx := apm.StartSpan(ctx, "SQL: Drop", "app")
372+
span.Context.SetLabel("database.path", r.File(ctx))
348373
defer span.End()
349374
query := fmt.Sprintf("DROP TABLE IF EXISTS %s", table)
350375
_, err := r.db.ExecContext(ctx, query)
@@ -370,16 +395,34 @@ type FilterOptions struct {
370395
type SQLOptions struct {
371396
Filter *FilterOptions
372397

398+
CurrentCursor string
399+
373400
IncludeFullData bool // If true, the query will return the full data field instead of the base data field
374401
}
375402

376403
func (o *SQLOptions) Where() (string, []any) {
377-
if o == nil || o.Filter == nil {
404+
if o == nil {
378405
return "", nil
379406
}
380407
var sb strings.Builder
381408
var args []any
409+
// Always filter by cursor
410+
if o.CurrentCursor != "" {
411+
sb.WriteString("cursor = ?")
412+
args = append(args, o.CurrentCursor)
413+
}
414+
415+
if o.Filter == nil {
416+
if sb.Len() == 0 {
417+
return "", nil
418+
}
419+
return fmt.Sprintf(" WHERE %s", sb.String()), args
420+
}
421+
382422
if o.Filter.Type != "" {
423+
if sb.Len() > 0 {
424+
sb.WriteString(" AND ")
425+
}
383426
sb.WriteString("type = ?")
384427
args = append(args, o.Filter.Type)
385428
}

internal/storage/index.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func LoadPackagesAndCursorFromIndex(ctx context.Context, logger *zap.Logger, sto
116116
return anIndex, storageCursor.Current, nil
117117
}
118118

119-
func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor, batchSize int, process func(packages.Packages) error) error {
119+
func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, bucketName, rootStoragePath string, aCursor cursor, batchSize int, process func(context.Context, packages.Packages, string) error) error {
120120
span, ctx := apm.StartSpan(ctx, "LoadSearchIndexAll", "app")
121121
span.Context.SetLabel("load.method", "batches")
122122
span.Context.SetLabel("load.batch.size", batchSize)
@@ -174,7 +174,7 @@ func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageC
174174
count++
175175

176176
if count >= batchSize {
177-
err = process(packages)
177+
err = process(ctx, packages, aCursor.Current)
178178
if err != nil {
179179
return fmt.Errorf("error processing batch of packages: %w", err)
180180
}
@@ -193,15 +193,15 @@ func loadSearchIndexAllBatches(ctx context.Context, logger *zap.Logger, storageC
193193
}
194194
}
195195
if len(packages) > 0 {
196-
err = process(packages)
196+
err = process(ctx, packages, aCursor.Current)
197197
if err != nil {
198198
return fmt.Errorf("error processing final batch of packages: %w", err)
199199
}
200200
}
201201
return nil
202202
}
203203

204-
func LoadPackagesAndCursorFromIndexBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, storageBucketInternal, currentCursor string, batchSize int, process func(packages.Packages) error) (string, error) {
204+
func LoadPackagesAndCursorFromIndexBatches(ctx context.Context, logger *zap.Logger, storageClient *storage.Client, storageBucketInternal, currentCursor string, batchSize int, process func(context.Context, packages.Packages, string) error) (string, error) {
205205
bucketName, rootStoragePath, err := extractBucketNameFromURL(storageBucketInternal)
206206
if err != nil {
207207
return "", fmt.Errorf("can't extract bucket name from URL (url: %s): %w", storageBucketInternal, err)

0 commit comments

Comments
 (0)