diff --git a/.configs/sqlc.yaml b/.configs/sqlc.yaml index 4b5e2ccfa..340fbe85e 100644 --- a/.configs/sqlc.yaml +++ b/.configs/sqlc.yaml @@ -174,3 +174,12 @@ sql: <<: *default_go package: "sessionsql" out: "../internal/session/sessionsql" + + - <<: *default_domain + name: "Search SQL" + queries: "../internal/search/queries" + gen: + go: + <<: *default_go + package: "searchsql" + out: "../internal/search/searchsql" diff --git a/go.mod b/go.mod index c73e5b362..36bd892d2 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,8 @@ require ( cloud.google.com/go/pubsub v1.47.0 github.com/99designs/gqlgen v0.17.66 github.com/GoogleCloudPlatform/k8s-config-connector v1.128.0 + github.com/blevesearch/bleve/v2 v2.4.4 + github.com/blevesearch/bleve_index_api v1.1.12 github.com/bombsimon/logrusr/v4 v4.1.0 github.com/btcsuite/btcutil v1.0.2 github.com/coreos/go-oidc/v3 v3.12.0 @@ -97,10 +99,28 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.26.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.50.0 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect + github.com/RoaringBitmap/roaring v1.9.3 // indirect github.com/agnivade/levenshtein v1.2.1 // indirect github.com/antlr4-go/antlr/v4 v4.13.1 // indirect github.com/apache/arrow/go/v15 v15.0.2 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.12.0 // indirect + github.com/blevesearch/geo v0.1.20 // indirect + github.com/blevesearch/go-faiss v1.0.24 // indirect + github.com/blevesearch/go-porterstemmer v1.0.3 // indirect + github.com/blevesearch/gtreap v0.1.1 // indirect + github.com/blevesearch/mmap-go v1.0.4 // indirect + github.com/blevesearch/scorch_segment_api/v2 v2.2.16 // indirect + github.com/blevesearch/segment v0.9.1 // indirect + github.com/blevesearch/snowballstem v0.9.0 // indirect + github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect + github.com/blevesearch/vellum v1.0.10 // indirect + github.com/blevesearch/zapx/v11 v11.3.10 // indirect + github.com/blevesearch/zapx/v12 v12.3.10 // indirect + github.com/blevesearch/zapx/v13 v13.3.10 // indirect + github.com/blevesearch/zapx/v14 v14.3.10 // indirect + github.com/blevesearch/zapx/v15 v15.3.16 // indirect + github.com/blevesearch/zapx/v16 v16.1.9-0.20241217210638-a0519e7caf3b // indirect github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/ccojocar/zxcvbn-go v1.0.2 // indirect @@ -147,8 +167,10 @@ require ( github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/goccy/go-json v0.10.5 // indirect github.com/gogo/protobuf v1.3.2 // indirect + github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/cel-go v0.22.1 // indirect github.com/google/flatbuffers v25.2.10+incompatible // indirect github.com/google/generative-ai-go v0.19.0 // indirect @@ -201,6 +223,7 @@ require ( github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/morikuni/aec v1.0.0 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v0.1.9 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect @@ -252,6 +275,7 @@ require ( github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect + go.etcd.io/bbolt v1.3.9 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib v1.34.0 // indirect @@ -261,7 +285,7 @@ require ( go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/arch v0.8.0 // indirect - golang.org/x/crypto v0.33.0 // indirect + golang.org/x/crypto v0.35.0 // indirect golang.org/x/exp/typeparams v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.23.0 // indirect golang.org/x/net v0.35.0 // indirect diff --git a/go.sum b/go.sum index 189a1a211..6f8881dd1 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEV github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/PuerkitoBio/goquery v1.9.3 h1:mpJr/ikUA9/GNJB/DBZcGeFDXUtosHRyRrwh7KGdTG0= github.com/PuerkitoBio/goquery v1.9.3/go.mod h1:1ndLHPdTz+DyQPICCWYlYQMPl0oXZj0G6D4LCYA6u4U= +github.com/RoaringBitmap/roaring v1.9.3 h1:t4EbC5qQwnisr5PrP9nt0IRhRTb9gMUgQF4t4S2OByM= +github.com/RoaringBitmap/roaring v1.9.3/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/agnivade/levenshtein v1.2.1 h1:EHBY3UOn1gwdy/VbFwgo4cxecRznFk7fKWN1KOX7eoM= github.com/agnivade/levenshtein v1.2.1/go.mod h1:QVVI16kDrtSuwcpd0p1+xMC6Z/VfhtCyDIjcwga4/DU= @@ -86,6 +88,44 @@ github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjk github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/bits-and-blooms/bitset v1.12.0 h1:U/q1fAF7xXRhFCrhROzIfffYnu+dlS38vCZtmFVPHmA= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/blevesearch/bleve/v2 v2.4.4 h1:RwwLGjUm54SwyyykbrZs4vc1qjzYic4ZnAnY9TwNl60= +github.com/blevesearch/bleve/v2 v2.4.4/go.mod h1:fa2Eo6DP7JR+dMFpQe+WiZXINKSunh7WBtlDGbolKXk= +github.com/blevesearch/bleve_index_api v1.1.12 h1:P4bw9/G/5rulOF7SJ9l4FsDoo7UFJ+5kexNy1RXfegY= +github.com/blevesearch/bleve_index_api v1.1.12/go.mod h1:PbcwjIcRmjhGbkS/lJCpfgVSMROV6TRubGGAODaK1W8= +github.com/blevesearch/geo v0.1.20 h1:paaSpu2Ewh/tn5DKn/FB5SzvH0EWupxHEIwbCk/QPqM= +github.com/blevesearch/geo v0.1.20/go.mod h1:DVG2QjwHNMFmjo+ZgzrIq2sfCh6rIHzy9d9d0B59I6w= +github.com/blevesearch/go-faiss v1.0.24 h1:K79IvKjoKHdi7FdiXEsAhxpMuns0x4fM0BO93bW5jLI= +github.com/blevesearch/go-faiss v1.0.24/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= +github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= +github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= +github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y= +github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk= +github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= +github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= +github.com/blevesearch/scorch_segment_api/v2 v2.2.16 h1:uGvKVvG7zvSxCwcm4/ehBa9cCEuZVE+/zvrSl57QUVY= +github.com/blevesearch/scorch_segment_api/v2 v2.2.16/go.mod h1:VF5oHVbIFTu+znY1v30GjSpT5+9YFs9dV2hjvuh34F0= +github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU= +github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw= +github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= +github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs= +github.com/blevesearch/upsidedown_store_api v1.0.2 h1:U53Q6YoWEARVLd1OYNc9kvhBMGZzVrdmaozG2MfoB+A= +github.com/blevesearch/upsidedown_store_api v1.0.2/go.mod h1:M01mh3Gpfy56Ps/UXHjEO/knbqyQ1Oamg8If49gRwrQ= +github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxyMIPI= +github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k= +github.com/blevesearch/zapx/v11 v11.3.10 h1:hvjgj9tZ9DeIqBCxKhi70TtSZYMdcFn7gDb71Xo/fvk= +github.com/blevesearch/zapx/v11 v11.3.10/go.mod h1:0+gW+FaE48fNxoVtMY5ugtNHHof/PxCqh7CnhYdnMzQ= +github.com/blevesearch/zapx/v12 v12.3.10 h1:yHfj3vXLSYmmsBleJFROXuO08mS3L1qDCdDK81jDl8s= +github.com/blevesearch/zapx/v12 v12.3.10/go.mod h1:0yeZg6JhaGxITlsS5co73aqPtM04+ycnI6D1v0mhbCs= +github.com/blevesearch/zapx/v13 v13.3.10 h1:0KY9tuxg06rXxOZHg3DwPJBjniSlqEgVpxIqMGahDE8= +github.com/blevesearch/zapx/v13 v13.3.10/go.mod h1:w2wjSDQ/WBVeEIvP0fvMJZAzDwqwIEzVPnCPrz93yAk= +github.com/blevesearch/zapx/v14 v14.3.10 h1:SG6xlsL+W6YjhX5N3aEiL/2tcWh3DO75Bnz77pSwwKU= +github.com/blevesearch/zapx/v14 v14.3.10/go.mod h1:qqyuR0u230jN1yMmE4FIAuCxmahRQEOehF78m6oTgns= +github.com/blevesearch/zapx/v15 v15.3.16 h1:Ct3rv7FUJPfPk99TI/OofdC+Kpb4IdyfdMH48sb+FmE= +github.com/blevesearch/zapx/v15 v15.3.16/go.mod h1:Turk/TNRKj9es7ZpKK95PS7f6D44Y7fAFy8F4LXQtGg= +github.com/blevesearch/zapx/v16 v16.1.9-0.20241217210638-a0519e7caf3b h1:ju9Az5YgrzCeK3M1QwvZIpxYhChkXp7/L0RhDYsxXoE= +github.com/blevesearch/zapx/v16 v16.1.9-0.20241217210638-a0519e7caf3b/go.mod h1:BlrYNpOu4BvVRslmIG+rLtKhmjIaRhIbG8sb9scGTwI= github.com/bombsimon/logrusr/v4 v4.1.0 h1:uZNPbwusB0eUXlO8hIUwStE6Lr5bLN6IgYgG+75kuh4= github.com/bombsimon/logrusr/v4 v4.1.0/go.mod h1:pjfHC5e59CvjTBIU3V3sGhFWFAnsnhOR03TRc6im0l8= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= @@ -236,6 +276,8 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang/geo v0.0.0-20210211234256-740aa86cb551 h1:gtexQ/VGyN+VVFRXSFiguSNcXmS6rkKT+X7FdIrTtfo= +github.com/golang/geo v0.0.0-20210211234256-740aa86cb551/go.mod h1:QZ0nwyI2jOfgRAoBvP+ab5aRr7c9x7lhGEJrKvBwjWI= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= @@ -422,6 +464,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f h1:KUppIJq7/+SVif2QVs3tOP0zanoHgBEVAwHxUSIzRqU= @@ -633,6 +677,8 @@ github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.einride.tech/aip v0.68.1 h1:16/AfSxcQISGN5z9C5lM+0mLYXihrHbQ1onvYTr93aQ= go.einride.tech/aip v0.68.1/go.mod h1:XaFtaj4HuA3Zwk9xoBtTWgNubZ0ZZXv9BZJCkuKuWbg= +go.etcd.io/bbolt v1.3.9 h1:8x7aARPEXiXbHmtUwAIv7eV2fQFHrLLavdiJ3uzJXoI= +go.etcd.io/bbolt v1.3.9/go.mod h1:zaO32+Ti0PK1ivdPtgMESzuzL2VPoIG1PCQNvOdo/dE= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -691,8 +737,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200115085410-6d4e4cb37c7d/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= -golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= +golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= +golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa h1:t2QcU6V556bFjYgu4L6C+6VrCPyJZ+eyRsABUPs1mz4= golang.org/x/exp v0.0.0-20250218142911-aa4b98e5adaa/go.mod h1:BHOTPb3L19zxehTsLoJXVaTktb06DFgmdW6Wb9s8jqk= diff --git a/internal/cmd/api/api.go b/internal/cmd/api/api.go index 2550fc863..2b98b2c69 100644 --- a/internal/cmd/api/api.go +++ b/internal/cmd/api/api.go @@ -13,6 +13,7 @@ import ( "github.com/joho/godotenv" "github.com/nais/api/internal/auth/authn" "github.com/nais/api/internal/database" + "github.com/nais/api/internal/database/notify" "github.com/nais/api/internal/deployment" "github.com/nais/api/internal/graph" "github.com/nais/api/internal/graph/gengql" @@ -184,6 +185,10 @@ func run(ctx context.Context, cfg *Config, log logrus.FieldLogger) error { wg, ctx := errgroup.WithContext(ctx) + // Notifier to use only one connection to the database for LISTEN/NOTIFY pattern + notifier := notify.New(pool, log) + go notifier.Run(ctx) + // HTTP server wg.Go(func() error { return runHttpServer( @@ -202,6 +207,7 @@ func run(ctx context.Context, cfg *Config, log logrus.FieldLogger) error { hookdClient, cfg.Unleash.BifrostApiUrl, cfg.Logging.DefaultLogDestinations(), + notifier, log, ) }) diff --git a/internal/cmd/api/http.go b/internal/cmd/api/http.go index 6065fb03b..966905d4d 100644 --- a/internal/cmd/api/http.go +++ b/internal/cmd/api/http.go @@ -17,6 +17,7 @@ import ( "github.com/nais/api/internal/auth/middleware" "github.com/nais/api/internal/cost" "github.com/nais/api/internal/database" + "github.com/nais/api/internal/database/notify" "github.com/nais/api/internal/deployment" "github.com/nais/api/internal/feature" "github.com/nais/api/internal/github/repository" @@ -31,6 +32,7 @@ import ( "github.com/nais/api/internal/persistence/sqlinstance" "github.com/nais/api/internal/persistence/valkey" "github.com/nais/api/internal/reconciler" + "github.com/nais/api/internal/search" "github.com/nais/api/internal/serviceaccount" "github.com/nais/api/internal/session" "github.com/nais/api/internal/team" @@ -72,6 +74,7 @@ func runHttpServer( hookdClient hookd.Client, bifrostAPIURL string, defaultLogDestinations []logging.SupportedLogDestination, + notifier *notify.Notifier, log logrus.FieldLogger, ) error { router := chi.NewRouter() @@ -92,6 +95,7 @@ func runHttpServer( hookdClient, bifrostAPIURL, defaultLogDestinations, + notifier, log, ) if err != nil { @@ -180,6 +184,7 @@ func ConfigureGraph( hookdClient hookd.Client, bifrostAPIURL string, defaultLogDestinations []logging.SupportedLogDestination, + notifier *notify.Notifier, log logrus.FieldLogger, ) (func(http.Handler) http.Handler, error) { appWatcher := application.NewWatcher(ctx, watcherMgr) @@ -198,6 +203,28 @@ func ConfigureGraph( namespaceWatcher := team.NewNamespaceWatcher(ctx, watcherMgr) unleashWatcher := unleash.NewWatcher(ctx, mgmtWatcherMgr) + searcher, err := search.New(ctx, pool, log.WithField("subsystem", "search_bleve")) + if err != nil { + return nil, fmt.Errorf("init bleve: %w", err) + } + + // Searchers searchers + application.AddSearch(searcher, appWatcher) + job.AddSearch(searcher, jobWatcher) + bigquery.AddSearch(searcher, bqWatcher) + bucket.AddSearch(searcher, bucketWatcher) + kafkatopic.AddSearch(searcher, kafkaTopicWatcher) + opensearch.AddSearch(searcher, openSearchWatcher) + redis.AddSearch(searcher, redisWatcher) + sqlinstance.AddSearch(searcher, sqlInstanceWatcher) + valkey.AddSearch(searcher, valkeyWatcher) + team.AddSearch(searcher, pool, notifier, log.WithField("subsystem", "team_search")) + + // Re-index all to initialize the search index + if err := searcher.ReIndex(ctx); err != nil { + return nil, fmt.Errorf("reindex all: %w", err) + } + sqlAdminService, err := sqlinstance.NewClient(ctx, log, sqlinstance.WithFakeClients(fakeClients), sqlinstance.WithInstanceWatcher(sqlInstanceWatcher)) if err != nil { return nil, fmt.Errorf("create SQL Admin service: %w", err) @@ -234,7 +261,7 @@ func ConfigureGraph( return nil, errors.New("timed out waiting for watchers to be ready") } - return loader.Middleware(func(ctx context.Context) context.Context { + setupContext := func(ctx context.Context) context.Context { ctx = podlog.NewLoaderContext(ctx, podLogStreamer) ctx = application.NewLoaderContext(ctx, appWatcher, ingressWatcher) ctx = bigquery.NewLoaderContext(ctx, bqWatcher) @@ -261,6 +288,7 @@ func ConfigureGraph( ctx = deployment.NewLoaderContext(ctx, pool, hookdClient) ctx = serviceaccount.NewLoaderContext(ctx, pool) ctx = session.NewLoaderContext(ctx, pool) + ctx = search.NewLoaderContext(ctx, pool, searcher) ctx = unleash.NewLoaderContext(ctx, tenantName, unleashWatcher, bifrostAPIURL, log) ctx = logging.NewPackageContext(ctx, tenantName, defaultLogDestinations) ctx = feature.NewLoaderContext( @@ -272,5 +300,7 @@ func ConfigureGraph( openSearchWatcher.Enabled(), ) return ctx - }), nil + } + + return loader.Middleware(setupContext), nil } diff --git a/internal/database/migrations/0032_team_notification.sql b/internal/database/migrations/0032_team_notification.sql new file mode 100644 index 000000000..52990bcba --- /dev/null +++ b/internal/database/migrations/0032_team_notification.sql @@ -0,0 +1,50 @@ +-- +goose Up +-- +goose StatementBegin +CREATE OR REPLACE FUNCTION api_notify () RETURNS trigger AS $$ +BEGIN + -- We accept a number of keys as arguments, and will read the values using NEW if it is set, or OLD if it is not. + -- We will then send a notification to api_notifiy with a JSON object containing the keys and values, as well as + -- the table name and operation. + DECLARE + values text[]; + i integer := 0; + key text; + BEGIN + IF TG_NARGS > 0 AND TG_OP IN ('CREATE', 'UPDATE', 'DELETE') THEN + FOREACH key IN ARRAY TG_ARGV LOOP + IF TG_OP != 'DELETE' THEN + values := array_append(values, row_to_json(NEW)->>key); + ELSE + values := array_append(values, row_to_json(OLD)->>key); + END IF; + i := i + 1; + END LOOP; + END IF; + + -- Construct the JSON object and send the notification. The JSON object will be of the form: + -- { + -- "table": "table_name", + -- "op": "operation", + -- "data": { + -- "key1": "value1", + -- "key2": "value2", + -- ... + -- } + -- } + PERFORM pg_notify('api_notify', jsonb_build_object('table', TG_TABLE_NAME, 'op', TG_OP, 'data', jsonb_object(TG_ARGV, values))::text); + RETURN NULL; + END; +RETURN NULL; +END; +$$ LANGUAGE plpgsql +; + +-- +goose StatementEnd +CREATE +OR REPLACE TRIGGER teams_notify +AFTER INSERT +OR +UPDATE +OR DELETE ON teams FOR EACH ROW +EXECUTE PROCEDURE api_notify ("slug", "purpose") +; diff --git a/internal/database/notify/notifier.go b/internal/database/notify/notifier.go new file mode 100644 index 000000000..3b78f5155 --- /dev/null +++ b/internal/database/notify/notifier.go @@ -0,0 +1,177 @@ +package notify + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/sirupsen/logrus" +) + +type Operation string + +const ( + Insert Operation = "INSERT" + Update Operation = "UPDATE" + Delete Operation = "DELETE" +) + +// Payload is the payload of a notification +// You should not change this struct, as it's used by all listeners +type Payload struct { + Table string `json:"table"` + Op Operation `json:"op"` + Data map[string]any `json:"data"` +} + +type listener struct { + ch chan Payload +} + +type Option func(n *Notifier) + +func WithRetries(num int) Option { + return func(n *Notifier) { + n.maxRetries = num + } +} + +type Notifier struct { + db *pgxpool.Pool + log logrus.FieldLogger + channel string + maxRetries int + + lock sync.RWMutex + listeners map[string][]listener +} + +func New(db *pgxpool.Pool, log logrus.FieldLogger, opts ...Option) *Notifier { + n := &Notifier{ + db: db, + channel: "api_notify", + log: log, + listeners: map[string][]listener{}, + maxRetries: 100, + } + + for _, opt := range opts { + opt(n) + } + + return n +} + +func (n *Notifier) Run(ctx context.Context) { + retries := 0 + lastError := time.Now() + + for { + select { + case <-ctx.Done(): + return + default: + if err := n.run(ctx); err != nil { + n.log.WithError(err).Error("error running notifier") + if n.maxRetries == 0 || retries >= n.maxRetries { + n.log.Errorf("max retries reached, shutting down notifier") + return + } + if time.Since(lastError) > time.Minute { + retries = 0 + } + + retries++ + lastError = time.Now() + + time.Sleep(time.Duration(retries) * time.Second) + } + } + } +} + +// SetChannel sets the channel to listen on +// Will not take effect after Run has been called +func (n *Notifier) SetChannel(channel string) { + n.channel = channel +} + +// Listen returns a channel that will receive notifications for the given table +// It will receive all notifications for the table unless one or more filters are provided +func (n *Notifier) Listen(table string) <-chan Payload { + n.lock.Lock() + defer n.lock.Unlock() + + n.log.WithField("table", table).Debug("registering listener") + + ch := make(chan Payload, 20) + n.listeners[table] = append(n.listeners[table], listener{ + ch: ch, + }) + + return ch +} + +func (n *Notifier) run(ctx context.Context) error { + conn, err := n.db.Acquire(ctx) + if err != nil { + return fmt.Errorf("acquire connection: %w", err) + } + defer conn.Release() + + if _, err := conn.Exec(ctx, "LISTEN "+n.channel); err != nil { + return fmt.Errorf("listen: %w", err) + } + + for { + not, err := conn.Conn().WaitForNotification(ctx) + if err != nil { + switch { + case errors.Is(err, pgx.ErrTxClosed): + return nil + case errors.Is(err, io.ErrUnexpectedEOF): + n.log.WithError(err).Infof("listener got unexpected EOF, retry") + continue + } + + return fmt.Errorf("wait for notification: %w", err) + } + + payload := Payload{} + if err := json.Unmarshal([]byte(not.Payload), &payload); err != nil { + return fmt.Errorf("unmarshal payload: %w", err) + } + + go n.distibute(payload) + } +} + +func (n *Notifier) distibute(payload Payload) { + n.lock.RLock() + defer n.lock.RUnlock() + + n.log.WithFields(logrus.Fields{ + "table": payload.Table, + "op": payload.Op, + "data": payload.Data, + }).Debug("received notification") + + listeners, ok := n.listeners[payload.Table] + if !ok { + return + } + + for _, listener := range listeners { + select { + case listener.ch <- payload: + default: + n.log.WithField("table", payload.Table).Warn("listener channel full, dropping notification") + } + } +} diff --git a/internal/graph/applications.resolvers.go b/internal/graph/applications.resolvers.go index 84bb5cb09..749335642 100644 --- a/internal/graph/applications.resolvers.go +++ b/internal/graph/applications.resolvers.go @@ -2,6 +2,7 @@ package graph import ( "context" + "fmt" "github.com/nais/api/internal/auth/authz" "github.com/nais/api/internal/graph/gengql" @@ -14,7 +15,12 @@ import ( ) func (r *applicationResolver) Team(ctx context.Context, obj *application.Application) (*team.Team, error) { - return team.Get(ctx, obj.TeamSlug) + team, err := team.Get(ctx, obj.TeamSlug) + if err != nil { + fmt.Println("Error getting team: ", obj.TeamSlug, err) + } + + return team, err } func (r *applicationResolver) Environment(ctx context.Context, obj *application.Application) (*team.TeamEnvironment, error) { diff --git a/internal/graph/ident/ident.go b/internal/graph/ident/ident.go index a92360737..ebfa0368d 100644 --- a/internal/graph/ident/ident.go +++ b/internal/graph/ident/ident.go @@ -125,3 +125,15 @@ func (i *Ident) UnmarshalGQLContext(_ context.Context, v interface{}) error { return nil } + +func FromString(s string) Ident { + typ, id, ok := strings.Cut(s, "_") + if !ok { + return Ident{} + } + + return Ident{ + ID: string(base58.Decode(id)), + Type: typ, + } +} diff --git a/internal/graph/pagination/models.go b/internal/graph/pagination/models.go index 75ae11c5a..a05c70ab9 100644 --- a/internal/graph/pagination/models.go +++ b/internal/graph/pagination/models.go @@ -101,5 +101,5 @@ func NewConvertConnectionWithError[T any, F any, I Integer](nodes []T, page *Pag } type Integer interface { - int64 | int32 | int + int64 | int32 | int | uint64 } diff --git a/internal/grpc/grpcteam/server_test.go b/internal/grpc/grpcteam/server_test.go index bdb5a18a4..33c69128d 100644 --- a/internal/grpc/grpcteam/server_test.go +++ b/internal/grpc/grpcteam/server_test.go @@ -53,7 +53,7 @@ func TestTeamsServer_Get(t *testing.T) { garRepository := "gar-repository" stmt := ` - INSERT INTO teams (slug, purpose, slack_channel, entra_id_group_id, github_team_slug, google_group_email, gar_repository) VALUES + INSERT INTO teams (slug, purpose, slack_channel, entra_id_group_id, github_team_slug, google_group_email, gar_repository) VALUES ($1, $2, $3, $4, $5, $6, $7)` if _, err = pool.Exec(ctx, stmt, teamSlug, purpose, slackChannel, entraIDgroupID, gitHubTeamSlug, googleGroupEmail, garRepository); err != nil { t.Fatalf("failed to insert team: %v", err) diff --git a/internal/integration/manager.go b/internal/integration/manager.go index 33d76b2b5..63c04bc60 100644 --- a/internal/integration/manager.go +++ b/internal/integration/manager.go @@ -17,6 +17,7 @@ import ( "github.com/nais/api/internal/auth/middleware" "github.com/nais/api/internal/cmd/api" "github.com/nais/api/internal/database" + "github.com/nais/api/internal/database/notify" "github.com/nais/api/internal/environment" "github.com/nais/api/internal/graph" "github.com/nais/api/internal/graph/gengql" @@ -98,12 +99,14 @@ func newManager(ctx context.Context, skipSetup bool) testmanager.SetupFunc { k8sRunner := apiRunner.NewK8sRunner(scheme, dir, clusters()) topic := newPubsubRunner() - gqlRunner, err := newGQLRunner(ctx, config, pool, topic, k8sRunner) + gqlRunner, gqlCleanup, err := newGQLRunner(ctx, config, pool, topic, k8sRunner) if err != nil { done() return ctx, nil, nil, err } + cleanups = append([]func(){gqlCleanup}, cleanups...) + runners := []spec.Runner{ gqlRunner, runner.NewSQLRunner(pool), @@ -120,18 +123,18 @@ func newManager(ctx context.Context, skipSetup bool) testmanager.SetupFunc { } } -func newGQLRunner(ctx context.Context, config *Config, pool *pgxpool.Pool, topic graph.PubsubTopic, k8sRunner *apiRunner.K8s) (spec.Runner, error) { +func newGQLRunner(ctx context.Context, config *Config, pool *pgxpool.Pool, topic graph.PubsubTopic, k8sRunner *apiRunner.K8s) (spec.Runner, func(), error) { log := logrus.New() log.Out = io.Discard clusterConfig, err := kubernetes.CreateClusterConfigMap("dev-nais", clusters(), nil) if err != nil { - return nil, fmt.Errorf("creating cluster config map: %w", err) + return nil, nil, fmt.Errorf("creating cluster config map: %w", err) } watcherMgr, err := watcher.NewManager(k8sRunner.Scheme, clusterConfig, log.WithField("subsystem", "k8s_watcher"), watcher.WithClientCreator(k8sRunner.ClientCreator)) if err != nil { - return nil, fmt.Errorf("failed to create watcher manager: %w", err) + return nil, nil, fmt.Errorf("failed to create watcher manager: %w", err) } managementWatcherMgr, err := watcher.NewManager( @@ -141,11 +144,15 @@ func newGQLRunner(ctx context.Context, config *Config, pool *pgxpool.Pool, topic watcher.WithClientCreator(k8sRunner.ClientCreator), ) if err != nil { - return nil, fmt.Errorf("failed to create management watcher manager: %w", err) + return nil, nil, fmt.Errorf("failed to create management watcher manager: %w", err) } vulnerabilityClient := vulnerability.NewDependencyTrackClient(vulnerability.DependencyTrackConfig{EnableFakes: true}, log) + notifierCtx, notifyCancel := context.WithCancel(ctx) + notifier := notify.New(pool, log, notify.WithRetries(0)) + go notifier.Run(notifierCtx) + graphMiddleware, err := api.ConfigureGraph( ctx, true, @@ -159,10 +166,12 @@ func newGQLRunner(ctx context.Context, config *Config, pool *pgxpool.Pool, topic fakeHookd.New(), unleash.FakeBifrostURL, []logging.SupportedLogDestination{logging.Loki}, + notifier, log, ) if err != nil { - return nil, fmt.Errorf("failed to configure graph: %w", err) + notifyCancel() + return nil, nil, fmt.Errorf("failed to configure graph: %w", err) } resolver := graph.NewResolver(topic) @@ -206,7 +215,7 @@ func newGQLRunner(ctx context.Context, config *Config, pool *pgxpool.Pool, topic middleware.ApiKeyAuthentication()(middleware.RequireAuthenticatedUser()(srv)).ServeHTTP(w, r) }) - return runner.NewGQLRunner(graphMiddleware(authProxy)), nil + return runner.NewGQLRunner(graphMiddleware(authProxy)), notifyCancel, nil } func startPostgresql(ctx context.Context) (*postgres.PostgresContainer, string, error) { diff --git a/internal/kubernetes/watcher/watcher.go b/internal/kubernetes/watcher/watcher.go index 2bc4fae08..311fe98d9 100644 --- a/internal/kubernetes/watcher/watcher.go +++ b/internal/kubernetes/watcher/watcher.go @@ -50,11 +50,16 @@ type watcherSettings struct { filterLabelSelector string } +type WatcherHook[T Object] func(cluster string, obj T) + type Watcher[T Object] struct { watchers []*clusterWatcher[T] log logrus.FieldLogger resourceCounter metric.Int64UpDownCounter watchedType string + onAdd WatcherHook[T] + onUpdate WatcherHook[T] + onRemove WatcherHook[T] } func newWatcher[T Object](mgr *Manager, obj T, settings *watcherSettings, log logrus.FieldLogger) *Watcher[T] { @@ -95,6 +100,10 @@ func (w *Watcher[T]) Enabled() bool { } func (w *Watcher[T]) add(cluster string, obj T) { + if w.onAdd != nil { + w.onAdd(cluster, obj) + } + w.resourceCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("type", w.watchedType), attribute.String("action", "add"))) w.log.WithFields(logrus.Fields{ "cluster": cluster, @@ -104,6 +113,10 @@ func (w *Watcher[T]) add(cluster string, obj T) { } func (w *Watcher[T]) remove(cluster string, obj T) { + if w.onRemove != nil { + w.onRemove(cluster, obj) + } + w.resourceCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("type", w.watchedType), attribute.String("action", "remove"))) w.log.WithFields(logrus.Fields{ "cluster": cluster, @@ -113,6 +126,9 @@ func (w *Watcher[T]) remove(cluster string, obj T) { } func (w *Watcher[T]) update(cluster string, obj T) { + if w.onUpdate != nil { + w.onUpdate(cluster, obj) + } w.resourceCounter.Add(context.TODO(), 1, metric.WithAttributes(attribute.String("type", w.watchedType), attribute.String("action", "update"))) w.log.WithFields(logrus.Fields{ "cluster": cluster, @@ -285,6 +301,18 @@ func (w *Watcher[T]) SystemAuthenticatedClient(ctx context.Context, cluster stri return nil, fmt.Errorf("no watcher for cluster %s", cluster) } +func (w *Watcher[T]) OnRemove(fn WatcherHook[T]) { + w.onRemove = fn +} + +func (w *Watcher[T]) OnUpdate(fn WatcherHook[T]) { + w.onUpdate = fn +} + +func (w *Watcher[T]) OnAdd(fn WatcherHook[T]) { + w.onAdd = fn +} + func Objects[T Object](list []*EnvironmentWrapper[T]) []T { ret := make([]T, len(list)) for i, obj := range list { diff --git a/internal/persistence/bigquery/queries.go b/internal/persistence/bigquery/queries.go index 91faa689f..6dc733701 100644 --- a/internal/persistence/bigquery/queries.go +++ b/internal/persistence/bigquery/queries.go @@ -18,7 +18,7 @@ func GetByIdent(ctx context.Context, id ident.Ident) (*BigQueryDataset, error) { return nil, err } - return Get(ctx, teamSlug, environment, name) + return fromContext(ctx).watcher.Get(environment, teamSlug.String(), name) } func Get(ctx context.Context, teamSlug slug.Slug, environment, name string) (*BigQueryDataset, error) { diff --git a/internal/persistence/bigquery/search.go b/internal/persistence/bigquery/search.go index 101457b29..66ac1a5a6 100644 --- a/internal/persistence/bigquery/search.go +++ b/internal/persistence/bigquery/search.go @@ -3,15 +3,19 @@ package bigquery import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" ) -func init() { - search.Register("BIGQUERY_DATASET", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*BigQueryDataset]) { + createIdent := func(env string, obj *BigQueryDataset) ident.Ident { + return obj.ID() + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("BIGQUERY_DATASET", search.NewK8sSearch("BIGQUERY_DATASET", watcher, gbi, createIdent)) } diff --git a/internal/persistence/bucket/search.go b/internal/persistence/bucket/search.go index 74ee123d4..d62b7d1dd 100644 --- a/internal/persistence/bucket/search.go +++ b/internal/persistence/bucket/search.go @@ -3,15 +3,20 @@ package bucket import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("BUCKET", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*Bucket]) { + createIdent := func(env string, obj *Bucket) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("BUCKET", search.NewK8sSearch("BUCKET", watcher, gbi, createIdent)) } diff --git a/internal/persistence/kafkatopic/search.go b/internal/persistence/kafkatopic/search.go index 65c768403..6f2f1cce7 100644 --- a/internal/persistence/kafkatopic/search.go +++ b/internal/persistence/kafkatopic/search.go @@ -3,15 +3,20 @@ package kafkatopic import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("KAFKA_TOPIC", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*KafkaTopic]) { + createIdent := func(env string, obj *KafkaTopic) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("KAFKA_TOPIC", search.NewK8sSearch("KAFKA_TOPIC", watcher, gbi, createIdent)) } diff --git a/internal/persistence/opensearch/search.go b/internal/persistence/opensearch/search.go index b956efa3d..7bffb21c6 100644 --- a/internal/persistence/opensearch/search.go +++ b/internal/persistence/opensearch/search.go @@ -3,15 +3,20 @@ package opensearch import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("OPENSEARCH", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*OpenSearch]) { + createIdent := func(env string, obj *OpenSearch) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("OPENSEARCH", search.NewK8sSearch("OPENSEARCH", watcher, gbi, createIdent)) } diff --git a/internal/persistence/redis/search.go b/internal/persistence/redis/search.go index 82b623ce5..15c56deac 100644 --- a/internal/persistence/redis/search.go +++ b/internal/persistence/redis/search.go @@ -3,15 +3,20 @@ package redis import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("REDIS_INSTANCE", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*RedisInstance]) { + createIdent := func(env string, app *RedisInstance) ident.Ident { + return newIdent(slug.Slug(app.GetNamespace()), env, app.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("REDIS_INSTANCE", search.NewK8sSearch("REDIS_INSTANCE", watcher, gbi, createIdent)) } diff --git a/internal/persistence/sqlinstance/search.go b/internal/persistence/sqlinstance/search.go index 35cf86624..1a7b4dbc4 100644 --- a/internal/persistence/sqlinstance/search.go +++ b/internal/persistence/sqlinstance/search.go @@ -3,15 +3,20 @@ package sqlinstance import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("SQL_INSTANCE", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*SQLInstance]) { + createIdent := func(env string, obj *SQLInstance) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("SQL_INSTANCE", search.NewK8sSearch("SQL_INSTANCE", watcher, gbi, createIdent)) } diff --git a/internal/persistence/valkey/search.go b/internal/persistence/valkey/search.go index df66b9041..24da4cd04 100644 --- a/internal/persistence/valkey/search.go +++ b/internal/persistence/valkey/search.go @@ -3,15 +3,20 @@ package valkey import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" ) -func init() { - search.Register("VALKEY_INSTANCE", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*ValkeyInstance]) { + createIdent := func(env string, obj *ValkeyInstance) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("VALKEY_INSTANCE", search.NewK8sSearch("VALKEY_INSTANCE", watcher, gbi, createIdent)) } diff --git a/internal/search/bleve.go b/internal/search/bleve.go new file mode 100644 index 000000000..b8ee1f88c --- /dev/null +++ b/internal/search/bleve.go @@ -0,0 +1,256 @@ +package search + +import ( + "context" + "fmt" + "slices" + "sync" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/analysis/analyzer/custom" + "github.com/blevesearch/bleve/v2/mapping" + "github.com/blevesearch/bleve/v2/search/query" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/nais/api/internal/auth/authz" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/graph/pagination" + "github.com/nais/api/internal/search/bleveext" + "github.com/nais/api/internal/search/searchsql" + "github.com/sirupsen/logrus" +) + +type Document struct { + ID string `json:"id"` + Name string `json:"name"` + Team string `json:"team,omitempty"` + Kind string `json:"kind"` + Fields map[string]string `json:"fields,omitempty"` +} + +func (m Document) Type() string { + return "doc" +} + +type Indexer interface { + Upsert(doc Document) + Remove(id ident.Ident) +} + +type Searchable interface { + Convert(ctx context.Context, ids ...ident.Ident) ([]SearchNode, error) + ReIndex(ctx context.Context) []Document + Watch(ctx context.Context, indexer Indexer) error +} + +type Client interface { + Search(ctx context.Context, page *pagination.Pagination, filter SearchFilter) (*SearchNodeConnection, error) + AddClient(kind SearchType, client Searchable) + ReIndex(ctx context.Context) error +} + +func buildIndexMapping() (mapping.IndexMapping, error) { + indexMapping := bleve.NewIndexMapping() + + docMapping := bleve.NewDocumentMapping() + docMapping.AddFieldMappingsAt("kind", bleve.NewKeywordFieldMapping()) + indexMapping.AddDocumentMapping("doc", docMapping) + + err := indexMapping.AddCustomAnalyzer(custom.Name, + map[string]any{ + "type": "custom", + "tokenizer": `unicode`, + }) + if err != nil { + return nil, err + } + + return indexMapping, nil +} + +func New(ctx context.Context, pool *pgxpool.Pool, log logrus.FieldLogger) (Client, error) { + im, err := buildIndexMapping() + if err != nil { + return nil, err + } + // im.CustomAnalysis = custom.AnalyzerConstructor(config map[string]interface{}, cache *registry.Cache) + bleveIndex, err := bleve.NewMemOnly(im) + if err != nil { + return nil, err + } + + bleveSearch := &bleveSearcher{ + Client: bleveIndex, + Clients: make(map[SearchType]Searchable), + log: log, + db: searchsql.New(pool), + } + + return bleveSearch, nil +} + +type bleveSearcher struct { + Client bleve.Index + Clients map[SearchType]Searchable + + log logrus.FieldLogger + db searchsql.Querier +} + +func (b *bleveSearcher) AddClient(kind SearchType, client Searchable) { + b.Clients[kind] = client +} + +func (b *bleveSearcher) ReIndex(ctx context.Context) error { + b.reindexAll(ctx) + for kind, search := range b.Clients { + if err := search.Watch(ctx, b); err != nil { + return fmt.Errorf("failed to watch %q: %w", kind, err) + } + } + return nil +} + +func (b *bleveSearcher) Upsert(doc Document) { + b.log.WithField("id", doc).Debug("indexing document") + + if err := b.Client.Index(doc.ID, doc); err != nil { + b.log.WithError(err).WithField("id", doc).Error("failed to index document") + } +} + +func (b *bleveSearcher) Remove(id ident.Ident) { + b.log.WithField("id", id).Debug("removing document") + + if err := b.Client.Delete(id.String()); err != nil { + b.log.WithError(err).WithField("id", id).Error("failed to remove document") + } +} + +func (b *bleveSearcher) reindexAll(ctx context.Context) { + wg := &sync.WaitGroup{} + for typ, search := range b.Clients { + wg.Add(1) + go func(search Searchable) { + defer wg.Done() + if err := b.index(search.ReIndex(ctx)); err != nil { + b.log.WithField("search_type", typ).WithError(err).Error("failed to reindex") + } + }(search) + } + wg.Wait() +} + +func (b *bleveSearcher) index(docs []Document) error { + batch := b.Client.NewBatch() + for _, doc := range docs { + if err := batch.Index(doc.ID, doc); err != nil { + return err + } + } + + return b.Client.Batch(batch) +} + +func (b *bleveSearcher) Search(ctx context.Context, page *pagination.Pagination, filter SearchFilter) (*SearchNodeConnection, error) { + slugs, err := b.db.TeamSlugsFromUserID(ctx, authz.ActorFromContext(ctx).User.GetID()) + if err != nil { + b.log.WithError(err).Error("failed to list teams") + return nil, err + } + + queries := []query.Query{} + + if filter.Query != "" { + qq := bleve.NewFuzzyQuery(filter.Query) + qq.SetFuzziness(2) + qq.SetBoost(0.5) + + prefix := bleve.NewPrefixQuery(filter.Query) + prefix.SetField("name") + prefix.SetBoost(1.5) + + // We add the query with both a match, prefix, and a fuzzy query to get both exact and fuzzy matches + queries = append(queries, bleve.NewDisjunctionQuery( + prefix, + bleve.NewMatchQuery(filter.Query), + qq, + )) + } + + if filter.Type != nil { + kind := bleve.NewTermQuery(filter.Type.String()) + kind.FieldVal = "kind" + queries = append(queries, kind) + } + var q query.Query = bleve.NewConjunctionQuery(queries...) + + if len(slugs) > 0 && (filter.Type == nil || (filter.Type != nil && *filter.Type != "TEAM")) { + teamSlugs := make([]string, 0, len(slugs)) + for _, slug := range slugs { + teamSlugs = append(teamSlugs, slug.String()) + } + + q = bleveext.NewBoostingQuery(q, []string{"team"}, func(field string, term []byte, isPartOfMatch bool) *query.Boost { + v := 1 + if slices.Contains(teamSlugs, string(term)) { + v *= 1000 + } + b := query.Boost(v) + return &b + }) + } + + search := bleve.NewSearchRequest(q) + search.Size = int(page.Limit()) + search.From = int(page.Offset()) + search.Fields = []string{"kind"} + + results, err := b.Client.Search(search) + if err != nil { + b.log.WithError(err).Error("bleve search failed") + return nil, err + } + + b.log.WithFields(logrus.Fields{ + "duration": results.Took, + "total": results.Total, + "hits": len(results.Hits), + }).Debug("search results") + + kinds := map[string][]ident.Ident{} + for _, hit := range results.Hits { + kind := hit.Fields["kind"].(string) + kinds[kind] = append(kinds[kind], ident.FromString(hit.ID)) + } + + convertedResults := make(map[string]SearchNode) + for kind, ids := range kinds { + client, ok := b.Clients[SearchType(kind)] + if !ok { + b.log.WithField("kind", kind).Error("missing search client") + continue + } + sn, err := client.Convert(ctx, ids...) + if err != nil { + b.log.WithError(err).Error("failed to convert search results") + continue + } + + for _, n := range sn { + convertedResults[n.ID().String()] = n + } + } + + ret := make([]SearchNode, 0, len(results.Hits)) + for _, hit := range results.Hits { + n, ok := convertedResults[hit.ID] + if !ok { + b.log.WithField("id", hit.ID).Error("missing search result") + continue + } + + ret = append(ret, n) + } + + return pagination.NewConnection(ret, page, results.Total), nil +} diff --git a/internal/search/bleveext/boost.go b/internal/search/bleveext/boost.go new file mode 100644 index 000000000..880de0b53 --- /dev/null +++ b/internal/search/bleveext/boost.go @@ -0,0 +1,91 @@ +package bleveext + +import ( + "context" + + "github.com/blevesearch/bleve/v2/mapping" + "github.com/blevesearch/bleve/v2/search" + "github.com/blevesearch/bleve/v2/search/query" + "github.com/blevesearch/bleve/v2/search/searcher" + index "github.com/blevesearch/bleve_index_api" +) + +// Inspired by https://gist.github.com/rgalanakis/75dbea70f720d8393a0b83393a010836 + +// BoostingQueryPredicate is called with a search field, term, +// and whether the field was part of the match, +// or was included just for the purposes of the BoostingQuery. +// +// For example, maybe if any of 3 fields match, +// the document should be boosted. +// The predicate would be called with all three fields, +// but only those with isPartOfMatch should return a non-nil query.Boost. +// +// Additionally, perhaps some documents should be boosted +// merely because of their value ("sale" items). +// In this case, you would likely ignore isPartOfMatch +// and instead compare against term. +type BoostingQueryPredicate func(field string, term []byte, isPartOfMatch bool) *query.Boost + +type BoostingQuery struct { + // BoostVal represents the ratio of recency to preexisitng score. The + // default, 1.0, assigns equal importance to recency score and match score. + // A value of 2 would relatively rank recency twice as important as match score. + BoostVal *query.Boost + // These fields are loaded, and used in the boost processing. + // Predicate will be called with every field. + Fields []string + // Return true if BoostVal should be applied to this hit. + Predicate BoostingQueryPredicate + base query.Query +} + +func NewBoostingQuery(base query.Query, fields []string, predicate BoostingQueryPredicate) *BoostingQuery { + return &BoostingQuery{Fields: fields, Predicate: predicate, base: base} +} + +// SetBoost sets the boost value. +// Usually you should leave this, and return differing values from Predicate. +// Changing this scales the result of Predicate. +func (q *BoostingQuery) SetBoost(b float64) { + boost := query.Boost(b) + q.BoostVal = &boost +} + +func (q *BoostingQuery) Boost() float64 { + return q.BoostVal.Value() +} + +func (q *BoostingQuery) Searcher(ctx context.Context, i index.IndexReader, m mapping.IndexMapping, options search.SearcherOptions) (search.Searcher, error) { + bs, err := q.base.Searcher(ctx, i, m, options) + if err != nil { + return nil, err + } + dvReader, err := i.DocValueReader(q.Fields) + if err != nil { + return nil, err + } + return searcher.NewFilteringSearcher(ctx, bs, q.makeFilter(dvReader)), nil +} + +func (q *BoostingQuery) makeFilter(dvReader index.DocValueReader) searcher.FilterFunc { + boost := q.Boost() + return func(d *search.DocumentMatch) bool { + isPartOfMatch := make(map[string]bool, len(d.FieldTermLocations)) + for _, ftloc := range d.FieldTermLocations { + isPartOfMatch[ftloc.Field] = true + } + seenFields := make(map[string]struct{}, len(d.Fields)) + _ = dvReader.VisitDocValues(d.IndexInternalID, func(field string, term []byte) { + if _, seen := seenFields[field]; seen { + return + } + seenFields[field] = struct{}{} + b := q.Predicate(field, term, isPartOfMatch[field]) + if b != nil { + d.Score *= boost * b.Value() + } + }) + return true + } +} diff --git a/internal/search/dataloader.go b/internal/search/dataloader.go new file mode 100644 index 000000000..9b575cf72 --- /dev/null +++ b/internal/search/dataloader.go @@ -0,0 +1,34 @@ +package search + +import ( + "context" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/nais/api/internal/search/searchsql" +) + +type ctxKey int + +const loadersKey ctxKey = iota + +func NewLoaderContext(ctx context.Context, dbConn *pgxpool.Pool, searcher Client) context.Context { + return context.WithValue(ctx, loadersKey, newLoaders(dbConn, searcher)) +} + +func fromContext(ctx context.Context) *loaders { + return ctx.Value(loadersKey).(*loaders) +} + +type loaders struct { + internalQuerier *searchsql.Queries + searcher Client +} + +func newLoaders(dbConn *pgxpool.Pool, searcher Client) *loaders { + db := searchsql.New(dbConn) + + return &loaders{ + internalQuerier: db, + searcher: searcher, + } +} diff --git a/internal/search/k8s_searcher.go b/internal/search/k8s_searcher.go new file mode 100644 index 000000000..d9bccde0c --- /dev/null +++ b/internal/search/k8s_searcher.go @@ -0,0 +1,86 @@ +package search + +import ( + "context" + + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" + "github.com/nais/api/internal/slug" +) + +type K8sSearch[T watcher.Object] struct { + kind SearchType + watcher *watcher.Watcher[T] + getByIdent func(ctx context.Context, id ident.Ident) (SearchNode, error) + newIdent func(env string, o T) ident.Ident +} + +func NewK8sSearch[T watcher.Object]( + kind SearchType, + watcher *watcher.Watcher[T], + getByIdent func(ctx context.Context, id ident.Ident) (SearchNode, error), + newIdent func(env string, o T) ident.Ident, +) *K8sSearch[T] { + return &K8sSearch[T]{ + kind: kind, + watcher: watcher, + getByIdent: getByIdent, + newIdent: newIdent, + } +} + +func (k K8sSearch[T]) Convert(ctx context.Context, ids ...ident.Ident) ([]SearchNode, error) { + ret := make([]SearchNode, 0, len(ids)) + for _, id := range ids { + o, err := k.getByIdent(ctx, id) + if err != nil { + return nil, err + } + + ret = append(ret, o) + } + return ret, nil +} + +func (k K8sSearch[T]) ReIndex(ctx context.Context) []Document { + objs := k.watcher.All() + docs := make([]Document, 0, len(objs)) + for _, obj := range objs { + team := slug.Slug(obj.GetNamespace()) + + docs = append(docs, Document{ + ID: k.newIdent(obj.Cluster, obj.Obj).String(), + Kind: k.kind.String(), + Name: obj.GetName(), + Team: team.String(), + }) + } + + return docs +} + +func (k K8sSearch[T]) Watch(ctx context.Context, indexer Indexer) error { + k.watcher.OnAdd(k.upsert(indexer)) + k.watcher.OnUpdate(k.upsert(indexer)) + k.watcher.OnRemove(k.onRemove(indexer)) + + return nil +} + +func (k K8sSearch[T]) upsert(indexer Indexer) func(string, T) { + return func(env string, obj T) { + team := slug.Slug(obj.GetNamespace()) + indexer.Upsert(Document{ + ID: k.newIdent(env, obj).String(), + Kind: k.kind.String(), + Name: obj.GetName(), + Team: team.String(), + }) + } +} + +func (k K8sSearch[T]) onRemove(indexer Indexer) func(string, T) { + return func(env string, obj T) { + indexer.Remove(k.newIdent(env, obj)) + } +} diff --git a/internal/search/models.go b/internal/search/models.go index 883d98703..bc9ed5dcb 100644 --- a/internal/search/models.go +++ b/internal/search/models.go @@ -6,6 +6,7 @@ import ( "strconv" "github.com/lithammer/fuzzysearch/fuzzy" + "github.com/nais/api/internal/graph/ident" "github.com/nais/api/internal/graph/pagination" ) @@ -15,6 +16,7 @@ type ( ) type SearchNode interface { + ID() ident.Ident IsSearchNode() } diff --git a/internal/search/queries.go b/internal/search/queries.go index 95eaef446..f51930d0e 100644 --- a/internal/search/queries.go +++ b/internal/search/queries.go @@ -2,46 +2,10 @@ package search import ( "context" - "slices" - "sort" - "strings" "github.com/nais/api/internal/graph/pagination" - "github.com/sourcegraph/conc/pool" ) func Search(ctx context.Context, page *pagination.Pagination, filter SearchFilter) (*SearchNodeConnection, error) { - q := strings.TrimSpace(filter.Query) - if q == "" && filter.Type == nil { - return pagination.EmptyConnection[SearchNode](), nil - } - - wg := pool.NewWithResults[[]*Result]().WithMaxGoroutines(5) - for _, searchable := range searchables { - if filter.Type != nil && searchable.Type != *filter.Type { - continue - } - - wg.Go(func() []*Result { - return searchable.Search(ctx, q) - }) - } - - wgRet := wg.Wait() - ret := make([]*Result, 0) - for _, r := range wgRet { - ret = append(ret, r...) - } - - ret = slices.DeleteFunc(ret, func(e *Result) bool { - return !Include(e.Rank) - }) - - sort.SliceStable(ret, func(i, j int) bool { - return ret[i].Rank < ret[j].Rank - }) - - return pagination.NewConvertConnection(pagination.Slice(ret, page), page, len(ret), func(from *Result) SearchNode { - return from.Node - }), nil + return fromContext(ctx).searcher.Search(ctx, page, filter) } diff --git a/internal/search/queries/user.sql b/internal/search/queries/user.sql new file mode 100644 index 000000000..d666ab6a9 --- /dev/null +++ b/internal/search/queries/user.sql @@ -0,0 +1,28 @@ +-- SELECT +-- sqlc.embed(users), +-- sqlc.embed(user_roles) +-- FROM +-- user_roles +-- JOIN teams ON teams.slug = user_roles.target_team_slug +-- JOIN users ON users.id = user_roles.user_id +-- WHERE +-- user_roles.user_id = @user_id +-- ORDER BY +-- CASE +-- WHEN @order_by::TEXT = 'slug:asc' THEN teams.slug +-- END ASC, +-- CASE +-- WHEN @order_by::TEXT = 'slug:desc' THEN teams.slug +-- END DESC, +-- teams.slug ASC +-- name: TeamSlugsFromUserID :many +SELECT + teams.slug +FROM + user_roles + JOIN teams ON teams.slug = user_roles.target_team_slug +WHERE + user_roles.user_id = @user_id +ORDER BY + teams.slug ASC +; diff --git a/internal/search/search.go b/internal/search/search.go deleted file mode 100644 index 86a98d030..000000000 --- a/internal/search/search.go +++ /dev/null @@ -1,14 +0,0 @@ -package search - -import "context" - -var searchables []Searchers - -type Searchers struct { - Search func(ctx context.Context, q string) []*Result - Type SearchType -} - -func Register(searchType SearchType, search func(ctx context.Context, q string) []*Result) { - searchables = append(searchables, Searchers{Search: search, Type: searchType}) -} diff --git a/internal/search/searchsql/db.go b/internal/search/searchsql/db.go new file mode 100644 index 000000000..926b92aa5 --- /dev/null +++ b/internal/search/searchsql/db.go @@ -0,0 +1,30 @@ +// Code generated by sqlc. DO NOT EDIT. + +package searchsql + +import ( + "context" + + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/search/searchsql/models.go b/internal/search/searchsql/models.go new file mode 100644 index 000000000..55e7875ec --- /dev/null +++ b/internal/search/searchsql/models.go @@ -0,0 +1,3 @@ +// Code generated by sqlc. DO NOT EDIT. + +package searchsql diff --git a/internal/search/searchsql/querier.go b/internal/search/searchsql/querier.go new file mode 100644 index 000000000..62e83ef5c --- /dev/null +++ b/internal/search/searchsql/querier.go @@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. + +package searchsql + +import ( + "context" + + "github.com/google/uuid" + "github.com/nais/api/internal/slug" +) + +type Querier interface { + // SELECT + // sqlc.embed(users), + // sqlc.embed(user_roles) + // FROM + // user_roles + // JOIN teams ON teams.slug = user_roles.target_team_slug + // JOIN users ON users.id = user_roles.user_id + // WHERE + // user_roles.user_id = @user_id + // ORDER BY + // CASE + // WHEN @order_by::TEXT = 'slug:asc' THEN teams.slug + // END ASC, + // CASE + // WHEN @order_by::TEXT = 'slug:desc' THEN teams.slug + // END DESC, + // teams.slug ASC + TeamSlugsFromUserID(ctx context.Context, userID uuid.UUID) ([]slug.Slug, error) +} + +var _ Querier = (*Queries)(nil) diff --git a/internal/search/searchsql/user.sql.go b/internal/search/searchsql/user.sql.go new file mode 100644 index 000000000..df423bbf8 --- /dev/null +++ b/internal/search/searchsql/user.sql.go @@ -0,0 +1,67 @@ +// Code generated by sqlc. DO NOT EDIT. +// source: user.sql + +package searchsql + +import ( + "context" + + "github.com/google/uuid" + "github.com/nais/api/internal/slug" +) + +const teamSlugsFromUserID = `-- name: TeamSlugsFromUserID :many +SELECT + teams.slug +FROM + user_roles + JOIN teams ON teams.slug = user_roles.target_team_slug +WHERE + user_roles.user_id = $1 +ORDER BY + teams.slug ASC +` + +// SELECT +// +// sqlc.embed(users), +// sqlc.embed(user_roles) +// +// FROM +// +// user_roles +// JOIN teams ON teams.slug = user_roles.target_team_slug +// JOIN users ON users.id = user_roles.user_id +// +// WHERE +// +// user_roles.user_id = @user_id +// +// ORDER BY +// +// CASE +// WHEN @order_by::TEXT = 'slug:asc' THEN teams.slug +// END ASC, +// CASE +// WHEN @order_by::TEXT = 'slug:desc' THEN teams.slug +// END DESC, +// teams.slug ASC +func (q *Queries) TeamSlugsFromUserID(ctx context.Context, userID uuid.UUID) ([]slug.Slug, error) { + rows, err := q.db.Query(ctx, teamSlugsFromUserID, userID) + if err != nil { + return nil, err + } + defer rows.Close() + items := []slug.Slug{} + for rows.Next() { + var slug slug.Slug + if err := rows.Scan(&slug); err != nil { + return nil, err + } + items = append(items, slug) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} diff --git a/internal/team/queries/teams.sql b/internal/team/queries/teams.sql index 91ffe20c5..3dcba8887 100644 --- a/internal/team/queries/teams.sql +++ b/internal/team/queries/teams.sql @@ -243,3 +243,13 @@ SELECT FROM teams ; + +-- name: ListAllForSearch :many +SELECT + slug, + purpose +FROM + teams +ORDER BY + slug ASC +; diff --git a/internal/team/search.go b/internal/team/search.go index 614daeb1f..53c2199ab 100644 --- a/internal/team/search.go +++ b/internal/team/search.go @@ -3,15 +3,128 @@ package team import ( "context" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/nais/api/internal/database/notify" + "github.com/nais/api/internal/graph/ident" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" + "github.com/nais/api/internal/team/teamsql" + "github.com/sirupsen/logrus" ) -func init() { - search.Register("TEAM", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) +func AddSearch(client search.Client, pool *pgxpool.Pool, notifier *notify.Notifier, log logrus.FieldLogger) { + client.AddClient("TEAM", &teamSearch{ + db: teamsql.New(pool), + notifier: notifier, + log: log, + }) +} + +type teamSearch struct { + log logrus.FieldLogger + notifier *notify.Notifier + db teamsql.Querier +} + +func (t *teamSearch) Convert(ctx context.Context, ids ...ident.Ident) ([]search.SearchNode, error) { + slugs := make([]slug.Slug, 0, len(ids)) + for _, id := range ids { + slug, err := parseTeamIdent(id) if err != nil { - return nil + return nil, err } - return ret - }) + slugs = append(slugs, slug) + } + + all, err := t.db.ListBySlugs(ctx, slugs) + if err != nil { + return nil, err + } + + ret := make([]search.SearchNode, 0, len(all)) + for _, team := range all { + ret = append(ret, toGraphTeam(team)) + } + + return ret, nil +} + +func (t *teamSearch) ReIndex(ctx context.Context) []search.Document { + all, err := t.db.ListAllForSearch(ctx) + if err != nil { + return nil + } + + ret := make([]search.Document, 0, len(all)) + for _, team := range all { + ret = append(ret, newSearchDocument(team.Slug, team.Purpose)) + } + + return ret +} + +func (t *teamSearch) Watch(ctx context.Context, indexer search.Indexer) error { + go t.listen(ctx, indexer) + return nil +} + +func (t *teamSearch) listen(ctx context.Context, indexer search.Indexer) { + ch := t.notifier.Listen("teams") + + for { + select { + case <-ctx.Done(): + return + case payload := <-ch: + data := dataFromNotification(payload) + if data.Slug == "" { + continue + } + + switch payload.Op { + case notify.Insert, notify.Update: + indexer.Upsert(newSearchDocument(data.Slug, data.Purpose)) + case notify.Delete: + indexer.Remove(newTeamIdent(data.Slug)) + default: + t.log.WithField("op", payload.Op).Warn("unknown operation") + } + } + } +} + +type notificationData struct { + Slug slug.Slug `json:"slug"` + Purpose string `json:"purpose"` +} + +func dataFromNotification(payload notify.Payload) notificationData { + var slg slug.Slug + var purpose string + + if sslug, ok := payload.Data["slug"].(string); ok { + slg = slug.Slug(sslug) + } + + if spurpose, ok := payload.Data["purpose"].(string); ok { + purpose = spurpose + } + + return notificationData{ + Slug: slg, + Purpose: purpose, + } +} + +func newSearchDocument(teamSlug slug.Slug, purpose string) search.Document { + sslug := teamSlug.String() + return search.Document{ + ID: newTeamIdent(teamSlug).String(), + Name: sslug, + Team: sslug, + Kind: "TEAM", + Fields: map[string]string{ + "purpose": purpose, + }, + } } diff --git a/internal/team/teamsql/querier.go b/internal/team/teamsql/querier.go index 480fa00e9..89140d4b9 100644 --- a/internal/team/teamsql/querier.go +++ b/internal/team/teamsql/querier.go @@ -24,6 +24,7 @@ type Querier interface { GetMember(ctx context.Context, arg GetMemberParams) (*GetMemberRow, error) GetMemberByEmail(ctx context.Context, arg GetMemberByEmailParams) (*GetMemberByEmailRow, error) List(ctx context.Context, arg ListParams) ([]*Team, error) + ListAllForSearch(ctx context.Context) ([]*ListAllForSearchRow, error) ListAllSlugs(ctx context.Context) ([]slug.Slug, error) ListBySlugs(ctx context.Context, slugs []slug.Slug) ([]*Team, error) ListEnvironmentsBySlug(ctx context.Context, argSlug slug.Slug) ([]*TeamAllEnvironment, error) diff --git a/internal/team/teamsql/teams.sql.go b/internal/team/teamsql/teams.sql.go index 9006d213d..b7191a807 100644 --- a/internal/team/teamsql/teams.sql.go +++ b/internal/team/teamsql/teams.sql.go @@ -256,6 +256,41 @@ func (q *Queries) List(ctx context.Context, arg ListParams) ([]*Team, error) { return items, nil } +const listAllForSearch = `-- name: ListAllForSearch :many +SELECT + slug, + purpose +FROM + teams +ORDER BY + slug ASC +` + +type ListAllForSearchRow struct { + Slug slug.Slug + Purpose string +} + +func (q *Queries) ListAllForSearch(ctx context.Context) ([]*ListAllForSearchRow, error) { + rows, err := q.db.Query(ctx, listAllForSearch) + if err != nil { + return nil, err + } + defer rows.Close() + items := []*ListAllForSearchRow{} + for rows.Next() { + var i ListAllForSearchRow + if err := rows.Scan(&i.Slug, &i.Purpose); err != nil { + return nil, err + } + items = append(items, &i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const listAllSlugs = `-- name: ListAllSlugs :one SELECT ARRAY_AGG(slug)::slug[] AS slugs diff --git a/internal/workload/application/search.go b/internal/workload/application/search.go index 333e3b3b9..5fd0e489b 100644 --- a/internal/workload/application/search.go +++ b/internal/workload/application/search.go @@ -3,15 +3,21 @@ package application import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" + nais_io_v1alpha1 "github.com/nais/liberator/pkg/apis/nais.io/v1alpha1" ) -func init() { - search.Register("APPLICATION", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*nais_io_v1alpha1.Application]) { + createIdent := func(env string, obj *nais_io_v1alpha1.Application) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("APPLICATION", search.NewK8sSearch("APPLICATION", watcher, gbi, createIdent)) } diff --git a/internal/workload/job/search.go b/internal/workload/job/search.go index d14c7bc00..c57720086 100644 --- a/internal/workload/job/search.go +++ b/internal/workload/job/search.go @@ -3,15 +3,21 @@ package job import ( "context" + "github.com/nais/api/internal/graph/ident" + "github.com/nais/api/internal/kubernetes/watcher" "github.com/nais/api/internal/search" + "github.com/nais/api/internal/slug" + nais_io_v1 "github.com/nais/liberator/pkg/apis/nais.io/v1" ) -func init() { - search.Register("JOB", func(ctx context.Context, q string) []*search.Result { - ret, err := Search(ctx, q) - if err != nil { - return nil - } - return ret - }) +func AddSearch(client search.Client, watcher *watcher.Watcher[*nais_io_v1.Naisjob]) { + createIdent := func(env string, obj *nais_io_v1.Naisjob) ident.Ident { + return newIdent(slug.Slug(obj.GetNamespace()), env, obj.GetName()) + } + + gbi := func(ctx context.Context, id ident.Ident) (search.SearchNode, error) { + return GetByIdent(ctx, id) + } + + client.AddClient("JOB", search.NewK8sSearch("JOB", watcher, gbi, createIdent)) }