Skip to content

Commit d2d438c

Browse files
authored
Merge branch 'ThreeMammals:develop' into feat_enableHttp2
2 parents 5ba1f47 + 19a8e2f commit d2d438c

25 files changed

+1970
-1378
lines changed
+24-16
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
11
using KubeClient.Models;
2+
using Ocelot.Infrastructure.DesignPatterns;
23
using Ocelot.Logging;
34
using Ocelot.Provider.Kubernetes.Interfaces;
45
using Ocelot.Values;
56

67
namespace Ocelot.Provider.Kubernetes;
78

8-
/// <summary>
9-
/// Default Kubernetes service discovery provider.
10-
/// </summary>
9+
/// <summary>Default Kubernetes service discovery provider.</summary>
10+
/// <remarks>
11+
/// <list type="bullet">
12+
/// <item>NuGet: <see href="https://www.nuget.org/packages/KubeClient">KubeClient</see></item>
13+
/// <item>GitHub: <see href="https://github.com/tintoy/dotnet-kube-client">dotnet-kube-client</see></item>
14+
/// </list>
15+
/// </remarks>
1116
public class Kube : IServiceDiscoveryProvider
1217
{
1318
private readonly KubeRegistryConfiguration _configuration;
1419
private readonly IOcelotLogger _logger;
1520
private readonly IKubeApiClient _kubeApi;
1621
private readonly IKubeServiceBuilder _serviceBuilder;
17-
private readonly List<Service> _services;
1822

1923
public Kube(
2024
KubeRegistryConfiguration configuration,
@@ -26,28 +30,32 @@ public Kube(
2630
_logger = factory.CreateLogger<Kube>();
2731
_kubeApi = kubeApi;
2832
_serviceBuilder = serviceBuilder;
29-
_services = new();
3033
}
3134

3235
public virtual async Task<List<Service>> GetAsync()
3336
{
34-
var endpoint = await _kubeApi
35-
.ResourceClient(client => new EndPointClientV1(client))
36-
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
37+
var endpoint = await Retry.OperationAsync(GetEndpoint, CheckErroneousState, logger: _logger);
3738

38-
_services.Clear();
39-
if (endpoint?.Subsets.Count != 0)
39+
if (CheckErroneousState(endpoint))
4040
{
41-
_services.AddRange(BuildServices(_configuration, endpoint));
42-
}
43-
else
44-
{
45-
_logger.LogWarning(() => $"K8s Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; Unable to use: it is invalid. Address must contain host only e.g. localhost and port must be greater than 0!");
41+
_logger.LogWarning(() => GetMessage($"Unable to use bad result returned by {nameof(Kube)} integration endpoint because the final result is invalid/unknown after multiple retries!"));
42+
return new(0);
4643
}
4744

48-
return _services;
45+
return BuildServices(_configuration, endpoint)
46+
.ToList();
4947
}
5048

49+
private Task<EndpointsV1> GetEndpoint() => _kubeApi
50+
.ResourceClient(client => new EndPointClientV1(client))
51+
.GetAsync(_configuration.KeyOfServiceInK8s, _configuration.KubeNamespace);
52+
53+
private bool CheckErroneousState(EndpointsV1 endpoint)
54+
=> (endpoint?.Subsets?.Count ?? 0) == 0; // null or count is zero
55+
56+
private string GetMessage(string message)
57+
=> $"{nameof(Kube)} provider. Namespace:{_configuration.KubeNamespace}, Service:{_configuration.KeyOfServiceInK8s}; {message}";
58+
5159
protected virtual IEnumerable<Service> BuildServices(KubeRegistryConfiguration configuration, EndpointsV1 endpoint)
5260
=> _serviceBuilder.BuildServices(configuration, endpoint);
5361
}

src/Ocelot.Provider.Kubernetes/KubeServiceCreator.cs

+4-4
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,10 @@ namespace Ocelot.Provider.Kubernetes;
77

88
public class KubeServiceCreator : IKubeServiceCreator
99
{
10-
private readonly IOcelotLogger _logger;
11-
1210
public KubeServiceCreator(IOcelotLoggerFactory factory)
1311
{
1412
ArgumentNullException.ThrowIfNull(factory);
15-
_logger = factory.CreateLogger<KubeServiceCreator>();
13+
Logger = factory.CreateLogger<KubeServiceCreator>();
1614
}
1715

1816
public virtual IEnumerable<Service> Create(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset)
@@ -34,6 +32,8 @@ public virtual IEnumerable<Service> CreateInstance(KubeRegistryConfiguration con
3432
return new Service[] { instance };
3533
}
3634

35+
protected IOcelotLogger Logger { get; }
36+
3737
protected virtual string GetServiceName(KubeRegistryConfiguration configuration, EndpointsV1 endpoint, EndpointSubsetV1 subset, EndpointAddressV1 address)
3838
=> endpoint.Metadata?.Name;
3939

@@ -46,7 +46,7 @@ protected virtual ServiceHostAndPort GetServiceHostAndPort(KubeRegistryConfigura
4646
: ports.FirstOrDefault(portNameToScheme);
4747
portV1 ??= new();
4848
portV1.Name ??= configuration.Scheme ?? string.Empty;
49-
_logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
49+
Logger.LogDebug(() => $"K8s service with key '{configuration.KeyOfServiceInK8s}' and address {address.Ip}; Detected port is {portV1.Name}:{portV1.Port}. Total {ports.Count} ports of [{string.Join(',', ports.Select(p => p.Name))}].");
5050
return new ServiceHostAndPort(address.Ip, portV1.Port, portV1.Name);
5151
}
5252

src/Ocelot.Provider.Kubernetes/Ocelot.Provider.Kubernetes.csproj

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
<Compile Remove="KubeApiClientFactory.cs" />
3030
</ItemGroup>
3131
<ItemGroup>
32-
<PackageReference Include="KubeClient" Version="2.4.10" />
33-
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.4.10" />
32+
<PackageReference Include="KubeClient" Version="2.5.8" />
33+
<PackageReference Include="KubeClient.Extensions.DependencyInjection" Version="2.5.8" />
3434
<PackageReference Include="StyleCop.Analyzers" Version="1.2.0-beta.507">
3535
<PrivateAssets>all</PrivateAssets>
3636
</PackageReference>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
using Ocelot.Logging;
2+
3+
namespace Ocelot.Infrastructure.DesignPatterns;
4+
5+
/// <summary>
6+
/// Basic <seealso href="https://www.bing.com/search?q=Retry+pattern">Retry pattern</seealso> for stabilizing integrated services.
7+
/// </summary>
8+
/// <remarks>Docs:
9+
/// <list type="bullet">
10+
/// <item><see href="https://learn.microsoft.com/en-us/azure/architecture/patterns/retry">Microsoft Learn | Retry pattern</see></item>
11+
/// </list>
12+
/// </remarks>
13+
public static class Retry
14+
{
15+
public const int DefaultRetryTimes = 3;
16+
public const int DefaultWaitTimeMilliseconds = 25;
17+
18+
private static string GetMessage<T>(T operation, int retryNo, string message)
19+
where T : Delegate
20+
=> $"Ocelot {nameof(Retry)} strategy for the operation of '{operation.GetType()}' type -> {nameof(Retry)} No {retryNo}: {message}";
21+
22+
/// <summary>
23+
/// Retry a synchronous operation when an exception occurs or predicate is true, then delay and retry again.
24+
/// </summary>
25+
/// <typeparam name="TResult">Type of the result of the sync operation.</typeparam>
26+
/// <param name="operation">Required Func-delegate of the operation.</param>
27+
/// <param name="predicate">Predicate to check, optionally.</param>
28+
/// <param name="retryTimes">Number of retries.</param>
29+
/// <param name="waitTime">Waiting time in milliseconds.</param>
30+
/// <param name="logger">Concrete logger from upper context.</param>
31+
/// <returns>A <typeparamref name="TResult"/> value as the result of the sync operation.</returns>
32+
public static TResult Operation<TResult>(
33+
Func<TResult> operation,
34+
Predicate<TResult> predicate = null,
35+
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds,
36+
IOcelotLogger logger = null)
37+
{
38+
for (int n = 1; n < retryTimes; n++)
39+
{
40+
TResult result;
41+
try
42+
{
43+
result = operation.Invoke();
44+
}
45+
catch (Exception e)
46+
{
47+
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
48+
Thread.Sleep(waitTime);
49+
continue; // the result is unknown, so continue to retry
50+
}
51+
52+
// Apply predicate for known result
53+
if (predicate?.Invoke(result) == true)
54+
{
55+
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
56+
Thread.Sleep(waitTime);
57+
continue; // on erroneous state
58+
}
59+
60+
// Happy path
61+
return result;
62+
}
63+
64+
// Last retry should generate native exception or other erroneous state(s)
65+
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
66+
return operation.Invoke(); // also final result must be analyzed in the upper context
67+
}
68+
69+
/// <summary>
70+
/// Retry an asynchronous operation when an exception occurs or predicate is true, then delay and retry again.
71+
/// </summary>
72+
/// <typeparam name="TResult">Type of the result of the async operation.</typeparam>
73+
/// <param name="operation">Required Func-delegate of the operation.</param>
74+
/// <param name="predicate">Predicate to check, optionally.</param>
75+
/// <param name="retryTimes">Number of retries.</param>
76+
/// <param name="waitTime">Waiting time in milliseconds.</param>
77+
/// <param name="logger">Concrete logger from upper context.</param>
78+
/// <returns>A <typeparamref name="TResult"/> value as the result of the async operation.</returns>
79+
public static async Task<TResult> OperationAsync<TResult>(
80+
Func<Task<TResult>> operation, // required operation delegate
81+
Predicate<TResult> predicate = null, // optional retry predicate for the result
82+
int retryTimes = DefaultRetryTimes, int waitTime = DefaultWaitTimeMilliseconds, // retrying options
83+
IOcelotLogger logger = null) // static injections
84+
{
85+
for (int n = 1; n < retryTimes; n++)
86+
{
87+
TResult result;
88+
try
89+
{
90+
result = await operation?.Invoke();
91+
}
92+
catch (Exception e)
93+
{
94+
logger?.LogError(() => GetMessage(operation, n, $"Caught exception of the {e.GetType()} type -> Message: {e.Message}."), e);
95+
await Task.Delay(waitTime);
96+
continue; // the result is unknown, so continue to retry
97+
}
98+
99+
// Apply predicate for known result
100+
if (predicate?.Invoke(result) == true)
101+
{
102+
logger?.LogWarning(() => GetMessage(operation, n, $"The predicate has identified erroneous state in the returned result. For further details, implement logging of the result's value or properties within the predicate method."));
103+
await Task.Delay(waitTime);
104+
continue; // on erroneous state
105+
}
106+
107+
// Happy path
108+
return result;
109+
}
110+
111+
// Last retry should generate native exception or other erroneous state(s)
112+
logger?.LogDebug(() => GetMessage(operation, retryTimes, $"Retrying lastly..."));
113+
return await operation?.Invoke(); // also final result must be analyzed in the upper context
114+
}
115+
}
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,83 @@
11
using Microsoft.AspNetCore.Http;
22
using Ocelot.Infrastructure;
3+
using Ocelot.Middleware;
34
using Ocelot.Responses;
45
using Ocelot.Values;
56

6-
namespace Ocelot.LoadBalancer.LoadBalancers
7+
namespace Ocelot.LoadBalancer.LoadBalancers;
8+
9+
public class CookieStickySessions : ILoadBalancer
710
{
8-
public class CookieStickySessions : ILoadBalancer
11+
private readonly int _keyExpiryInMs;
12+
private readonly string _cookieName;
13+
private readonly ILoadBalancer _loadBalancer;
14+
private readonly IBus<StickySession> _bus;
15+
16+
private static readonly object Locker = new();
17+
private static readonly Dictionary<string, StickySession> Stored = new(); // TODO Inject instead of static sharing
18+
19+
public CookieStickySessions(ILoadBalancer loadBalancer, string cookieName, int keyExpiryInMs, IBus<StickySession> bus)
920
{
10-
private readonly int _keyExpiryInMs;
11-
private readonly string _key;
12-
private readonly ILoadBalancer _loadBalancer;
13-
private readonly ConcurrentDictionary<string, StickySession> _stored;
14-
private readonly IBus<StickySession> _bus;
15-
private readonly object _lock = new();
21+
_bus = bus;
22+
_cookieName = cookieName;
23+
_keyExpiryInMs = keyExpiryInMs;
24+
_loadBalancer = loadBalancer;
25+
_bus.Subscribe(CheckExpiry);
26+
}
1627

17-
public CookieStickySessions(ILoadBalancer loadBalancer, string key, int keyExpiryInMs, IBus<StickySession> bus)
28+
private void CheckExpiry(StickySession sticky)
29+
{
30+
// TODO Get test coverage for this
31+
lock (Locker)
1832
{
19-
_bus = bus;
20-
_key = key;
21-
_keyExpiryInMs = keyExpiryInMs;
22-
_loadBalancer = loadBalancer;
23-
_stored = new ConcurrentDictionary<string, StickySession>();
24-
_bus.Subscribe(ss =>
33+
if (!Stored.TryGetValue(sticky.Key, out var session) || session.Expiry >= DateTime.UtcNow)
2534
{
26-
//todo - get test coverage for this.
27-
if (_stored.TryGetValue(ss.Key, out var stickySession))
28-
{
29-
lock (_lock)
30-
{
31-
if (stickySession.Expiry < DateTime.UtcNow)
32-
{
33-
_stored.TryRemove(stickySession.Key, out _);
34-
_loadBalancer.Release(stickySession.HostAndPort);
35-
}
36-
}
37-
}
38-
});
35+
return;
36+
}
37+
38+
Stored.Remove(session.Key);
39+
_loadBalancer.Release(session.HostAndPort);
3940
}
41+
}
4042

41-
public async Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
43+
public Task<Response<ServiceHostAndPort>> Lease(HttpContext httpContext)
44+
{
45+
var route = httpContext.Items.DownstreamRoute();
46+
var serviceName = route.LoadBalancerKey;
47+
var cookie = httpContext.Request.Cookies[_cookieName];
48+
var key = $"{serviceName}:{cookie}"; // strong key name because of static store
49+
lock (Locker)
4250
{
43-
var key = httpContext.Request.Cookies[_key];
44-
45-
lock (_lock)
51+
if (!string.IsNullOrEmpty(key) && Stored.TryGetValue(key, out StickySession cached))
4652
{
47-
if (!string.IsNullOrEmpty(key) && _stored.ContainsKey(key))
48-
{
49-
var cached = _stored[key];
50-
51-
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
52-
53-
_stored[key] = updated;
54-
55-
_bus.Publish(updated, _keyExpiryInMs);
56-
57-
return new OkResponse<ServiceHostAndPort>(updated.HostAndPort);
58-
}
53+
var updated = new StickySession(cached.HostAndPort, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
54+
Update(key, updated);
55+
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(updated.HostAndPort));
5956
}
6057

61-
var next = await _loadBalancer.Lease(httpContext);
62-
58+
// There is no value in the store, so lease it now!
59+
var next = _loadBalancer.Lease(httpContext).GetAwaiter().GetResult(); // unfortunately the operation must be synchronous
6360
if (next.IsError)
6461
{
65-
return new ErrorResponse<ServiceHostAndPort>(next.Errors);
66-
}
67-
68-
lock (_lock)
69-
{
70-
if (!string.IsNullOrEmpty(key) && !_stored.ContainsKey(key))
71-
{
72-
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
73-
_stored[key] = ss;
74-
_bus.Publish(ss, _keyExpiryInMs);
75-
}
62+
return Task.FromResult<Response<ServiceHostAndPort>>(new ErrorResponse<ServiceHostAndPort>(next.Errors));
7663
}
7764

78-
return new OkResponse<ServiceHostAndPort>(next.Data);
65+
var ss = new StickySession(next.Data, DateTime.UtcNow.AddMilliseconds(_keyExpiryInMs), key);
66+
Update(key, ss);
67+
return Task.FromResult<Response<ServiceHostAndPort>>(new OkResponse<ServiceHostAndPort>(next.Data));
7968
}
69+
}
8070

81-
public void Release(ServiceHostAndPort hostAndPort)
71+
protected void Update(string key, StickySession value)
72+
{
73+
lock (Locker)
8274
{
75+
Stored[key] = value;
76+
_bus.Publish(value, _keyExpiryInMs);
8377
}
8478
}
79+
80+
public void Release(ServiceHostAndPort hostAndPort)
81+
{
82+
}
8583
}

src/Ocelot/LoadBalancer/LoadBalancers/CookieStickySessionsCreator.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@ public class CookieStickySessionsCreator : ILoadBalancerCreator
99
{
1010
public Response<ILoadBalancer> Create(DownstreamRoute route, IServiceDiscoveryProvider serviceProvider)
1111
{
12-
var loadBalancer = new RoundRobin(async () => await serviceProvider.GetAsync());
12+
var options = route.LoadBalancerOptions;
13+
var loadBalancer = new RoundRobin(serviceProvider.GetAsync, route.LoadBalancerKey);
1314
var bus = new InMemoryBus<StickySession>();
14-
return new OkResponse<ILoadBalancer>(new CookieStickySessions(loadBalancer, route.LoadBalancerOptions.Key,
15-
route.LoadBalancerOptions.ExpiryInMs, bus));
15+
return new OkResponse<ILoadBalancer>(
16+
new CookieStickySessions(loadBalancer, options.Key, options.ExpiryInMs, bus));
1617
}
1718

1819
public string Type => nameof(CookieStickySessions);

0 commit comments

Comments
 (0)