diff --git a/Jellyfin.Plugin.Webhook/Notifiers/ItemAddedNotifier/ItemAddedManager.cs b/Jellyfin.Plugin.Webhook/Notifiers/ItemAddedNotifier/ItemAddedManager.cs index c38135f..9d689e3 100644 --- a/Jellyfin.Plugin.Webhook/Notifiers/ItemAddedNotifier/ItemAddedManager.cs +++ b/Jellyfin.Plugin.Webhook/Notifiers/ItemAddedNotifier/ItemAddedManager.cs @@ -1,6 +1,10 @@ using System; using System.Collections.Concurrent; +using System.Linq; +using System.Text.Json; +using System.Threading; using System.Threading.Tasks; +using Jellyfin.Extensions; using Jellyfin.Plugin.Webhook.Destinations; using Jellyfin.Plugin.Webhook.Helpers; using Jellyfin.Plugin.Webhook.Models; @@ -20,6 +24,7 @@ public class ItemAddedManager : IItemAddedManager private readonly ILibraryManager _libraryManager; private readonly IServerApplicationHost _applicationHost; private readonly ConcurrentDictionary _itemProcessQueue; + private static Semaphore _lock = new(initialCount: 0, maximumCount: 1); /// /// Initializes a new instance of the class. @@ -41,9 +46,22 @@ public ItemAddedManager( /// public async Task ProcessItemsAsync() { + // Wait until a semaphore task is available + // meaning our queue isn't waiting for a task. + _lock.WaitOne(); + _logger.LogDebug("ProcessItemsAsync"); // Attempt to process all items in queue. var currentItems = _itemProcessQueue.ToArray(); + + var itemsIds = currentItems. + Select((item) => _libraryManager.GetItemById(item.Key)). + Where((item) => item is not null). + Select((item) => item!.Id). + ToList(); + + _logger.LogDebug("Table of parent element IDs: {ArrayJson}", JsonSerializer.Serialize(itemsIds)); + if (currentItems.Length != 0) { var scope = _applicationHost.ServiceProvider!.CreateAsyncScope(); @@ -60,6 +78,32 @@ public async Task ProcessItemsAsync() return; } + var parentInterect = false; + var parent = _libraryManager.GetItemById(item.ParentId); + while (parent is not null) + { + if (itemsIds.Contains(parent.Id)) + { + _logger.LogDebug("While walking for {ItemName}, a common parent was found: {ParentId}", item.Name, parent.Id); + parentInterect = true; + break; + } + + if (parent!.ParentId.IsEmpty()) + { + _logger.LogDebug("Walking up parent because ParentId is not null : {ParentId}", parent!.ParentId); + parent = _libraryManager.GetItemById(parent.ParentId); + } + } + + // Remove the item if the parent is in the queue + if (parentInterect) + { + _logger.LogDebug("Item {ItemName} ignored because a common parent was found", item.Name); + _itemProcessQueue.TryRemove(key, out _); + continue; + } + _logger.LogDebug("Item {ItemName}", item.Name); // Metadata not refreshed yet and under retry limit. @@ -87,11 +131,29 @@ await webhookSender.SendNotification(NotificationType.ItemAdded, dataObject, ite } } } + + _lock.Release(); } /// public void AddItem(BaseItem item) { + // Once 2s is elapsed, we release a lock from the semaphore. + _ = Task.Run(static () => + { + try + { + _lock.WaitOne(500); + } + catch (Exception) + { + return; + } + + Thread.Sleep(500); + _lock.Release(); + }); + _itemProcessQueue.TryAdd(item.Id, new QueuedItemContainer(item.Id)); _logger.LogDebug("Queued {ItemName} for notification", item.Name); }