diff --git a/docs/features/kubernetes.rst b/docs/features/kubernetes.rst index eeeccd592..a35e03701 100644 --- a/docs/features/kubernetes.rst +++ b/docs/features/kubernetes.rst @@ -78,7 +78,7 @@ The example here shows a typical configuration: } } -Service deployment in **Namespace** ``Dev``, **ServiceDiscoveryProvider** type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` type. +Service deployment in **Namespace** ``Dev``, **ServiceDiscoveryProvider** type is ``Kube``, you also can set :ref:`k8s-pollkube-provider` or :ref:`k8s-watchkube-provider` type. **Note 1**: ``Host``, ``Port`` and ``Token`` are no longer in use. @@ -109,6 +109,22 @@ This really depends on how volatile your services are. We doubt it will matter for most people and polling may give a tiny performance improvement over calling Kubernetes per request. There is no way for Ocelot to work these out for you. +.. _k8s-watchkube-provider: + +WatchKube provider +^^^^^^^^^^^^^^^^^^ + +This option utilizes Kubernetes API `watch requests `_ for fetching service configuration. +Essentially it means that there will be one streamed http connection with kube-api per downstream service. +Changes streamed by this connection will be used for updating available endpoints list. + +.. code-block:: json + + "ServiceDiscoveryProvider": { + "Namespace": "dev", + "Type": "WatchKube" + } + Global vs Route Levels ---------------------- diff --git a/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs b/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs index 83418957b..a83f00c11 100644 --- a/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs +++ b/src/Ocelot.Provider.Kubernetes/EndPointClientV1.cs @@ -7,21 +7,24 @@ namespace Ocelot.Provider.Kubernetes { public class EndPointClientV1 : KubeResourceClient, IEndPointClient { - private readonly HttpRequest _collection; + private readonly HttpRequest _byName; + private readonly HttpRequest _watchByName; public EndPointClientV1(IKubeApiClient client) : base(client) { - _collection = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + _byName = KubeRequest.Create("api/v1/namespaces/{Namespace}/endpoints/{ServiceName}"); + _watchByName = KubeRequest.Create("api/v1/watch/namespaces/{Namespace}/endpoints/{ServiceName}"); } - public async Task GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default) + public async Task GetAsync(string serviceName, string kubeNamespace = null, + CancellationToken cancellationToken = default) { if (string.IsNullOrEmpty(serviceName)) { throw new ArgumentNullException(nameof(serviceName)); } - var request = _collection + var request = _byName .WithTemplateParameters(new { Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, @@ -34,5 +37,23 @@ public async Task GetAsync(string serviceName, string kubeNamespace ? await response.ReadContentAsAsync() : null; } + + public IObservable> Watch(string serviceName, string kubeNamespace, + CancellationToken cancellationToken = default) + { + if (string.IsNullOrEmpty(serviceName)) + { + throw new ArgumentNullException(nameof(serviceName)); + } + + return ObserveEvents( + _watchByName.WithTemplateParameters(new + { + ServiceName = serviceName, + Namespace = kubeNamespace ?? KubeClient.DefaultNamespace, + }), + "watch v1/Endpoints '" + serviceName + "' in namespace " + + (kubeNamespace ?? KubeClient.DefaultNamespace)); + } } } diff --git a/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs b/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs index 10f79f8af..8e85df73f 100644 --- a/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs +++ b/src/Ocelot.Provider.Kubernetes/Interfaces/IEndPointClient.cs @@ -6,4 +6,6 @@ namespace Ocelot.Provider.Kubernetes.Interfaces; public interface IEndPointClient : IKubeResourceClient { Task GetAsync(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default); + + IObservable> Watch(string serviceName, string kubeNamespace = null, CancellationToken cancellationToken = default); } diff --git a/src/Ocelot.Provider.Kubernetes/Kube.cs b/src/Ocelot.Provider.Kubernetes/Kube.cs index 6ae36bb71..e52c4c9e4 100644 --- a/src/Ocelot.Provider.Kubernetes/Kube.cs +++ b/src/Ocelot.Provider.Kubernetes/Kube.cs @@ -47,7 +47,7 @@ public virtual async Task> GetAsync() } private Task GetEndpoint() => _kubeApi - .ResourceClient(client => new EndPointClientV1(client)) + .EndpointsV1() .GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace); private bool CheckErroneousState(EndpointsV1 endpoint) diff --git a/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs new file mode 100644 index 000000000..6afa49927 --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/KubeApiClientExtensions.cs @@ -0,0 +1,9 @@ +using Ocelot.Provider.Kubernetes.Interfaces; + +namespace Ocelot.Provider.Kubernetes; + +public static class KubeApiClientExtensions +{ + public static IEndPointClient EndpointsV1(this IKubeApiClient client) + => client.ResourceClient(x => new EndPointClientV1(x)); +} diff --git a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs index 9f681d7a0..c9f2aa777 100644 --- a/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs +++ b/src/Ocelot.Provider.Kubernetes/KubernetesProviderFactory.cs @@ -1,18 +1,21 @@ -using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection; using Ocelot.Configuration; -using Ocelot.Logging; +using Ocelot.Logging; using Ocelot.Provider.Kubernetes.Interfaces; +using System.Reactive.Concurrency; namespace Ocelot.Provider.Kubernetes { public static class KubernetesProviderFactory // TODO : IServiceDiscoveryProviderFactory - { - /// - /// String constant used for provider type definition. + { + /// + /// String constant used for provider type definition. /// public const string PollKube = nameof(Kubernetes.PollKube); - - public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider; + + public const string WatchKube = nameof(Kubernetes.WatchKube); + + public static ServiceDiscoveryFinderDelegate Get { get; } = CreateProvider; private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provider, ServiceProviderConfiguration config, DownstreamRoute route) { @@ -27,11 +30,16 @@ private static IServiceDiscoveryProvider CreateProvider(IServiceProvider provide Scheme = route.DownstreamScheme, }; + if (WatchKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase)) + { + return new WatchKube(configuration, factory, kubeClient, serviceBuilder, Scheduler.Default); + } + var defaultK8sProvider = new Kube(configuration, factory, kubeClient, serviceBuilder); - - return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase) - ? new PollKube(config.PollingInterval, factory, defaultK8sProvider) - : defaultK8sProvider; + + return PollKube.Equals(config.Type, StringComparison.OrdinalIgnoreCase) + ? new PollKube(config.PollingInterval, factory, defaultK8sProvider) + : defaultK8sProvider; } } } diff --git a/src/Ocelot.Provider.Kubernetes/ObservableExtensions.cs b/src/Ocelot.Provider.Kubernetes/ObservableExtensions.cs new file mode 100644 index 000000000..7d3db03a5 --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/ObservableExtensions.cs @@ -0,0 +1,27 @@ +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace Ocelot.Provider.Kubernetes +{ + public static class ObservableExtensions + { + public static IObservable RetryAfter(this IObservable source, + TimeSpan dueTime, + IScheduler scheduler) + { + return RepeatInfinite(source, dueTime, scheduler).Catch(); + } + + private static IEnumerable> RepeatInfinite(IObservable source, + TimeSpan dueTime, + IScheduler scheduler) + { + yield return source; + + while (true) + { + yield return source.DelaySubscription(dueTime, scheduler); + } + } + } +} diff --git a/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj b/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj index ba6d0b0b4..d07ad7afa 100644 --- a/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj +++ b/src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj @@ -37,4 +37,7 @@ + + + diff --git a/src/Ocelot.Provider.Kubernetes/WatchKube.cs b/src/Ocelot.Provider.Kubernetes/WatchKube.cs new file mode 100644 index 000000000..3a1cdf17b --- /dev/null +++ b/src/Ocelot.Provider.Kubernetes/WatchKube.cs @@ -0,0 +1,88 @@ +using KubeClient.Models; +using Ocelot.Logging; +using Ocelot.Provider.Kubernetes.Interfaces; +using Ocelot.Values; +using System.Reactive.Concurrency; +using System.Reactive.Linq; + +namespace Ocelot.Provider.Kubernetes; + +public class WatchKube : IServiceDiscoveryProvider, IDisposable +{ + internal const int FailedSubscriptionRetrySeconds = 5; + internal const int FirstResultsFetchingTimeoutSeconds = 3; + + private readonly KubeRegistryConfiguration _configuration; + private readonly IOcelotLogger _logger; + private readonly IKubeApiClient _kubeApi; + private readonly IKubeServiceBuilder _serviceBuilder; + private readonly IScheduler _scheduler; + + private readonly IDisposable _subscription; + private readonly TaskCompletionSource _firstResultsCompletionSource; + + private List _services = new(); + + public WatchKube( + KubeRegistryConfiguration configuration, + IOcelotLoggerFactory factory, + IKubeApiClient kubeApi, + IKubeServiceBuilder serviceBuilder, + IScheduler scheduler) + { + _configuration = configuration; + _logger = factory.CreateLogger(); + _kubeApi = kubeApi; + _serviceBuilder = serviceBuilder; + _scheduler = scheduler; + + _firstResultsCompletionSource = new TaskCompletionSource(); + SetFirstResultsCompletedAfterDelay(); + _subscription = CreateSubscription(); + } + + public virtual async Task> GetAsync() + { + // wait for first results fetching + await _firstResultsCompletionSource.Task; + + if (_services is not { Count: > 0 }) + { + _logger.LogWarning(() => GetMessage("Subscription to service endpoints gave no results!")); + } + + return _services; + } + + private void SetFirstResultsCompletedAfterDelay() => Observable + .Timer(TimeSpan.FromSeconds(FirstResultsFetchingTimeoutSeconds), _scheduler) + .Subscribe(_ => _firstResultsCompletionSource.TrySetResult()); + + private IDisposable CreateSubscription() => + _kubeApi + .EndpointsV1() + .Watch(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace) + .Do(_ => { }, ex => _logger.LogError(() => GetMessage("Endpoints subscription error occured."), ex)) + .RetryAfter(TimeSpan.FromSeconds(FailedSubscriptionRetrySeconds), _scheduler) + .Subscribe( + onNext: endpointEvent => + { + _services = endpointEvent.EventType switch + { + ResourceEventType.Deleted or ResourceEventType.Error => new(), + _ when (endpointEvent.Resource?.Subsets?.Count ?? 0) == 0 => new(), + _ => _serviceBuilder.BuildServices(_configuration, endpointEvent.Resource).ToList(), + }; + _firstResultsCompletionSource.TrySetResult(); + }, + onCompleted: () => + { + // called only when subscription canceled in Dispose + _logger.LogInformation(() => GetMessage("Subscription to service endpoints completed")); + }); + + private string GetMessage(string message) + => $"{nameof(WatchKube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}"; + + public void Dispose() => _subscription.Dispose(); +} diff --git a/test/Ocelot.AcceptanceTests/ServiceDiscovery/KubernetesServiceDiscoveryTests.cs b/test/Ocelot.AcceptanceTests/ServiceDiscovery/KubernetesServiceDiscoveryTests.cs index 7350d5ba7..7110af769 100644 --- a/test/Ocelot.AcceptanceTests/ServiceDiscovery/KubernetesServiceDiscoveryTests.cs +++ b/test/Ocelot.AcceptanceTests/ServiceDiscovery/KubernetesServiceDiscoveryTests.cs @@ -44,8 +44,10 @@ public override void Dispose() base.Dispose(); } - [Fact] - public void ShouldReturnServicesFromK8s() + [Theory] + [InlineData(nameof(Kube))] + [InlineData(nameof(WatchKube))] + public void ShouldReturnServicesFromK8s(string discoveryType) { const string namespaces = nameof(KubernetesServiceDiscoveryTests); const string serviceName = nameof(ShouldReturnServicesFromK8s); @@ -55,7 +57,7 @@ public void ShouldReturnServicesFromK8s() var subsetV1 = GivenSubsetAddress(downstream); var endpoints = GivenEndpoints(subsetV1); var route = GivenRouteWithServiceName(namespaces); - var configuration = GivenKubeConfiguration(namespaces, route); + var configuration = GivenKubeConfiguration(namespaces, discoveryType, route); var downstreamResponse = serviceName; this.Given(x => GivenServiceInstanceIsRunning(downstreamUrl, downstreamResponse)) .And(x => x.GivenThereIsAFakeKubernetesProvider(endpoints, serviceName, namespaces)) @@ -95,7 +97,7 @@ public void ShouldReturnServicesByPortNameAsDownstreamScheme(string downstreamSc route.DownstreamScheme = downstreamScheme; // !!! Warning !!! Select port by name as scheme route.UpstreamPathTemplate = "/api/example/{url}"; route.ServiceName = serviceName; // "example-web" - var configuration = GivenKubeConfiguration(namespaces, route); + var configuration = GivenKubeConfiguration(namespaces, nameof(Kube), route); this.Given(x => GivenServiceInstanceIsRunning(downstreamUrl, nameof(ShouldReturnServicesByPortNameAsDownstreamScheme))) .And(x => x.GivenThereIsAFakeKubernetesProvider(endpoints, serviceName, namespaces)) @@ -173,7 +175,7 @@ public void ShouldHighlyLoadOnUnstableKubeProvider_WithRoundRobinLoadBalancing(i downstreams.ForEach(ds => GivenSubsetAddress(ds, subset)); var endpoints = GivenEndpoints(subset, serviceName); // totalServices service instances with different ports var route = GivenRouteWithServiceName(namespaces, serviceName, nameof(RoundRobinAnalyzer)); // !!! - var configuration = GivenKubeConfiguration(namespaces, route); + var configuration = GivenKubeConfiguration(namespaces, nameof(Kube), route); GivenMultipleServiceInstancesAreRunning(downstreamUrls, downstreamResponses); GivenThereIsAConfiguration(configuration); GivenOcelotIsRunningWithServices(WithKubernetesAndRoundRobin); @@ -245,7 +247,7 @@ private FileRoute GivenRouteWithServiceName(string serviceNamespace, LoadBalancerOptions = new() { Type = loadBalancerType }, }; - private FileConfiguration GivenKubeConfiguration(string serviceNamespace, params FileRoute[] routes) + private FileConfiguration GivenKubeConfiguration(string serviceNamespace, string type, params FileRoute[] routes) { var u = new Uri(_kubernetesUrl); var configuration = GivenConfiguration(routes); @@ -254,7 +256,7 @@ private FileConfiguration GivenKubeConfiguration(string serviceNamespace, params Scheme = u.Scheme, Host = u.Host, Port = u.Port, - Type = nameof(Kube), + Type = type, PollingInterval = 0, Namespace = serviceNamespace, }; @@ -305,15 +307,34 @@ private void GivenThereIsAFakeKubernetesProvider(EndpointsV1 endpoints, bool isS context.Response.Headers.Append("Content-Type", "application/json"); await context.Response.WriteAsync(json); } + + if (context.Request.Path.Value == $"/api/v1/watch/namespaces/{namespaces}/endpoints/{serviceName}") + { + var json = JsonConvert.SerializeObject(new ResourceEventV1() + { + EventType = ResourceEventType.Added, + Resource = endpoints, + }); + + if (context.Request.Headers.TryGetValue("Authorization", out var values)) + { + _receivedToken = values.First(); + } + + context.Response.StatusCode = 200; + context.Response.Headers.Append("Content-Type", "application/json"); + + await using var sw = new StreamWriter(context.Response.Body); + await sw.WriteLineAsync(json); + await sw.FlushAsync(); + + // keeping open connection like kube api will slow down tests + } }); } - private static ServiceDescriptor GetValidateScopesDescriptor() - => ServiceDescriptor.Singleton>( - new DefaultServiceProviderFactory(new() { ValidateScopes = true })); private IOcelotBuilder AddKubernetes(IServiceCollection services) => services .Configure(_kubeClientOptionsConfigure) - .Replace(GetValidateScopesDescriptor()) .AddOcelot().AddKubernetes(false); private void WithKubernetes(IServiceCollection services) => AddKubernetes(services); diff --git a/test/Ocelot.UnitTests/Kubernetes/WatchKubeTests.cs b/test/Ocelot.UnitTests/Kubernetes/WatchKubeTests.cs new file mode 100644 index 000000000..1bb509c7d --- /dev/null +++ b/test/Ocelot.UnitTests/Kubernetes/WatchKubeTests.cs @@ -0,0 +1,166 @@ +using KubeClient; +using KubeClient.Models; +using Microsoft.Reactive.Testing; +using Ocelot.Logging; +using Ocelot.Provider.Kubernetes; +using Ocelot.Provider.Kubernetes.Interfaces; +using Ocelot.Values; +using System.Reactive.Linq; + +namespace Ocelot.UnitTests.Kubernetes; + +public class WatchKubeTests +{ + private readonly Mock _loggerFactoryMock = new(); + private readonly Mock _logger = new(); + private readonly Mock _kubeApiClientMock = new(); + private readonly Mock _endpointClient = new(); + private readonly Mock _serviceBuilderMock = new(); + private readonly TestScheduler _testScheduler = new(); + + private readonly KubeRegistryConfiguration _config = new() + { + KubeNamespace = "dummy-namespace", KeyOfServiceInK8s = "dummy-service", + }; + + public WatchKubeTests() + { + _loggerFactoryMock + .Setup(x => x.CreateLogger()) + .Returns(_logger.Object); + + _kubeApiClientMock.Setup(x => + x.ResourceClient(It.IsAny>())) + .Returns(_endpointClient.Object); + + _serviceBuilderMock + .Setup(x => x.BuildServices(It.IsAny(), It.IsAny())) + .Returns((KubeRegistryConfiguration config, EndpointsV1 endpoints) => + { + return endpoints.Subsets.Select((x, i) => new Service( + config.KeyOfServiceInK8s, + new ServiceHostAndPort(x.Addresses[i].Hostname, x.Ports[i].Port!.Value), + i.ToString(), + endpoints.ApiVersion, + Enumerable.Empty())); + }); + } + + [Theory] + [InlineData(ResourceEventType.Added, 1)] + [InlineData(ResourceEventType.Modified, 1)] + [InlineData(ResourceEventType.Bookmark, 1)] + [InlineData(ResourceEventType.Error, 0)] + [InlineData(ResourceEventType.Deleted, 0)] + [Trait("Feat ", "2168")] + public async Task GetAsync_EndpointsEventObserved_ServicesReturned(ResourceEventType eventType, + int expectedServicesCount) + { + // Arrange + var eventDelay = TimeSpan.FromMilliseconds(Random.Shared.Next(1, (WatchKube.FirstResultsFetchingTimeoutSeconds * 1000) - 1)); + var endpointsObservable = CreateOneEvent(eventType).ToObservable().Delay(eventDelay, _testScheduler); + _endpointClient + .Setup(x => x.Watch( + It.Is(s => s == _config.KeyOfServiceInK8s), + It.IsAny(), + It.IsAny())) + .Returns(endpointsObservable); + + // Act + var watchKube = CreateWatchKube(); + _testScheduler.AdvanceBy(eventDelay.Ticks); + var services = await watchKube.GetAsync(); + + // Assert + services.Count.ShouldBe(expectedServicesCount); + } + + [Fact] + [Trait("Feat ", "2168")] + public async Task GetAsync_NoEventsAfterTimeout_EmptyServicesReturned() + { + // Arrange + _endpointClient + .Setup(x => x.Watch( + It.Is(s => s == _config.KeyOfServiceInK8s), + It.IsAny(), + It.IsAny())) + .Returns(Observable.Create>(_ => Mock.Of())); + + // Act + var watchKube = CreateWatchKube(); + _testScheduler.Start(); + var services = await watchKube.GetAsync(); + + // Assert + services.ShouldBeEmpty(); + _testScheduler.Clock.ShouldBe(TimeSpan.FromSeconds(WatchKube.FirstResultsFetchingTimeoutSeconds).Ticks); + _logger.Verify(x => x.LogWarning(It.IsAny>())); + } + + [Fact] + [Trait("Feat ", "2168")] + public async Task GetAsync_WatchFailed_RetriedAfterDelay() + { + // Arrange + var subscriptionAttempts = 0; + var observable = Observable.Create>(observer => + { + if (subscriptionAttempts == 0) + { + observer.OnError(new HttpRequestException("Error occured in first watch request")); + } + else + { + observer.OnNext(CreateOneEvent(ResourceEventType.Added).First()); + } + + subscriptionAttempts++; + return Mock.Of(); + }); + _endpointClient + .Setup(x => x.Watch( + It.Is(s => s == _config.KeyOfServiceInK8s), + It.IsAny(), + It.IsAny())) + .Returns(observable); + + // Act + var watchKube = CreateWatchKube(); + _testScheduler.Start(); + var services = await watchKube.GetAsync(); + + // Assert + services.Count.ShouldBe(1); + subscriptionAttempts.ShouldBe(2); + _testScheduler.Clock.ShouldBe(TimeSpan.FromSeconds(WatchKube.FailedSubscriptionRetrySeconds).Ticks); + _logger.Verify(x => x.LogError(It.IsAny>(), It.IsAny())); + } + + private WatchKube CreateWatchKube() => new(_config, + _loggerFactoryMock.Object, + _kubeApiClientMock.Object, + _serviceBuilderMock.Object, + _testScheduler); + + private IResourceEventV1[] CreateOneEvent(ResourceEventType eventType) + { + var resourceEvent = new ResourceEventV1() { EventType = eventType, Resource = CreateEndpoints(), }; + return new IResourceEventV1[] { resourceEvent }; + } + + private EndpointsV1 CreateEndpoints() + { + var endpoints = new EndpointsV1 + { + Kind = "endpoint", + ApiVersion = "1.0", + Metadata = new ObjectMetaV1 { Name = _config.KeyOfServiceInK8s, Namespace = _config.KubeNamespace, }, + }; + var subset = new EndpointSubsetV1(); + subset.Addresses.Add(new EndpointAddressV1 { Ip = "127.0.0.1", Hostname = "localhost" }); + subset.Ports.Add(new EndpointPortV1 { Port = 80 }); + endpoints.Subsets.Add(subset); + return endpoints; + } +} diff --git a/test/Ocelot.UnitTests/Ocelot.UnitTests.csproj b/test/Ocelot.UnitTests/Ocelot.UnitTests.csproj index 909c6081b..796fa9df9 100644 --- a/test/Ocelot.UnitTests/Ocelot.UnitTests.csproj +++ b/test/Ocelot.UnitTests/Ocelot.UnitTests.csproj @@ -50,6 +50,7 @@ +