diff --git a/control_plane.go b/control_plane.go index 73c31b7..850f1d8 100644 --- a/control_plane.go +++ b/control_plane.go @@ -15,6 +15,9 @@ package opensergo import ( + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" "log" "os" "sync" @@ -38,7 +41,7 @@ type ControlPlane struct { func NewControlPlane() (*ControlPlane, error) { cp := &ControlPlane{} - operator, err := controller.NewKubernetesOperator(cp.sendMessage) + operator, err := controller.NewKubernetesOperator(cp.sendMessage, cp.NotifyPluginHandler) if err != nil { return nil, err } @@ -71,6 +74,32 @@ func (c *ControlPlane) Start() error { return nil } +func (c *ControlPlane) NotifyPluginHandler(pluginName string, e any) error { + client, err := c.server.PluginServer.GetPluginClient(pluginName) + if err != nil { + log.Printf("Error:%s\n", err.Error()) + } + switch pluginName { + case builtin.RateLimitServicePluginName: + raw, ok := client.(ratelimit_plugin.RateLimit) + if !ok { + return errors.New("can't convert ratelimit plugin to normal wrapper") + } + l, ok := e.(*v1alpha1.RateLimitStrategy) + if !ok { + log.Printf("Error: %s\n", "can't convert event to ratelimit strategy") + } + err = builtin.NotifyPluginRateLimit(raw, l) + if err != nil { + return err + } + default: + log.Printf("unknown plugin name: %s\n", pluginName) + } + return nil + +} + func (c *ControlPlane) sendMessage(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error { connections, exists := c.server.ConnectionManager().Get(namespace, app, kind) if !exists || connections == nil { @@ -94,6 +123,7 @@ func (c *ControlPlane) sendMessageToStream(stream model.OpenSergoTransportStream if stream == nil { return nil } + return stream.SendMsg(&trpb.SubscribeResponse{ Status: status, Ack: "", diff --git a/go.mod b/go.mod index 61ecf71..90d8c01 100644 --- a/go.mod +++ b/go.mod @@ -1,24 +1,80 @@ module github.com/opensergo/opensergo-control-plane -go 1.14 +go 1.18 require ( github.com/alibaba/sentinel-golang v1.0.3 - github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f github.com/envoyproxy/protoc-gen-validate v0.6.7 github.com/go-logr/logr v0.4.0 github.com/golang/protobuf v1.5.2 - github.com/json-iterator/go v1.1.12 // indirect - github.com/kr/pretty v0.3.0 // indirect + github.com/hashicorp/go-hclog v1.1.0 + github.com/hashicorp/go-plugin v1.4.10 + github.com/hashicorp/go-secure-stdlib/base62 v0.1.2 + github.com/hashicorp/go-secure-stdlib/pluginutil/v2 v2.0.4 github.com/pkg/errors v0.9.1 - github.com/rogpeppe/go-internal v1.8.0 // indirect go.uber.org/atomic v1.7.0 google.golang.org/genproto v0.0.0-20220329172620-7be39ac1afc7 google.golang.org/grpc v1.51.0 google.golang.org/protobuf v1.28.1 - gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 k8s.io/apimachinery v0.21.4 k8s.io/client-go v0.21.4 sigs.k8s.io/controller-runtime v0.9.7 ) + +require ( + cloud.google.com/go v0.65.0 // indirect + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.1.1 // indirect + github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/evanphx/json-patch v4.11.0+incompatible // indirect + github.com/fatih/color v1.7.0 // indirect + github.com/fsnotify/fsnotify v1.4.9 // indirect + github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect + github.com/google/go-cmp v0.5.7 // indirect + github.com/google/gofuzz v1.1.0 // indirect + github.com/google/uuid v1.1.2 // indirect + github.com/googleapis/gnostic v0.5.5 // indirect + github.com/hashicorp/go-uuid v1.0.2 // indirect + github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb // indirect + github.com/imdario/mergo v0.3.12 // indirect + github.com/json-iterator/go v1.1.12 // indirect + github.com/kr/pretty v0.3.0 // indirect + github.com/mattn/go-colorable v0.1.6 // indirect + github.com/mattn/go-isatty v0.0.12 // indirect + github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect + github.com/mitchellh/go-testing-interface v1.0.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/oklog/run v1.0.0 // indirect + github.com/prometheus/client_golang v1.11.0 // indirect + github.com/prometheus/client_model v0.3.0 // indirect + github.com/prometheus/common v0.26.0 // indirect + github.com/prometheus/procfs v0.6.0 // indirect + github.com/rogpeppe/go-internal v1.8.0 // indirect + github.com/spf13/pflag v1.0.5 // indirect + golang.org/x/crypto v0.8.0 // indirect + golang.org/x/net v0.9.0 // indirect + golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect + golang.org/x/sys v0.7.0 // indirect + golang.org/x/term v0.7.0 // indirect + golang.org/x/text v0.9.0 // indirect + golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect + gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect + google.golang.org/appengine v1.6.7 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.4.0 // indirect + k8s.io/api v0.21.4 // indirect + k8s.io/apiextensions-apiserver v0.21.4 // indirect + k8s.io/component-base v0.21.4 // indirect + k8s.io/klog/v2 v2.8.0 // indirect + k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 // indirect + k8s.io/utils v0.0.0-20210802155522-efc7438f0176 // indirect + sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect + sigs.k8s.io/yaml v1.2.0 // indirect +) diff --git a/go.sum b/go.sum index 736c4bc..93d3cba 100644 --- a/go.sum +++ b/go.sum @@ -72,7 +72,6 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQwij/eHl5CU= github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -85,7 +84,6 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= @@ -100,7 +98,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc h1:PYXxkRUBGUMa5xgMVMDl62vEklZvKpVaxQeN9ie7Hfk= github.com/cncf/xds/go v0.0.0-20220314180256-7f1daf1720fc/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= @@ -142,7 +139,6 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f h1:WM6jD/5NGnwG5ZiZIZtYldAt0j+Q7xOvEEEMQtbuk5M= github.com/envoyproxy/go-control-plane v0.10.3-0.20221109183938-2935a23e638f/go.mod h1:ufpOdMVWU+v42FYQiIBUhSWglFcK3S1Ml8bbzLwkdcE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= @@ -152,6 +148,7 @@ github.com/evanphx/json-patch v0.5.2/go.mod h1:ZWS5hhDbVDyob71nXKNL0+PWn6ToqBHMi github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.11.0+incompatible h1:glyUF9yIYtMHzn8xaKw5rMhdWcwsYV8dZHIq5567/xs= github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= +github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= @@ -283,14 +280,24 @@ github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyN github.com/hashicorp/consul/sdk v0.3.0/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= +github.com/hashicorp/go-hclog v1.1.0 h1:QsGcniKx5/LuX2eYoeL+Np3UKYPNaN7YKpTh29h8rbw= +github.com/hashicorp/go-hclog v1.1.0/go.mod h1:whpDNt7SSdeAju8AWKIWsul05p54N/39EeqMAyrmvFQ= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk= +github.com/hashicorp/go-plugin v1.4.10 h1:xUbmA4jC6Dq163/fWcp8P3JuHilrHHMLNRxzGQJ9hNk= +github.com/hashicorp/go-plugin v1.4.10/go.mod h1:6/1TEzT0eQznvI/gV2CM29DLSkAK/e58mUWKVsPaph0= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= +github.com/hashicorp/go-secure-stdlib/base62 v0.1.2 h1:ET4pqyjiGmY09R5y+rSd70J2w45CtbWDNvGqWp/R3Ng= +github.com/hashicorp/go-secure-stdlib/base62 v0.1.2/go.mod h1:EdWO6czbmthiwZ3/PUsDV+UD1D5IRU4ActiaWGwt0Yw= +github.com/hashicorp/go-secure-stdlib/pluginutil/v2 v2.0.4 h1:IU2iGcvthrJ53rPbQU6B4+iVs/cLPO89ti5dd2XEj3k= +github.com/hashicorp/go-secure-stdlib/pluginutil/v2 v2.0.4/go.mod h1:TDJ8YE+fJNEW7OQUqL9RjB6SPieLUxvKzrt8u19qOkg= github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU= github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -302,6 +309,8 @@ github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ= github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I= github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc= +github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb h1:b5rjCoWHc7eqmAS4/qyk21ZsHyb6Mxv/jykxvNTkU4M= +github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKeTrX5uv1uIXGdwYDTeHna2qgaIlx54MXqjAM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg= github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= @@ -312,6 +321,7 @@ github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= +github.com/jhump/protoreflect v1.6.0 h1:h5jfMVslIg6l29nsMs0D8Wj17RDVdNYti0vDN/PZZoE= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= @@ -353,8 +363,15 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs= github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84= +github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= @@ -363,6 +380,7 @@ github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3N github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-testing-interface v1.0.0 h1:fzU/JVNcaqHQEcVFAKeR41fkiLdIPrefOvVG1VZ96U0= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg= github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY= @@ -394,6 +412,7 @@ github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= +github.com/oklog/run v1.0.0 h1:Ru7dDtJNOyC66gQ5dQmaCa0qIsAUFY3sFpK1Xk8igrw= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -524,8 +543,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tklauser/go-sysconf v0.3.6/go.mod h1:MkWzOF4RMCshBAMXuhXJs64Rte09mITnppBXY/rYEFI= github.com/tklauser/numcpus v0.2.2/go.mod h1:x3qojaO3uyYt0i56EW/VUYs7uBvdl2fkfZFu0T9wgjM= @@ -539,7 +558,6 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= @@ -584,7 +602,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.8.0 h1:pd9TJtTueMTVQXzk8E2XESSMQDj/U7OUu0PqJqPXQjQ= +golang.org/x/crypto v0.8.0/go.mod h1:mRqEX+O9/h5TFCrQhkgjo2yKi0yYA+9ecGkdQoHrywE= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -620,8 +639,6 @@ golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.1-0.20200828183125-ce943fd02449/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 h1:6zppjxzCulZykYSLyVDYbneBfbaBIQPYMevg0bEwv2s= -golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -660,12 +677,11 @@ golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81R golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210224082022-3d97a244fca7/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.9.0 h1:aWJ/m6xSmxWBx+V0XRHTlrYrPG56jKsLdTFmsSsCzOM= +golang.org/x/net v0.9.0/go.mod h1:d48xBJpPfHeWQsugry2m+kC02ZBRGRgulfHnEXEuWns= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -684,7 +700,6 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -694,6 +709,7 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -707,6 +723,7 @@ golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191008105621-543471e840be/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -714,6 +731,7 @@ golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200212091648-12a6c2dcc1e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -741,17 +759,15 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210816183151-1e6c022a8912/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.7.0 h1:3jlCCIQZPdOYu1h8BkNvLz8Kgwtae2cagcG/VamtZRU= +golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY= -golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.7.0 h1:BEvjmm5fURWqcfbSKTdpkDXYBrUS1c0m8agp14W48vQ= +golang.org/x/term v0.7.0/go.mod h1:P32HKFT3hSsZrRxla30E9HqToFYAQPCMs/zFMBUFqPY= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -761,8 +777,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg= -golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= @@ -826,8 +842,7 @@ golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.1.12 h1:VveCTK38A2rkS8ZqFY25HIDFscX5X9OoEhJd3quQmXU= -golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -934,7 +949,6 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= @@ -970,8 +984,9 @@ gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/controller/crd_watcher.go b/pkg/controller/crd_watcher.go index bcc42ad..ae3a330 100644 --- a/pkg/controller/crd_watcher.go +++ b/pkg/controller/crd_watcher.go @@ -16,6 +16,7 @@ package controller import ( "context" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" "log" "net/http" "strconv" @@ -53,8 +54,9 @@ type CRDWatcher struct { subscribedNamespaces map[string]bool subscribedApps map[model.NamespacedApp]bool - crdGenerator func() client.Object - sendDataHandler model.DataEntirePushHandler + crdGenerator func() client.Object + sendDataHandler model.DataEntirePushHandler + notifyPluginHandler model.NotifyPluginHandler updateMux sync.RWMutex } @@ -247,6 +249,10 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro case RateLimitStrategyKind: rls := object.(*crdv1alpha1.RateLimitStrategy) + err = r.notifyPluginHandler(builtin.RateLimitServicePluginName, rls) + if err != nil { + log.Println("notify plugin error", err) + } mType, _ := strconv.ParseInt(rls.Spec.MetricType, 10, 32) limitMode, _ := strconv.ParseInt(rls.Spec.LimitMode, 10, 32) rule = &pb.RateLimitStrategy{ @@ -328,7 +334,7 @@ func (r *CRDWatcher) translateCrdToProto(object client.Object) (*anypb.Any, erro } -func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler) *CRDWatcher { +func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerator func() client.Object, sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) *CRDWatcher { return &CRDWatcher{ kind: kind, Client: crdManager.GetClient(), @@ -340,5 +346,6 @@ func NewCRDWatcher(crdManager ctrl.Manager, kind model.SubscribeKind, crdGenerat crdGenerator: crdGenerator, crdCache: NewCRDCache(kind), sendDataHandler: sendDataHandler, + notifyPluginHandler: notifyPluginHandler, } } diff --git a/pkg/controller/k8s_operator.go b/pkg/controller/k8s_operator.go index 6c72356..538d77c 100644 --- a/pkg/controller/k8s_operator.go +++ b/pkg/controller/k8s_operator.go @@ -72,13 +72,14 @@ type KubernetesOperator struct { ctxCancel context.CancelFunc started atomic.Value - sendDataHandler model.DataEntirePushHandler + sendDataHandler model.DataEntirePushHandler + notifyPluginHandler model.NotifyPluginHandler controllerMux sync.RWMutex } // NewKubernetesOperator creates a OpenSergo Kubernetes operator. -func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*KubernetesOperator, error) { +func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler, notifyPluginHandler model.NotifyPluginHandler) (*KubernetesOperator, error) { ctrl.SetLogger(&k8SLogger{ l: logging.GetGlobalLogger(), level: logging.GetGlobalLoggerLevel(), @@ -102,11 +103,12 @@ func NewKubernetesOperator(sendDataHandler model.DataEntirePushHandler) (*Kubern } ctx, cancel := context.WithCancel(context.Background()) k := &KubernetesOperator{ - crdManager: mgr, - controllers: make(map[string]*CRDWatcher), - ctx: ctx, - ctxCancel: cancel, - sendDataHandler: sendDataHandler, + crdManager: mgr, + controllers: make(map[string]*CRDWatcher), + ctx: ctx, + ctxCancel: cancel, + sendDataHandler: sendDataHandler, + notifyPluginHandler: notifyPluginHandler, } return k, nil } @@ -145,7 +147,7 @@ func (k *KubernetesOperator) RegisterWatcher(target model.SubscribeTarget) (*CRD return nil, errors.New("CRD not supported: " + target.Kind) } // This kind of CRD has never been watched. - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return nil, err @@ -178,7 +180,7 @@ func (k *KubernetesOperator) AddWatcher(target model.SubscribeTarget) error { if !crdSupports { return errors.New("CRD not supported: " + target.Kind) } - crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler) + crdWatcher := NewCRDWatcher(k.crdManager, target.Kind, crdMetadata.Generator(), k.sendDataHandler, k.notifyPluginHandler) err = crdWatcher.AddSubscribeTarget(target) if err != nil { return err diff --git a/pkg/model/model.go b/pkg/model/model.go index 29140e4..19a3332 100644 --- a/pkg/model/model.go +++ b/pkg/model/model.go @@ -31,3 +31,5 @@ type OpenSergoTransportStream = trpb.OpenSergoUniversalTransportService_Subscrib type SubscribeRequestHandler func(ClientIdentifier, *trpb.SubscribeRequest, OpenSergoTransportStream) error type DataEntirePushHandler func(namespace, app, kind string, dataWithVersion *trpb.DataWithVersion, status *trpb.Status, respId string) error + +type NotifyPluginHandler func(pluginName string, e any) error diff --git a/pkg/plugin/.golangci.yml b/pkg/plugin/.golangci.yml new file mode 100644 index 0000000..f62e0e9 --- /dev/null +++ b/pkg/plugin/.golangci.yml @@ -0,0 +1,361 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +run: + timeout: 10m + tests: false + skip-files: + - "^zz_generated.*" +linters: + disable-all: true + enable: + # The base lints + - errcheck + - gosimple + - govet + - ineffassign + - staticcheck + - typecheck + - unused + - bodyclose + - cyclop + - nilerr + - goimports + - asciicheck + - prealloc + - stylecheck + - exportloopref + - rowserrcheck + - makezero + - durationcheck + - gosec + - predeclared + + # The advanced lints + - dupl + - exhaustive + - godot + - misspell + - varnamelen + - gocritic + - gocognit + - nestif + - maligned + #- wsl +# Refers: https://gist.github.com/maratori/47a4d00457a92aa426dbd48a18776322 +linters-settings: + wsl: + # See https://github.com/bombsimon/wsl/blob/master/doc/configuration.md for documentation of available settings. + # These are the defaults for `golangci-lint`. + + # Do strict checking when assigning from append (x = append(x, y)). If + # this is set to true - the append call must append either a variable + # assigned, called or used on the line above. + strict-append: true + # Allows assignments to be cuddled with variables used in calls on + # line above and calls to be cuddled with assignments of variables + # used in call on line above. + allow-assign-and-call: true + # Allows assignments to be cuddled with anything. + allow-assign-and-anything: false + # Allows cuddling to assignments even if they span over multiple lines. + allow-multiline-assign: true + # If the number of lines in a case block is equal to or lager than this + # number, the case *must* end white a newline. + force-case-trailing-whitespace: 0 + # Allow blocks to end with comments. + allow-trailing-comment: false + # Allow multiple comments in the beginning of a block separated with newline. + allow-separated-leading-comment: false + # Allow multiple var/declaration statements to be cuddled. + allow-cuddle-declarations: false + # A list of call idents that everything can be cuddled with. + # Defaults to calls looking like locks. + allow-cuddle-with-calls: ["Lock", "RLock"] + # AllowCuddleWithRHS is a list of right hand side variables that is allowed + # to be cuddled with anything. Defaults to assignments or calls looking + # like unlocks. + allow-cuddle-with-rhs: ["Unlock", "RUnlock"] + # Causes an error when an If statement that checks an error variable doesn't + # cuddle with the assignment of that variable. + force-err-cuddling: false + # When force-err-cuddling is enabled this is a list of names + # used for error variables to check for in the conditional. + error-variable-names: ["err"] + # Causes an error if a short declaration (:=) cuddles with anything other than + # another short declaration. + # This logic overrides force-err-cuddling among others. + force-short-decl-cuddling: false + varnamelen: + # The longest distance, in source lines, that is being considered a "small scope". + # Variables used in at most this many lines will be ignored. + # Default: 5 + max-distance: 6 + # The minimum length of a variable's name that is considered "long". + # Variable names that are at least this long will be ignored. + # Default: 3 + min-name-length: 1 + # Check method receivers. + # Default: false + check-receiver: false + # Check named return values. + # Default: false + check-return: true + # Check type parameters. + # Default: false + check-type-param: true + # Ignore "ok" variables that hold the bool return value of a type assertion. + # Default: false + ignore-type-assert-ok: true + # Ignore "ok" variables that hold the bool return value of a map index. + # Default: false + ignore-map-index-ok: true + # Ignore "ok" variables that hold the bool return value of a channel receive. + # Default: false + ignore-chan-recv-ok: true + # Optional list of variable names that should be ignored completely. + # Default: [] + ignore-names: + - err + # Optional list of variable declarations that should be ignored completely. + # Entries must be in one of the following forms (see below for examples): + # - for variables, parameters, named return values, method receivers, or type parameters: + # ( can also be a pointer/slice/map/chan/...) + # - for constants: const + # + # Default: [] + ignore-decls: + - c echo.Context + - t testing.T + - f *foo.Bar + - e error + - i int + - const C + - T any + - m map[string]int + prealloc: + # IMPORTANT: we don't recommend using this linter before doing performance profiling. + # For most programs usage of prealloc will be a premature optimization. + + # Report pre-allocation suggestions only on simple loops that have no returns/breaks/continues/gotos in them. + # Default: true + simple: false + # Report pre-allocation suggestions on range loops. + # Default: true + range-loops: false + # Report pre-allocation suggestions on for loops. + # Default: false + for-loops: true + nestif: + # Minimal complexity of if statements to report. + # Default: 5 + min-complexity: 6 + misspell: + # Correct spellings using locale preferences for US or UK. + # Setting locale to US will correct the British spelling of 'colour' to 'color'. + # Default is to use a neutral variety of English. + locale: US + # Default: [] + ignore-words: + - someword + godot: + # Comments to be checked: `declarations`, `toplevel`, or `all`. + # Default: declarations + scope: toplevel + # List of regexps for excluding particular comment lines from check. + # Default: [] + exclude: + # Exclude todo and fixme comments. + - "^fixme:" + - "^todo:" + # Check that each sentence ends with a period. + # Default: true + period: false + # Check that each sentence starts with a capital letter. + # Default: false + capital: false + dupl: + # Tokens count to trigger issue. + # Default: 150 + threshold: 100 + cyclop: + # The maximal code complexity to report. + # Default: 10 + max-complexity: 30 + # The maximal average package complexity. + # If it's higher than 0.0 (float) the check is enabled + # Default: 0.0 + package-average: 10.0 + errcheck: + # Report about not checking of errors in type assertions: `a := b.(MyStruct)`. + # Such cases aren't reported by default. + # Default: false + check-type-assertions: true + exhaustive: + # Program elements to check for exhaustiveness. + # Default: [ switch ] + check: + - switch + - map + funlen: + # Checks the number of lines in a function. + # If lower than 0, disable the check. + # Default: 60 + lines: 100 + # Checks the number of statements in a function. + # If lower than 0, disable the check. + # Default: 40 + statements: 50 + gocognit: + # Minimal code complexity to report. + # Default: 30 (but we recommend 10-20) + min-complexity: 20 + gocritic: + # Which checks should be enabled; can't be combined with 'disabled-checks'. + # See https://go-critic.github.io/overview#checks-overview. + # To check which checks are enabled run `GL_DEBUG=gocritic golangci-lint run`. + # By default, list of stable checks is used. + enabled-checks: + - elseif + - nestingReduce + - unnamedResult + # - ruleguard + - truncateCmp + - hugeparam + - rangevalcopy + - captlocal + - underef + - toomanyresultschecker + - rangeexprcopy + # Which checks should be disabled; can't be combined with 'enabled-checks'. + # Default: [] + disabled-checks: + - regexpMust + # Enable multiple checks by tags, run `GL_DEBUG=gocritic golangci-lint run` to see all tags and checks. + # See https://github.com/go-critic/go-critic#usage -> section "Tags". + # Default: [] + enabled-tags: + - diagnostic + - style + - performance + - experimental + - opinionated + disabled-tags: + - diagnostic + - style + - performance + - experimental + - opinionated + # Settings passed to gocritic. + # The settings key is the name of a supported gocritic checker. + # The list of supported checkers can be find in https://go-critic.github.io/overview. + settings: + # Must be valid enabled check name. + captLocal: + # Whether to restrict checker to params only. + # Default: true + paramsOnly: false + elseif: + # Whether to skip balanced if-else pairs. + # Default: true + skipBalanced: false + hugeParam: + # Size in bytes that makes the warning trigger. + # Default: 80 + sizeThreshold: 70 + nestingReduce: + # Min number of statements inside a branch to trigger a warning. + # Default: 5 + bodyWidth: 4 + rangeExprCopy: + # Size in bytes that makes the warning trigger. + # Default: 512 + sizeThreshold: 516 + # Whether to check test functions + # Default: true + skipTestFuncs: false + rangeValCopy: + # Size in bytes that makes the warning trigger. + # Default: 128 + sizeThreshold: 32 + # Whether to check test functions. + # Default: true + skipTestFuncs: false + ruleguard: + # Enable debug to identify which 'Where' condition was rejected. + # The value of the parameter is the name of a function in a ruleguard file. + # + # When a rule is evaluated: + # If: + # The Match() clause is accepted; and + # One of the conditions in the Where() clause is rejected, + # Then: + # ruleguard prints the specific Where() condition that was rejected. + # + # The flag is passed to the ruleguard 'debug-group' argument. + # Default: "" + debug: 'emptyDecl' + # Deprecated, use 'failOn' param. + # If set to true, identical to failOn='all', otherwise failOn='' + failOnError: false + # Determines the behavior when an error occurs while parsing ruleguard files. + # If flag is not set, log error and skip rule files that contain an error. + # If flag is set, the value must be a comma-separated list of error conditions. + # - 'all': fail on all errors. + # - 'import': ruleguard rule imports a package that cannot be found. + # - 'dsl': gorule file does not comply with the ruleguard DSL. + # Default: "" + failOn: dsl + # Comma-separated list of file paths containing ruleguard rules. + # If a path is relative, it is relative to the directory where the golangci-lint command is executed. + # The special '${configDir}' variable is substituted with the absolute directory containing the golangci config file. + # Glob patterns such as 'rules-*.go' may be specified. + # Default: "" + rules: '${configDir}/ruleguard/rules-*.go,${configDir}/myrule1.go' + # Comma-separated list of enabled groups or skip empty to enable everything. + # Tags can be defined with # character prefix. + # Default: "" + enable: "myGroupName,#myTagName" + # Comma-separated list of disabled groups or skip empty to enable everything. + # Tags can be defined with # character prefix. + # Default: "" + disable: "myGroupName,#myTagName" + tooManyResultsChecker: + # Maximum number of results. + # Default: 5 + maxResults: 10 + truncateCmp: + # Whether to skip int/uint/uintptr types. + # Default: true + skipArchDependent: false + underef: + # Whether to skip (*x).method() calls where x is a pointer receiver. + # Default: true + skipRecvDeref: false + unnamedResult: + # Whether to check exported functions. + # Default: false + checkExported: true +issues: + exclude-rules: + - path: _test\.go + linters: + - dupl + - errcheck + - gosec + - rowserrcheck + - makezero \ No newline at end of file diff --git a/pkg/plugin/Makefile b/pkg/plugin/Makefile new file mode 100644 index 0000000..fb6386f --- /dev/null +++ b/pkg/plugin/Makefile @@ -0,0 +1,40 @@ +.PHONY: all +all: build-plugins + +.PHONY: build-plugins +build-plugins: + CGO_ENABLED=$(CGO_ENABLED) sh -c "'$(CURDIR)/scripts/plugin.sh'" + +.PHONY: clean +clean: + -rm ./bin/manager + +.PHONY: fmt +fmt: import ## Run go fmt against code. + go fmt ./... + +## Location to install dependencies to +LOCALBIN ?= $(shell pwd)/bin +$(LOCALBIN): + mkdir -p $(LOCALBIN) + +CHECK_LINT?= $(LOCALBIN)/golangci-lint +CHECK_IMPORTS?= $(LOCALBIN)/goimports + +.PHONY: lint +lint: check-lint + GOBIN=$(LOCALBIN) CGO_ENABLED=$(CGO_ENABLED) golangci-lint run -v --timeout=5m ./... + +.PHONY: check-lint +check-lint: $(CHECK_LINT) ## Download golangci-lint-setup locally if necessary. +$(CHECK_LINT): $(LOCALBIN) + GOBIN=$(LOCALBIN) CGO_ENABLED=$(CGO_ENABLED) go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest + +.PHONY: import +import: check-goimports + GOBIN=$(LOCALBIN) CGO_ENABLED=$(CGO_ENABLED) ./bin/goimports -w -e -l . + +.PHONY: check-goimports +check-goimports: $(CHECK_IMPORTS) ## Download goimports-setup locally if necessary. +$(CHECK_IMPORTS): $(LOCALBIN) + GOBIN=$(LOCALBIN) CGO_ENABLED=$(CGO_ENABLED) go install golang.org/x/tools/cmd/goimports@latest diff --git a/pkg/plugin/assets.go b/pkg/plugin/assets.go new file mode 100644 index 0000000..bb4612e --- /dev/null +++ b/pkg/plugin/assets.go @@ -0,0 +1,22 @@ +package plugin + +import ( + "embed" + "io/fs" +) + +const contentDir = "assets" + +// content is our static web server content. +// +//go:embed assets +var content embed.FS + +func FileSystem() fs.FS { + // Remove the root + f, err := fs.Sub(content, contentDir) + if err != nil { + panic(err) + } + return f +} diff --git a/pkg/plugin/assets/README.md b/pkg/plugin/assets/README.md new file mode 100644 index 0000000..12a0b4d --- /dev/null +++ b/pkg/plugin/assets/README.md @@ -0,0 +1 @@ +This directory is used to store the built plug-in resources. \ No newline at end of file diff --git a/pkg/plugin/client/client.go b/pkg/plugin/client/client.go new file mode 100644 index 0000000..cc2817f --- /dev/null +++ b/pkg/plugin/client/client.go @@ -0,0 +1,60 @@ +package client + +import ( + "fmt" + "strings" + "sync" +) + +type PluginClientRegistry struct { + client sync.Map +} + +func (c *PluginClientRegistry) RegisterClient(id string, client interface{}) error { + if client == nil { + return fmt.Errorf("register client fail: %s", "client is nil") + } + _, loaded := c.client.LoadOrStore(id, client) + if loaded { + return fmt.Errorf("client %s already exists", id) + } + return nil +} + +func (c *PluginClientRegistry) GetPluginClient(id string) any { + value, _ := c.client.Load(id) + if value == nil { + return nil + } + return value +} + +func (c *PluginClientRegistry) DeletePluginClient(id string) { + c.client.Delete(id) +} + +func (c *PluginClientRegistry) RangePluginClientByName(name string) interface{} { + var client interface{} + c.client.Range(func(key, value interface{}) bool { + parts := strings.SplitN(key.(string), "-", 2) + prefix := parts[0] + if prefix == name { + client = value + return false + } + return true + }) + return client +} + +func (c *PluginClientRegistry) RangePluginClientByPublicID(publicID string) interface{} { + var client interface{} + c.client.Range(func(key, value interface{}) bool { + if key.(string) == publicID { + client = value + return false + } + return true + }) + return client +} diff --git a/pkg/plugin/config/config.go b/pkg/plugin/config/config.go new file mode 100644 index 0000000..f65f953 --- /dev/null +++ b/pkg/plugin/config/config.go @@ -0,0 +1,41 @@ +package config + +import ( + "fmt" + "log" + "os" + "path" + "runtime" + + "gopkg.in/yaml.v3" +) + +type Config struct { + ExternalPlugin []*Plugin `yaml:"plugin"` +} +type Plugin struct { + Type string `yaml:"type"` + ExecutionDir string `yaml:"execution_dir"` +} + +func ReadConfig(path string) (*Config, error) { + file, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("error discovering plugin information: %w", err) + } + config := &Config{} + err = yaml.Unmarshal(file, config) + if err != nil { + log.Fatalf("error parsing config file: %v", err) + } + return config, nil +} + +func GetCurrentAbPathByCaller() string { + var abPath string + _, filename, _, ok := runtime.Caller(0) + if ok { + abPath = path.Dir(filename) + } + return abPath +} diff --git a/pkg/plugin/config/config.yaml b/pkg/plugin/config/config.yaml new file mode 100644 index 0000000..caed039 --- /dev/null +++ b/pkg/plugin/config/config.yaml @@ -0,0 +1,3 @@ +plugin: + - type: streamtest + execution_dir: C:/TEMP/test diff --git a/pkg/plugin/main/main.go b/pkg/plugin/main/main.go new file mode 100644 index 0000000..6768fd3 --- /dev/null +++ b/pkg/plugin/main/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "log" + + stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream" + + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" +) + +//nolint:gosimple +func main() { + pluginServer := pl.NewPluginServer() + err := pluginServer.InitPlugin() + if err != nil { + log.Fatalln("Error:", err.Error()) + } + + client, err := pluginServer.GetPluginClient(builtin.StreamServicePluginName) + if err != nil { + log.Fatalln("Error:", err.Error()) + } + raw, ok := client.(stream_plugin.Stream) + if !ok { + log.Fatalln("Error: can't convert rpc plugin to normal wrapper") + } + + sa := &say{} + greet, err := raw.Greeter("这是一个前缀", sa) + if err != nil { + log.Printf("Error: %s\n", err.Error()) + } + log.Println("Greeter:", greet) + + for { + select { + case <-pluginServer.ShutdownCh: + pluginServer.ContextCancel() + if err := pluginServer.RunShutdownFuncs(); err != nil { + log.Fatalln("Error:", err.Error()) + } + log.Println("Server shutdown") + return + } + } + +} + +type say struct { +} + +func (s *say) Say(ss string) string { + return ss + "这是一个后缀v2" +} diff --git a/pkg/plugin/pl/builtin/const.go b/pkg/plugin/pl/builtin/const.go new file mode 100644 index 0000000..f4399f5 --- /dev/null +++ b/pkg/plugin/pl/builtin/const.go @@ -0,0 +1,8 @@ +package builtin + +const ( + StreamServicePluginSetName = "stream-plugin" + StreamServicePluginName = "stream" + RateLimitServicePluginSetName = "ratelimit-plugin" + RateLimitServicePluginName = "ratelimit" +) diff --git a/pkg/plugin/pl/builtin/notifyplugin.go b/pkg/plugin/pl/builtin/notifyplugin.go new file mode 100644 index 0000000..a5b7e36 --- /dev/null +++ b/pkg/plugin/pl/builtin/notifyplugin.go @@ -0,0 +1,15 @@ +package builtin + +import ( + "github.com/opensergo/opensergo-control-plane/pkg/api/v1alpha1" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" +) + +func NotifyPluginRateLimit(r ratelimit_plugin.RateLimit, l *v1alpha1.RateLimitStrategy) error { + limit, err := r.RateLimit(l.Spec.Threshold) + if err != nil { + return err + } + l.Spec.Threshold = limit + return nil +} diff --git a/pkg/plugin/pl/builtin/ratelimit/grpc.go b/pkg/plugin/pl/builtin/ratelimit/grpc.go new file mode 100644 index 0000000..3b40ae2 --- /dev/null +++ b/pkg/plugin/pl/builtin/ratelimit/grpc.go @@ -0,0 +1,36 @@ +package ratelimit_plugin + +import ( + "context" + + v1 "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1" +) + +type GRPCClient struct { + client v1.RateLimitServiceClient +} + +func (g *GRPCClient) RateLimit(t int64) (int64, error) { + resp, err := g.client.RateLimit(context.Background(), &v1.RateLimitRequest{ + Threshold: t, + }) + if err != nil { + return 0, err + } + return resp.Threshold, nil +} + +type GRPCServer struct { + v1.UnimplementedRateLimitServiceServer + Impl RateLimit +} + +func (g *GRPCServer) RateLimit(ctx context.Context, req *v1.RateLimitRequest) (*v1.RateLimitResponse, error) { + resp, err := g.Impl.RateLimit(req.Threshold) + if err != nil { + return nil, err + } + return &v1.RateLimitResponse{ + Threshold: resp, + }, nil +} diff --git a/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go b/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go new file mode 100644 index 0000000..5032e3b --- /dev/null +++ b/pkg/plugin/pl/builtin/ratelimit/ratelimit_plugin.go @@ -0,0 +1,51 @@ +package ratelimit_plugin + +import ( + "context" + "fmt" + + "github.com/hashicorp/go-plugin" + pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/rate_limit/v1" + "google.golang.org/grpc" +) + +type RateLimitPluginServer struct { +} + +var _ RateLimit = (*RateLimitPluginServer)(nil) + +func (s RateLimitPluginServer) RateLimit(t int64) (int64, error) { + return t + 1, nil +} + +type RateLimit interface { + RateLimit(t int64) (int64, error) +} + +type RateLimitPlugin struct { + plugin.Plugin + + impl RateLimit +} + +func NewRateLimitPluginServiceServer(impl RateLimit) (*RateLimitPlugin, error) { + if impl == nil { + return nil, fmt.Errorf("empty underlying ratelimit plugin passed in") + } + return &RateLimitPlugin{ + impl: impl, + }, nil +} + +func (h *RateLimitPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + pb.RegisterRateLimitServiceServer(s, &GRPCServer{ + Impl: h.impl, + }) + return nil +} + +func (h *RateLimitPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (any, error) { + return &GRPCClient{ + client: pb.NewRateLimitServiceClient(c), + }, nil +} diff --git a/pkg/plugin/pl/builtin/stream/grpc.go b/pkg/plugin/pl/builtin/stream/grpc.go new file mode 100644 index 0000000..018b7cd --- /dev/null +++ b/pkg/plugin/pl/builtin/stream/grpc.go @@ -0,0 +1,91 @@ +package stream_plugin + +import ( + "context" + "fmt" + + v1 "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream/v1" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" +) + +type GRPCClient struct { + client v1.RateLimitServiceClient + broker *plugin.GRPCBroker +} + +func (g *GRPCClient) Greeter(name string, h Hello) (string, error) { + addHelperServer := &GRPCHelloServer{Impl: h} + + var s *grpc.Server + serverFunc := func(opts []grpc.ServerOption) *grpc.Server { + s = grpc.NewServer(opts...) + v1.RegisterHelloServer(s, addHelperServer) + + return s + } + + brokerID := g.broker.NextId() + go g.broker.AcceptAndServe(brokerID, serverFunc) + + resp, err := g.client.Greet(context.Background(), &v1.StreamReq{ + Id: brokerID, + Name: name, + }) + if err != nil { + return "", err + } + + s.Stop() + return resp.Greet, nil +} + +type GRPCServer struct { + v1.UnimplementedStreamGreeterServer + Impl Stream + broker *plugin.GRPCBroker +} + +func (g *GRPCServer) Greet(ctx context.Context, req *v1.StreamReq) (*v1.StreamResp, error) { + conn, err := g.broker.Dial(req.Id) + if err != nil { + return nil, err + } + defer conn.Close() + + a := &GRPCHelloClient{v1.NewHelloClient(conn)} + resp, err := g.Impl.Greeter(req.Name, a) + if err != nil { + return nil, err + } + return &v1.StreamResp{ + Greet: resp, + }, nil +} + +type GRPCHelloClient struct { + client v1.HelloClient +} + +func (g *GRPCHelloClient) Say(s string) string { + resp, err := g.client.Say(context.Background(), &v1.HelloReq{ + Pre: s, + }) + if err != nil { + return "" + } + return resp.Resp +} + +type GRPCHelloServer struct { + v1.UnimplementedHelloServer + Impl Hello +} + +func (g *GRPCHelloServer) Say(ctx context.Context, req *v1.HelloReq) (*v1.HelloResp, error) { + resp := g.Impl.Say(fmt.Sprint(req.Pre, " GRPCHelloServer")) + return &v1.HelloResp{ + Resp: resp, + }, nil +} diff --git a/pkg/plugin/pl/builtin/stream/stream_plugin.go b/pkg/plugin/pl/builtin/stream/stream_plugin.go new file mode 100644 index 0000000..ab00f0a --- /dev/null +++ b/pkg/plugin/pl/builtin/stream/stream_plugin.go @@ -0,0 +1,59 @@ +package stream_plugin + +import ( + "context" + "fmt" + + pb "github.com/opensergo/opensergo-control-plane/pkg/plugin/proto/stream/v1" + + "github.com/hashicorp/go-plugin" + "google.golang.org/grpc" +) + +type StreamPluginServer struct { +} + +var _ Stream = (*StreamPluginServer)(nil) + +func (s StreamPluginServer) Greeter(name string, h Hello) (string, error) { + sp := fmt.Sprintf("pre:%s, h:%s\n", name, h.Say("test")) + return sp, nil +} + +type Hello interface { + Say(s string) string +} + +type Stream interface { + Greeter(name string, h Hello) (string, error) +} + +type StreamPlugin struct { + plugin.Plugin + + impl Stream +} + +func NewStreamPluginServiceServer(impl Stream) (*StreamPlugin, error) { + if impl == nil { + return nil, fmt.Errorf("empty underlying stream plugin passed in") + } + return &StreamPlugin{ + impl: impl, + }, nil +} + +func (h *StreamPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error { + pb.RegisterStreamGreeterServer(s, &GRPCServer{ + Impl: h.impl, + broker: broker, + }) + return nil +} + +func (h *StreamPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (any, error) { + return &GRPCClient{ + client: pb.NewStreamGreeterClient(c), + broker: broker, + }, nil +} diff --git a/pkg/plugin/pl/plugin.go b/pkg/plugin/pl/plugin.go new file mode 100644 index 0000000..3a3b999 --- /dev/null +++ b/pkg/plugin/pl/plugin.go @@ -0,0 +1,193 @@ +package pl + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "strings" + "syscall" + + "github.com/opensergo/opensergo-control-plane/pkg/plugin" + pluginclient "github.com/opensergo/opensergo-control-plane/pkg/plugin/client" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/config" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" + + plugin2 "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/plugin" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/plugin/store" + + "github.com/hashicorp/go-secure-stdlib/pluginutil/v2" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/util" +) + +type PluginServer struct { + Context context.Context + ContextCancel context.CancelFunc + Config *config.Config + EnabledPlugin []*EnabledPlugin + Client *pluginclient.PluginClientRegistry + ShutdownCh chan struct{} +} + +type EnabledPlugin struct { + *store.Plugin + PluginName string + ShutdownFuncs []func() error +} + +func NewPluginServer() *PluginServer { + ctx, cancel := context.WithCancel(context.Background()) + ps := &PluginServer{ + Context: ctx, + ContextCancel: cancel, + EnabledPlugin: make([]*EnabledPlugin, 0), + Client: &pluginclient.PluginClientRegistry{}, + ShutdownCh: MakeShutdownCh(), + } + //go func() { + // <-ps.ShutdownCh + // cancel() + // if err := ps.RunShutdownFuncs(); err != nil { + // log.Fatalln("Error:", err.Error()) + // } + // log.Println("Server shutdown") + //}() + return ps +} + +func MakeShutdownCh() chan struct{} { + resultCh := make(chan struct{}) + shutdownCh := make(chan os.Signal, 4) + signal.Notify(shutdownCh, syscall.SIGTERM, syscall.SIGINT) + go func() { + for { + <-shutdownCh + resultCh <- struct{}{} + } + }() + return resultCh +} + +func (p *PluginServer) RunShutdownFuncs() error { + for i := range p.EnabledPlugin { + for _, f := range p.EnabledPlugin[i].ShutdownFuncs { + if err := f(); err != nil { + return fmt.Errorf("error shutting down plugin %s: %w", p.EnabledPlugin[i].PluginName, err) + } + p.Client.DeletePluginClient(p.EnabledPlugin[i].PublicID) + } + } + return nil +} + +func (p *PluginServer) InitPlugin() error { + path := config.GetCurrentAbPathByCaller() + c, err := config.ReadConfig(filepath.Join(filepath.Dir(path), "config/config.yaml")) + if err != nil { + return fmt.Errorf("error reading config: %w", err) + } + + p.Config = c + //for i, _ := range c.ExternalPlugin { + // p.EnabledPlugin = append(p.EnabledPlugin, EnabledPlugin{ + // PluginName: c.ExternalPlugin[i].Type, + // }) + //} + + // enadle builtin plugin + p.EnabledPlugin = append(p.EnabledPlugin, &EnabledPlugin{ + PluginName: builtin.StreamServicePluginName, + }, &EnabledPlugin{ + PluginName: builtin.RateLimitServicePluginName, + }) + err = p.CreatePlugin() + if err != nil { + return fmt.Errorf("error creating plugin: %w", err) + } + + return nil +} + +type creatOption struct { + pluginSetName string + pluginType store.PluginType + executionDir string +} + +func (p *PluginServer) CreatePlugin() error { + for _, enabledPlugin := range p.EnabledPlugin { + switch enabledPlugin.PluginName { + case builtin.StreamServicePluginName: + co := &creatOption{ + pluginSetName: builtin.StreamServicePluginSetName, + pluginType: store.PluginTypeCompute, + executionDir: "", + } + err := p.createplugin(enabledPlugin, co) + if err != nil { + return fmt.Errorf("error CreatePlugin: %w", err) + } + case builtin.RateLimitServicePluginName: + co := &creatOption{ + pluginSetName: builtin.RateLimitServicePluginSetName, + pluginType: store.PluginTypeCompute, + executionDir: "", + } + err := p.createplugin(enabledPlugin, co) + if err != nil { + return fmt.Errorf("error CreatePlugin: %w", err) + } + default: + fmt.Printf("unknow plugin: %s\n", enabledPlugin.PluginName) + } + } + return nil +} + +func (p *PluginServer) createplugin(e *EnabledPlugin, c *creatOption) error { + logger := plugin2.NewLogger(c.pluginSetName) + pluginName := strings.ToLower(e.PluginName) + client, cleanup, err := plugin2.CreatePlugin( + p.Context, + pluginName, + c.pluginSetName, + plugin2.WithPluginOptions( + pluginutil.WithPluginExecutionDirectory(c.executionDir), + pluginutil.WithPluginsFilesystem(util.PluginPrefix, plugin.FileSystem()), + ), + plugin2.WithLogger(logger), + ) + e.ShutdownFuncs = append(e.ShutdownFuncs, cleanup) + if err != nil { + return fmt.Errorf("error creating %s plugin: %w", pluginName, err) + } + + plg, err := p.registerPlugin(p.Context, pluginName, client, c.pluginType, store.WithDescription(fmt.Sprintf("Built-in %s plugin", pluginName))) + if err != nil { + return fmt.Errorf("error registering %s plugin: %w", pluginName, err) + } + e.Plugin = plg + return nil +} + +func (p *PluginServer) registerPlugin(ctx context.Context, name string, client interface{}, flag store.PluginType, opt ...store.Option) (*store.Plugin, error) { + opt = append(opt, store.WithName(name)) + plg := store.NewPlugin(opt...) + err := plg.CreatePlugin(ctx, flag, opt...) + if err != nil { + return nil, err + } + if err = p.Client.RegisterClient(plg.PublicID, client); err != nil { + return nil, err + } + return plg, nil +} + +func (p *PluginServer) GetPluginClient(name string) (interface{}, error) { + client := p.Client.RangePluginClientByName(name) + if client == nil { + return nil, fmt.Errorf("plugin %s not found", name) + } + return client, nil +} diff --git a/pkg/plugin/pl/plugin/load.go b/pkg/plugin/pl/plugin/load.go new file mode 100644 index 0000000..4a8198d --- /dev/null +++ b/pkg/plugin/pl/plugin/load.go @@ -0,0 +1,80 @@ +package plugin + +import ( + "context" + "fmt" + "strings" + + "github.com/hashicorp/go-plugin" + "github.com/hashicorp/go-secure-stdlib/pluginutil/v2" +) + +func CreatePlugin(ctx context.Context, pluginName, pluginSetName string, opt ...Option) (raw any, cleanup func() error, err error) { + raw, cleanup, err = createPlugin(ctx, pluginName, pluginSetName, opt...) + if err != nil { + return nil, cleanup, err + } + + //var ok bool + //hp, ok := raw.(pb.StreamGreeterClient) + //if !ok { + // return nil, cleanup, fmt.Errorf("error converting rpc plugin of type %T to normal wrapper", raw) + //} + + return raw, cleanup, nil +} + +func createPlugin( + ctx context.Context, + pluginName string, + pluginSetName string, + opt ...Option, +) ( + raw any, + cleanup func() error, + retErr error, +) { + defer func() { + if retErr != nil && cleanup != nil { + _ = cleanup() + } + }() + + pluginName = strings.ToLower(pluginName) + + opts, err := getOpts(opt...) + if err != nil { + return nil, nil, fmt.Errorf("error parsing plugin options: %w", err) + } + + // First, scan available plugins, then find the right one to use + pluginMap, err := pluginutil.BuildPluginMap( + append( + opts.withPluginOptions, + pluginutil.WithPluginClientCreationFunc( + func(pluginPath string, _ ...pluginutil.Option) (*plugin.Client, error) { + return NewPluginClient(pluginPath, pluginSetName, WithLogger(opts.withLogger)) + }), + )...) + if err != nil { + return nil, nil, fmt.Errorf("error building plugin map: %w", err) + } + + // Create the plugin and cleanup func + plugClient, cleanup, err := pluginutil.CreatePlugin(pluginMap[pluginName], opts.withPluginOptions...) + if err != nil { + return nil, cleanup, err + } + + switch client := plugClient.(type) { + case plugin.ClientProtocol: + raw, err = client.Dispense(pluginSetName) + if err != nil { + return nil, cleanup, fmt.Errorf("error dispensing %q plugin: %w", pluginSetName, err) + } + default: + return nil, cleanup, fmt.Errorf("unable to understand type %T of raw plugin", raw) + } + + return raw, cleanup, nil +} diff --git a/pkg/plugin/pl/plugin/options.go b/pkg/plugin/pl/plugin/options.go new file mode 100644 index 0000000..1ad1ee0 --- /dev/null +++ b/pkg/plugin/pl/plugin/options.go @@ -0,0 +1,63 @@ +package plugin + +import ( + "fmt" + "os" + + "github.com/hashicorp/go-secure-stdlib/pluginutil/v2" + + "github.com/hashicorp/go-hclog" +) + +// getOpts iterates the inbound Options and returns a struct +func getOpts(opt ...Option) (*options, error) { + opts := getDefaultOptions() + for _, o := range opt { + if o == nil { + continue + } + if err := o(opts); err != nil { + return nil, fmt.Errorf("error running option function: %w", err) + } + } + return opts, nil +} + +// Option - a type that wraps an interface for compile-time safety but can +// contain an option for this package or for wrappers implementing this +// interface. +type Option func(*options) error + +type options struct { + withPluginOptions []pluginutil.Option + withLogger hclog.Logger +} + +func getDefaultOptions() *options { + return &options{} +} + +// WithPluginOptions allows providing plugin-related (as opposed to +// configutil-related) options +func WithPluginOptions(opts ...pluginutil.Option) Option { + return func(o *options) error { + o.withPluginOptions = append(o.withPluginOptions, opts...) + return nil + } +} + +// WithLogger allows passing a logger to the plugin library for debugging +func WithLogger(logger hclog.Logger) Option { + return func(o *options) error { + o.withLogger = logger + return nil + } +} +func NewLogger(name string) hclog.Logger { + logger := hclog.New(&hclog.LoggerOptions{ + Name: name, + Output: os.Stdout, + Level: hclog.Debug, + }) + return logger +} diff --git a/pkg/plugin/pl/plugin/plugin.go b/pkg/plugin/pl/plugin/plugin.go new file mode 100644 index 0000000..01b84bc --- /dev/null +++ b/pkg/plugin/pl/plugin/plugin.go @@ -0,0 +1,86 @@ +package plugin + +import ( + "errors" + "os/exec" + + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin" + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" + stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream" + + "github.com/hashicorp/go-plugin" +) + +// HandshakeConfig is a shared config that can be used regardless of plugin, to +// avoid having to know type-specific things about each plugin +var HandshakeConfig = plugin.HandshakeConfig{ + MagicCookieKey: "OPENSERGO_STREAM_PLUGIN", + MagicCookieValue: "opensergo-plugin", +} + +// ServePlugin starts a plugin server +func ServePlugin(svc any, opt ...Option) error { + opts, err := getOpts(opt...) + if err != nil { + return err + } + + plugins := make(map[string]plugin.Plugin) + if streamSvc, ok := svc.(stream_plugin.Stream); ok { + streamServiceServer, err := stream_plugin.NewStreamPluginServiceServer(streamSvc) + if err != nil { + return err + } + plugins[builtin.StreamServicePluginSetName] = streamServiceServer + } + if ratelimitSvc, ok := svc.(ratelimit_plugin.RateLimit); ok { + ratelimitServiceServer, err := ratelimit_plugin.NewRateLimitPluginServiceServer(ratelimitSvc) + if err != nil { + return err + } + plugins[builtin.RateLimitServicePluginSetName] = ratelimitServiceServer + } + + if len(plugins) == 0 { + return errors.New("no valid plugin server provided") + } + + plugin.Serve(&plugin.ServeConfig{ + HandshakeConfig: HandshakeConfig, + VersionedPlugins: map[int]plugin.PluginSet{ + 1: plugins, + }, + Logger: opts.withLogger, + GRPCServer: plugin.DefaultGRPCServer, + }) + return nil +} + +func NewPluginClient(pluginPath string, setName string, opt ...Option) (*plugin.Client, error) { + opts, err := getOpts(opt...) + if err != nil { + return nil, err + } + + var set plugin.PluginSet + + switch setName { + case builtin.StreamServicePluginSetName: + set = plugin.PluginSet{builtin.StreamServicePluginSetName: &stream_plugin.StreamPlugin{}} + case builtin.RateLimitServicePluginSetName: + set = plugin.PluginSet{builtin.RateLimitServicePluginSetName: &ratelimit_plugin.RateLimitPlugin{}} + } + + return plugin.NewClient(&plugin.ClientConfig{ + HandshakeConfig: HandshakeConfig, + VersionedPlugins: map[int]plugin.PluginSet{ + 1: set, + }, + Cmd: exec.Command(pluginPath), + AllowedProtocols: []plugin.Protocol{ + plugin.ProtocolGRPC, + }, + Logger: opts.withLogger, + AutoMTLS: true, + }), nil +} diff --git a/pkg/plugin/pl/plugin/store/options.go b/pkg/plugin/pl/plugin/store/options.go new file mode 100644 index 0000000..b219a2c --- /dev/null +++ b/pkg/plugin/pl/plugin/store/options.go @@ -0,0 +1,48 @@ +package store + +// GetOpts - iterate the inbound Options and return a struct +func GetOpts(opt ...Option) options { + opts := getDefaultOptions() + for _, o := range opt { + o(&opts) + } + return opts +} + +// Option - how Options are passed as arguments. +type Option func(*options) + +// options = how options are represented +type options struct { + withName string + withDescription string + withPublicID string +} + +func getDefaultOptions() options { + return options{ + withDescription: "", + withName: "", + } +} + +// WithDescription provides an optional description. +func WithDescription(desc string) Option { + return func(o *options) { + o.withDescription = desc + } +} + +// WithName provides an optional name. +func WithName(name string) Option { + return func(o *options) { + o.withName = name + } +} + +// WithPublicID provides an optional specific public ID +func WithPublicID(with string) Option { + return func(o *options) { + o.withPublicID = with + } +} diff --git a/pkg/plugin/pl/plugin/store/plugin.go b/pkg/plugin/pl/plugin/store/plugin.go new file mode 100644 index 0000000..19aa8a9 --- /dev/null +++ b/pkg/plugin/pl/plugin/store/plugin.go @@ -0,0 +1,65 @@ +package store + +import ( + "context" + "fmt" + + "github.com/hashicorp/go-secure-stdlib/base62" + "github.com/pkg/errors" +) + +type PluginType int + +const ( + PluginTypeUnknown PluginType = 0 + PluginTypeCompute PluginType = 1 + PluginTypeComputeOneWay PluginType = 2 + PluginTypeComputeTwoWay PluginType = 3 +) + +type Plugin struct { + PublicID string + PluginType PluginType + ScopeID string + Name string //name of plugin + Description string + Version uint32 + //CreateTime *timestamp.Timestamp + //UpdateTime *timestamp.Timestamp +} + +func NewPlugin(opt ...Option) *Plugin { + opts := GetOpts(opt...) + p := &Plugin{ + ScopeID: Global.String(), + Name: opts.withName, + Description: opts.withDescription, + } + return p +} + +func (p *Plugin) CreatePlugin(ctx context.Context, plugintype PluginType, opt ...Option) error { + p.PluginType = plugintype + + opts := GetOpts(opt...) + + p.PublicID = opts.withPublicID + if p.PublicID == "" { + var err error + p.PublicID, err = newPublicID(p.Name) + if err != nil { + return err + } + } + + return nil +} + +// newPublicID Create a globally unique publicId for a plugin +func newPublicID(name string) (string, error) { + publicID, err := base62.Random(10) + if err != nil { + return "", errors.Wrap(err, "unable to generate id") + } + return fmt.Sprintf("%s-%s", name, publicID), nil +} diff --git a/pkg/plugin/pl/plugin/store/scope.go b/pkg/plugin/pl/plugin/store/scope.go new file mode 100644 index 0000000..92488d1 --- /dev/null +++ b/pkg/plugin/pl/plugin/store/scope.go @@ -0,0 +1,23 @@ +package store + +// Type defines the possible types for Scopes +type Type uint + +const ( + Unknown Type = 0 + Global Type = 1 + Part Type = 2 +) + +func (s Type) String() string { + return [...]string{ + "unknown", + "global", + "part", + }[s] +} + +var Map = map[string]Type{ + Global.String(): Global, + Part.String(): Part, +} diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go b/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go new file mode 100644 index 0000000..bd14489 --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit.pb.go @@ -0,0 +1,222 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: rate_limit.proto + +package __ + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + _ "google.golang.org/protobuf/types/known/anypb" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type RateLimitResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Threshold int64 `protobuf:"varint,1,opt,name=threshold,proto3" json:"threshold,omitempty"` +} + +func (x *RateLimitResponse) Reset() { + *x = RateLimitResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_rate_limit_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RateLimitResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RateLimitResponse) ProtoMessage() {} + +func (x *RateLimitResponse) ProtoReflect() protoreflect.Message { + mi := &file_rate_limit_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RateLimitResponse.ProtoReflect.Descriptor instead. +func (*RateLimitResponse) Descriptor() ([]byte, []int) { + return file_rate_limit_proto_rawDescGZIP(), []int{0} +} + +func (x *RateLimitResponse) GetThreshold() int64 { + if x != nil { + return x.Threshold + } + return 0 +} + +type RateLimitRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Threshold int64 `protobuf:"varint,1,opt,name=threshold,proto3" json:"threshold,omitempty"` +} + +func (x *RateLimitRequest) Reset() { + *x = RateLimitRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_rate_limit_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RateLimitRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RateLimitRequest) ProtoMessage() {} + +func (x *RateLimitRequest) ProtoReflect() protoreflect.Message { + mi := &file_rate_limit_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RateLimitRequest.ProtoReflect.Descriptor instead. +func (*RateLimitRequest) Descriptor() ([]byte, []int) { + return file_rate_limit_proto_rawDescGZIP(), []int{1} +} + +func (x *RateLimitRequest) GetThreshold() int64 { + if x != nil { + return x.Threshold + } + return 0 +} + +var File_rate_limit_proto protoreflect.FileDescriptor + +var file_rate_limit_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x24, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, 0x6f, + 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, 0x61, + 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x22, 0x31, 0x0a, 0x11, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, 0x72, 0x65, + 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x68, 0x72, + 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x22, 0x30, 0x0a, 0x10, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x68, + 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, + 0x68, 0x72, 0x65, 0x73, 0x68, 0x6f, 0x6c, 0x64, 0x32, 0x90, 0x01, 0x0a, 0x10, 0x52, 0x61, 0x74, + 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7c, 0x0a, + 0x09, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x12, 0x36, 0x2e, 0x69, 0x6f, 0x2e, + 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, 0x6f, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, + 0x74, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x37, 0x2e, 0x69, 0x6f, 0x2e, 0x6f, 0x70, 0x65, 0x6e, 0x73, 0x65, 0x72, 0x67, + 0x6f, 0x2e, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x72, + 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x2e, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x04, 0x5a, 0x02, 0x2e, + 0x2f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_rate_limit_proto_rawDescOnce sync.Once + file_rate_limit_proto_rawDescData = file_rate_limit_proto_rawDesc +) + +func file_rate_limit_proto_rawDescGZIP() []byte { + file_rate_limit_proto_rawDescOnce.Do(func() { + file_rate_limit_proto_rawDescData = protoimpl.X.CompressGZIP(file_rate_limit_proto_rawDescData) + }) + return file_rate_limit_proto_rawDescData +} + +var file_rate_limit_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_rate_limit_proto_goTypes = []interface{}{ + (*RateLimitResponse)(nil), // 0: io.opensergo.plugin.proto.rate_limit.RateLimitResponse + (*RateLimitRequest)(nil), // 1: io.opensergo.plugin.proto.rate_limit.RateLimitRequest +} +var file_rate_limit_proto_depIdxs = []int32{ + 1, // 0: io.opensergo.plugin.proto.rate_limit.RateLimitService.RateLimit:input_type -> io.opensergo.plugin.proto.rate_limit.RateLimitRequest + 0, // 1: io.opensergo.plugin.proto.rate_limit.RateLimitService.RateLimit:output_type -> io.opensergo.plugin.proto.rate_limit.RateLimitResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_rate_limit_proto_init() } +func file_rate_limit_proto_init() { + if File_rate_limit_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_rate_limit_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RateLimitResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_rate_limit_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RateLimitRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_rate_limit_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_rate_limit_proto_goTypes, + DependencyIndexes: file_rate_limit_proto_depIdxs, + MessageInfos: file_rate_limit_proto_msgTypes, + }.Build() + File_rate_limit_proto = out.File + file_rate_limit_proto_rawDesc = nil + file_rate_limit_proto_goTypes = nil + file_rate_limit_proto_depIdxs = nil +} diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit.proto b/pkg/plugin/proto/rate_limit/v1/rate_limit.proto new file mode 100644 index 0000000..ae3f908 --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package io.opensergo.plugin.proto.rate_limit; + +option go_package = "./"; +import "google/protobuf/any.proto"; + +message RateLimitResponse{ + int64 threshold = 1; +} + +message RateLimitRequest { + int64 threshold = 1; +} +service RateLimitService { + rpc RateLimit(RateLimitRequest) returns (RateLimitResponse); +} diff --git a/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go b/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go new file mode 100644 index 0000000..94cc33e --- /dev/null +++ b/pkg/plugin/proto/rate_limit/v1/rate_limit_grpc.pb.go @@ -0,0 +1,106 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: rate_limit.proto + +package __ + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// RateLimitServiceClient is the client API for RateLimitService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RateLimitServiceClient interface { + RateLimit(ctx context.Context, in *RateLimitRequest, opts ...grpc.CallOption) (*RateLimitResponse, error) +} + +type rateLimitServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewRateLimitServiceClient(cc grpc.ClientConnInterface) RateLimitServiceClient { + return &rateLimitServiceClient{cc} +} + +func (c *rateLimitServiceClient) RateLimit(ctx context.Context, in *RateLimitRequest, opts ...grpc.CallOption) (*RateLimitResponse, error) { + out := new(RateLimitResponse) + err := c.cc.Invoke(ctx, "/io.opensergo.plugin.proto.rate_limit.RateLimitService/RateLimit", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// RateLimitServiceServer is the server API for RateLimitService service. +// All implementations must embed UnimplementedRateLimitServiceServer +// for forward compatibility +type RateLimitServiceServer interface { + RateLimit(context.Context, *RateLimitRequest) (*RateLimitResponse, error) + mustEmbedUnimplementedRateLimitServiceServer() +} + +// UnimplementedRateLimitServiceServer must be embedded to have forward compatible implementations. +type UnimplementedRateLimitServiceServer struct { +} + +func (UnimplementedRateLimitServiceServer) RateLimit(context.Context, *RateLimitRequest) (*RateLimitResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RateLimit not implemented") +} +func (UnimplementedRateLimitServiceServer) mustEmbedUnimplementedRateLimitServiceServer() {} + +// UnsafeRateLimitServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to RateLimitServiceServer will +// result in compilation errors. +type UnsafeRateLimitServiceServer interface { + mustEmbedUnimplementedRateLimitServiceServer() +} + +func RegisterRateLimitServiceServer(s grpc.ServiceRegistrar, srv RateLimitServiceServer) { + s.RegisterService(&RateLimitService_ServiceDesc, srv) +} + +func _RateLimitService_RateLimit_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RateLimitRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(RateLimitServiceServer).RateLimit(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/io.opensergo.plugin.proto.rate_limit.RateLimitService/RateLimit", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(RateLimitServiceServer).RateLimit(ctx, req.(*RateLimitRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// RateLimitService_ServiceDesc is the grpc.ServiceDesc for RateLimitService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var RateLimitService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "io.opensergo.plugin.proto.rate_limit.RateLimitService", + HandlerType: (*RateLimitServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "RateLimit", + Handler: _RateLimitService_RateLimit_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "rate_limit.proto", +} diff --git a/pkg/plugin/proto/stream/v1/stream.pb.go b/pkg/plugin/proto/stream/v1/stream.pb.go new file mode 100644 index 0000000..d5db1af --- /dev/null +++ b/pkg/plugin/proto/stream/v1/stream.pb.go @@ -0,0 +1,349 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.19.4 +// source: stream.proto + +package v1 + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type StreamReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Id uint32 `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"` +} + +func (x *StreamReq) Reset() { + *x = StreamReq{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamReq) ProtoMessage() {} + +func (x *StreamReq) ProtoReflect() protoreflect.Message { + mi := &file_stream_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamReq.ProtoReflect.Descriptor instead. +func (*StreamReq) Descriptor() ([]byte, []int) { + return file_stream_proto_rawDescGZIP(), []int{0} +} + +func (x *StreamReq) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *StreamReq) GetId() uint32 { + if x != nil { + return x.Id + } + return 0 +} + +type StreamResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Greet string `protobuf:"bytes,1,opt,name=greet,proto3" json:"greet,omitempty"` +} + +func (x *StreamResp) Reset() { + *x = StreamResp{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamResp) ProtoMessage() {} + +func (x *StreamResp) ProtoReflect() protoreflect.Message { + mi := &file_stream_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamResp.ProtoReflect.Descriptor instead. +func (*StreamResp) Descriptor() ([]byte, []int) { + return file_stream_proto_rawDescGZIP(), []int{1} +} + +func (x *StreamResp) GetGreet() string { + if x != nil { + return x.Greet + } + return "" +} + +type HelloReq struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Pre string `protobuf:"bytes,1,opt,name=pre,proto3" json:"pre,omitempty"` +} + +func (x *HelloReq) Reset() { + *x = HelloReq{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HelloReq) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HelloReq) ProtoMessage() {} + +func (x *HelloReq) ProtoReflect() protoreflect.Message { + mi := &file_stream_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HelloReq.ProtoReflect.Descriptor instead. +func (*HelloReq) Descriptor() ([]byte, []int) { + return file_stream_proto_rawDescGZIP(), []int{2} +} + +func (x *HelloReq) GetPre() string { + if x != nil { + return x.Pre + } + return "" +} + +type HelloResp struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Resp string `protobuf:"bytes,1,opt,name=resp,proto3" json:"resp,omitempty"` +} + +func (x *HelloResp) Reset() { + *x = HelloResp{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *HelloResp) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*HelloResp) ProtoMessage() {} + +func (x *HelloResp) ProtoReflect() protoreflect.Message { + mi := &file_stream_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use HelloResp.ProtoReflect.Descriptor instead. +func (*HelloResp) Descriptor() ([]byte, []int) { + return file_stream_proto_rawDescGZIP(), []int{3} +} + +func (x *HelloResp) GetResp() string { + if x != nil { + return x.Resp + } + return "" +} + +var File_stream_proto protoreflect.FileDescriptor + +var file_stream_proto_rawDesc = []byte{ + 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x22, 0x2f, 0x0a, 0x09, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x52, 0x65, 0x71, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x02, 0x69, 0x64, 0x22, 0x22, 0x0a, 0x0a, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x73, 0x70, 0x12, 0x14, 0x0a, 0x05, 0x67, 0x72, 0x65, 0x65, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x67, 0x72, 0x65, 0x65, 0x74, 0x22, 0x1c, 0x0a, 0x08, 0x48, + 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x12, 0x10, 0x0a, 0x03, 0x70, 0x72, 0x65, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x70, 0x72, 0x65, 0x22, 0x1f, 0x0a, 0x09, 0x48, 0x65, 0x6c, + 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, 0x12, 0x12, 0x0a, 0x04, 0x72, 0x65, 0x73, 0x70, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x72, 0x65, 0x73, 0x70, 0x32, 0x3f, 0x0a, 0x0d, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x47, 0x72, 0x65, 0x65, 0x74, 0x65, 0x72, 0x12, 0x2e, 0x0a, 0x05, 0x67, + 0x72, 0x65, 0x65, 0x74, 0x12, 0x11, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x71, 0x1a, 0x12, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, 0x32, 0x33, 0x0a, 0x05, 0x48, + 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x2a, 0x0a, 0x03, 0x73, 0x61, 0x79, 0x12, 0x10, 0x2e, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x71, 0x1a, 0x11, 0x2e, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x52, 0x65, 0x73, 0x70, + 0x42, 0x0a, 0x5a, 0x08, 0x2e, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_stream_proto_rawDescOnce sync.Once + file_stream_proto_rawDescData = file_stream_proto_rawDesc +) + +func file_stream_proto_rawDescGZIP() []byte { + file_stream_proto_rawDescOnce.Do(func() { + file_stream_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_proto_rawDescData) + }) + return file_stream_proto_rawDescData +} + +var file_stream_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_stream_proto_goTypes = []interface{}{ + (*StreamReq)(nil), // 0: stream.StreamReq + (*StreamResp)(nil), // 1: stream.StreamResp + (*HelloReq)(nil), // 2: stream.HelloReq + (*HelloResp)(nil), // 3: stream.HelloResp +} +var file_stream_proto_depIdxs = []int32{ + 0, // 0: stream.StreamGreeter.greet:input_type -> stream.StreamReq + 2, // 1: stream.Hello.say:input_type -> stream.HelloReq + 1, // 2: stream.StreamGreeter.greet:output_type -> stream.StreamResp + 3, // 3: stream.Hello.say:output_type -> stream.HelloResp + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_stream_proto_init() } +func file_stream_proto_init() { + if File_stream_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_stream_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HelloReq); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*HelloResp); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_stream_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 2, + }, + GoTypes: file_stream_proto_goTypes, + DependencyIndexes: file_stream_proto_depIdxs, + MessageInfos: file_stream_proto_msgTypes, + }.Build() + File_stream_proto = out.File + file_stream_proto_rawDesc = nil + file_stream_proto_goTypes = nil + file_stream_proto_depIdxs = nil +} diff --git a/pkg/plugin/proto/stream/v1/stream.proto b/pkg/plugin/proto/stream/v1/stream.proto new file mode 100644 index 0000000..ad9045c --- /dev/null +++ b/pkg/plugin/proto/stream/v1/stream.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package stream; +option go_package = "./"; + +//protoc --go_out=. --go-grpc_out=. stream.proto + +message StreamReq { + string name = 1; + uint32 id = 2; +} + +message StreamResp { + string greet = 1; +} +message HelloReq { + string pre = 1; +} +message HelloResp { + string resp = 1; +} + +service StreamGreeter { + rpc greet(StreamReq) returns (StreamResp); +} + +service Hello { + rpc say(HelloReq) returns (HelloResp); +} diff --git a/pkg/plugin/proto/stream/v1/stream_grpc.pb.go b/pkg/plugin/proto/stream/v1/stream_grpc.pb.go new file mode 100644 index 0000000..71e435f --- /dev/null +++ b/pkg/plugin/proto/stream/v1/stream_grpc.pb.go @@ -0,0 +1,192 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.19.4 +// source: stream.proto + +package v1 + +import ( + context "context" + + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// RateLimitServiceClient is the client API for StreamGreeter service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type RateLimitServiceClient interface { + Greet(ctx context.Context, in *StreamReq, opts ...grpc.CallOption) (*StreamResp, error) +} + +type streamGreeterClient struct { + cc grpc.ClientConnInterface +} + +func NewStreamGreeterClient(cc grpc.ClientConnInterface) RateLimitServiceClient { + return &streamGreeterClient{cc} +} + +func (c *streamGreeterClient) Greet(ctx context.Context, in *StreamReq, opts ...grpc.CallOption) (*StreamResp, error) { + out := new(StreamResp) + err := c.cc.Invoke(ctx, "/stream.StreamGreeter/greet", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// StreamGreeterServer is the server API for StreamGreeter service. +// All implementations must embed UnimplementedStreamGreeterServer +// for forward compatibility +type StreamGreeterServer interface { + Greet(context.Context, *StreamReq) (*StreamResp, error) + mustEmbedUnimplementedStreamGreeterServer() +} + +// UnimplementedStreamGreeterServer must be embedded to have forward compatible implementations. +type UnimplementedStreamGreeterServer struct { +} + +func (UnimplementedStreamGreeterServer) Greet(context.Context, *StreamReq) (*StreamResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method RateLimit not implemented") +} +func (UnimplementedStreamGreeterServer) mustEmbedUnimplementedStreamGreeterServer() {} + +// UnsafeStreamGreeterServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to StreamGreeterServer will +// result in compilation errors. +type UnsafeStreamGreeterServer interface { + mustEmbedUnimplementedStreamGreeterServer() +} + +func RegisterStreamGreeterServer(s grpc.ServiceRegistrar, srv StreamGreeterServer) { + s.RegisterService(&StreamGreeter_ServiceDesc, srv) +} + +func _StreamGreeter_Greet_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StreamReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(StreamGreeterServer).Greet(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/stream.StreamGreeter/greet", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(StreamGreeterServer).Greet(ctx, req.(*StreamReq)) + } + return interceptor(ctx, in, info, handler) +} + +// StreamGreeter_ServiceDesc is the grpc.ServiceDesc for StreamGreeter service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var StreamGreeter_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "stream.StreamGreeter", + HandlerType: (*StreamGreeterServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "greet", + Handler: _StreamGreeter_Greet_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "stream.proto", +} + +// HelloClient is the client API for Hello service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HelloClient interface { + Say(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) +} + +type helloClient struct { + cc grpc.ClientConnInterface +} + +func NewHelloClient(cc grpc.ClientConnInterface) HelloClient { + return &helloClient{cc} +} + +func (c *helloClient) Say(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) { + out := new(HelloResp) + err := c.cc.Invoke(ctx, "/stream.Hello/say", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// HelloServer is the server API for Hello service. +// All implementations must embed UnimplementedHelloServer +// for forward compatibility +type HelloServer interface { + Say(context.Context, *HelloReq) (*HelloResp, error) + mustEmbedUnimplementedHelloServer() +} + +// UnimplementedHelloServer must be embedded to have forward compatible implementations. +type UnimplementedHelloServer struct { +} + +func (UnimplementedHelloServer) Say(context.Context, *HelloReq) (*HelloResp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Say not implemented") +} +func (UnimplementedHelloServer) mustEmbedUnimplementedHelloServer() {} + +// UnsafeHelloServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HelloServer will +// result in compilation errors. +type UnsafeHelloServer interface { + mustEmbedUnimplementedHelloServer() +} + +func RegisterHelloServer(s grpc.ServiceRegistrar, srv HelloServer) { + s.RegisterService(&Hello_ServiceDesc, srv) +} + +func _Hello_Say_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(HelloReq) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(HelloServer).Say(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/stream.Hello/say", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(HelloServer).Say(ctx, req.(*HelloReq)) + } + return interceptor(ctx, in, info, handler) +} + +// Hello_ServiceDesc is the grpc.ServiceDesc for Hello service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Hello_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "stream.Hello", + HandlerType: (*HelloServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "say", + Handler: _Hello_Say_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "stream.proto", +} diff --git a/pkg/plugin/scripts/plugin.sh b/pkg/plugin/scripts/plugin.sh new file mode 100644 index 0000000..0adfee0 --- /dev/null +++ b/pkg/plugin/scripts/plugin.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: MPL-2.0 + +# +# This script builds the required plugins. +set -e + +GOOS=$(go env GOOS) +echo "${GOOS}" +BINARY_SUFFIX="" +if [ "${GOOS}" = "windows" ]; then + BINARY_SUFFIX=".exe" +fi + +# Get the parent directory of where this script is. +SOURCE="${BASH_SOURCE[0]}" +while [ -h "$SOURCE" ] ; do SOURCE="$(readlink "$SOURCE")"; done +export DIR="$( cd -P "$( dirname "$SOURCE" )/.." && pwd )" +echo "DIR: $DIR" +# Create plugins +echo "==> Building opensergo plugins..." +rm -f $DIR/assets/opensergo-plugin-* +for CURR_PLUGIN in $(ls $DIR/server); do + cd $DIR/server/$CURR_PLUGIN; + go build -o $DIR/assets/opensergo-plugin-${CURR_PLUGIN}${BINARY_SUFFIX} .; + cd $DIR; +done; +#cd $DIR/assets; +#for CURR_PLUGIN in $(ls opensergo-plugin*); do +# gzip -f -9 $CURR_PLUGIN; +#done; +echo "==> Building opensergo plugins... DONE" \ No newline at end of file diff --git a/pkg/plugin/server/ratelimit/main.go b/pkg/plugin/server/ratelimit/main.go new file mode 100644 index 0000000..9d66e99 --- /dev/null +++ b/pkg/plugin/server/ratelimit/main.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "os" + + ratelimit_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/ratelimit" + + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/plugin" +) + +func main() { + //log := hclog.New(&hclog.LoggerOptions{ + // Output: os.Stderr, + // Level: hclog.Trace, + // JSONFormat: true, + //}), plugin.WithLogger(log) + b := NewBuiltinPlugin() + if err := plugin.ServePlugin(b); err != nil { + fmt.Println("Error serving plugin", err) + os.Exit(1) + } + os.Exit(0) +} + +var ( + _ ratelimit_plugin.RateLimit = (*ratelimit_plugin.RateLimitPluginServer)(nil) +) + +type BuiltinPlugin struct { + *ratelimit_plugin.RateLimitPluginServer +} + +func NewBuiltinPlugin() *BuiltinPlugin { + return &BuiltinPlugin{ + RateLimitPluginServer: &ratelimit_plugin.RateLimitPluginServer{}, + } +} diff --git a/pkg/plugin/server/stream/main.go b/pkg/plugin/server/stream/main.go new file mode 100644 index 0000000..be33d31 --- /dev/null +++ b/pkg/plugin/server/stream/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "fmt" + "os" + + stream_plugin "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/builtin/stream" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl/plugin" +) + +func main() { + //log := hclog.New(&hclog.LoggerOptions{ + // Output: os.Stderr, + // Level: hclog.Trace, + // JSONFormat: true, + //}), plugin.WithLogger(log) + b := NewStreamPlugin() + if err := plugin.ServePlugin(b); err != nil { + fmt.Println("Error serving plugin", err) + os.Exit(1) + } + os.Exit(0) +} + +var ( + _ stream_plugin.Stream = (*stream_plugin.StreamPluginServer)(nil) +) + +type BuiltinPlugin struct { + *stream_plugin.StreamPluginServer +} + +func NewStreamPlugin() *BuiltinPlugin { + return &BuiltinPlugin{ + StreamPluginServer: &stream_plugin.StreamPluginServer{}, + } +} diff --git a/pkg/plugin/util/const.go b/pkg/plugin/util/const.go new file mode 100644 index 0000000..9d67ed0 --- /dev/null +++ b/pkg/plugin/util/const.go @@ -0,0 +1,3 @@ +package util + +const PluginPrefix = "opensergo-plugin-" diff --git a/pkg/transport/grpc/server.go b/pkg/transport/grpc/server.go index ba94169..50f5e65 100644 --- a/pkg/transport/grpc/server.go +++ b/pkg/transport/grpc/server.go @@ -16,15 +16,15 @@ package grpc import ( "fmt" - "io" - "log" - "net" - "github.com/opensergo/opensergo-control-plane/pkg/model" + "github.com/opensergo/opensergo-control-plane/pkg/plugin/pl" trpb "github.com/opensergo/opensergo-control-plane/pkg/proto/transport/v1" "github.com/opensergo/opensergo-control-plane/pkg/util" "go.uber.org/atomic" "google.golang.org/grpc" + "io" + "log" + "net" ) const ( @@ -40,16 +40,20 @@ type Server struct { port uint32 started *atomic.Bool + + PluginServer *pl.PluginServer } func NewServer(port uint32, subscribeHandlers []model.SubscribeRequestHandler) *Server { connectionManager := NewConnectionManager() + pluginServer := pl.NewPluginServer() return &Server{ transportServer: newTransportServer(connectionManager, subscribeHandlers), port: port, grpcServer: grpc.NewServer(), started: atomic.NewBool(false), connectionManager: connectionManager, + PluginServer: pluginServer, } } @@ -62,6 +66,21 @@ func (s *Server) ComponentName() string { } func (s *Server) Run() error { + go func() { + for { + select { + case <-s.PluginServer.ShutdownCh: + s.PluginServer.ContextCancel() + if err := s.PluginServer.RunShutdownFuncs(); err != nil { + log.Printf("Error:%s\n", err.Error()) + } + s.grpcServer.GracefulStop() + log.Println("Server shutdown") + return + } + } + }() + if s.started.CAS(false, true) { listener, err := net.Listen("tcp", fmt.Sprintf(":%d", s.port)) if err != nil { @@ -69,6 +88,11 @@ func (s *Server) Run() error { } trpb.RegisterOpenSergoUniversalTransportServiceServer(s.grpcServer, s.transportServer) + + err = s.PluginServer.InitPlugin() + if err != nil { + return err + } err = s.grpcServer.Serve(listener) if err != nil { return err