Skip to content

Commit 44b0c52

Browse files
authored
Make sure that energy meter packets are not queued up. (openhab#16841)
Hide `open()` socket call beneath PacketListener, so caller do not need to care about that. Signed-off-by: Łukasz Dywicki <[email protected]>
1 parent 4401de5 commit 44b0c52

File tree

7 files changed

+138
-40
lines changed

7 files changed

+138
-40
lines changed

bundles/org.openhab.binding.smaenergymeter/README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@ No binding configuration required.
2020
Usually no manual configuration is required, as the multicast IP address and the port remain on their factory set values.
2121
Optionally, a refresh interval (in seconds) can be defined.
2222

23-
| Parameter | Name | Description | Required | Default |
24-
|------------------|-----------------|---------------------------------------|----------|-----------------|
25-
| `serialNumber` | Serial number | Serial number of a meter. | yes | |
26-
| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
27-
| `port` | Port | Port number used by meter. | no | 9522 |
28-
| `pollingPeriod` | Polling Period | Polling period used to readout meter. | no | 30 |
23+
| Parameter | Name | Description | Required | Default |
24+
|------------------|-----------------|------------------------------------------------------------|----------|-----------------|
25+
| `serialNumber` | Serial number | Serial number of a meter. | yes | |
26+
| `mcastGroup` | Multicast Group | Multicast group used by meter. | yes | 239.12.255.254 |
27+
| `port` | Port | Port number used by meter. | no | 9522 |
28+
| `pollingPeriod` | Polling Period | Polling period used to publish meter reading (in seconds). | no | 30 |
2929

3030
The polling period parameter is used to trigger readout of meter. In case if two consecutive readout attempts fail thing will report offline status.
3131

bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/discovery/SMAEnergyMeterDiscoveryService.java

-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ protected void startBackgroundDiscovery() {
7474
try {
7575
packetListener = listenerRegistry.getListener(PacketListener.DEFAULT_MCAST_GRP,
7676
PacketListener.DEFAULT_MCAST_PORT);
77-
packetListener.open(30);
7877
} catch (IOException e) {
7978
logger.warn("Could not start background discovery", e);
8079
return;

bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/handler/SMAEnergyMeterHandler.java

+15-10
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,15 @@
1515
import static org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants.*;
1616

1717
import java.io.IOException;
18+
import java.util.concurrent.TimeUnit;
1819

1920
import org.eclipse.jdt.annotation.Nullable;
2021
import org.openhab.binding.smaenergymeter.internal.configuration.EnergyMeterConfig;
22+
import org.openhab.binding.smaenergymeter.internal.packet.FilteringPayloadHandler;
2123
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener;
2224
import org.openhab.binding.smaenergymeter.internal.packet.PacketListenerRegistry;
2325
import org.openhab.binding.smaenergymeter.internal.packet.PayloadHandler;
26+
import org.openhab.binding.smaenergymeter.internal.packet.ThrottlingPayloadHandler;
2427
import org.openhab.core.thing.ChannelUID;
2528
import org.openhab.core.thing.Thing;
2629
import org.openhab.core.thing.ThingStatus;
@@ -42,7 +45,8 @@ public class SMAEnergyMeterHandler extends BaseThingHandler implements PayloadHa
4245
private final Logger logger = LoggerFactory.getLogger(SMAEnergyMeterHandler.class);
4346
private final PacketListenerRegistry listenerRegistry;
4447
private @Nullable PacketListener listener;
45-
private @Nullable String serialNumber;
48+
private @Nullable PayloadHandler handler;
49+
private String serialNumber;
4650

4751
public SMAEnergyMeterHandler(Thing thing, PacketListenerRegistry listenerRegistry) {
4852
super(thing);
@@ -84,9 +88,13 @@ public void initialize() {
8488
updateStatus(ThingStatus.UNKNOWN);
8589
logger.debug("Activated handler for SMA Energy Meter with S/N '{}'", serialNumber);
8690

87-
listener.addPayloadHandler(this);
88-
89-
listener.open(config.getPollingPeriod());
91+
if (config.getPollingPeriod() <= 1) {
92+
listener.addPayloadHandler(handler = new FilteringPayloadHandler(this, serialNumber));
93+
} else {
94+
listener.addPayloadHandler(handler = new FilteringPayloadHandler(
95+
new ThrottlingPayloadHandler(this, TimeUnit.SECONDS.toMillis(config.getPollingPeriod())),
96+
serialNumber));
97+
}
9098
this.listener = listener;
9199
logger.debug("Polling job scheduled to run every {} sec. for '{}'", config.getPollingPeriod(),
92100
getThing().getUID());
@@ -100,18 +108,15 @@ public void initialize() {
100108
public void dispose() {
101109
logger.debug("Disposing SMAEnergyMeter handler '{}'", getThing().getUID());
102110
PacketListener listener = this.listener;
103-
if (listener != null) {
104-
listener.removePayloadHandler(this);
111+
PayloadHandler handler = this.handler;
112+
if (listener != null && handler != null) {
113+
listener.removePayloadHandler(handler);
105114
this.listener = null;
106115
}
107116
}
108117

109118
@Override
110119
public void handle(EnergyMeter energyMeter) {
111-
String serialNumber = this.serialNumber;
112-
if (serialNumber == null || !serialNumber.equals(energyMeter.getSerialNumber())) {
113-
return;
114-
}
115120
updateStatus(ThingStatus.ONLINE);
116121

117122
logger.debug("Update SMAEnergyMeter {} data '{}'", serialNumber, getThing().getUID());

bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/DefaultPacketListenerRegistry.java

+5-6
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@
1616
import java.util.Map;
1717
import java.util.Map.Entry;
1818
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.Executors;
2019
import java.util.concurrent.ScheduledExecutorService;
2120
import java.util.concurrent.ScheduledFuture;
2221
import java.util.concurrent.TimeUnit;
2322

2423
import org.eclipse.jdt.annotation.NonNullByDefault;
2524
import org.openhab.binding.smaenergymeter.internal.SMAEnergyMeterBindingConstants;
2625
import org.openhab.binding.smaenergymeter.internal.packet.PacketListener.ReceivingTask;
26+
import org.openhab.core.common.ThreadPoolManager;
2727
import org.osgi.service.component.annotations.Component;
2828
import org.osgi.service.component.annotations.Deactivate;
2929
import org.slf4j.Logger;
@@ -40,9 +40,8 @@
4040
public class DefaultPacketListenerRegistry implements PacketListenerRegistry {
4141

4242
private final Logger logger = LoggerFactory.getLogger(DefaultPacketListenerRegistry.class);
43-
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1,
44-
(runnable) -> new Thread(runnable,
45-
"OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener"));
43+
private final ScheduledExecutorService scheduler = ThreadPoolManager
44+
.getScheduledPool("OH-binding-" + SMAEnergyMeterBindingConstants.BINDING_ID + "-listener");
4645
private final Map<String, PacketListener> listeners = new ConcurrentHashMap<>();
4746

4847
@Override
@@ -68,8 +67,8 @@ protected void shutdown() throws IOException {
6867
scheduler.shutdownNow();
6968
}
7069

71-
public ScheduledFuture<?> addTask(Runnable runnable, int intervalSec) {
72-
return scheduler.scheduleWithFixedDelay(runnable, 0, intervalSec, TimeUnit.SECONDS);
70+
public ScheduledFuture<?> addTask(ReceivingTask runnable) {
71+
return scheduler.scheduleWithFixedDelay(runnable, 0, 1000, TimeUnit.MILLISECONDS);
7372
}
7473

7574
public void execute(ReceivingTask receivingTask) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/**
2+
* Copyright (c) 2010-2024 Contributors to the openHAB project
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.openhab.binding.smaenergymeter.internal.packet;
14+
15+
import java.io.IOException;
16+
17+
import org.eclipse.jdt.annotation.NonNullByDefault;
18+
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
19+
20+
/**
21+
* Payload handler which define acceptance criteria for received meter data.
22+
*
23+
* @author Łukasz Dywicki - Initial contribution
24+
*/
25+
@NonNullByDefault
26+
public class FilteringPayloadHandler implements PayloadHandler {
27+
28+
private final PayloadHandler delegate;
29+
private final String serialNumber;
30+
31+
public FilteringPayloadHandler(PayloadHandler delegate, String serialNumber) {
32+
this.delegate = delegate;
33+
this.serialNumber = serialNumber;
34+
}
35+
36+
@Override
37+
public void handle(EnergyMeter energyMeter) throws IOException {
38+
if (this.serialNumber.equals(energyMeter.getSerialNumber())) {
39+
delegate.handle(energyMeter);
40+
}
41+
}
42+
}

bundles/org.openhab.binding.smaenergymeter/src/main/java/org/openhab/binding/smaenergymeter/internal/packet/PacketListener.java

+25-17
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public PacketListener(DefaultPacketListenerRegistry registry, String multicastGr
5656
}
5757

5858
public void addPayloadHandler(PayloadHandler handler) {
59+
if (handlers.isEmpty()) {
60+
open();
61+
}
5962
handlers.add(handler);
6063
}
6164

@@ -72,18 +75,22 @@ public boolean isOpen() {
7275
return socket != null && socket.isConnected();
7376
}
7477

75-
public void open(int intervalSec) throws IOException {
78+
private void open() {
7679
if (isOpen()) {
7780
// no need to bind socket second time
7881
return;
7982
}
80-
MulticastSocket socket = new MulticastSocket(port);
81-
socket.setSoTimeout(5000);
82-
InetAddress address = InetAddress.getByName(multicastGroup);
83-
socket.joinGroup(address);
83+
try {
84+
MulticastSocket socket = new MulticastSocket(port);
85+
socket.setSoTimeout(5000);
86+
InetAddress address = InetAddress.getByName(multicastGroup);
87+
socket.joinGroup(address);
8488

85-
future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers), intervalSec);
86-
this.socket = socket;
89+
future = registry.addTask(new ReceivingTask(socket, multicastGroup + ":" + port, handlers));
90+
this.socket = socket;
91+
} catch (IOException e) {
92+
throw new RuntimeException("Could not open socket", e);
93+
}
8794
}
8895

8996
void close() throws IOException {
@@ -122,24 +129,25 @@ static class ReceivingTask implements Runnable {
122129
}
123130

124131
public void run() {
125-
try {
126-
byte[] bytes = new byte[608];
127-
DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
128-
DatagramSocket socket = this.socket;
129-
socket.receive(msgPacket);
132+
byte[] bytes = new byte[608];
133+
DatagramPacket msgPacket = new DatagramPacket(bytes, bytes.length);
134+
DatagramSocket socket = this.socket;
130135

131-
try {
136+
try {
137+
do {
138+
// this loop is intended to receive all packets queued on the socket,
139+
// having a receive() call without loop causes packets to get queued over time,
140+
// if more than one meter present because we consume one packet per second
141+
socket.receive(msgPacket);
132142
EnergyMeter meter = new EnergyMeter();
133143
meter.parse(bytes);
134144

135145
for (PayloadHandler handler : handlers) {
136146
handler.handle(meter);
137147
}
138-
} catch (IOException e) {
139-
logger.debug("Unexpected payload received for group {}", group, e);
140-
}
148+
} while (msgPacket.getLength() == 608);
141149
} catch (IOException e) {
142-
logger.warn("Failed to receive data for multicast group {}", group, e);
150+
logger.debug("Unexpected payload received for group {}", group, e);
143151
}
144152
}
145153
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/**
2+
* Copyright (c) 2010-2024 Contributors to the openHAB project
3+
*
4+
* See the NOTICE file(s) distributed with this work for additional
5+
* information.
6+
*
7+
* This program and the accompanying materials are made available under the
8+
* terms of the Eclipse Public License 2.0 which is available at
9+
* http://www.eclipse.org/legal/epl-2.0
10+
*
11+
* SPDX-License-Identifier: EPL-2.0
12+
*/
13+
package org.openhab.binding.smaenergymeter.internal.packet;
14+
15+
import java.io.IOException;
16+
17+
import org.eclipse.jdt.annotation.NonNullByDefault;
18+
import org.openhab.binding.smaenergymeter.internal.handler.EnergyMeter;
19+
20+
/**
21+
* Payload handler which defer publishing of meter data by given amount of time.
22+
*
23+
* @author Łukasz Dywicki - Initial contribution
24+
*/
25+
@NonNullByDefault
26+
public class ThrottlingPayloadHandler implements PayloadHandler {
27+
28+
private final PayloadHandler delegate;
29+
private final long pollingPeriodMs;
30+
private long publishTime = 0;
31+
32+
public ThrottlingPayloadHandler(PayloadHandler delegate, long pollingPeriodMs) {
33+
this.delegate = delegate;
34+
this.pollingPeriodMs = pollingPeriodMs;
35+
}
36+
37+
@Override
38+
public void handle(EnergyMeter energyMeter) throws IOException {
39+
long ts = System.currentTimeMillis();
40+
if (publishTime <= ts) {
41+
delegate.handle(energyMeter);
42+
publishTime = ts + pollingPeriodMs;
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)