Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Jellyfin.Extensions;
using Jellyfin.Plugin.Webhook.Destinations;
using Jellyfin.Plugin.Webhook.Helpers;
using Jellyfin.Plugin.Webhook.Models;
Expand All @@ -20,6 +24,7 @@ public class ItemAddedManager : IItemAddedManager
private readonly ILibraryManager _libraryManager;
private readonly IServerApplicationHost _applicationHost;
private readonly ConcurrentDictionary<Guid, QueuedItemContainer> _itemProcessQueue;
private static Semaphore _lock = new(initialCount: 0, maximumCount: 1);

/// <summary>
/// Initializes a new instance of the <see cref="ItemAddedManager"/> class.
Expand All @@ -41,9 +46,22 @@ public ItemAddedManager(
/// <inheritdoc />
public async Task ProcessItemsAsync()
{
// Wait until a semaphore task is available
// meaning our queue isn't waiting for a task.
_lock.WaitOne();

_logger.LogDebug("ProcessItemsAsync");
// Attempt to process all items in queue.
var currentItems = _itemProcessQueue.ToArray();

var itemsIds = currentItems.
Select((item) => _libraryManager.GetItemById(item.Key)).
Where((item) => item is not null).
Select((item) => item!.Id).
ToList();

_logger.LogDebug("Table of parent element IDs: {ArrayJson}", JsonSerializer.Serialize(itemsIds));

if (currentItems.Length != 0)
{
var scope = _applicationHost.ServiceProvider!.CreateAsyncScope();
Expand All @@ -60,6 +78,32 @@ public async Task ProcessItemsAsync()
return;
}

var parentInterect = false;
var parent = _libraryManager.GetItemById(item.ParentId);
while (parent is not null)
{
if (itemsIds.Contains(parent.Id))
{
_logger.LogDebug("While walking for {ItemName}, a common parent was found: {ParentId}", item.Name, parent.Id);
parentInterect = true;
break;
}

if (parent!.ParentId.IsEmpty())
{
_logger.LogDebug("Walking up parent because ParentId is not null : {ParentId}", parent!.ParentId);
parent = _libraryManager.GetItemById(parent.ParentId);
}
}

// Remove the item if the parent is in the queue
if (parentInterect)
{
_logger.LogDebug("Item {ItemName} ignored because a common parent was found", item.Name);
_itemProcessQueue.TryRemove(key, out _);
continue;
}

_logger.LogDebug("Item {ItemName}", item.Name);

// Metadata not refreshed yet and under retry limit.
Expand Down Expand Up @@ -87,11 +131,29 @@ await webhookSender.SendNotification(NotificationType.ItemAdded, dataObject, ite
}
}
}

_lock.Release();
}

/// <inheritdoc />
public void AddItem(BaseItem item)
{
// Once 2s is elapsed, we release a lock from the semaphore.
_ = Task.Run(static () =>
{
try
{
_lock.WaitOne(500);
}
catch (Exception)
{
return;
}

Thread.Sleep(500);
_lock.Release();
});

_itemProcessQueue.TryAdd(item.Id, new QueuedItemContainer(item.Id));
_logger.LogDebug("Queued {ItemName} for notification", item.Name);
}
Expand Down