Skip to content

Commit c17f5be

Browse files
mherwegematgroe
authored andcommitted
Persistence alias support (openhab#18286)
Signed-off-by: Mark Herwege <[email protected]>
1 parent 0264ea2 commit c17f5be

File tree

20 files changed

+218
-103
lines changed

20 files changed

+218
-103
lines changed

bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/AbstractDynamoDBItem.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ public static DynamoDBItem<?> fromStateNew(Item item, ZonedDateTime time, @Nulla
449449
if (deserializedState == null) {
450450
return null;
451451
}
452-
return new DynamoDBHistoricItem(getName(), deserializedState, getTime().toInstant());
452+
return new DynamoDBHistoricItem(item.getName(), deserializedState, getTime().toInstant());
453453
} catch (Exception e) {
454454
logger.trace("Failed to convert state '{}' to item {} {}: {} {}. Data persisted with incompatible item.",
455455
this.state, item.getClass().getSimpleName(), item.getName(), e.getClass().getSimpleName(),

bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBPersistenceService.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,11 @@ public Set<PersistenceItemInfo> getItemInfo() {
350350

351351
@Override
352352
public Iterable<HistoricItem> query(FilterCriteria filter) {
353+
return query(filter, null);
354+
}
355+
356+
@Override
357+
public Iterable<HistoricItem> query(FilterCriteria filter, @Nullable String alias) {
353358
logIfManyQueuedTasks();
354359
Instant start = Instant.now();
355360
String filterDescription = filterToString(filter);
@@ -419,7 +424,7 @@ public Iterable<HistoricItem> query(FilterCriteria filter) {
419424
item.getClass().getSimpleName(), dtoClass.getSimpleName(), tableName);
420425

421426
QueryEnhancedRequest queryExpression = DynamoDBQueryUtils.createQueryExpression(dtoClass,
422-
localTableNameResolver.getTableSchema(), item, filter, unitProvider);
427+
localTableNameResolver.getTableSchema(), item, alias, filter, unitProvider);
423428

424429
CompletableFuture<List<DynamoDBItem<?>>> itemsFuture = new CompletableFuture<>();
425430
final SdkPublisher<? extends DynamoDBItem<?>> itemPublisher = table.query(queryExpression).items();

bundles/org.openhab.persistence.dynamodb/src/main/java/org/openhab/persistence/dynamodb/internal/DynamoDBQueryUtils.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,21 @@ public class DynamoDBQueryUtils {
4444
* @param dtoClass dto class
4545
* @param expectedTableSchema table schema to query against
4646
* @param item item corresponding to filter
47+
* @param alias corresponding to item
4748
* @param filter filter for the query
4849
* @return DynamoDBQueryExpression corresponding to the given FilterCriteria
4950
* @param unitProvider the unit provider for number with dimension
5051
* @throws IllegalArgumentException when schema is not fully resolved
5152
*/
5253
public static QueryEnhancedRequest createQueryExpression(Class<? extends DynamoDBItem<?>> dtoClass,
53-
ExpectedTableSchema expectedTableSchema, Item item, FilterCriteria filter, UnitProvider unitProvider) {
54+
ExpectedTableSchema expectedTableSchema, Item item, @Nullable String alias, FilterCriteria filter,
55+
UnitProvider unitProvider) {
5456
if (!expectedTableSchema.isFullyResolved()) {
5557
throw new IllegalArgumentException("Schema not resolved");
5658
}
5759
QueryEnhancedRequest.Builder queryBuilder = QueryEnhancedRequest.builder()
5860
.scanIndexForward(filter.getOrdering() == Ordering.ASCENDING);
59-
String itemName = filter.getItemName();
61+
String itemName = alias != null ? alias : filter.getItemName();
6062
if (itemName == null) {
6163
throw new IllegalArgumentException("Item name not set");
6264
}

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/InfluxDBPersistenceService.java

+20-14
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ public void store(Item item, ZonedDateTime date, State state, @Nullable String a
204204
logger.warn("InfluxDB service not ready. Storing {} rejected.", item);
205205
return;
206206
}
207-
convert(item, state, date.toInstant(), null).thenAccept(point -> {
207+
convert(item, state, date.toInstant(), alias).thenAccept(point -> {
208208
if (point == null) {
209209
logger.trace("Ignoring item {}, conversion to an InfluxDB point failed.", item.getName());
210210
return;
@@ -233,27 +233,33 @@ public boolean remove(FilterCriteria filter) throws IllegalArgumentException {
233233

234234
@Override
235235
public Iterable<HistoricItem> query(FilterCriteria filter) {
236+
return query(filter, null);
237+
}
238+
239+
@Override
240+
public Iterable<HistoricItem> query(FilterCriteria filter, @Nullable String alias) {
241+
String itemName = filter.getItemName();
242+
if (itemName == null) {
243+
logger.warn("Item name is missing in filter {} when querying data.", filter);
244+
return List.of();
245+
}
236246
if (serviceActivated && checkConnection()) {
237247
logger.trace(
238248
"Query-Filter: itemname: {}, ordering: {}, state: {}, operator: {}, getBeginDate: {}, getEndDate: {}, getPageSize: {}, getPageNumber: {}",
239-
filter.getItemName(), filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
249+
itemName, filter.getOrdering().toString(), filter.getState(), filter.getOperator(),
240250
filter.getBeginDate(), filter.getEndDate(), filter.getPageSize(), filter.getPageNumber());
241-
if (filter.getItemName() == null) {
242-
logger.warn("Item name is missing in filter {} when querying data.", filter);
243-
return List.of();
244-
}
245251

246252
List<InfluxDBRepository.InfluxRow> results = influxDBRepository.query(filter,
247-
configuration.getRetentionPolicy());
248-
return results.stream().map(this::mapRowToHistoricItem).collect(Collectors.toList());
253+
configuration.getRetentionPolicy(), alias);
254+
return results.stream().map(r -> mapRowToHistoricItem(r, itemName)).collect(Collectors.toList());
249255
} else {
250256
logger.debug("Query for persisted data ignored, InfluxDB is not connected");
251257
return List.of();
252258
}
253259
}
254260

255-
private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row) {
256-
State state = InfluxDBStateConvertUtils.objectToState(row.value(), row.itemName(), itemRegistry);
261+
private HistoricItem mapRowToHistoricItem(InfluxDBRepository.InfluxRow row, String itemName) {
262+
State state = InfluxDBStateConvertUtils.objectToState(row.value(), itemName, itemRegistry);
257263
return new InfluxDBHistoricItem(row.itemName(), state, row.time());
258264
}
259265

@@ -314,8 +320,8 @@ private void commit() {
314320
}
315321

316322
return CompletableFuture.supplyAsync(() -> {
317-
String measurementName = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
318-
measurementName = influxDBMetadataService.getMeasurementNameOrDefault(itemName, measurementName);
323+
String alias = storeAlias != null && !storeAlias.isBlank() ? storeAlias : itemName;
324+
String measurementName = influxDBMetadataService.getMeasurementNameOrDefault(alias);
319325

320326
if (configuration.isReplaceUnderscore()) {
321327
measurementName = measurementName.replace('_', '.');
@@ -326,7 +332,7 @@ private void commit() {
326332
Object value = InfluxDBStateConvertUtils.stateToObject(storeState);
327333

328334
InfluxPoint.Builder pointBuilder = InfluxPoint.newBuilder(measurementName).withTime(timeStamp)
329-
.withValue(value).withTag(TAG_ITEM_NAME, itemName);
335+
.withValue(value).withTag(TAG_ITEM_NAME, alias);
330336

331337
if (configuration.isAddCategoryTag()) {
332338
String categoryName = Objects.requireNonNullElse(category, "n/a");
@@ -342,7 +348,7 @@ private void commit() {
342348
pointBuilder.withTag(TAG_LABEL_NAME, labelName);
343349
}
344350

345-
influxDBMetadataService.getMetaData(itemName)
351+
influxDBMetadataService.getMetaData(alias)
346352
.ifPresent(metadata -> metadata.getConfiguration().forEach(pointBuilder::withTag));
347353

348354
return pointBuilder.build();

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/FilterCriteriaQueryCreator.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
package org.openhab.persistence.influxdb.internal;
1414

1515
import org.eclipse.jdt.annotation.NonNullByDefault;
16+
import org.eclipse.jdt.annotation.Nullable;
1617
import org.openhab.core.persistence.FilterCriteria;
1718

1819
/**
@@ -24,12 +25,13 @@
2425
public interface FilterCriteriaQueryCreator {
2526
/**
2627
* Create query from {@link FilterCriteria}
27-
*
28+
*
2829
* @param criteria Criteria to create query from
2930
* @param retentionPolicy Name of the retentionPolicy/bucket to use in query
31+
* @param alias
3032
* @return Created query as a String
3133
*/
32-
String createQuery(FilterCriteria criteria, String retentionPolicy);
34+
String createQuery(FilterCriteria criteria, String retentionPolicy, @Nullable String alias);
3335

3436
default String getOperationSymbol(FilterCriteria.Operator operator, InfluxDBVersion version) {
3537
return switch (operator) {

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBMetadataService.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,9 @@ public InfluxDBMetadataService(@Reference MetadataRegistry metadataRegistry) {
4242
* get the measurement name from the item metadata or return the provided default
4343
*
4444
* @param itemName the item name
45-
* @param defaultName the default measurement name (
4645
* @return the metadata measurement name if present, defaultName otherwise
4746
*/
48-
public String getMeasurementNameOrDefault(String itemName, String defaultName) {
47+
public String getMeasurementNameOrDefault(String itemName) {
4948
Optional<Metadata> metadata = getMetaData(itemName);
5049
if (metadata.isPresent()) {
5150
String metaName = metadata.get().getValue();
@@ -54,7 +53,7 @@ public String getMeasurementNameOrDefault(String itemName, String defaultName) {
5453
}
5554
}
5655

57-
return defaultName;
56+
return itemName;
5857
}
5958

6059
/**

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/InfluxDBRepository.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import java.util.Map;
1818

1919
import org.eclipse.jdt.annotation.NonNullByDefault;
20+
import org.eclipse.jdt.annotation.Nullable;
2021
import org.openhab.core.persistence.FilterCriteria;
2122

2223
/**
@@ -63,10 +64,11 @@ public interface InfluxDBRepository {
6364
* Executes Flux query
6465
*
6566
* @param filter the query filter
67+
* @param alias
6668
* @return Query results
67-
*
69+
*
6870
*/
69-
List<InfluxRow> query(FilterCriteria filter, String retentionPolicy);
71+
List<InfluxRow> query(FilterCriteria filter, String retentionPolicy, @Nullable String alias);
7072

7173
/**
7274
* Write points to database

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1FilterCriteriaQueryCreatorImpl.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,10 @@ public InfluxDB1FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configurati
5050
}
5151

5252
@Override
53-
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
53+
public String createQuery(FilterCriteria criteria, String retentionPolicy, @Nullable String alias) {
5454
final String itemName = Objects.requireNonNull(criteria.getItemName()); // we checked non-null before
55-
final String tableName = getTableName(itemName);
55+
final String localAlias = alias != null ? alias : itemName;
56+
final String tableName = getTableName(localAlias);
5657
final boolean hasCriteriaName = itemName != null;
5758

5859
Select select = select().column("\"" + COLUMN_VALUE_NAME_V1 + "\"::field")
@@ -61,8 +62,8 @@ public String createQuery(FilterCriteria criteria, String retentionPolicy) {
6162

6263
Where where = select.where();
6364

64-
if (itemName != null && !tableName.equals(itemName)) {
65-
where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, itemName));
65+
if (localAlias != null && !tableName.equals(localAlias)) {
66+
where.and(BuiltQuery.QueryBuilder.eq(TAG_ITEM_NAME, localAlias));
6667
}
6768
if (criteria.getBeginDate() != null) {
6869
where.and(BuiltQuery.QueryBuilder.gte(COLUMN_TIME_NAME_V1, criteria.getBeginDate().toInstant().toString()));
@@ -99,7 +100,7 @@ private String getTableName(@Nullable String itemName) {
99100
return "/.*/";
100101
}
101102

102-
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
103+
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName);
103104

104105
if (configuration.isReplaceUnderscore()) {
105106
name = name.replace('_', '.');

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx1/InfluxDB1RepositoryImpl.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,7 @@
1212
*/
1313
package org.openhab.persistence.influxdb.internal.influx1;
1414

15-
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_TIME_NAME_V1;
16-
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.COLUMN_VALUE_NAME_V1;
17-
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.FIELD_VALUE_NAME;
18-
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.TAG_ITEM_NAME;
15+
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
1916

2017
import java.time.Instant;
2118
import java.util.ArrayList;
@@ -168,11 +165,11 @@ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
168165
}
169166

170167
@Override
171-
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
168+
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy, @Nullable String alias) {
172169
try {
173170
final InfluxDB currentClient = client;
174171
if (currentClient != null) {
175-
String query = queryCreator.createQuery(filter, retentionPolicy);
172+
String query = queryCreator.createQuery(filter, retentionPolicy, alias);
176173
logger.trace("Query {}", query);
177174
Query parsedQuery = new Query(query, configuration.getDatabaseName());
178175
List<QueryResult.Result> results = currentClient.query(parsedQuery, TimeUnit.MILLISECONDS).getResults();

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2FilterCriteriaQueryCreatorImpl.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@
1212
*/
1313
package org.openhab.persistence.influxdb.internal.influx2;
1414

15-
import static com.influxdb.query.dsl.functions.restriction.Restrictions.measurement;
16-
import static com.influxdb.query.dsl.functions.restriction.Restrictions.tag;
15+
import static com.influxdb.query.dsl.functions.restriction.Restrictions.*;
1716
import static org.openhab.persistence.influxdb.internal.InfluxDBConstants.*;
1817
import static org.openhab.persistence.influxdb.internal.InfluxDBStateConvertUtils.stateToObject;
1918

2019
import java.time.temporal.ChronoUnit;
2120
import java.util.Objects;
2221

2322
import org.eclipse.jdt.annotation.NonNullByDefault;
23+
import org.eclipse.jdt.annotation.Nullable;
2424
import org.openhab.core.persistence.FilterCriteria;
2525
import org.openhab.core.types.State;
2626
import org.openhab.persistence.influxdb.internal.FilterCriteriaQueryCreator;
@@ -49,7 +49,7 @@ public InfluxDB2FilterCriteriaQueryCreatorImpl(InfluxDBConfiguration configurati
4949
}
5050

5151
@Override
52-
public String createQuery(FilterCriteria criteria, String retentionPolicy) {
52+
public String createQuery(FilterCriteria criteria, String retentionPolicy, @Nullable String alias) {
5353
Flux flux = Flux.from(retentionPolicy);
5454

5555
RangeFlux range = flux.range();
@@ -66,7 +66,8 @@ public String createQuery(FilterCriteria criteria, String retentionPolicy) {
6666
flux = range;
6767

6868
String itemName = Objects.requireNonNull(criteria.getItemName()); // we checked non-null before
69-
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
69+
final String localAlias = alias != null ? alias : itemName;
70+
String name = influxDBMetadataService.getMeasurementNameOrDefault(localAlias);
7071
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
7172
flux = flux.filter(measurement().equal(measurementName));
7273
if (!measurementName.equals(itemName)) {

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2RepositoryImpl.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public boolean remove(FilterCriteria filter) {
176176
String predicate = "";
177177
String itemName = filter.getItemName();
178178
if (itemName != null) {
179-
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName, itemName);
179+
String name = influxDBMetadataService.getMeasurementNameOrDefault(itemName);
180180
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
181181
predicate = "(_measurement=\"" + measurementName + "\")";
182182
}
@@ -213,11 +213,11 @@ private Optional<Point> convertPointToClientFormat(InfluxPoint point) {
213213
}
214214

215215
@Override
216-
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy) {
216+
public List<InfluxRow> query(FilterCriteria filter, String retentionPolicy, @Nullable String alias) {
217217
try {
218218
final QueryApi currentQueryAPI = queryAPI;
219219
if (currentQueryAPI != null) {
220-
String query = queryCreator.createQuery(filter, retentionPolicy);
220+
String query = queryCreator.createQuery(filter, retentionPolicy, alias);
221221
logger.trace("Query {}", query);
222222
List<FluxTable> clientResult = currentQueryAPI.query(query);
223223
return clientResult.stream().flatMap(this::mapRawResultToHistoric).toList();

0 commit comments

Comments
 (0)