From b7896bd7976f1fbe91e9028cc38f0ffbf6eda1b8 Mon Sep 17 00:00:00 2001 From: snaumenko-st Date: Tue, 19 Dec 2023 10:30:41 +0400 Subject: [PATCH 1/3] Introduce IAsyncEnumerable on DelayedQuery to utilize async Session query while asynchronously prefetching data --- .../Prefetch/PrefetchTest.cs | 109 ++++++++++++++++-- Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs | 14 ++- 2 files changed, 114 insertions(+), 9 deletions(-) diff --git a/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs b/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs index 21d0d2977f..17dfcb9f88 100644 --- a/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs +++ b/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs @@ -5,11 +5,14 @@ // Created: 2009.12.24 using System; +using System.Collections.Generic; using NUnit.Framework; using System.Linq; +using System.Reflection; +using System.Threading; using System.Threading.Tasks; -using Xtensive.Core; -using Xtensive.Orm.Internals.Prefetch; +using Xtensive.Orm.Internals; +using Xtensive.Orm.Providers; using Xtensive.Orm.Services; using Xtensive.Orm.Tests; @@ -44,7 +47,7 @@ public class Person : Entity public Person(Session session) : base(session) - {} + { } } #endregion @@ -63,10 +66,10 @@ protected override Configuration.DomainConfiguration BuildConfiguration() [TearDown] public void ClearContent() { - using(var session = Domain.OpenSession()) - using(var tx = session.OpenTransaction()) { + using (var session = Domain.OpenSession()) + using (var tx = session.OpenTransaction()) { var people = session.Query.All().ToList(); - foreach(var person in people) { + foreach (var person in people) { person.Manager = null; } session.SaveChanges(); @@ -247,7 +250,7 @@ public async Task MultipleBatchesAsyncTest() var count = 1000; await using (var session = await Domain.OpenSessionAsync()) - using (var transactionScope = session.OpenTransaction()){ + using (var transactionScope = session.OpenTransaction()) { var people = new Person[count]; for (var i = 0; i < count; i++) { people[i] = new Person(session) { Name = i.ToString(), Photo = new[] { (byte) (i % 256) } }; @@ -376,6 +379,98 @@ public async Task DelayedQueryAsyncTest() } } + private class QueryCounterSessionHandlerMoq : ChainingSessionHandler + { + private volatile int syncCounter; + private volatile int asyncCounter; + + public int GetSyncCounter() => syncCounter; + + public int GetAsyncCounter() => asyncCounter; + + public QueryCounterSessionHandlerMoq(SessionHandler chainedHandler) : base(chainedHandler) + { + } + + public override void ExecuteQueryTasks(IEnumerable queryTasks, bool allowPartialExecution) + { + _ = Interlocked.Increment(ref syncCounter); + base.ExecuteQueryTasks(queryTasks, allowPartialExecution); + } + + public override Task ExecuteQueryTasksAsync(IEnumerable queryTasks, bool allowPartialExecution, CancellationToken token) + { + _ = Interlocked.Increment(ref asyncCounter); + return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token); + } + } + + [Test] + public async Task DelayedQueryAsyncShouldMaterializeAsyncTest() + { + await using (var session = await Domain.OpenSessionAsync()) { + using (var transactionScope = session.OpenTransaction()) { + var employee = new Person(session) { Name = "Employee", Photo = new byte[] { 8, 0 } }; + var manager = new Person(session) { Name = "Manager", Photo = new byte[] { 8, 0 } }; + _ = manager.Employees.Add(employee); + transactionScope.Complete(); + } + } + + await using (var session = await Domain.OpenSessionAsync()) // no session activation! + { + var sessionAccessor = new DirectSessionAccessor(session); + var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty); + var handler = prop.GetValue(session) as SessionHandler; + var moq = new QueryCounterSessionHandlerMoq(handler); + using (sessionAccessor.ChangeSessionHandler(moq)) + + using (var transactionScope = session.OpenTransaction()) { + var people = session.Query.CreateDelayedQuery(q => q.All()) + .Prefetch(p => p.Photo) // Lazy load field + .Prefetch(p => p.Employees // EntitySet Employees + .Prefetch(e => e.Photo)) // and lazy load field of each of its items + .Prefetch(p => p.Manager); // Referenced entity + foreach (var person in people) { + // some code here... + } + Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0)); + Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0)); + transactionScope.Complete(); + } + } + + await using (var session = await Domain.OpenSessionAsync())// no session activation! + { + var sessionAccessor = new DirectSessionAccessor(session); + var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty); + var handler = prop.GetValue(session) as SessionHandler; + var moq = new QueryCounterSessionHandlerMoq(handler); + using (sessionAccessor.ChangeSessionHandler(moq)) + + using (var transactionScope = session.OpenTransaction()) { + var people = session.Query.CreateDelayedQuery(q => q.All()) + .Prefetch(p => p.Photo) // Lazy load field + .Prefetch(p => p.Employees.Prefetch(e => e.Photo)) // EntitySet Employees and lazy load field of each of its items with the limit on number of items to be loaded + .Prefetch(p => p.Manager.Photo); // Referenced entity and lazy load field for each of them + await foreach (var person in people.AsAsyncEnumerable()) { + var accessor = DirectStateAccessor.Get(person); + Assert.That(accessor.GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded)); + Assert.That(accessor.GetFieldState("Manager"), Is.EqualTo(PersistentFieldState.Loaded)); + if (person.ManagerKey != null) { + Assert.IsNotNull(DirectStateAccessor.Get(session)[person.ManagerKey]); + Assert.That(DirectStateAccessor.Get(person.Manager).GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded)); + } + // some code here... + } + Assert.That(moq.GetSyncCounter(), Is.EqualTo(0)); + Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0)); + transactionScope.Complete(); + } + + } + } + [Test] public void CachedQueryTest() { diff --git a/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs b/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs index f800ba3f73..012b2300fc 100644 --- a/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs +++ b/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs @@ -20,7 +20,7 @@ namespace Xtensive.Orm /// /// The type of the element in a resulting sequence. [Serializable] - public sealed class DelayedQuery : DelayedQuery, IEnumerable + public sealed class DelayedQuery : DelayedQuery, IEnumerable, IAsyncEnumerable { /// public IEnumerator GetEnumerator() => Materialize().GetEnumerator(); @@ -28,6 +28,15 @@ public sealed class DelayedQuery : DelayedQuery, IEnumerable /// IEnumerator IEnumerable.GetEnumerator() => GetEnumerator(); + /// + async IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken token) + { + var elements = await ExecuteAsync(token).ConfigureAwaitFalse(); + foreach (var element in elements) { + yield return element; + } + } + /// /// Asynchronously executes delayed query. /// @@ -43,6 +52,7 @@ public ValueTask> ExecuteAsync(CancellationToken token = d internal DelayedQuery(Session session, TranslatedQuery translatedQuery, ParameterContext parameterContext) : base(session, translatedQuery, parameterContext) - {} + { } + } } \ No newline at end of file From 20c2baf86d07c94acf862cd960c784f54be25ad6 Mon Sep 17 00:00:00 2001 From: snaumenko-st Date: Wed, 20 Dec 2023 15:01:08 +0400 Subject: [PATCH 2/3] ConfigureAwait --- Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs b/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs index 012b2300fc..24822a5c23 100644 --- a/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs +++ b/Orm/Xtensive.Orm/Orm/DelayedQuery{T}.cs @@ -31,7 +31,7 @@ public sealed class DelayedQuery : DelayedQuery, IEnumerable /// async IAsyncEnumerator IAsyncEnumerable.GetAsyncEnumerator(CancellationToken token) { - var elements = await ExecuteAsync(token).ConfigureAwaitFalse(); + var elements = await ExecuteAsync(token).ConfigureAwait(false); foreach (var element in elements) { yield return element; } From 6e0c966176375f995483287e91bbba2ac110d672 Mon Sep 17 00:00:00 2001 From: snaumenko-st Date: Mon, 25 Dec 2023 11:45:15 +0400 Subject: [PATCH 3/3] PrefetchOnDelayedQuery tests refactoring --- .../Prefetch/PrefetchTest.cs | 97 ------------------- .../Storage/Prefetch/PrefetchTest.cs | 41 +++++++- .../QueryCounterSessionHandlerMock.cs | 34 +++++++ 3 files changed, 74 insertions(+), 98 deletions(-) create mode 100644 Orm/Xtensive.Orm.Tests/Storage/Prefetch/QueryCounterSessionHandlerMock.cs diff --git a/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs b/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs index 17dfcb9f88..d3648e940c 100644 --- a/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs +++ b/Orm/Xtensive.Orm.Manual/Prefetch/PrefetchTest.cs @@ -5,14 +5,9 @@ // Created: 2009.12.24 using System; -using System.Collections.Generic; using NUnit.Framework; using System.Linq; -using System.Reflection; -using System.Threading; using System.Threading.Tasks; -using Xtensive.Orm.Internals; -using Xtensive.Orm.Providers; using Xtensive.Orm.Services; using Xtensive.Orm.Tests; @@ -379,98 +374,6 @@ public async Task DelayedQueryAsyncTest() } } - private class QueryCounterSessionHandlerMoq : ChainingSessionHandler - { - private volatile int syncCounter; - private volatile int asyncCounter; - - public int GetSyncCounter() => syncCounter; - - public int GetAsyncCounter() => asyncCounter; - - public QueryCounterSessionHandlerMoq(SessionHandler chainedHandler) : base(chainedHandler) - { - } - - public override void ExecuteQueryTasks(IEnumerable queryTasks, bool allowPartialExecution) - { - _ = Interlocked.Increment(ref syncCounter); - base.ExecuteQueryTasks(queryTasks, allowPartialExecution); - } - - public override Task ExecuteQueryTasksAsync(IEnumerable queryTasks, bool allowPartialExecution, CancellationToken token) - { - _ = Interlocked.Increment(ref asyncCounter); - return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token); - } - } - - [Test] - public async Task DelayedQueryAsyncShouldMaterializeAsyncTest() - { - await using (var session = await Domain.OpenSessionAsync()) { - using (var transactionScope = session.OpenTransaction()) { - var employee = new Person(session) { Name = "Employee", Photo = new byte[] { 8, 0 } }; - var manager = new Person(session) { Name = "Manager", Photo = new byte[] { 8, 0 } }; - _ = manager.Employees.Add(employee); - transactionScope.Complete(); - } - } - - await using (var session = await Domain.OpenSessionAsync()) // no session activation! - { - var sessionAccessor = new DirectSessionAccessor(session); - var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty); - var handler = prop.GetValue(session) as SessionHandler; - var moq = new QueryCounterSessionHandlerMoq(handler); - using (sessionAccessor.ChangeSessionHandler(moq)) - - using (var transactionScope = session.OpenTransaction()) { - var people = session.Query.CreateDelayedQuery(q => q.All()) - .Prefetch(p => p.Photo) // Lazy load field - .Prefetch(p => p.Employees // EntitySet Employees - .Prefetch(e => e.Photo)) // and lazy load field of each of its items - .Prefetch(p => p.Manager); // Referenced entity - foreach (var person in people) { - // some code here... - } - Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0)); - Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0)); - transactionScope.Complete(); - } - } - - await using (var session = await Domain.OpenSessionAsync())// no session activation! - { - var sessionAccessor = new DirectSessionAccessor(session); - var prop = typeof(Session).GetProperty("Handler", BindingFlags.Instance | BindingFlags.NonPublic | BindingFlags.GetProperty); - var handler = prop.GetValue(session) as SessionHandler; - var moq = new QueryCounterSessionHandlerMoq(handler); - using (sessionAccessor.ChangeSessionHandler(moq)) - - using (var transactionScope = session.OpenTransaction()) { - var people = session.Query.CreateDelayedQuery(q => q.All()) - .Prefetch(p => p.Photo) // Lazy load field - .Prefetch(p => p.Employees.Prefetch(e => e.Photo)) // EntitySet Employees and lazy load field of each of its items with the limit on number of items to be loaded - .Prefetch(p => p.Manager.Photo); // Referenced entity and lazy load field for each of them - await foreach (var person in people.AsAsyncEnumerable()) { - var accessor = DirectStateAccessor.Get(person); - Assert.That(accessor.GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded)); - Assert.That(accessor.GetFieldState("Manager"), Is.EqualTo(PersistentFieldState.Loaded)); - if (person.ManagerKey != null) { - Assert.IsNotNull(DirectStateAccessor.Get(session)[person.ManagerKey]); - Assert.That(DirectStateAccessor.Get(person.Manager).GetFieldState("Photo"), Is.EqualTo(PersistentFieldState.Loaded)); - } - // some code here... - } - Assert.That(moq.GetSyncCounter(), Is.EqualTo(0)); - Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0)); - transactionScope.Complete(); - } - - } - } - [Test] public void CachedQueryTest() { diff --git a/Orm/Xtensive.Orm.Tests/Storage/Prefetch/PrefetchTest.cs b/Orm/Xtensive.Orm.Tests/Storage/Prefetch/PrefetchTest.cs index ccb73d9415..0282e51ac7 100644 --- a/Orm/Xtensive.Orm.Tests/Storage/Prefetch/PrefetchTest.cs +++ b/Orm/Xtensive.Orm.Tests/Storage/Prefetch/PrefetchTest.cs @@ -13,9 +13,10 @@ using Xtensive.Orm.Configuration; using Xtensive.Orm.Internals; using Xtensive.Orm.Internals.Prefetch; -using Xtensive.Orm.Model; +using Xtensive.Orm.Services; using Xtensive.Orm.Tests.ObjectModel; using Xtensive.Orm.Tests.ObjectModel.ChinookDO; +using FieldInfo = Xtensive.Orm.Model.FieldInfo; namespace Xtensive.Orm.Tests.Storage.Prefetch { @@ -923,6 +924,44 @@ public async Task StructurePrefetchAsyncTest() } } + [Test] + public void PrefetchOnDelayedQueryTest() + { + using var session = Domain.OpenSession(); + var sessionAccessor = session.Services.Get(); + using var moq = new QueryCounterSessionHandlerMock(session.Handler); + using (sessionAccessor.ChangeSessionHandler(moq)) + using (var transactionScope = session.OpenTransaction()) { + var prefetcher = session.Query.CreateDelayedQuery(q => q.All()) + .Prefetch(o => o.ProcessingTime); + foreach (var invoice in prefetcher.AsEnumerable()) { + // some code here... + } + transactionScope.Complete(); + } + Assert.That(moq.GetSyncCounter(), Is.GreaterThan(0)); + Assert.That(moq.GetAsyncCounter(), Is.EqualTo(0)); + } + + [Test] + public async Task PrefetchOnDelayedQueryAsyncTest() + { + await using var session = await Domain.OpenSessionAsync(); + var sessionAccessor = session.Services.Get(); + await using var moq = new QueryCounterSessionHandlerMock(session.Handler); + using (sessionAccessor.ChangeSessionHandler(moq)) + await using (var transactionScope = await session.OpenTransactionAsync()) { + var prefetcher = session.Query.CreateDelayedQuery(q => q.All()) + .Prefetch(o => o.ProcessingTime); + await foreach (var invoice in prefetcher.AsAsyncEnumerable()) { + // some code here... + } + transactionScope.Complete(); + } + Assert.That(moq.GetSyncCounter(), Is.EqualTo(0)); + Assert.That(moq.GetAsyncCounter(), Is.GreaterThan(0)); + } + private void RemoveAllBooks() { using (var session = Domain.OpenSession()) diff --git a/Orm/Xtensive.Orm.Tests/Storage/Prefetch/QueryCounterSessionHandlerMock.cs b/Orm/Xtensive.Orm.Tests/Storage/Prefetch/QueryCounterSessionHandlerMock.cs new file mode 100644 index 0000000000..bd1df105df --- /dev/null +++ b/Orm/Xtensive.Orm.Tests/Storage/Prefetch/QueryCounterSessionHandlerMock.cs @@ -0,0 +1,34 @@ +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Xtensive.Orm.Internals; +using Xtensive.Orm.Providers; + +namespace Xtensive.Orm.Tests.Storage.Prefetch +{ + internal class QueryCounterSessionHandlerMock : ChainingSessionHandler + { + private volatile int syncCounter; + private volatile int asyncCounter; + + public int GetSyncCounter() => syncCounter; + + public int GetAsyncCounter() => asyncCounter; + + public QueryCounterSessionHandlerMock(SessionHandler chainedHandler) : base(chainedHandler) + { + } + + public override void ExecuteQueryTasks(IEnumerable queryTasks, bool allowPartialExecution) + { + _ = Interlocked.Increment(ref syncCounter); + base.ExecuteQueryTasks(queryTasks, allowPartialExecution); + } + + public override Task ExecuteQueryTasksAsync(IEnumerable queryTasks, bool allowPartialExecution, CancellationToken token) + { + _ = Interlocked.Increment(ref asyncCounter); + return base.ExecuteQueryTasksAsync(queryTasks, allowPartialExecution, token); + } + } +}