Skip to content

Commit 7b26209

Browse files
authored
feat(java): add possibility to specify WriteConsistency parameter (#329)
1 parent 854f21f commit 7b26209

File tree

32 files changed

+1500
-218
lines changed

32 files changed

+1500
-218
lines changed

CHANGELOG.md

+24
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,30 @@ This release also uses new version of InfluxDB OSS API definitions - [oss.yml](h
6767
1. [#315](https://github.com/influxdata/influxdb-client-java/pull/315): Add support for timezones [FluxDSL]
6868
1. [#317](https://github.com/influxdata/influxdb-client-java/pull/317): Gets HTTP headers from the unsuccessful HTTP request
6969
1. [#334](https://github.com/influxdata/influxdb-client-java/pull/334): Supports not operator [FluxDSL]
70+
1. [#329](https://github.com/influxdata/influxdb-client-java/pull/329): Add support for write `consistency` parameter [InfluxDB Enterprise]
71+
72+
Configure `consistency` via `Write API`:
73+
```diff
74+
- writeApi.writeRecord(WritePrecision.NS, "cpu_load_short,host=server02 value=0.67");
75+
+ WriteParameters parameters = new WriteParameters(WritePrecision.NS, WriteConsistency.ALL);
76+
+
77+
+ writeApi.writeRecord("cpu_load_short,host=server02 value=0.67", parameters);
78+
```
79+
80+
Configure `consistency` via client options:
81+
```diff
82+
- InfluxDBClient client = InfluxDBClientFactory.createV1("http://influxdb_enterpriser:8086",
83+
- "my-username",
84+
- "my-password".toCharArray(),
85+
- "my-db",
86+
- "autogen");
87+
+ InfluxDBClient client = InfluxDBClientFactory.createV1("http://influxdb_enterpriser:8086",
88+
+ "my-username",
89+
+ "my-password".toCharArray(),
90+
+ "my-db",
91+
+ "autogen",
92+
+ WriteConsistency.ALL);
93+
```
7094

7195
### Bug Fixes
7296
1. [#313](https://github.com/influxdata/influxdb-client-java/pull/313): Do not deliver `exception` when the consumer is already disposed [influxdb-client-reactive]

client-kotlin/README.md

+21-19
Original file line numberDiff line numberDiff line change
@@ -179,16 +179,17 @@ A client can be configured via configuration file. The configuration file has to
179179

180180
The following options are supported:
181181

182-
| Property name | default | description |
183-
| --------------------------|-----------|-------------|
184-
| influx2.url | - | the url to connect to InfluxDB |
185-
| influx2.org | - | default destination organization for writes and queries |
186-
| influx2.bucket | - | default destination bucket for writes |
187-
| influx2.token | - | the token to use for the authorization |
188-
| influx2.logLevel | NONE | rest client verbosity level |
189-
| influx2.readTimeout | 10000 ms | read timeout |
190-
| influx2.writeTimeout | 10000 ms | write timeout |
191-
| influx2.connectTimeout | 10000 ms | socket timeout |
182+
| Property name | default | description |
183+
|--------------------------|------------|------------------------------------------------------------|
184+
| influx2.url | - | the url to connect to InfluxDB |
185+
| influx2.org | - | default destination organization for writes and queries |
186+
| influx2.bucket | - | default destination bucket for writes |
187+
| influx2.token | - | the token to use for the authorization |
188+
| influx2.logLevel | NONE | rest client verbosity level |
189+
| influx2.readTimeout | 10000 ms | read timeout |
190+
| influx2.writeTimeout | 10000 ms | write timeout |
191+
| influx2.connectTimeout | 10000 ms | socket timeout |
192+
| influx2.precision | NS | default precision for unix timestamps in the line protocol |
192193

193194
The `influx2.readTimeout`, `influx2.writeTimeout` and `influx2.connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.
194195

@@ -222,15 +223,16 @@ val influxDBClient = InfluxDBClientKotlinFactory
222223
```
223224
The following options are supported:
224225

225-
| Property name | default | description |
226-
| ------------------|-----------|-------------|
227-
| org | - | default destination organization for writes and queries |
228-
| bucket | - | default destination bucket for writes |
229-
| token | - | the token to use for the authorization |
230-
| logLevel | NONE | rest client verbosity level |
231-
| readTimeout | 10000 ms | read timeout |
232-
| writeTimeout | 10000 ms | write timeout |
233-
| connectTimeout | 10000 ms | socket timeout |
226+
| Property name | default | description |
227+
|------------------|------------|------------------------------------------------------------|
228+
| org | - | default destination organization for writes and queries |
229+
| bucket | - | default destination bucket for writes |
230+
| token | - | the token to use for the authorization |
231+
| logLevel | NONE | rest client verbosity level |
232+
| readTimeout | 10000 ms | read timeout |
233+
| writeTimeout | 10000 ms | write timeout |
234+
| connectTimeout | 10000 ms | socket timeout |
235+
| precision | NS | default precision for unix timestamps in the line protocol |
234236

235237
The `readTimeout`, `writeTimeout` and `connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.
236238

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/WriteKotlinApi.kt

+32
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package com.influxdb.client.kotlin
2323

2424
import com.influxdb.client.domain.WritePrecision
25+
import com.influxdb.client.write.WriteParameters
2526
import com.influxdb.client.write.Point
2627
import kotlinx.coroutines.flow.Flow
2728

@@ -82,6 +83,17 @@ interface WriteKotlinApi {
8283
*/
8384
suspend fun writeRecords(records: Flow<String>, precision: WritePrecision, bucket: String? = null, org: String? = null)
8485

86+
/**
87+
* Write Line Protocol records into InfluxDB.
88+
*
89+
* If any exception occurs during write, this exception is rethrown from this method.
90+
*
91+
* @param records specified in [LineProtocol](http://bit.ly/line-protocol).
92+
* The `records` are considered as one batch unit.
93+
* @param parameters specify InfluxDB Write endpoint parameters
94+
*/
95+
suspend fun writeRecords(records: Flow<String>, parameters: WriteParameters)
96+
8597
/**
8698
* Write Data Point into InfluxDB.
8799
*
@@ -127,6 +139,16 @@ interface WriteKotlinApi {
127139
*/
128140
suspend fun writePoints(points: Flow<Point>, bucket: String? = null, org: String? = null)
129141

142+
/**
143+
* Write Data Points into InfluxDB.
144+
*
145+
* If any exception occurs during write, this exception is rethrown from this method.
146+
*
147+
* @param points specified data points. The `points` are considered as one batch unit.
148+
* @param parameters specify InfluxDB Write endpoint parameters
149+
*/
150+
suspend fun writePoints(points: Flow<Point>, parameters: WriteParameters)
151+
130152
/**
131153
* Write Measurement into InfluxDB.
132154
*
@@ -177,4 +199,14 @@ interface WriteKotlinApi {
177199
* @param <M> measurement type
178200
*/
179201
suspend fun <M> writeMeasurements(measurements: Flow<M>, precision: WritePrecision, bucket: String? = null, org: String? = null)
202+
203+
/**
204+
* Write Measurements into InfluxDB.
205+
*
206+
* If any exception occurs during write, this exception is rethrown from this method.
207+
*
208+
* @param measurements specified Measurements. The `measurements` are considered as one batch unit.
209+
* @param parameters specify InfluxDB Write endpoint parameters
210+
*/
211+
suspend fun <M> writeMeasurements(measurements: Flow<M>, parameters: WriteParameters)
180212
}

client-kotlin/src/main/kotlin/com/influxdb/client/kotlin/internal/WriteKotlinApiImpl.kt

+23-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,11 @@ import com.influxdb.client.internal.AbstractWriteClient
2828
import com.influxdb.client.kotlin.WriteKotlinApi
2929
import com.influxdb.client.service.WriteService
3030
import com.influxdb.client.write.Point
31+
import com.influxdb.client.write.WriteParameters
3132
import kotlinx.coroutines.flow.Flow
3233
import kotlinx.coroutines.flow.asFlow
3334
import kotlinx.coroutines.flow.map
3435
import kotlinx.coroutines.flow.toList
35-
import java.util.*
3636

3737
/**
3838
* @author Jakub Bednar (20/04/2021 9:27)
@@ -58,6 +58,10 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
5858
write(records.map { AbstractWriteClient.BatchWriteDataRecord(it) }, precision, bucket, org)
5959
}
6060

61+
override suspend fun writeRecords(records: Flow<String>, parameters: WriteParameters) {
62+
write(records.map { AbstractWriteClient.BatchWriteDataRecord(it) }, parameters)
63+
}
64+
6165
override suspend fun writePoint(point: Point, bucket: String?, org: String?) {
6266
writePoints(listOf(point), bucket, org)
6367
}
@@ -67,15 +71,17 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
6771
}
6872

6973
override suspend fun writePoints(points: Flow<Point>, bucket: String?, org: String?) {
74+
writePoints(points, WriteParameters(bucket, org, options.precision, options.consistency))
75+
}
76+
77+
override suspend fun writePoints(points: Flow<Point>, parameters: WriteParameters) {
7078
points
7179
.toList()
7280
.groupByTo(LinkedHashMap(), { it.precision }, { it })
7381
.forEach { group ->
7482
write(
7583
group.value.asFlow().map { AbstractWriteClient.BatchWriteDataPoint(it, options) },
76-
group.key,
77-
bucket,
78-
org
84+
parameters.copy(group.key, options)
7985
)
8086
}
8187
}
@@ -107,6 +113,10 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
107113
write(measurements.map { toMeasurementBatch(it, precision) }, precision, bucket, org)
108114
}
109115

116+
override suspend fun <M> writeMeasurements(measurements: Flow<M>, parameters: WriteParameters) {
117+
write(measurements.map { toMeasurementBatch(it, parameters.precisionSafe(options)) }, parameters)
118+
}
119+
110120
private suspend fun write(
111121
records: Flow<AbstractWriteClient.BatchWriteData>,
112122
precision: WritePrecision,
@@ -117,7 +127,15 @@ internal class WriteKotlinApiImpl(service: WriteService, options: InfluxDBClient
117127
val bucketOrOption = bucket ?: options.bucket.orEmpty()
118128
val orgOrOption = org ?: options.org.orEmpty()
119129

120-
write(bucketOrOption, orgOrOption, precision, records.toList().stream())
130+
write(records, WriteParameters(bucketOrOption, orgOrOption, precision))
131+
}
132+
133+
private suspend fun write(
134+
records: Flow<AbstractWriteClient.BatchWriteData>,
135+
parameters: WriteParameters
136+
) {
137+
138+
write(parameters, records.toList().stream())
121139
}
122140
}
123141

client-kotlin/src/test/kotlin/com/influxdb/client/kotlin/WriteKotlinApiTest.kt

+55
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121
*/
2222
package com.influxdb.client.kotlin
2323

24+
import com.influxdb.client.domain.WriteConsistency
2425
import com.influxdb.client.domain.WritePrecision
2526
import com.influxdb.client.write.Point
27+
import com.influxdb.client.write.WriteParameters
2628
import com.influxdb.exceptions.UnauthorizedException
2729
import com.influxdb.test.AbstractMockServerTest
2830
import kotlinx.coroutines.flow.Flow
31+
import kotlinx.coroutines.flow.asFlow
2932
import kotlinx.coroutines.flow.collect
3033
import kotlinx.coroutines.flow.flow
3134
import kotlinx.coroutines.runBlocking
35+
import okhttp3.mockwebserver.RecordedRequest
3236
import org.assertj.core.api.Assertions
3337
import org.junit.jupiter.api.AfterEach
3438
import org.junit.jupiter.api.BeforeEach
@@ -230,6 +234,57 @@ class WriteKotlinApiTest : AbstractMockServerTest() {
230234
Assertions.assertThat(mockServer.requestCount).isEqualTo(5)
231235
}
232236

237+
@Test
238+
fun writeParameters(): Unit = runBlocking {
239+
val assertParameters = Runnable {
240+
val request: RecordedRequest = try {
241+
takeRequest()
242+
} catch (e: InterruptedException) {
243+
throw RuntimeException(e)
244+
}
245+
Assertions.assertThat(request.requestUrl).isNotNull
246+
Assertions.assertThat("s")
247+
.isEqualTo(request.requestUrl!!.queryParameter("precision"))
248+
Assertions.assertThat("c").isEqualTo(request.requestUrl!!.queryParameter("bucket"))
249+
Assertions.assertThat("d").isEqualTo(request.requestUrl!!.queryParameter("org"))
250+
Assertions.assertThat("quorum")
251+
.isEqualTo(request.requestUrl!!.queryParameter("consistency"))
252+
}
253+
254+
val parameters = WriteParameters("c", "d", WritePrecision.S, WriteConsistency.QUORUM)
255+
256+
// records
257+
val lineProtocols = flow {
258+
for (i in 1..49) {
259+
emit("h2o,location=coyote_creek level=${i}.0 $i")
260+
}
261+
}
262+
enqueuedResponse()
263+
writeApi.writeRecords(lineProtocols, parameters)
264+
assertParameters.run()
265+
266+
// points
267+
val point = Point
268+
.measurement("h2o")
269+
.addField("level", 1)
270+
.time(1, WritePrecision.S)
271+
272+
enqueuedResponse()
273+
writeApi.writePoints(listOf(point, point).asFlow(), parameters)
274+
assertParameters.run()
275+
276+
// measurements
277+
val mem = ITQueryKotlinApi.Mem()
278+
mem.host = "192.168.1.100"
279+
mem.region = "europe"
280+
mem.free = 40
281+
mem.time = Instant.ofEpochSecond(10)
282+
283+
enqueuedResponse()
284+
writeApi.writeMeasurements(listOf(mem, mem).asFlow(), parameters)
285+
assertParameters.run()
286+
}
287+
233288
private suspend fun <T> Flow<T>.chunks(size: Int): Flow<List<T>> = flow {
234289
val chunk = ArrayList<T>(size)
235290
collect {

client-legacy/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -43,12 +43,12 @@ FluxClient fluxClient = FluxClientFactory.create("http://localhost:8086?readTime
4343
```
4444
The following options are supported:
4545

46-
| Property name | default | description |
47-
| --------------|-------------|-------------|
48-
| readTimeout | 10000 ms| read timeout |
49-
| writeTimeout | 10000 ms| write timeout |
50-
| connectTimeout | 10000 ms| socket timeout |
51-
| logLevel | NONE | rest client verbosity level |
46+
| Property name | default | description |
47+
|----------------|----------|-----------------------------|
48+
| readTimeout | 10000 ms | read timeout |
49+
| writeTimeout | 10000 ms | write timeout |
50+
| connectTimeout | 10000 ms | socket timeout |
51+
| logLevel | NONE | rest client verbosity level |
5252

5353
## Query using the Flux language
5454

client-reactive/README.md

+21-19
Original file line numberDiff line numberDiff line change
@@ -302,16 +302,17 @@ A client can be configured via configuration file. The configuration file has to
302302

303303
The following options are supported:
304304

305-
| Property name | default | description |
306-
| --------------------------|-----------|-------------|
307-
| influx2.url | - | the url to connect to InfluxDB |
308-
| influx2.org | - | default destination organization for writes and queries |
309-
| influx2.bucket | - | default destination bucket for writes |
310-
| influx2.token | - | the token to use for the authorization |
311-
| influx2.logLevel | NONE | rest client verbosity level |
312-
| influx2.readTimeout | 10000 ms | read timeout |
313-
| influx2.writeTimeout | 10000 ms | write timeout |
314-
| influx2.connectTimeout | 10000 ms | socket timeout |
305+
| Property name | default | description |
306+
|--------------------------|------------|------------------------------------------------------------|
307+
| influx2.url | - | the url to connect to InfluxDB |
308+
| influx2.org | - | default destination organization for writes and queries |
309+
| influx2.bucket | - | default destination bucket for writes |
310+
| influx2.token | - | the token to use for the authorization |
311+
| influx2.logLevel | NONE | rest client verbosity level |
312+
| influx2.readTimeout | 10000 ms | read timeout |
313+
| influx2.writeTimeout | 10000 ms | write timeout |
314+
| influx2.connectTimeout | 10000 ms | socket timeout |
315+
| influx2.precision | NS | default precision for unix timestamps in the line protocol |
315316

316317
The `influx2.readTimeout`, `influx2.writeTimeout` and `influx2.connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.
317318

@@ -345,15 +346,16 @@ InfluxDBClientReactive influxDBClient = InfluxDBClientReactiveFactory
345346
```
346347
The following options are supported:
347348

348-
| Property name | default | description |
349-
| ------------------|-----------|-------------|
350-
| org | - | default destination organization for writes and queries |
351-
| bucket | - | default destination bucket for writes |
352-
| token | - | the token to use for the authorization |
353-
| logLevel | NONE | rest client verbosity level |
354-
| readTimeout | 10000 ms | read timeout |
355-
| writeTimeout | 10000 ms | write timeout |
356-
| connectTimeout | 10000 ms | socket timeout |
349+
| Property name | default | description |
350+
|------------------|------------|------------------------------------------------------------|
351+
| org | - | default destination organization for writes and queries |
352+
| bucket | - | default destination bucket for writes |
353+
| token | - | the token to use for the authorization |
354+
| logLevel | NONE | rest client verbosity level |
355+
| readTimeout | 10000 ms | read timeout |
356+
| writeTimeout | 10000 ms | write timeout |
357+
| connectTimeout | 10000 ms | socket timeout |
358+
| precision | NS | default precision for unix timestamps in the line protocol |
357359

358360
The `readTimeout`, `writeTimeout` and `connectTimeout` supports `ms`, `s` and `m` as unit. Default is milliseconds.
359361

0 commit comments

Comments
 (0)