Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 26 additions & 17 deletions src/Stateless/StateMachine.Async.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,28 +135,37 @@ async Task InternalFireAsync(TTrigger trigger, params object[] args)
/// <param name="args"> A variable-length parameters list containing arguments. </param>
async Task InternalFireQueuedAsync(TTrigger trigger, params object[] args)
{
if (_firing)
if (!_firing)
{
_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
return;
}

try
{
_firing = true;

await InternalFireOneAsync(trigger, args).ConfigureAwait(RetainSynchronizationContext);
await _lock.WaitAsync();

while (_eventQueue.Count != 0)
if(!_firing)
{
var queuedEvent = _eventQueue.Dequeue();
await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext);
try
{
_firing = true;
_lock.Release();
await InternalFireOneAsync(trigger, args).ConfigureAwait(RetainSynchronizationContext);

while (!_eventQueue.IsEmpty)
{
bool suuccess = _eventQueue.TryDequeue(out QueuedTrigger queuedEvent);
if (suuccess)
{
await InternalFireOneAsync(queuedEvent.Trigger, queuedEvent.Args).ConfigureAwait(RetainSynchronizationContext);
}
}
return;
}
finally
{
_firing = false;
}
}
_lock.Release();
}
finally
{
_firing = false;
}

_eventQueue.Enqueue(new QueuedTrigger { Trigger = trigger, Args = args });
}

async Task InternalFireOneAsync(TTrigger trigger, params object[] args)
Expand Down
14 changes: 11 additions & 3 deletions src/Stateless/StateMachine.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
using Stateless.Reflection;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace Stateless
{
Expand Down Expand Up @@ -32,14 +34,15 @@ public partial class StateMachine<TState, TTrigger>
private readonly OnTransitionedEvent _onTransitionCompletedEvent;
private readonly TState _initialState;
private readonly FiringMode _firingMode;
private readonly SemaphoreSlim _lock;

private class QueuedTrigger
{
public TTrigger Trigger { get; set; }
public object[] Args { get; set; }
}

private readonly Queue<QueuedTrigger> _eventQueue = new Queue<QueuedTrigger>();
private readonly ConcurrentQueue<QueuedTrigger> _eventQueue = new ConcurrentQueue<QueuedTrigger>();
private bool _firing;

/// <summary>
Expand Down Expand Up @@ -102,6 +105,7 @@ public StateMachine(TState initialState, FiringMode firingMode) : this()
_unhandledTriggerAction = new UnhandledTriggerAction.Sync(DefaultUnhandledTriggerAction);
_onTransitionedEvent = new OnTransitionedEvent();
_onTransitionCompletedEvent = new OnTransitionedEvent();
_lock = new SemaphoreSlim(1, 1);
}

/// <summary>
Expand Down Expand Up @@ -374,8 +378,12 @@ private void InternalFireQueued(TTrigger trigger, params object[] args)
// Empty queue for triggers
while (_eventQueue.Any())
{
var queuedEvent = _eventQueue.Dequeue();
InternalFireOne(queuedEvent.Trigger, queuedEvent.Args);
bool suuccess = _eventQueue.TryDequeue(out QueuedTrigger queuedEvent);
if(suuccess)
{
InternalFireOne(queuedEvent.Trigger, queuedEvent.Args);
}

}
}
finally
Expand Down
50 changes: 50 additions & 0 deletions test/Stateless.Tests/InitialTransitionFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -335,5 +335,55 @@ public async void AsyncTransitionEvents_OrderingWithInitialTransition()
Assert.Equal(expectedOrdering[i], actualOrdering[i]);
}
}

[Fact]
public async Task CheckMultipleFireAsync()
{
for (int i = 0; i < 200; i++)
{
await CheckAsyncMethod();
}
}

private async Task CheckAsyncMethod()
{
// Create a state machine
var sm = new StateMachine<State, Trigger>(State.A);

sm.Configure(State.A).Permit(Trigger.X, State.B);
sm.Configure(State.A).Permit(Trigger.Y, State.C);
sm.Configure(State.B).Permit(Trigger.Y, State.D);
sm.Configure(State.B).Permit(Trigger.X, State.A);
sm.Configure(State.C).Permit(Trigger.Y, State.D);
sm.Configure(State.C).Permit(Trigger.X, State.A);
sm.Configure(State.D).Permit(Trigger.Y, State.B);
sm.Configure(State.D).Permit(Trigger.X, State.C);


// Create some tasks
List<Task> tasks = new List<Task>();

for (int i = 0; i < 20; i++)
{
Task taskTriggerX = Task.Run(() => FireTrigger(sm, Trigger.X, false));
Task taskTriggerY = Task.Run(() => FireTrigger(sm, Trigger.Y, false));
Task taskTriggerDelay = Task.Run(() => FireTrigger(sm, Trigger.X, true));
tasks.Add(taskTriggerX);
tasks.Add(taskTriggerY);
tasks.Add(taskTriggerDelay);
}

// Wait for tasks to complete
await Task.WhenAll(tasks);
}

private async Task FireTrigger(StateMachine<State, Trigger> sm, Trigger trigger, bool addDelay)
{
if(addDelay)
{
await Task.Delay(20);
}
await sm.FireAsync(trigger);
}
}
}