-
Notifications
You must be signed in to change notification settings - Fork 95
/
Copy pathCustomDomainMapping.cs
169 lines (149 loc) · 5.25 KB
/
CustomDomainMapping.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
using System;
using System.Threading.Tasks;
using InfluxDB.Client;
using InfluxDB.Client.Api.Domain;
using InfluxDB.Client.Core.Flux.Domain;
using InfluxDB.Client.Writes;
namespace Examples
{
public static class CustomDomainMapping
{
/// <summary>
/// Define Domain Object
/// </summary>
private class Sensor
{
/// <summary>
/// Type of sensor.
/// </summary>
public string Type { get; set; }
/// <summary>
/// Version of sensor.
/// </summary>
public string Version { get; set; }
/// <summary>
/// Measured value.
/// </summary>
public double Value { get; set; }
public DateTimeOffset Timestamp { get; set; }
public override string ToString()
{
return $"{Timestamp:MM/dd/yyyy hh:mm:ss.fff tt} {Type}, {Version} value: {Value}";
}
}
/// <summary>
/// Define Custom Domain Object Converter
/// </summary>
private class DomainEntityConverter : IDomainObjectMapper
{
/// <summary>
/// Convert to DomainObject.
/// </summary>
public T ConvertToEntity<T>(FluxRecord fluxRecord)
{
return (T)ConvertToEntity(fluxRecord, typeof(T));
}
public object ConvertToEntity(FluxRecord fluxRecord, Type type)
{
if (type != typeof(Sensor))
{
throw new NotSupportedException($"This converter doesn't supports: {type}");
}
var customEntity = new Sensor
{
Type = Convert.ToString(fluxRecord.GetValueByKey("type")),
Version = Convert.ToString(fluxRecord.GetValueByKey("version")),
Value = Convert.ToDouble(fluxRecord.GetValueByKey("data")),
Timestamp = fluxRecord.GetTime().GetValueOrDefault().ToDateTimeUtc()
};
return Convert.ChangeType(customEntity, type);
}
/// <summary>
/// Convert to Point
/// </summary>
public PointData ConvertToPointData<T>(T entity, WritePrecision precision)
{
if (!(entity is Sensor sensor))
{
throw new NotSupportedException($"This converter doesn't supports: {entity}");
}
var point = PointData
.Measurement("sensor")
.Tag("type", sensor.Type)
.Tag("version", sensor.Version)
.Field("data", sensor.Value)
.Timestamp(sensor.Timestamp, precision);
return point;
}
}
public static async Task Main()
{
const string host = "http://localhost:9999";
const string token = "my-token";
const string bucket = "my-bucket";
const string organization = "my-org";
var options = new InfluxDBClientOptions(host)
{
Token = token,
Org = organization,
Bucket = bucket
};
var converter = new DomainEntityConverter();
using var client = new InfluxDBClient(options);
//
// Prepare data to write
//
var time = new DateTimeOffset(2020, 11, 15, 8, 20, 15,
new TimeSpan(3, 0, 0));
var entity1 = new Sensor
{
Timestamp = time,
Type = "temperature",
Version = "v0.0.2",
Value = 15
};
var entity2 = new Sensor
{
Timestamp = time.AddHours(1),
Type = "temperature",
Version = "v0.0.2",
Value = 15
};
var entity3 = new Sensor
{
Timestamp = time.AddHours(2),
Type = "humidity",
Version = "v0.13",
Value = 74
};
var entity4 = new Sensor
{
Timestamp = time.AddHours(3),
Type = "humidity",
Version = "v0.13",
Value = 82
};
//
// Write data
//
await client.GetWriteApiAsync(converter)
.WriteMeasurementsAsync(new[] { entity1, entity2, entity3, entity4 }, WritePrecision.S);
//
// Query Data to Domain object
//
var queryApi = client.GetQueryApiSync(converter);
//
// Select ALL
//
var query = $"from(bucket:\"{bucket}\") " +
"|> range(start: 0) " +
"|> filter(fn: (r) => r[\"_measurement\"] == \"sensor\")" +
"|> pivot(rowKey:[\"_time\"], columnKey: [\"_field\"], valueColumn: \"_value\")";
var sensors = queryApi.QuerySync<Sensor>(query);
//
// Print result
//
sensors.ForEach(it => Console.WriteLine(it.ToString()));
}
}
}