Skip to content

Commit e50c11f

Browse files
committed
feat: add support for FluxRecord streaming
1 parent d292ca7 commit e50c11f

File tree

5 files changed

+97
-39
lines changed

5 files changed

+97
-39
lines changed

CHANGELOG.md

+3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@
44

55
⚠️ Important Notice: Starting from this release, we won’t be listing every dependency change in our changelog. This helps us maintain the project faster and focus on important features for our InfluxDB client.
66

7+
### Features:
8+
1. [#705](https://github.com/influxdata/influxdb-client-csharp/pull/705): `QueryAsyncEnumerable()` now supports `FluxRecord` streaming.
9+
710
### CI
811
1. [#681](https://github.com/influxdata/influxdb-client-csharp/pull/681): Add build for `dotnet8`
912

Client.Core/Internal/AbstractQueryClient.cs

+4-3
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,11 @@ protected async IAsyncEnumerable<T> QueryEnumerable<T>(
203203
)
204204
};
205205

206-
var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
206+
using var stream = await RestClient.DownloadStreamAsync(query, cancellationToken).ConfigureAwait(false);
207+
using var sr = new StreamReader(stream);
208+
207209
await foreach (var (_, record) in _csvParser
208-
.ParseFluxResponseAsync(new StreamReader(stream), cancellationToken)
209-
.ConfigureAwait(false))
210+
.ParseFluxResponseAsync(sr, cancellationToken).ConfigureAwait(false))
210211
if (!(record is null))
211212
{
212213
yield return convert.Invoke(record);

Client.Test/QueryApiTest.cs

+25
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,31 @@ public async Task QueryAsyncEnumerable()
9898
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);
9999

100100
Assert.AreEqual(2, list.Count);
101+
Assert.AreEqual("mem", list[0].Measurement);
102+
Assert.AreEqual("mem", list[1].Measurement);
103+
Assert.AreEqual(12.25, list[0].Value);
104+
Assert.AreEqual(13.00, list[1].Value);
105+
}
106+
107+
[Test]
108+
public async Task QueryAsyncEnumerableOfFluxRecords()
109+
{
110+
MockServer
111+
.Given(Request.Create().WithPath("/api/v2/query").UsingPost())
112+
.RespondWith(CreateResponse(Data));
113+
114+
var measurements = _queryApi.QueryAsyncEnumerable(
115+
new Query(null, "from(...)"),
116+
"my-org");
117+
118+
var list = new List<FluxRecord>();
119+
await foreach (var item in measurements.ConfigureAwait(false)) list.Add(item);
120+
121+
Assert.AreEqual(2, list.Count);
122+
Assert.AreEqual("mem", list[0].GetMeasurement());
123+
Assert.AreEqual("mem", list[1].GetMeasurement());
124+
Assert.AreEqual(12.25, list[0].GetValue());
125+
Assert.AreEqual(13.00, list[1].GetValue());
101126
}
102127

103128
[Test]

Client/InvokableScriptsApi.cs

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
using System;
22
using System.Collections.Generic;
3-
using System.Net.Http;
4-
using System.Runtime.CompilerServices;
53
using System.Threading;
64
using System.Threading.Tasks;
75
using InfluxDB.Client.Api.Domain;
@@ -248,15 +246,13 @@ await QueryRaw(CreateRequest(scriptId, bindParams), Consumer, ErrorConsumer, Emp
248246
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
249247
/// <param name="cancellationToken">Cancellation token</param>
250248
/// <returns>stream of FluxRecord</returns>
251-
public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
249+
public IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scriptId,
252250
Dictionary<string, object> bindParams = default,
253-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
251+
CancellationToken cancellationToken = default)
254252
{
255253
var requestMessage = CreateRequest(scriptId, bindParams);
256254

257-
await foreach (var record in QueryEnumerable(requestMessage, it => it, cancellationToken)
258-
.ConfigureAwait(false))
259-
yield return record;
255+
return QueryEnumerable(requestMessage, r => r, cancellationToken);
260256
}
261257

262258
/// <summary>
@@ -266,15 +262,13 @@ public async IAsyncEnumerable<FluxRecord> InvokeScriptEnumerableAsync(string scr
266262
/// <param name="bindParams">Represent key/value pairs parameters to be injected into script</param>
267263
/// <param name="cancellationToken">Cancellation token</param>
268264
/// <returns>stream of Measurement</returns>
269-
public async IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
265+
public IAsyncEnumerable<T> InvokeScriptMeasurementsEnumerableAsync<T>(string scriptId,
270266
Dictionary<string, object> bindParams = default,
271-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
267+
CancellationToken cancellationToken = default)
272268
{
273269
var requestMessage = CreateRequest(scriptId, bindParams);
274270

275-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
276-
cancellationToken).ConfigureAwait(false))
277-
yield return record;
271+
return QueryEnumerable(requestMessage, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
278272
}
279273

280274
protected override void BeforeIntercept(RestRequest request)

Client/QueryApi.cs

+59-24
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
using System;
22
using System.Collections.Generic;
33
using System.Net.Http;
4-
using System.Runtime.CompilerServices;
54
using System.Threading;
65
using System.Threading.Tasks;
76
using InfluxDB.Client.Api.Domain;
@@ -236,6 +235,28 @@ IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
236235
IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
237236
CancellationToken cancellationToken = default);
238237

238+
/// <summary>
239+
/// Executes the Flux query against the InfluxDB 2.x and returns
240+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
241+
/// </summary>
242+
/// <param name="query">the flux query to execute</param>
243+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
244+
/// <param name="cancellationToken">cancellation token</param>
245+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
246+
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
247+
CancellationToken cancellationToken = default);
248+
249+
/// <summary>
250+
/// Executes the Flux query against the InfluxDB 2.x and returns
251+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
252+
/// </summary>
253+
/// <param name="query">the flux query to execute</param>
254+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
255+
/// <param name="cancellationToken">cancellation token</param>
256+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
257+
IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
258+
CancellationToken cancellationToken = default);
259+
239260
/// <summary>
240261
/// Executes the Flux query against the InfluxDB and synchronously map whole response to <see cref="string"/> result.
241262
///
@@ -442,16 +463,10 @@ await QueryAsync(query, consumer, ErrorConsumer, EmptyAction, org, cancellationT
442463
/// <param name="cancellationToken">cancellation token</param>
443464
/// <typeparam name="T">the type of measurement</typeparam>
444465
/// <returns>Measurements which are matched the query</returns>
445-
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
446-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
466+
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string org = null,
467+
CancellationToken cancellationToken = default)
447468
{
448-
Arguments.CheckNonEmptyString(query, nameof(query));
449-
450-
var requestMessage = CreateRequest(CreateQuery(query), org);
451-
452-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
453-
cancellationToken).ConfigureAwait(false))
454-
yield return record;
469+
return QueryAsyncEnumerable<T>(CreateQuery(query), org, cancellationToken);
455470
}
456471

457472
/// <summary>
@@ -463,16 +478,40 @@ public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(string query, string or
463478
/// <param name="cancellationToken">cancellation token</param>
464479
/// <typeparam name="T">the type of measurement</typeparam>
465480
/// <returns>Measurements which are matched the query</returns>
466-
public async IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
467-
[EnumeratorCancellation] CancellationToken cancellationToken = default)
481+
public IAsyncEnumerable<T> QueryAsyncEnumerable<T>(Query query, string org = null,
482+
CancellationToken cancellationToken = default)
468483
{
469-
Arguments.CheckNotNull(query, nameof(query));
484+
var request = CreateRequest(query, org);
485+
return QueryEnumerable(request, r => Mapper.ConvertToEntity<T>(r), cancellationToken);
486+
}
470487

471-
var requestMessage = CreateRequest(query, org);
488+
/// <summary>
489+
/// Executes the Flux query against the InfluxDB 2.x and returns
490+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
491+
/// </summary>
492+
/// <param name="query">the flux query to execute</param>
493+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
494+
/// <param name="cancellationToken">cancellation token</param>
495+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
496+
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(string query, string org = null,
497+
CancellationToken cancellationToken = default)
498+
{
499+
return QueryAsyncEnumerable(CreateQuery(query), org, cancellationToken);
500+
}
472501

473-
await foreach (var record in QueryEnumerable(requestMessage, it => Mapper.ConvertToEntity<T>(it),
474-
cancellationToken).ConfigureAwait(false))
475-
yield return record;
502+
/// <summary>
503+
/// Executes the Flux query against the InfluxDB 2.x and returns
504+
/// an asynchronous enumerable of <see cref="FluxRecord"/>.
505+
/// </summary>
506+
/// <param name="query">the flux query to execute</param>
507+
/// <param name="org">specifies the source organization. If the org is not specified then is used config from <see cref="InfluxDBClientOptions.Org" />.</param>
508+
/// <param name="cancellationToken">cancellation token</param>
509+
/// <returns>An asynchronous enumerable of <see cref="FluxRecord"/>.</returns>
510+
public IAsyncEnumerable<FluxRecord> QueryAsyncEnumerable(Query query, string org = null,
511+
CancellationToken cancellationToken = default)
512+
{
513+
var request = CreateRequest(query, org);
514+
return QueryEnumerable(request, r => r, cancellationToken);
476515
}
477516

478517
/// <summary>
@@ -494,8 +533,7 @@ public Task QueryAsync(string query, Action<FluxRecord> onNext, Action<Exception
494533

495534
var consumer = new FluxResponseConsumerRecord(onNext);
496535

497-
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
498-
cancellationToken);
536+
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
499537
}
500538

501539
/// <summary>
@@ -540,8 +578,7 @@ public Task QueryAsync<T>(string query, Action<T> onNext, Action<Exception> onEr
540578

541579
var consumer = new FluxResponseConsumerPoco<T>(onNext, Mapper);
542580

543-
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org,
544-
cancellationToken);
581+
return QueryAsync(CreateQuery(query, Dialect), consumer, onError, onComplete, org, cancellationToken);
545582
}
546583

547584
/// <summary>
@@ -801,9 +838,7 @@ internal static Query CreateQuery(string query, Dialect dialect = null)
801838
{
802839
Arguments.CheckNonEmptyString(query, nameof(query));
803840

804-
var created = new Query(query: query, dialect: dialect ?? Dialect);
805-
806-
return created;
841+
return new Query(query: query, dialect: dialect ?? Dialect);
807842
}
808843
}
809844
}

0 commit comments

Comments
 (0)