Skip to content

Commit 4f19183

Browse files
authored
fix: fixed mirroring not working (#479)
* refactor: remove unnecessary mediator subscribers registration * refactor: replace IServiceProvider with IKubernetes client in resource mirrors and watchers * refactor: change service registration from transient to singleton for IKubernetes and ReflectorOptions
1 parent f1e2183 commit 4f19183

File tree

8 files changed

+34
-76
lines changed

8 files changed

+34
-76
lines changed

src/ES.Kubernetes.Reflector/Core/ConfigMapMirror.cs

+8-23
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,18 @@
66

77
namespace ES.Kubernetes.Reflector.Core;
88

9-
public class ConfigMapMirror(ILogger<ConfigMapMirror> logger, IServiceProvider serviceProvider)
10-
: ResourceMirror<V1ConfigMap>(logger, serviceProvider)
9+
public class ConfigMapMirror(ILogger<ConfigMapMirror> logger, IKubernetes kubernetesClient)
10+
: ResourceMirror<V1ConfigMap>(logger, kubernetesClient)
1111
{
12-
private readonly IServiceProvider _serviceProvider = serviceProvider;
13-
1412
protected override async Task<V1ConfigMap[]> OnResourceWithNameList(string itemRefName)
1513
{
16-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
17-
return (await client.CoreV1.ListConfigMapForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
14+
return (await KubernetesClient.CoreV1.ListConfigMapForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
1815
.Items
1916
.ToArray();
2017
}
2118

22-
protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
23-
{
24-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
25-
await client.CoreV1.PatchNamespacedConfigMapAsync(patch, refId.Name, refId.Namespace);
26-
}
19+
protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
20+
=> await KubernetesClient.CoreV1.PatchNamespacedConfigMapAsync(patch, refId.Name, refId.Namespace);
2721

2822
protected override Task OnResourceConfigurePatch(V1ConfigMap source, JsonPatchDocument<V1ConfigMap> patchDoc)
2923
{
@@ -33,10 +27,7 @@ protected override Task OnResourceConfigurePatch(V1ConfigMap source, JsonPatchDo
3327
}
3428

3529
protected override async Task OnResourceCreate(V1ConfigMap item, string ns)
36-
{
37-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
38-
await client.CoreV1.CreateNamespacedConfigMapAsync(item, ns);
39-
}
30+
=> await KubernetesClient.CoreV1.CreateNamespacedConfigMapAsync(item, ns);
4031

4132
protected override Task<V1ConfigMap> OnResourceClone(V1ConfigMap sourceResource)
4233
{
@@ -50,14 +41,8 @@ protected override Task<V1ConfigMap> OnResourceClone(V1ConfigMap sourceResource)
5041
}
5142

5243
protected override async Task OnResourceDelete(KubeRef resourceId)
53-
{
54-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
55-
await client.CoreV1.DeleteNamespacedConfigMapAsync(resourceId.Name, resourceId.Namespace);
56-
}
44+
=> await KubernetesClient.CoreV1.DeleteNamespacedConfigMapAsync(resourceId.Name, resourceId.Namespace);
5745

5846
protected override async Task<V1ConfigMap> OnResourceGet(KubeRef refId)
59-
{
60-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
61-
return await client.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace);
62-
}
47+
=> await KubernetesClient.CoreV1.ReadNamespacedConfigMapAsync(refId.Name, refId.Namespace);
6348
}

src/ES.Kubernetes.Reflector/Core/ConfigMapWatcher.cs

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
1111
public class ConfigMapWatcher(
1212
ILogger<ConfigMapWatcher> logger,
1313
IMediator mediator,
14-
IServiceProvider serviceProvider,
14+
IKubernetes kubernetesClient,
1515
IOptionsMonitor<ReflectorOptions> options)
16-
: WatcherBackgroundService<V1ConfigMap, V1ConfigMapList>(logger, mediator, serviceProvider, options)
16+
: WatcherBackgroundService<V1ConfigMap, V1ConfigMapList>(logger, mediator, options)
1717
{
18-
protected override Task<HttpOperationResponse<V1ConfigMapList>> OnGetWatcher(IKubernetes client,
19-
CancellationToken cancellationToken)
18+
protected override Task<HttpOperationResponse<V1ConfigMapList>> OnGetWatcher(CancellationToken cancellationToken)
2019
{
21-
return client.CoreV1.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true,
20+
return kubernetesClient.CoreV1.ListConfigMapForAllNamespacesWithHttpMessagesAsync(watch: true,
2221
timeoutSeconds: WatcherTimeout,
2322
cancellationToken: cancellationToken);
2423
}

src/ES.Kubernetes.Reflector/Core/Mirroring/ResourceMirror.cs

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
namespace ES.Kubernetes.Reflector.Core.Mirroring;
1818

19-
public abstract class ResourceMirror<TResource>(ILogger logger, IServiceProvider serviceProvider) :
19+
public abstract class ResourceMirror<TResource>(ILogger logger, IKubernetes kubernetesClient) :
2020
INotificationHandler<WatcherEvent>,
2121
INotificationHandler<WatcherClosed>
2222
where TResource : class, IKubernetesObject<V1ObjectMeta>
@@ -28,6 +28,7 @@ public abstract class ResourceMirror<TResource>(ILogger logger, IServiceProvider
2828
private readonly ConcurrentDictionary<KubeRef, bool> _notFoundCache = new();
2929
private readonly ConcurrentDictionary<KubeRef, ReflectorProperties> _propertiesCache = new();
3030
protected readonly ILogger Logger = logger;
31+
protected readonly IKubernetes KubernetesClient = kubernetesClient;
3132

3233

3334
/// <summary>
@@ -324,8 +325,7 @@ private async Task AutoReflectionForSource(KubeRef resourceRef, TResource? resou
324325
var autoReflectionList = _autoReflectionCache.GetOrAdd(resourceRef, _ => new List<KubeRef>());
325326

326327
var matches = await OnResourceWithNameList(resourceRef.Name);
327-
using var client = serviceProvider.GetRequiredService<IKubernetes>();
328-
var namespaces = (await client.CoreV1.ListNamespaceAsync(cancellationToken: cancellationToken)).Items;
328+
var namespaces = (await KubernetesClient.CoreV1.ListNamespaceAsync(cancellationToken: cancellationToken)).Items;
329329

330330
foreach (var match in matches)
331331
{

src/ES.Kubernetes.Reflector/Core/NamespaceWatcher.cs

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
1111
public class NamespaceWatcher(
1212
ILogger<NamespaceWatcher> logger,
1313
IMediator mediator,
14-
IServiceProvider serviceProvider,
14+
IKubernetes kubernetesClient,
1515
IOptionsMonitor<ReflectorOptions> options)
16-
: WatcherBackgroundService<V1Namespace, V1NamespaceList>(logger, mediator, serviceProvider, options)
16+
: WatcherBackgroundService<V1Namespace, V1NamespaceList>(logger, mediator, options)
1717
{
18-
protected override Task<HttpOperationResponse<V1NamespaceList>> OnGetWatcher(IKubernetes client,
19-
CancellationToken cancellationToken)
18+
protected override Task<HttpOperationResponse<V1NamespaceList>> OnGetWatcher(CancellationToken cancellationToken)
2019
{
21-
return client.CoreV1.ListNamespaceWithHttpMessagesAsync(watch: true, timeoutSeconds: WatcherTimeout,
20+
return kubernetesClient.CoreV1.ListNamespaceWithHttpMessagesAsync(watch: true, timeoutSeconds: WatcherTimeout,
2221
cancellationToken: cancellationToken);
2322
}
2423
}

src/ES.Kubernetes.Reflector/Core/SecretMirror.cs

+7-22
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,18 @@
66

77
namespace ES.Kubernetes.Reflector.Core;
88

9-
public class SecretMirror(ILogger<SecretMirror> logger, IServiceProvider serviceProvider)
10-
: ResourceMirror<V1Secret>(logger, serviceProvider)
9+
public class SecretMirror(ILogger<SecretMirror> logger, IKubernetes kubernetesClient)
10+
: ResourceMirror<V1Secret>(logger, kubernetesClient)
1111
{
12-
private readonly IServiceProvider _serviceProvider = serviceProvider;
13-
1412
protected override async Task<V1Secret[]> OnResourceWithNameList(string itemRefName)
1513
{
16-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
17-
return (await client.CoreV1.ListSecretForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
14+
return (await KubernetesClient.CoreV1.ListSecretForAllNamespacesAsync(fieldSelector: $"metadata.name={itemRefName}"))
1815
.Items
1916
.ToArray();
2017
}
2118

2219
protected override async Task OnResourceApplyPatch(V1Patch patch, KubeRef refId)
23-
{
24-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
25-
await client.CoreV1.PatchNamespacedSecretWithHttpMessagesAsync(patch, refId.Name, refId.Namespace);
26-
}
20+
=> await KubernetesClient.CoreV1.PatchNamespacedSecretWithHttpMessagesAsync(patch, refId.Name, refId.Namespace);
2721

2822
protected override Task OnResourceConfigurePatch(V1Secret source, JsonPatchDocument<V1Secret> patchDoc)
2923
{
@@ -32,10 +26,7 @@ protected override Task OnResourceConfigurePatch(V1Secret source, JsonPatchDocum
3226
}
3327

3428
protected override async Task OnResourceCreate(V1Secret item, string ns)
35-
{
36-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
37-
await client.CoreV1.CreateNamespacedSecretAsync(item, ns);
38-
}
29+
=> await KubernetesClient.CoreV1.CreateNamespacedSecretAsync(item, ns);
3930

4031
protected override Task<V1Secret> OnResourceClone(V1Secret sourceResource)
4132
{
@@ -49,16 +40,10 @@ protected override Task<V1Secret> OnResourceClone(V1Secret sourceResource)
4940
}
5041

5142
protected override async Task OnResourceDelete(KubeRef resourceId)
52-
{
53-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
54-
await client.CoreV1.DeleteNamespacedSecretAsync(resourceId.Name, resourceId.Namespace);
55-
}
43+
=> await KubernetesClient.CoreV1.DeleteNamespacedSecretAsync(resourceId.Name, resourceId.Namespace);
5644

5745
protected override async Task<V1Secret> OnResourceGet(KubeRef refId)
58-
{
59-
using var client = _serviceProvider.GetRequiredService<IKubernetes>();
60-
return await client.CoreV1.ReadNamespacedSecretAsync(refId.Name, refId.Namespace);
61-
}
46+
=> await KubernetesClient.CoreV1.ReadNamespacedSecretAsync(refId.Name, refId.Namespace);
6247

6348
protected override Task<bool> OnResourceIgnoreCheck(V1Secret item)
6449
{

src/ES.Kubernetes.Reflector/Core/SecretWatcher.cs

+4-5
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,13 @@ namespace ES.Kubernetes.Reflector.Core;
1111
public class SecretWatcher(
1212
ILogger<SecretWatcher> logger,
1313
IMediator mediator,
14-
IServiceProvider serviceProvider,
14+
IKubernetes kubernetesClient,
1515
IOptionsMonitor<ReflectorOptions> options)
16-
: WatcherBackgroundService<V1Secret, V1SecretList>(logger, mediator, serviceProvider, options)
16+
: WatcherBackgroundService<V1Secret, V1SecretList>(logger, mediator, options)
1717
{
18-
protected override Task<HttpOperationResponse<V1SecretList>> OnGetWatcher(IKubernetes client,
19-
CancellationToken cancellationToken)
18+
protected override Task<HttpOperationResponse<V1SecretList>> OnGetWatcher(CancellationToken cancellationToken)
2019
{
21-
return client.CoreV1.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true,
20+
return kubernetesClient.CoreV1.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true,
2221
timeoutSeconds: WatcherTimeout,
2322
cancellationToken: cancellationToken);
2423
}

src/ES.Kubernetes.Reflector/Core/Watchers/WatcherBackgroundService.cs

+2-7
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ namespace ES.Kubernetes.Reflector.Core.Watchers;
1212
public abstract class WatcherBackgroundService<TResource, TResourceList>(
1313
ILogger logger,
1414
IMediator mediator,
15-
IServiceProvider serviceProvider,
1615
IOptionsMonitor<ReflectorOptions> options)
1716
: BackgroundService
1817
where TResource : IKubernetesObject<V1ObjectMeta>
@@ -27,8 +26,6 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
2726
var sessionStopwatch = new Stopwatch();
2827
while (!stoppingToken.IsCancellationRequested)
2928
{
30-
await using var scope = serviceProvider.CreateAsyncScope();
31-
3229
var sessionFaulted = false;
3330
sessionStopwatch.Restart();
3431

@@ -40,9 +37,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
4037

4138
using var absoluteTimeoutCts = new CancellationTokenSource(TimeSpan.FromSeconds(WatcherTimeout + 3));
4239
using var cancellationCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, absoluteTimeoutCts.Token);
43-
using var client = scope.ServiceProvider.GetRequiredService<IKubernetes>();
4440

45-
using var watcher = OnGetWatcher(client, stoppingToken);
41+
using var watcher = OnGetWatcher(stoppingToken);
4642
var watchList = watcher.WatchAsync<TResource, TResourceList>(cancellationToken: cancellationCts.Token);
4743

4844
await foreach (var (type, item) in watchList)
@@ -79,6 +75,5 @@ await Mediator.Publish(new WatcherClosed
7975
}
8076
}
8177

82-
protected abstract Task<HttpOperationResponse<TResourceList>> OnGetWatcher(IKubernetes client,
83-
CancellationToken cancellationToken);
78+
protected abstract Task<HttpOperationResponse<TResourceList>> OnGetWatcher(CancellationToken cancellationToken);
8479
}

src/ES.Kubernetes.Reflector/Program.cs

+2-6
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
builder.Services.Configure<ReflectorOptions>(builder.Configuration.GetSection(nameof(ES.Kubernetes.Reflector)));
2525

26-
builder.Services.AddTransient(s =>
26+
builder.Services.AddSingleton(s =>
2727
{
2828
var reflectorOptions = s.GetRequiredService<IOptions<ReflectorOptions>>();
2929

@@ -35,17 +35,13 @@
3535
});
3636

3737

38-
builder.Services.AddTransient<IKubernetes>(s =>
38+
builder.Services.AddSingleton<IKubernetes>(s =>
3939
new Kubernetes(s.GetRequiredService<KubernetesClientConfiguration>()));
4040

4141
builder.Services.AddHostedService<NamespaceWatcher>();
4242
builder.Services.AddHostedService<SecretWatcher>();
4343
builder.Services.AddHostedService<ConfigMapWatcher>();
4444

45-
builder.Services.AddSingleton<SecretMirror>();
46-
builder.Services.AddSingleton<ConfigMapMirror>();
47-
48-
4945
var app = builder.Build();
5046
app.Ignite();
5147
await app.RunAsync();

0 commit comments

Comments
 (0)