From e14148b8f5f68c1f636fb20297319cd6a5f4873a Mon Sep 17 00:00:00 2001 From: yux0 Date: Fri, 4 Apr 2025 11:08:18 -0700 Subject: [PATCH 1/3] Add a system workflwo to manage config --- common/persistence/cluster_metadata_store.go | 3 +- service/worker/fx.go | 2 + service/worker/systemconfig/activities.go | 64 +++++++++++++++++++ service/worker/systemconfig/fx.go | 64 +++++++++++++++++++ .../systemconfig/update_failover_version.go | 32 ++++++++++ 5 files changed, 163 insertions(+), 2 deletions(-) create mode 100644 service/worker/systemconfig/activities.go create mode 100644 service/worker/systemconfig/fx.go create mode 100644 service/worker/systemconfig/update_failover_version.go diff --git a/common/persistence/cluster_metadata_store.go b/common/persistence/cluster_metadata_store.go index 3e6184612b8..6eb1d6eeaeb 100644 --- a/common/persistence/cluster_metadata_store.go +++ b/common/persistence/cluster_metadata_store.go @@ -238,8 +238,7 @@ func immutableFieldsChanged(old *persistencespb.ClusterMetadata, cur *persistenc return true } if old.IsGlobalNamespaceEnabled { - if (old.FailoverVersionIncrement != 0 && old.FailoverVersionIncrement != cur.FailoverVersionIncrement) || - (old.InitialFailoverVersion != 0 && old.InitialFailoverVersion != cur.InitialFailoverVersion) { + if old.InitialFailoverVersion != 0 && old.InitialFailoverVersion != cur.InitialFailoverVersion { return true } } diff --git a/service/worker/fx.go b/service/worker/fx.go index e67b1520762..ce490ebcddc 100644 --- a/service/worker/fx.go +++ b/service/worker/fx.go @@ -54,6 +54,7 @@ import ( "go.temporal.io/server/service/worker/dlq" "go.temporal.io/server/service/worker/migration" "go.temporal.io/server/service/worker/scheduler" + "go.temporal.io/server/service/worker/systemconfig" "go.temporal.io/server/service/worker/workerdeployment" "go.uber.org/fx" ) @@ -69,6 +70,7 @@ var Module = fx.Options( workerdeployment.Module, dlq.Module, dynamicconfig.Module, + systemconfig.Module, fx.Provide( func(c resource.HistoryClient) dlq.HistoryClient { return c diff --git a/service/worker/systemconfig/activities.go b/service/worker/systemconfig/activities.go new file mode 100644 index 00000000000..ac7a90d19b2 --- /dev/null +++ b/service/worker/systemconfig/activities.go @@ -0,0 +1,64 @@ +package systemconfig + +import ( + "context" + "fmt" + + "go.temporal.io/server/common/persistence" +) + +type ( + UpdateFailoverVersionIncrementInput struct { + CurrentFVI int64 + NewFVI int64 + } + + UpdateFailoverVersionIncrementOuput struct { + CurrentFVI int64 + NewFVI int64 + } + + activities struct { + currentClusterName string + clusterMetadataManager persistence.ClusterMetadataManager + } +) + +func (a *activities) UpdateFVI( + ctx context.Context, + input UpdateFailoverVersionIncrementInput, +) (UpdateFailoverVersionIncrementOuput, error) { + request := &persistence.GetClusterMetadataRequest{ + ClusterName: a.currentClusterName, + } + resp, err := a.clusterMetadataManager.GetClusterMetadata(ctx, request) + if err != nil { + return UpdateFailoverVersionIncrementOuput{}, err + } + metadata := resp.ClusterMetadata + if metadata.FailoverVersionIncrement != input.CurrentFVI { + return UpdateFailoverVersionIncrementOuput{}, + fmt.Errorf("failover version increment %v does not match input version %v", metadata.FailoverVersionIncrement, input.CurrentFVI) + } + if metadata.InitialFailoverVersion >= input.NewFVI { + return UpdateFailoverVersionIncrementOuput{}, + fmt.Errorf("failover version increment %v is less than initial failover version %v", input.NewFVI, metadata.InitialFailoverVersion) + } + if !metadata.IsGlobalNamespaceEnabled { + return UpdateFailoverVersionIncrementOuput{}, + fmt.Errorf("please update failover version increment from application yaml file") + } + + metadata.FailoverVersionIncrement = input.NewFVI + applied, err := a.clusterMetadataManager.SaveClusterMetadata(ctx, &persistence.SaveClusterMetadataRequest{ + ClusterMetadata: metadata, + Version: resp.Version, + }) + if err != nil { + return UpdateFailoverVersionIncrementOuput{}, err + } + if !applied { + return UpdateFailoverVersionIncrementOuput{}, fmt.Errorf("new failover version increment did not apply") + } + return UpdateFailoverVersionIncrementOuput{}, nil +} diff --git a/service/worker/systemconfig/fx.go b/service/worker/systemconfig/fx.go new file mode 100644 index 00000000000..c9aab0b187c --- /dev/null +++ b/service/worker/systemconfig/fx.go @@ -0,0 +1,64 @@ +package systemconfig + +import ( + sdkworker "go.temporal.io/sdk/worker" + "go.temporal.io/sdk/workflow" + "go.temporal.io/server/common/cluster" + "go.temporal.io/server/common/persistence" + workercommon "go.temporal.io/server/service/worker/common" + "go.uber.org/fx" +) + +type ( + initParams struct { + fx.In + ClusterMetadataManager persistence.ClusterMetadataManager + ClusterMetadata cluster.Metadata + } + + fxResult struct { + fx.Out + Component workercommon.WorkerComponent `group:"workerComponent"` + } + + clusterWorkerComponent struct { + initParams + } +) + +var Module = fx.Options( + fx.Provide(NewResult), +) + +func NewResult(params initParams) fxResult { + component := &clusterWorkerComponent{ + initParams: params, + } + return fxResult{ + Component: component, + } +} + +func (wc *clusterWorkerComponent) RegisterWorkflow(registry sdkworker.Registry) { + registry.RegisterWorkflowWithOptions(UpdateFailoverVersionIncrementWorkflow, workflow.RegisterOptions{Name: updateFailoverVersionIncrementWorkflowName}) +} + +func (wc *clusterWorkerComponent) DedicatedWorkflowWorkerOptions() *workercommon.DedicatedWorkerOptions { + // Use default worker + return nil +} + +func (wc *clusterWorkerComponent) RegisterActivities(registry sdkworker.Registry) { + registry.RegisterActivity(wc.activities()) +} + +func (wc *clusterWorkerComponent) DedicatedActivityWorkerOptions() *workercommon.DedicatedWorkerOptions { + return nil +} + +func (wc *clusterWorkerComponent) activities() *activities { + return &activities{ + currentClusterName: wc.ClusterMetadata.GetCurrentClusterName(), + clusterMetadataManager: wc.ClusterMetadataManager, + } +} diff --git a/service/worker/systemconfig/update_failover_version.go b/service/worker/systemconfig/update_failover_version.go new file mode 100644 index 00000000000..3334a6a9145 --- /dev/null +++ b/service/worker/systemconfig/update_failover_version.go @@ -0,0 +1,32 @@ +package systemconfig + +import ( + "time" + + "go.temporal.io/sdk/workflow" +) + +const ( + updateFailoverVersionIncrementWorkflowName = "update-failover-version-increment" +) + +type ( + UpdateFailoverVersionIncrementParams struct { + CurrentFVI int64 + NewFVI int64 + } +) + +func UpdateFailoverVersionIncrementWorkflow( + ctx workflow.Context, + params UpdateFailoverVersionIncrementParams, +) error { + activityOptions := workflow.ActivityOptions{ + StartToCloseTimeout: 1 * time.Minute, + } + future := workflow.ExecuteActivity(workflow.WithActivityOptions(ctx, activityOptions), "UpdateFVI", UpdateFailoverVersionIncrementInput{ + CurrentFVI: params.CurrentFVI, + NewFVI: params.NewFVI, + }) + return future.Get(ctx, nil) +} From 58b40711f7e74fcd5c874d684a3a34a2f99c62c2 Mon Sep 17 00:00:00 2001 From: yux0 Date: Mon, 7 Apr 2025 14:12:36 -0700 Subject: [PATCH 2/3] update code --- service/worker/systemconfig/activities.go | 36 +++++++++++++---------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/service/worker/systemconfig/activities.go b/service/worker/systemconfig/activities.go index ac7a90d19b2..ac6ae318b06 100644 --- a/service/worker/systemconfig/activities.go +++ b/service/worker/systemconfig/activities.go @@ -2,6 +2,7 @@ package systemconfig import ( "context" + "errors" "fmt" "go.temporal.io/server/common/persistence" @@ -13,9 +14,7 @@ type ( NewFVI int64 } - UpdateFailoverVersionIncrementOuput struct { - CurrentFVI int64 - NewFVI int64 + UpdateFailoverVersionIncrementOutput struct { } activities struct { @@ -27,26 +26,33 @@ type ( func (a *activities) UpdateFVI( ctx context.Context, input UpdateFailoverVersionIncrementInput, -) (UpdateFailoverVersionIncrementOuput, error) { +) (UpdateFailoverVersionIncrementOutput, error) { request := &persistence.GetClusterMetadataRequest{ ClusterName: a.currentClusterName, } + if input.NewFVI <= 0 { + return UpdateFailoverVersionIncrementOutput{}, + errors.New("invalid failover version increment") + } resp, err := a.clusterMetadataManager.GetClusterMetadata(ctx, request) if err != nil { - return UpdateFailoverVersionIncrementOuput{}, err + return UpdateFailoverVersionIncrementOutput{}, err } metadata := resp.ClusterMetadata if metadata.FailoverVersionIncrement != input.CurrentFVI { - return UpdateFailoverVersionIncrementOuput{}, - fmt.Errorf("failover version increment %v does not match input version %v", metadata.FailoverVersionIncrement, input.CurrentFVI) + return UpdateFailoverVersionIncrementOutput{}, + errors.New(fmt.Sprintf("failover version increment %v does not match input version %v", metadata.FailoverVersionIncrement, input.CurrentFVI)) + } + if metadata.InitialFailoverVersion == input.NewFVI { + return UpdateFailoverVersionIncrementOutput{}, nil } - if metadata.InitialFailoverVersion >= input.NewFVI { - return UpdateFailoverVersionIncrementOuput{}, - fmt.Errorf("failover version increment %v is less than initial failover version %v", input.NewFVI, metadata.InitialFailoverVersion) + if metadata.InitialFailoverVersion > input.NewFVI { + return UpdateFailoverVersionIncrementOutput{}, + errors.New(fmt.Sprintf("failover version increment %v is less than initial failover version %v", input.NewFVI, metadata.InitialFailoverVersion)) } if !metadata.IsGlobalNamespaceEnabled { - return UpdateFailoverVersionIncrementOuput{}, - fmt.Errorf("please update failover version increment from application yaml file") + return UpdateFailoverVersionIncrementOutput{}, + errors.New("please update failover version increment from application yaml file") } metadata.FailoverVersionIncrement = input.NewFVI @@ -55,10 +61,10 @@ func (a *activities) UpdateFVI( Version: resp.Version, }) if err != nil { - return UpdateFailoverVersionIncrementOuput{}, err + return UpdateFailoverVersionIncrementOutput{}, err } if !applied { - return UpdateFailoverVersionIncrementOuput{}, fmt.Errorf("new failover version increment did not apply") + return UpdateFailoverVersionIncrementOutput{}, errors.New("new failover version increment did not apply") } - return UpdateFailoverVersionIncrementOuput{}, nil + return UpdateFailoverVersionIncrementOutput{}, nil } From 7a783ff52e585f9110cfbfbe96b40cd8bc7165a7 Mon Sep 17 00:00:00 2001 From: yux0 Date: Mon, 7 Apr 2025 15:19:18 -0700 Subject: [PATCH 3/3] add header --- service/worker/systemconfig/activities.go | 24 +++++++++++++++++++ service/worker/systemconfig/fx.go | 24 +++++++++++++++++++ .../systemconfig/update_failover_version.go | 24 +++++++++++++++++++ 3 files changed, 72 insertions(+) diff --git a/service/worker/systemconfig/activities.go b/service/worker/systemconfig/activities.go index ac6ae318b06..29dd4f365d7 100644 --- a/service/worker/systemconfig/activities.go +++ b/service/worker/systemconfig/activities.go @@ -1,3 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package systemconfig import ( diff --git a/service/worker/systemconfig/fx.go b/service/worker/systemconfig/fx.go index c9aab0b187c..ee9f8980964 100644 --- a/service/worker/systemconfig/fx.go +++ b/service/worker/systemconfig/fx.go @@ -1,3 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package systemconfig import ( diff --git a/service/worker/systemconfig/update_failover_version.go b/service/worker/systemconfig/update_failover_version.go index 3334a6a9145..fbb202e7eeb 100644 --- a/service/worker/systemconfig/update_failover_version.go +++ b/service/worker/systemconfig/update_failover_version.go @@ -1,3 +1,27 @@ +// The MIT License +// +// Copyright (c) 2020 Temporal Technologies Inc. All rights reserved. +// +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package systemconfig import (