diff --git a/cmd/e2e/api_test.go b/cmd/e2e/api_test.go index b8b6114d..90d2c9c8 100644 --- a/cmd/e2e/api_test.go +++ b/cmd/e2e/api_test.go @@ -1861,3 +1861,295 @@ func makeDestinationDisabledValidator(id string, disabled bool) map[string]any { }, } } + +func (suite *basicSuite) TestLogStoreAPI() { + tenantID := idgen.String() + destinationID := idgen.Destination() + eventID := idgen.Event() + + tests := []APITest{ + // Setup: Create tenant + { + Name: "PUT /:tenantID - Create tenant", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPUT, + Path: "/" + tenantID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + // Setup: Configure mock server destination + { + Name: "PUT mockserver/destinations", + Request: httpclient.Request{ + Method: httpclient.MethodPUT, + BaseURL: suite.mockServerBaseURL, + Path: "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }, + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + }, + }, + // Setup: Create destination in Outpost + { + Name: "POST /:tenantID/destinations", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/" + tenantID + "/destinations", + Body: map[string]interface{}{ + "id": destinationID, + "type": "webhook", + "topics": "*", + "config": map[string]interface{}{ + "url": fmt.Sprintf("%s/webhook/%s", suite.mockServerBaseURL, destinationID), + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusCreated, + }, + }, + }, + // Publish an event + { + Name: "POST /publish - Publish event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodPOST, + Path: "/publish", + Body: map[string]interface{}{ + "tenant_id": tenantID, + "topic": "user.created", + "eligible_for_retry": true, + "id": eventID, + "metadata": map[string]any{ + "source": "test", + }, + "data": map[string]any{ + "user_id": "123", + "email": "test@example.com", + }, + }, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusAccepted, + }, + }, + }, + // Wait for event to be processed and logged, then list events + { + Name: "GET /:tenantID/events - List events (with delay)", + Delay: 3 * time.Second, + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{ + "type": "integer", + "const": 200, + }, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + }, + "count": map[string]interface{}{ + "type": "integer", + "minimum": 1, + }, + }, + "required": []any{"data", "count"}, + }, + }, + "required": []any{"statusCode", "body"}, + }, + }, + }, + // Retrieve specific event + { + Name: "GET /:tenantID/events/:eventID - Retrieve event", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events/" + eventID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{ + "type": "integer", + "const": 200, + }, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + "const": eventID, + }, + "tenant_id": map[string]interface{}{ + "type": "string", + "const": tenantID, + }, + "topic": map[string]interface{}{ + "type": "string", + "const": "user.created", + }, + "status": map[string]interface{}{ + "type": "string", + "enum": []any{"pending", "success", "failed"}, + }, + }, + "required": []any{"id", "tenant_id", "topic"}, + }, + }, + "required": []any{"statusCode", "body"}, + }, + }, + }, + // List deliveries for the event + { + Name: "GET /:tenantID/events/:eventID/deliveries - List deliveries", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/events/" + eventID + "/deliveries", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{ + "type": "integer", + "const": 200, + }, + "body": map[string]interface{}{ + "type": "array", + "minItems": 1, + "items": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + }, + "status": map[string]interface{}{ + "type": "string", + "enum": []any{"pending", "success", "failed"}, + }, + "delivered_at": map[string]interface{}{ + "type": "string", + }, + }, + "required": []any{"id", "status", "delivered_at"}, + }, + }, + }, + "required": []any{"statusCode", "body"}, + }, + }, + }, + // List events by destination + { + Name: "GET /:tenantID/destinations/:destinationID/events - List events by destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationID + "/events", + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{ + "type": "integer", + "const": 200, + }, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "data": map[string]interface{}{ + "type": "array", + "minItems": 1, + }, + "count": map[string]interface{}{ + "type": "integer", + "minimum": 1, + }, + }, + "required": []any{"data", "count"}, + }, + }, + "required": []any{"statusCode", "body"}, + }, + }, + }, + // Retrieve event by destination + { + Name: "GET /:tenantID/destinations/:destinationID/events/:eventID - Retrieve event by destination", + Request: suite.AuthRequest(httpclient.Request{ + Method: httpclient.MethodGET, + Path: "/" + tenantID + "/destinations/" + destinationID + "/events/" + eventID, + }), + Expected: APITestExpectation{ + Match: &httpclient.Response{ + StatusCode: http.StatusOK, + }, + Validate: map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "statusCode": map[string]interface{}{ + "type": "integer", + "const": 200, + }, + "body": map[string]interface{}{ + "type": "object", + "properties": map[string]interface{}{ + "id": map[string]interface{}{ + "type": "string", + "const": eventID, + }, + "destination_id": map[string]interface{}{ + "type": "string", + "const": destinationID, + }, + }, + "required": []any{"id", "destination_id"}, + }, + }, + "required": []any{"statusCode", "body"}, + }, + }, + }, + } + + suite.RunAPITests(suite.T(), tests) +} diff --git a/cmd/e2e/suites_test.go b/cmd/e2e/suites_test.go index 89d5d75c..c48ff087 100644 --- a/cmd/e2e/suites_test.go +++ b/cmd/e2e/suites_test.go @@ -219,7 +219,7 @@ func TestBasicSuiteWithDeploymentID(t *testing.T) { } suite.Run(t, &basicSuite{ - logStorageType: configs.LogStorageTypePostgres, + logStorageType: configs.LogStorageTypeClickHouse, deploymentID: "dp_e2e_test", }) } diff --git a/internal/config/config.go b/internal/config/config.go index 79901389..f369837a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -473,10 +473,11 @@ func (c *Config) ToMigratorOpts() migrator.MigrationOpts { URL: c.PostgresURL, }, CH: migrator.MigrationOptsCH{ - Addr: c.ClickHouse.Addr, - Username: c.ClickHouse.Username, - Password: c.ClickHouse.Password, - Database: c.ClickHouse.Database, + Addr: c.ClickHouse.Addr, + Username: c.ClickHouse.Username, + Password: c.ClickHouse.Password, + Database: c.ClickHouse.Database, + DeploymentID: c.DeploymentID, }, } } diff --git a/internal/logstore/chlogstore/chlogstore.go b/internal/logstore/chlogstore/chlogstore.go index 581c0b21..1752a547 100644 --- a/internal/logstore/chlogstore/chlogstore.go +++ b/internal/logstore/chlogstore/chlogstore.go @@ -13,13 +13,21 @@ import ( ) type logStoreImpl struct { - chDB clickhouse.DB + chDB clickhouse.DB + tableName string } var _ driver.LogStore = (*logStoreImpl)(nil) -func NewLogStore(chDB clickhouse.DB) driver.LogStore { - return &logStoreImpl{chDB: chDB} +// NewLogStore creates a new ClickHouse log store. +// If deploymentID is provided, it uses a deployment-specific table ({deploymentID}_event_log). +// Otherwise, it uses the default "event_log" table. +func NewLogStore(chDB clickhouse.DB, deploymentID string) (driver.LogStore, error) { + tableName := "event_log" + if deploymentID != "" { + tableName = fmt.Sprintf("%s_event_log", deploymentID) + } + return &logStoreImpl{chDB: chDB, tableName: tableName}, nil } func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRequest) (driver.ListEventResponse, error) { @@ -120,7 +128,7 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe any(e.metadata) as metadata, any(e.data) as data, argMax(e.status, e.delivery_time) as status - FROM event_log AS e + FROM %s AS e WHERE e.tenant_id = ? AND e.event_time >= fromUnixTimestamp64Milli(?) AND e.event_time <= fromUnixTimestamp64Milli(?) @@ -130,7 +138,7 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe %s %s LIMIT %d - `, destFilter, topicFilter, havingClause, orderBy, limit+1) + `, s.tableName, destFilter, topicFilter, havingClause, orderBy, limit+1) // Append having args after the main args args = append(args, havingArgs...) @@ -239,7 +247,7 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe countQuery = fmt.Sprintf(` SELECT count(*) FROM ( SELECT event_id - FROM event_log + FROM %s WHERE tenant_id = ? AND event_time >= fromUnixTimestamp64Milli(?) AND event_time <= fromUnixTimestamp64Milli(?) @@ -248,17 +256,17 @@ func (s *logStoreImpl) ListEvent(ctx context.Context, request driver.ListEventRe GROUP BY event_id HAVING argMax(status, delivery_time) = ? ) - `, countDestFilter, countTopicFilter) + `, s.tableName, countDestFilter, countTopicFilter) } else { countQuery = fmt.Sprintf(` SELECT count(DISTINCT event_id) - FROM event_log + FROM %s WHERE tenant_id = ? AND event_time >= fromUnixTimestamp64Milli(?) AND event_time <= fromUnixTimestamp64Milli(?) %s %s - `, countDestFilter, countTopicFilter) + `, s.tableName, countDestFilter, countTopicFilter) } var totalCount uint64 @@ -294,7 +302,7 @@ func parseCursor(cursor string) (time.Time, string, error) { } func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID string) (*models.Event, error) { - query := ` + query := fmt.Sprintf(` SELECT event_id, tenant_id, @@ -305,11 +313,11 @@ func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID stri metadata, data, argMax(status, delivery_time) as status - FROM event_log + FROM %s WHERE tenant_id = ? AND event_id = ? GROUP BY event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data LIMIT 1 - ` + `, s.tableName) row := s.chDB.QueryRow(ctx, query, tenantID, eventID) @@ -348,7 +356,7 @@ func (s *logStoreImpl) RetrieveEvent(ctx context.Context, tenantID, eventID stri } func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, destinationID, eventID string) (*models.Event, error) { - query := ` + query := fmt.Sprintf(` SELECT event_id, tenant_id, @@ -359,11 +367,11 @@ func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, metadata, data, argMax(status, delivery_time) as status - FROM event_log + FROM %s WHERE tenant_id = ? AND destination_id = ? AND event_id = ? GROUP BY event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data LIMIT 1 - ` + `, s.tableName) row := s.chDB.QueryRow(ctx, query, tenantID, destinationID, eventID) @@ -402,7 +410,7 @@ func (s *logStoreImpl) RetrieveEventByDestination(ctx context.Context, tenantID, } func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeliveryRequest) ([]*models.Delivery, error) { - query := ` + query := fmt.Sprintf(` SELECT delivery_id, delivery_event_id, @@ -412,12 +420,12 @@ func (s *logStoreImpl) ListDelivery(ctx context.Context, request driver.ListDeli delivery_time, code, response_data - FROM event_log + FROM %s WHERE tenant_id = ? AND event_id = ? AND delivery_id != '' ORDER BY delivery_time DESC - ` + `, s.tableName) rows, err := s.chDB.Query(ctx, query, request.TenantID, request.EventID) if err != nil { @@ -460,10 +468,10 @@ func (s *logStoreImpl) InsertManyDeliveryEvent(ctx context.Context, deliveryEven } batch, err := s.chDB.PrepareBatch(ctx, - `INSERT INTO event_log ( + fmt.Sprintf(`INSERT INTO %s ( event_id, tenant_id, destination_id, topic, eligible_for_retry, event_time, metadata, data, delivery_id, delivery_event_id, status, delivery_time, code, response_data - )`, + )`, s.tableName), ) if err != nil { return err diff --git a/internal/logstore/chlogstore/chlogstore_test.go b/internal/logstore/chlogstore/chlogstore_test.go index 2fab1d2d..6f41873d 100644 --- a/internal/logstore/chlogstore/chlogstore_test.go +++ b/internal/logstore/chlogstore/chlogstore_test.go @@ -72,5 +72,62 @@ func (h *harness) Close() { } func (h *harness) MakeDriver(ctx context.Context) (driver.LogStore, error) { - return NewLogStore(h.chDB), nil + return NewLogStore(h.chDB, "") +} + +func TestConformance_WithDeploymentID(t *testing.T) { + t.Parallel() + + drivertest.RunConformanceTests(t, newHarnessWithDeploymentID) +} + +func newHarnessWithDeploymentID(_ context.Context, t *testing.T) (drivertest.Harness, error) { + t.Helper() + + t.Cleanup(testinfra.Start(t)) + + chConfig := testinfra.NewClickHouseConfig(t) + deploymentID := "test_deployment" + + chDB, err := clickhouse.New(&chConfig) + require.NoError(t, err) + + // Use the migrator with DeploymentID to create deployment-specific tables + ctx := context.Background() + m, err := migrator.New(migrator.MigrationOpts{ + CH: migrator.MigrationOptsCH{ + Addr: chConfig.Addr, + Username: chConfig.Username, + Password: chConfig.Password, + Database: chConfig.Database, + DeploymentID: deploymentID, + }, + }) + require.NoError(t, err) + _, _, err = m.Up(ctx, -1) + require.NoError(t, err) + + return &harnessWithDeployment{ + chDB: chDB, + deploymentID: deploymentID, + migrator: m, + }, nil +} + +type harnessWithDeployment struct { + chDB clickhouse.DB + deploymentID string + migrator *migrator.Migrator +} + +func (h *harnessWithDeployment) Close() { + ctx := context.Background() + // Roll back migrations (drops deployment-specific tables) + h.migrator.Down(ctx, -1) + h.migrator.Close(ctx) + h.chDB.Close() +} + +func (h *harnessWithDeployment) MakeDriver(ctx context.Context) (driver.LogStore, error) { + return NewLogStore(h.chDB, h.deploymentID) } diff --git a/internal/logstore/logstore.go b/internal/logstore/logstore.go index dad027e1..687fb977 100644 --- a/internal/logstore/logstore.go +++ b/internal/logstore/logstore.go @@ -25,8 +25,9 @@ type LogStore interface { } type DriverOpts struct { - CH clickhouse.DB - PG *pgxpool.Pool + CH clickhouse.DB + PG *pgxpool.Pool + DeploymentID string } func (d *DriverOpts) Close() error { @@ -41,22 +42,25 @@ func (d *DriverOpts) Close() error { func NewLogStore(ctx context.Context, driverOpts DriverOpts) (LogStore, error) { if driverOpts.CH != nil { - return chlogstore.NewLogStore(driverOpts.CH), nil + return chlogstore.NewLogStore(driverOpts.CH, driverOpts.DeploymentID) } if driverOpts.PG != nil { - return pglogstore.NewLogStore(driverOpts.PG), nil + return pglogstore.NewLogStore(driverOpts.PG, driverOpts.DeploymentID) } return nil, errors.New("no driver provided") } type Config struct { - ClickHouse *clickhouse.ClickHouseConfig - Postgres *string + ClickHouse *clickhouse.ClickHouseConfig + Postgres *string + DeploymentID string } func MakeDriverOpts(cfg Config) (DriverOpts, error) { - driverOpts := DriverOpts{} + driverOpts := DriverOpts{ + DeploymentID: cfg.DeploymentID, + } if cfg.ClickHouse != nil { chDB, err := clickhouse.New(cfg.ClickHouse) diff --git a/internal/logstore/pglogstore/pglogstore.go b/internal/logstore/pglogstore/pglogstore.go index fc1aaa42..116f2690 100644 --- a/internal/logstore/pglogstore/pglogstore.go +++ b/internal/logstore/pglogstore/pglogstore.go @@ -2,6 +2,7 @@ package pglogstore import ( "context" + "errors" "fmt" "time" @@ -11,16 +12,22 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +// ErrDeploymentIDNotSupported is returned when deployment_id is provided but not supported. +var ErrDeploymentIDNotSupported = errors.New("postgres logstore does not support deployment_id") + type logStore struct { db *pgxpool.Pool cursorParser eventCursorParser } -func NewLogStore(db *pgxpool.Pool) driver.LogStore { +func NewLogStore(db *pgxpool.Pool, deploymentID string) (driver.LogStore, error) { + if deploymentID != "" { + return nil, ErrDeploymentIDNotSupported + } return &logStore{ db: db, cursorParser: newEventCursorParser(), - } + }, nil } func (s *logStore) ListEvent(ctx context.Context, req driver.ListEventRequest) (driver.ListEventResponse, error) { diff --git a/internal/logstore/pglogstore/pglogstore_test.go b/internal/logstore/pglogstore/pglogstore_test.go index 7dcfe0de..5e276d86 100644 --- a/internal/logstore/pglogstore/pglogstore_test.go +++ b/internal/logstore/pglogstore/pglogstore_test.go @@ -71,5 +71,16 @@ func (h *harness) Close() { } func (h *harness) MakeDriver(ctx context.Context) (driver.LogStore, error) { - return NewLogStore(h.db), nil + return NewLogStore(h.db, "") +} + +func TestNewLogStore_DeploymentIDNotSupported(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Parallel() + + db := setupPGConnection(t) + defer db.Close() + + _, err := NewLogStore(db, "some-deployment-id") + require.ErrorIs(t, err, ErrDeploymentIDNotSupported) } diff --git a/internal/migrator/deployment_source.go b/internal/migrator/deployment_source.go new file mode 100644 index 00000000..e5b3a1b1 --- /dev/null +++ b/internal/migrator/deployment_source.go @@ -0,0 +1,154 @@ +package migrator + +import ( + "bytes" + "fmt" + "io" + "io/fs" + "strings" + + "github.com/golang-migrate/migrate/v4/source" +) + +// deploymentSource wraps an embed.FS and replaces {deployment_prefix} placeholders +// with deployment-specific prefixes. It implements the source.Driver interface for golang-migrate. +type deploymentSource struct { + fs fs.FS + path string + deploymentID string + migrations *source.Migrations +} + +// newDeploymentSource creates a new deployment source driver. +// It reads migrations from the embedded FS and replaces "{deployment_prefix}" placeholder: +// - If deploymentID is set: {deployment_prefix} -> {deploymentID}_ +// - If deploymentID is empty: {deployment_prefix} -> "" (empty string) +func newDeploymentSource(fsys fs.FS, path string, deploymentID string) (source.Driver, error) { + ds := &deploymentSource{ + fs: fsys, + path: path, + deploymentID: deploymentID, + migrations: source.NewMigrations(), + } + + if err := ds.init(); err != nil { + return nil, err + } + + return ds, nil +} + +func (ds *deploymentSource) init() error { + entries, err := fs.ReadDir(ds.fs, ds.path) + if err != nil { + return fmt.Errorf("failed to read migrations directory: %w", err) + } + + for _, entry := range entries { + if entry.IsDir() { + continue + } + + name := entry.Name() + m, err := source.DefaultParse(name) + if err != nil { + continue // skip files that don't match migration pattern + } + + if !ds.migrations.Append(m) { + return fmt.Errorf("unable to parse migration file: %s", name) + } + } + + return nil +} + +// Open is part of the source.Driver interface. +func (ds *deploymentSource) Open(url string) (source.Driver, error) { + return nil, fmt.Errorf("Open is not implemented for deploymentSource; use newDeploymentSource instead") +} + +// Close is part of the source.Driver interface. +func (ds *deploymentSource) Close() error { + return nil +} + +// First returns the first migration version. +func (ds *deploymentSource) First() (version uint, err error) { + v, ok := ds.migrations.First() + if !ok { + return 0, &fs.PathError{Op: "first", Path: ds.path, Err: fs.ErrNotExist} + } + return v, nil +} + +// Prev returns the previous migration version. +func (ds *deploymentSource) Prev(version uint) (prevVersion uint, err error) { + v, ok := ds.migrations.Prev(version) + if !ok { + return 0, &fs.PathError{Op: "prev", Path: ds.path, Err: fs.ErrNotExist} + } + return v, nil +} + +// Next returns the next migration version. +func (ds *deploymentSource) Next(version uint) (nextVersion uint, err error) { + v, ok := ds.migrations.Next(version) + if !ok { + return 0, &fs.PathError{Op: "next", Path: ds.path, Err: fs.ErrNotExist} + } + return v, nil +} + +// ReadUp reads the up migration for the given version and performs deployment suffix replacement. +func (ds *deploymentSource) ReadUp(version uint) (r io.ReadCloser, identifier string, err error) { + m, ok := ds.migrations.Up(version) + if !ok { + return nil, "", &fs.PathError{Op: "readup", Path: ds.path, Err: fs.ErrNotExist} + } + + content, err := ds.readAndTransform(m.Raw) + if err != nil { + return nil, "", err + } + + return io.NopCloser(bytes.NewReader(content)), m.Identifier, nil +} + +// ReadDown reads the down migration for the given version and performs deployment suffix replacement. +func (ds *deploymentSource) ReadDown(version uint) (r io.ReadCloser, identifier string, err error) { + m, ok := ds.migrations.Down(version) + if !ok { + return nil, "", &fs.PathError{Op: "readdown", Path: ds.path, Err: fs.ErrNotExist} + } + + content, err := ds.readAndTransform(m.Raw) + if err != nil { + return nil, "", err + } + + return io.NopCloser(bytes.NewReader(content)), m.Identifier, nil +} + +// readAndTransform reads a migration file and replaces deployment placeholders. +func (ds *deploymentSource) readAndTransform(filename string) ([]byte, error) { + filepath := ds.path + "/" + filename + content, err := fs.ReadFile(ds.fs, filepath) + if err != nil { + return nil, fmt.Errorf("failed to read migration file %s: %w", filepath, err) + } + + // Replace {deployment_prefix} with actual prefix (or empty string) + transformed := ds.replaceDeploymentPrefix(string(content)) + return []byte(transformed), nil +} + +// replaceDeploymentPrefix replaces "{deployment_prefix}" placeholder with the actual prefix. +// If deploymentID is set, it becomes "{deploymentID}_". Otherwise, it becomes empty string. +func (ds *deploymentSource) replaceDeploymentPrefix(sql string) string { + prefix := "" + if ds.deploymentID != "" { + prefix = fmt.Sprintf("%s_", ds.deploymentID) + } + return strings.ReplaceAll(sql, "{deployment_prefix}", prefix) +} diff --git a/internal/migrator/migrations/clickhouse/000001_init.down.sql b/internal/migrator/migrations/clickhouse/000001_init.down.sql index f3db6fbf..622ff350 100644 --- a/internal/migrator/migrations/clickhouse/000001_init.down.sql +++ b/internal/migrator/migrations/clickhouse/000001_init.down.sql @@ -1 +1 @@ -DROP TABLE IF EXISTS event_log; +DROP TABLE IF EXISTS {deployment_prefix}event_log; diff --git a/internal/migrator/migrations/clickhouse/000001_init.up.sql b/internal/migrator/migrations/clickhouse/000001_init.up.sql index 311412e1..5df187c9 100644 --- a/internal/migrator/migrations/clickhouse/000001_init.up.sql +++ b/internal/migrator/migrations/clickhouse/000001_init.up.sql @@ -1,8 +1,9 @@ -- Single denormalized table for events and deliveries -- Each row represents a delivery attempt (or pending state) for an event -- This avoids JOINs and leverages ClickHouse's columnar storage efficiently +-- Note: {deployment_prefix} is replaced with {deployment_id}_ when running migrations with a deployment ID -CREATE TABLE IF NOT EXISTS event_log ( +CREATE TABLE IF NOT EXISTS {deployment_prefix}event_log ( -- Event fields event_id String, tenant_id String, diff --git a/internal/migrator/migrator.go b/internal/migrator/migrator.go index f89d270e..bbb0e5fd 100644 --- a/internal/migrator/migrator.go +++ b/internal/migrator/migrator.go @@ -91,8 +91,6 @@ func (m *Migrator) Up(ctx context.Context, n int) (int, int, error) { // Down migrates the database down by n migrations. It returns the updated version, // the number of migrations rolled back, and an error. func (m *Migrator) Down(ctx context.Context, n int) (int, int, error) { - fmt.Println("down", n) - initVersion, err := m.Version(ctx) if err != nil { return 0, 0, err @@ -134,10 +132,11 @@ type MigrationOptsPG struct { } type MigrationOptsCH struct { - Addr string - Username string - Password string - Database string + Addr string + Username string + Password string + Database string + DeploymentID string // If set, creates deployment-specific tables (e.g., event_log_{deploymentID}) } type MigrationOpts struct { @@ -167,9 +166,10 @@ func (opts *MigrationOpts) getDriver() (source.Driver, error) { } if opts.CH.Addr != "" { - d, err := iofs.New(chMigrations, "migrations/clickhouse") + // Always use deployment source to handle {deployment_prefix} placeholder + d, err := newDeploymentSource(chMigrations, "migrations/clickhouse", opts.CH.DeploymentID) if err != nil { - return nil, fmt.Errorf("failed to create clickhouse migration source: %w", err) + return nil, fmt.Errorf("failed to create clickhouse deployment migration source: %w", err) } return d, nil } @@ -187,7 +187,12 @@ func (opts *MigrationOpts) databaseURL() string { } if opts.CH.Addr != "" { - return fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true", opts.CH.Username, opts.CH.Password, opts.CH.Addr, opts.CH.Database) + url := fmt.Sprintf("clickhouse://%s:%s@%s/%s?x-multi-statement=true", opts.CH.Username, opts.CH.Password, opts.CH.Addr, opts.CH.Database) + // Use deployment-specific migrations table to avoid conflicts between deployments + if opts.CH.DeploymentID != "" { + url += fmt.Sprintf("&x-migrations-table=%s_schema_migrations", opts.CH.DeploymentID) + } + return url } return "" diff --git a/internal/migrator/migrator_test.go b/internal/migrator/migrator_test.go index d930809a..07496b5d 100644 --- a/internal/migrator/migrator_test.go +++ b/internal/migrator/migrator_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" + "github.com/hookdeck/outpost/internal/clickhouse" + "github.com/hookdeck/outpost/internal/util/testinfra" "github.com/hookdeck/outpost/internal/util/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -302,3 +304,146 @@ func sanitizeURLForTesting(dbURL string) string { } return dbURL } + +// TestMigrator_ClickHouse_DeploymentSuffix tests that ClickHouse migrations +// correctly create tables with or without the deployment suffix. +func TestMigrator_ClickHouse_DeploymentSuffix(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Cleanup(testinfra.Start(t)) + + chConfig := testinfra.NewClickHouseConfig(t) + + tests := []struct { + name string + deploymentID string + expectedTable string + }{ + { + name: "without deployment ID creates event_log table", + deploymentID: "", + expectedTable: "event_log", + }, + { + name: "with deployment ID creates event_log_{deploymentID} table", + deploymentID: "test_deployment", + expectedTable: "event_log_test_deployment", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + + // Create migrator with optional deployment ID + m, err := New(MigrationOpts{ + CH: MigrationOptsCH{ + Addr: chConfig.Addr, + Username: chConfig.Username, + Password: chConfig.Password, + Database: chConfig.Database, + DeploymentID: tt.deploymentID, + }, + }) + require.NoError(t, err) + + // Run migrations up + version, applied, err := m.Up(ctx, -1) + require.NoError(t, err) + assert.Equal(t, 1, version, "should be at version 1") + assert.Equal(t, 1, applied, "should have applied 1 migration") + + // Verify the correct table was created + chDB, err := clickhouse.New(&chConfig) + require.NoError(t, err) + defer chDB.Close() + + // Check table exists by querying it + var count uint64 + err = chDB.QueryRow(ctx, "SELECT count() FROM "+tt.expectedTable).Scan(&count) + require.NoError(t, err, "table %s should exist", tt.expectedTable) + + // Roll back migrations + version, rolledBack, err := m.Down(ctx, -1) + require.NoError(t, err) + assert.Equal(t, 0, version, "should be at version 0 after rollback") + assert.Equal(t, 1, rolledBack, "should have rolled back 1 migration") + + // Verify table was dropped + err = chDB.QueryRow(ctx, "SELECT count() FROM "+tt.expectedTable).Scan(&count) + require.Error(t, err, "table %s should not exist after rollback", tt.expectedTable) + + m.Close(ctx) + }) + } +} + +// TestMigrator_ClickHouse_DeploymentSuffix_Isolation tests that different deployments +// have isolated tables and migrations tracking. +func TestMigrator_ClickHouse_DeploymentSuffix_Isolation(t *testing.T) { + testutil.CheckIntegrationTest(t) + t.Cleanup(testinfra.Start(t)) + + chConfig := testinfra.NewClickHouseConfig(t) + ctx := context.Background() + + // Create two migrators with different deployment IDs + m1, err := New(MigrationOpts{ + CH: MigrationOptsCH{ + Addr: chConfig.Addr, + Username: chConfig.Username, + Password: chConfig.Password, + Database: chConfig.Database, + DeploymentID: "deployment_a", + }, + }) + require.NoError(t, err) + defer m1.Close(ctx) + + m2, err := New(MigrationOpts{ + CH: MigrationOptsCH{ + Addr: chConfig.Addr, + Username: chConfig.Username, + Password: chConfig.Password, + Database: chConfig.Database, + DeploymentID: "deployment_b", + }, + }) + require.NoError(t, err) + defer m2.Close(ctx) + + // Run migrations for deployment_a + _, _, err = m1.Up(ctx, -1) + require.NoError(t, err) + + // Run migrations for deployment_b + _, _, err = m2.Up(ctx, -1) + require.NoError(t, err) + + // Verify both tables exist independently + chDB, err := clickhouse.New(&chConfig) + require.NoError(t, err) + defer chDB.Close() + + var count uint64 + err = chDB.QueryRow(ctx, "SELECT count() FROM event_log_deployment_a").Scan(&count) + require.NoError(t, err, "event_log_deployment_a should exist") + + err = chDB.QueryRow(ctx, "SELECT count() FROM event_log_deployment_b").Scan(&count) + require.NoError(t, err, "event_log_deployment_b should exist") + + // Roll back deployment_a - should not affect deployment_b + _, _, err = m1.Down(ctx, -1) + require.NoError(t, err) + + // deployment_a table should be gone + err = chDB.QueryRow(ctx, "SELECT count() FROM event_log_deployment_a").Scan(&count) + require.Error(t, err, "event_log_deployment_a should not exist after rollback") + + // deployment_b table should still exist + err = chDB.QueryRow(ctx, "SELECT count() FROM event_log_deployment_b").Scan(&count) + require.NoError(t, err, "event_log_deployment_b should still exist") + + // Clean up deployment_b + _, _, err = m2.Down(ctx, -1) + require.NoError(t, err) +} diff --git a/internal/services/builder.go b/internal/services/builder.go index ac570ddd..12f29844 100644 --- a/internal/services/builder.go +++ b/internal/services/builder.go @@ -502,8 +502,9 @@ func (s *serviceInstance) initRedis(ctx context.Context, cfg *config.Config, log func (s *serviceInstance) initLogStore(ctx context.Context, cfg *config.Config, logger *logging.Logger) error { logger.Debug("configuring log store driver", zap.String("service", s.name)) logStoreDriverOpts, err := logstore.MakeDriverOpts(logstore.Config{ - ClickHouse: cfg.ClickHouse.ToConfig(), - Postgres: &cfg.PostgresURL, + ClickHouse: cfg.ClickHouse.ToConfig(), + Postgres: &cfg.PostgresURL, + DeploymentID: cfg.DeploymentID, }) if err != nil { logger.Error("log store driver configuration failed", zap.String("service", s.name), zap.Error(err))