Skip to content

Commit 01cb892

Browse files
authored
[mqtt.homeassistant] Only subscribe to topics for linked channels (openhab#18239)
* [mqtt.homeassistant] Only subscribe to topics for linked channels Signed-off-by: Cody Cutrer <[email protected]>
1 parent fd63e11 commit 01cb892

29 files changed

+312
-19
lines changed

bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/DiscoverComponents.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class DiscoverComponents implements MqttMessageSubscriber {
5252
private final ThingUID thingUID;
5353
private final ScheduledExecutorService scheduler;
5454
private final ChannelStateUpdateListener updateListener;
55+
private final HomeAssistantChannelLinkageChecker linkageChecker;
5556
private final AvailabilityTracker tracker;
5657

5758
protected final CompletableFuture<@Nullable Void> discoverFinishedFuture = new CompletableFuture<>();
@@ -82,11 +83,12 @@ public static interface ComponentDiscovered {
8283
* @param channelStateUpdateListener Channel update listener. Usually the handler.
8384
*/
8485
public DiscoverComponents(ThingUID thingUID, ScheduledExecutorService scheduler,
85-
ChannelStateUpdateListener channelStateUpdateListener, AvailabilityTracker tracker, Gson gson,
86-
Jinjava jinjava, UnitProvider unitProvider) {
86+
ChannelStateUpdateListener channelStateUpdateListener, HomeAssistantChannelLinkageChecker linkageChecker,
87+
AvailabilityTracker tracker, Gson gson, Jinjava jinjava, UnitProvider unitProvider) {
8788
this.thingUID = thingUID;
8889
this.scheduler = scheduler;
8990
this.updateListener = channelStateUpdateListener;
91+
this.linkageChecker = linkageChecker;
9092
this.gson = gson;
9193
this.jinjava = jinjava;
9294
this.unitProvider = unitProvider;
@@ -105,8 +107,8 @@ public void processMessage(String topic, byte[] payload) {
105107

106108
if (config.length() > 0) {
107109
try {
108-
component = ComponentFactory.createComponent(thingUID, haID, config, updateListener, tracker, scheduler,
109-
gson, jinjava, unitProvider);
110+
component = ComponentFactory.createComponent(thingUID, haID, config, updateListener, linkageChecker,
111+
tracker, scheduler, gson, jinjava, unitProvider);
110112
component.setConfigSeen();
111113

112114
logger.trace("Found HomeAssistant component {}", haID);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright (c) 2010-2025 Contributors to the openHAB project
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.openhab.binding.mqtt.homeassistant.internal;
14+
15+
import org.eclipse.jdt.annotation.NonNullByDefault;
16+
import org.openhab.core.thing.ChannelUID;
17+
18+
/**
19+
* Allows a Home Assistant component to check if a channel is linked (and thus worth subscribing to)
20+
*
21+
* @author Cody Cutrer - Initial contribution
22+
*/
23+
@NonNullByDefault
24+
public interface HomeAssistantChannelLinkageChecker {
25+
/**
26+
* Returns whether at least one item is linked for the given channel ID.
27+
*
28+
* @param channelId channel ID (must not be null)
29+
* @return true if at least one item is linked, false otherwise
30+
*/
31+
boolean isChannelLinked(ChannelUID channelUID);
32+
}

bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponent.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.openhab.core.thing.type.ChannelGroupType;
5656
import org.openhab.core.thing.type.ChannelGroupTypeBuilder;
5757
import org.openhab.core.thing.type.ChannelGroupTypeUID;
58+
import org.openhab.core.thing.type.ChannelKind;
5859
import org.openhab.core.types.CommandDescription;
5960
import org.openhab.core.types.StateDescription;
6061

@@ -232,8 +233,14 @@ public void setConfigSeen() {
232233
*/
233234
public CompletableFuture<@Nullable Void> start(MqttBrokerConnection connection, ScheduledExecutorService scheduler,
234235
int timeout) {
235-
return Stream.concat(channels.values().stream(), hiddenChannels.stream())
236-
.map(v -> v.start(connection, scheduler, timeout)) //
236+
// Hidden channels (used by a component to simulate other channels or non-channel behavior),
237+
// triggers channels (which can be used by rules without ever being linked),
238+
// and linked channels are started.
239+
// Therefore, unlinked channels are not started.
240+
return Stream.concat(channels.values().stream().filter(c -> {
241+
return c.getChannel().getKind().equals(ChannelKind.TRIGGER)
242+
|| componentConfiguration.getLinkageChecker().isChannelLinked(c.getChannel().getUID());
243+
}), hiddenChannels.stream()).map(v -> v.start(connection, scheduler, timeout)) //
237244
.reduce(CompletableFuture.completedFuture(null), (f, v) -> f.thenCompose(b -> v));
238245
}
239246

bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/component/ComponentFactory.java

+14-5
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import org.openhab.binding.mqtt.generic.AvailabilityTracker;
1919
import org.openhab.binding.mqtt.generic.ChannelStateUpdateListener;
2020
import org.openhab.binding.mqtt.homeassistant.internal.HaID;
21+
import org.openhab.binding.mqtt.homeassistant.internal.HomeAssistantChannelLinkageChecker;
2122
import org.openhab.binding.mqtt.homeassistant.internal.config.dto.AbstractChannelConfiguration;
2223
import org.openhab.binding.mqtt.homeassistant.internal.exception.ConfigurationException;
2324
import org.openhab.binding.mqtt.homeassistant.internal.exception.UnsupportedComponentException;
@@ -47,10 +48,12 @@ public class ComponentFactory {
4748
* @return A HA MQTT Component
4849
*/
4950
public static AbstractComponent<?> createComponent(ThingUID thingUID, HaID haID, String channelConfigurationJSON,
50-
ChannelStateUpdateListener updateListener, AvailabilityTracker tracker, ScheduledExecutorService scheduler,
51-
Gson gson, Jinjava jinjava, UnitProvider unitProvider) throws ConfigurationException {
51+
ChannelStateUpdateListener updateListener, HomeAssistantChannelLinkageChecker linkageChecker,
52+
AvailabilityTracker tracker, ScheduledExecutorService scheduler, Gson gson, Jinjava jinjava,
53+
UnitProvider unitProvider) throws ConfigurationException {
5254
ComponentConfiguration componentConfiguration = new ComponentConfiguration(thingUID, haID,
53-
channelConfigurationJSON, gson, jinjava, updateListener, tracker, scheduler, unitProvider);
55+
channelConfigurationJSON, gson, jinjava, updateListener, linkageChecker, tracker, scheduler,
56+
unitProvider);
5457
switch (haID.component) {
5558
case "alarm_control_panel":
5659
return new AlarmControlPanel(componentConfiguration);
@@ -110,6 +113,7 @@ protected static class ComponentConfiguration {
110113
private final HaID haID;
111114
private final String configJSON;
112115
private final ChannelStateUpdateListener updateListener;
116+
private final HomeAssistantChannelLinkageChecker linkageChecker;
113117
private final AvailabilityTracker tracker;
114118
private final Gson gson;
115119
private final Jinjava jinjava;
@@ -125,14 +129,15 @@ protected static class ComponentConfiguration {
125129
* @param gson A Gson instance
126130
*/
127131
protected ComponentConfiguration(ThingUID thingUID, HaID haID, String configJSON, Gson gson, Jinjava jinjava,
128-
ChannelStateUpdateListener updateListener, AvailabilityTracker tracker,
129-
ScheduledExecutorService scheduler, UnitProvider unitProvider) {
132+
ChannelStateUpdateListener updateListener, HomeAssistantChannelLinkageChecker linkageChecker,
133+
AvailabilityTracker tracker, ScheduledExecutorService scheduler, UnitProvider unitProvider) {
130134
this.thingUID = thingUID;
131135
this.haID = haID;
132136
this.configJSON = configJSON;
133137
this.gson = gson;
134138
this.jinjava = jinjava;
135139
this.updateListener = updateListener;
140+
this.linkageChecker = linkageChecker;
136141
this.tracker = tracker;
137142
this.scheduler = scheduler;
138143
this.unitProvider = unitProvider;
@@ -154,6 +159,10 @@ public ChannelStateUpdateListener getUpdateListener() {
154159
return updateListener;
155160
}
156161

162+
public HomeAssistantChannelLinkageChecker getLinkageChecker() {
163+
return linkageChecker;
164+
}
165+
157166
public Gson getGson() {
158167
return gson;
159168
}

bundles/org.openhab.binding.mqtt.homeassistant/src/main/java/org/openhab/binding/mqtt/homeassistant/internal/handler/HomeAssistantThingHandler.java

+36-3
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.openhab.binding.mqtt.homeassistant.internal.DiscoverComponents.ComponentDiscovered;
4040
import org.openhab.binding.mqtt.homeassistant.internal.HaID;
4141
import org.openhab.binding.mqtt.homeassistant.internal.HandlerConfiguration;
42+
import org.openhab.binding.mqtt.homeassistant.internal.HomeAssistantChannelLinkageChecker;
4243
import org.openhab.binding.mqtt.homeassistant.internal.component.AbstractComponent;
4344
import org.openhab.binding.mqtt.homeassistant.internal.component.ComponentFactory;
4445
import org.openhab.binding.mqtt.homeassistant.internal.component.DeviceTrigger;
@@ -83,7 +84,7 @@
8384
*/
8485
@NonNullByDefault
8586
public class HomeAssistantThingHandler extends AbstractMQTTThingHandler
86-
implements ComponentDiscovered, Consumer<List<Object>> {
87+
implements ComponentDiscovered, Consumer<List<Object>>, HomeAssistantChannelLinkageChecker {
8788
public static final String AVAILABILITY_CHANNEL = "availability";
8889
private static final Comparator<AbstractComponent<?>> COMPONENT_COMPARATOR = Comparator
8990
.comparing((AbstractComponent<?> component) -> component.hasGroup())
@@ -134,7 +135,7 @@ public HomeAssistantThingHandler(Thing thing, MqttChannelTypeProvider channelTyp
134135
this.unitProvider = unitProvider;
135136
this.attributeReceiveTimeout = attributeReceiveTimeout;
136137
this.delayedProcessing = new DelayedBatchProcessing<>(attributeReceiveTimeout, this, scheduler);
137-
this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, gson, jinjava,
138+
this.discoverComponents = new DiscoverComponents(thing.getUID(), scheduler, this, this, this, gson, jinjava,
138139
unitProvider);
139140
}
140141

@@ -183,7 +184,7 @@ public void initialize() {
183184
String channelConfigurationJSON = (String) channelConfig.get("config");
184185
try {
185186
AbstractComponent<?> component = ComponentFactory.createComponent(thingUID, haID,
186-
channelConfigurationJSON, this, this, scheduler, gson, jinjava, unitProvider);
187+
channelConfigurationJSON, this, this, this, scheduler, gson, jinjava, unitProvider);
187188
if (typeID.equals(MqttBindingConstants.HOMEASSISTANT_MQTT_THING)) {
188189
typeID = calculateThingTypeUID(component);
189190
}
@@ -558,4 +559,36 @@ private List<Configuration> flattenChannelConfiguration(Configuration multiCompo
558559
Map<@Nullable String, AbstractComponent<?>> getComponents() {
559560
return haComponents;
560561
}
562+
563+
// For components to check if a channel is linked before starting them
564+
@Override
565+
public boolean isChannelLinked(ChannelUID channelUID) {
566+
return isLinked(channelUID);
567+
}
568+
569+
// A channel is newly linked; make sure it is started
570+
@Override
571+
public void channelLinked(ChannelUID channelUID) {
572+
MqttBrokerConnection connection = this.connection;
573+
// We haven't started at all yet.
574+
if (connection == null) {
575+
return;
576+
}
577+
synchronized (haComponents) {
578+
haComponents.forEach((id, component) -> {
579+
if (component.getChannels().stream().anyMatch(channel -> channel.getUID().equals(channelUID))) {
580+
component.start(connection, scheduler, attributeReceiveTimeout).exceptionally(e -> {
581+
updateStatus(ThingStatus.OFFLINE, ThingStatusDetail.CONFIGURATION_ERROR, e.getMessage());
582+
return null;
583+
});
584+
}
585+
});
586+
}
587+
super.channelLinked(channelUID);
588+
}
589+
590+
// Don't bother unsubscribing on unlink; it's a relatively rare operation during normal usage,
591+
// and a decent amount of effort to make sure there aren't other links before stopping them, and
592+
// making sure not to stop other channels that are still linked.
593+
// A disable/re-enable of the thing will clear the subscriptions.
561594
}

bundles/org.openhab.binding.mqtt.homeassistant/src/test/java/org/openhab/binding/mqtt/homeassistant/internal/component/AbstractComponentTests.java

+40
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.*;
2020

2121
import java.nio.charset.StandardCharsets;
22+
import java.util.HashSet;
2223
import java.util.List;
2324
import java.util.Objects;
2425
import java.util.Set;
@@ -43,10 +44,12 @@
4344
import org.openhab.binding.mqtt.homeassistant.internal.handler.HomeAssistantThingHandler;
4445
import org.openhab.core.i18n.UnitProvider;
4546
import org.openhab.core.library.types.HSBType;
47+
import org.openhab.core.thing.ChannelUID;
4648
import org.openhab.core.thing.Thing;
4749
import org.openhab.core.thing.ThingStatusInfo;
4850
import org.openhab.core.thing.binding.ThingHandlerCallback;
4951
import org.openhab.core.thing.type.AutoUpdatePolicy;
52+
import org.openhab.core.thing.type.ChannelKind;
5053
import org.openhab.core.thing.type.ChannelTypeRegistry;
5154
import org.openhab.core.types.Command;
5255
import org.openhab.core.types.State;
@@ -137,6 +140,32 @@ public void disposeThingHandler() {
137140
return Objects.requireNonNull(thingHandler.getDiscoveredComponent());
138141
}
139142

143+
/**
144+
* Simulate linking an item to a channel, so that the handler knows it should subscribe to the relevant topics
145+
*
146+
* @param component component
147+
* @param channelId channel
148+
*/
149+
protected void linkChannel(AbstractComponent<@NonNull ? extends AbstractChannelConfiguration> component,
150+
String channelId) {
151+
var stateChannel = Objects.requireNonNull(component.getChannel(channelId));
152+
thingHandler.linkChannel(stateChannel.getChannel().getUID());
153+
}
154+
155+
/**
156+
* Simulate linking an item to a all channels of a component, so that the handler knows it should subscribe to the
157+
* relevant topics
158+
*
159+
* @param component component
160+
*/
161+
protected void linkAllChannels(AbstractComponent<@NonNull ? extends AbstractChannelConfiguration> component) {
162+
component.getChannels().forEach(c -> {
163+
if (c.getKind().equals(ChannelKind.STATE)) {
164+
thingHandler.linkChannel(c.getUID());
165+
}
166+
});
167+
}
168+
140169
/**
141170
* Assert channel topics, label and value class
142171
*
@@ -328,6 +357,7 @@ protected void sendCommand(AbstractComponent<@NonNull ? extends AbstractChannelC
328357
}
329358

330359
protected static class LatchThingHandler extends HomeAssistantThingHandler {
360+
private final Set<ChannelUID> linkedChannels = new HashSet<>();
331361
private @Nullable CountDownLatch latch;
332362
private @Nullable AbstractComponent<@NonNull ? extends AbstractChannelConfiguration> discoveredComponent;
333363

@@ -356,5 +386,15 @@ public CountDownLatch createWaitForComponentDiscoveredLatch(int count) {
356386
public @Nullable AbstractComponent<@NonNull ? extends AbstractChannelConfiguration> getDiscoveredComponent() {
357387
return discoveredComponent;
358388
}
389+
390+
public void linkChannel(ChannelUID channelUID) {
391+
linkedChannels.add(channelUID);
392+
channelLinked(channelUID);
393+
}
394+
395+
@Override
396+
protected boolean isLinked(ChannelUID channelUID) {
397+
return linkedChannels.contains(channelUID);
398+
}
359399
}
360400
}

bundles/org.openhab.binding.mqtt.homeassistant/src/test/java/org/openhab/binding/mqtt/homeassistant/internal/component/AlarmControlPanelTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public void testAlarmControlPanel() {
7171
assertChannel(component, AlarmControlPanel.STATE_CHANNEL_ID, "zigbee2mqtt/alarm/state",
7272
"zigbee2mqtt/alarm/set/state", "alarm", TextValue.class);
7373

74+
linkAllChannels(component);
75+
7476
publishMessage("zigbee2mqtt/alarm/state", "armed_home");
7577
assertState(component, AlarmControlPanel.STATE_CHANNEL_ID, new StringType("armed_home"));
7678
publishMessage("zigbee2mqtt/alarm/state", "armed_away");

bundles/org.openhab.binding.mqtt.homeassistant/src/test/java/org/openhab/binding/mqtt/homeassistant/internal/component/BinarySensorTests.java

+6
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public void test() throws InterruptedException {
7070
assertChannel(component, BinarySensor.SENSOR_CHANNEL_ID, "zigbee2mqtt/sensor/state", "", "onoffsensor",
7171
OnOffValue.class);
7272

73+
linkAllChannels(component);
74+
7375
publishMessage("zigbee2mqtt/sensor/state", "{ \"state\": \"ON_\" }");
7476
assertState(component, BinarySensor.SENSOR_CHANNEL_ID, OnOffType.ON);
7577
publishMessage("zigbee2mqtt/sensor/state", "{ \"state\": \"ON_\" }");
@@ -112,6 +114,8 @@ public void offDelayTest() {
112114
""");
113115
// @formatter:on
114116

117+
linkAllChannels(component);
118+
115119
publishMessage("zigbee2mqtt/sensor/state", "{ \"state\": \"ON_\" }");
116120
assertState(component, BinarySensor.SENSOR_CHANNEL_ID, OnOffType.ON);
117121

@@ -150,6 +154,8 @@ public void expireAfterTest() {
150154
""");
151155
// @formatter:on
152156

157+
linkAllChannels(component);
158+
153159
publishMessage("zigbee2mqtt/sensor/state", "{ \"state\": \"OFF_\" }");
154160
assertState(component, BinarySensor.SENSOR_CHANNEL_ID, OnOffType.OFF);
155161

bundles/org.openhab.binding.mqtt.homeassistant/src/test/java/org/openhab/binding/mqtt/homeassistant/internal/component/ButtonTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public void testButton() {
6262
assertThat(Objects.requireNonNull(component.getChannel(Button.BUTTON_CHANNEL_ID)).getChannel()
6363
.getAutoUpdatePolicy(), is(AutoUpdatePolicy.VETO));
6464

65+
linkAllChannels(component);
66+
6567
assertThrows(IllegalArgumentException.class,
6668
() -> component.getChannel(Button.BUTTON_CHANNEL_ID).getState().publishValue(new StringType("ON")));
6769
assertNothingPublished("esphome/single-car-gdo/button/restart/command");

bundles/org.openhab.binding.mqtt.homeassistant/src/test/java/org/openhab/binding/mqtt/homeassistant/internal/component/CameraTests.java

+2
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ public void test() throws InterruptedException {
6262

6363
assertChannel(component, Camera.CAMERA_CHANNEL_ID, "zigbee2mqtt/cam1/state", "", "cam1", ImageValue.class);
6464

65+
linkAllChannels(component);
66+
6567
var imageBytes = getResourceAsByteArray("component/image.png");
6668
publishMessage("zigbee2mqtt/cam1/state", imageBytes);
6769
assertState(component, Camera.CAMERA_CHANNEL_ID, new RawType(imageBytes, "image/png"));

0 commit comments

Comments
 (0)