Skip to content

Commit c9f42db

Browse files
committed
fix PcpMmvWriter race condition
Fixes a race condition where metric updates can be lost if the happen while `PcpMmvWriter` is being reset. Previously, updates were dropped if the writer was not in a started state. If this happened while the the writer was executing `start()`, then it's possible the updated value would be lost if the old metric value had already been written. The issue is addressed here by blocking the metric update when the writer is in a `STARTING` state, until the writer has been fully started. If the writer is in a `STARTED` state, the update is applied immediately. If the writer is in a `STOPPED` state, the update is dropped immediately. Resolves #132
1 parent d955079 commit c9f42db

File tree

1 file changed

+46
-11
lines changed

1 file changed

+46
-11
lines changed

dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java

+46-11
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.lang.management.ManagementFactory;
3333
import java.nio.ByteBuffer;
3434
import java.nio.charset.Charset;
35+
import java.time.Duration;
3536
import java.util.ArrayList;
3637
import java.util.Collection;
3738
import java.util.Collections;
@@ -47,6 +48,7 @@
4748
import javax.measure.Unit;
4849

4950
import com.google.common.collect.Maps;
51+
import com.google.common.util.concurrent.Monitor;
5052
import io.pcp.parfait.dxm.PcpString.PcpStringStore;
5153
import io.pcp.parfait.dxm.semantics.Semantics;
5254
import io.pcp.parfait.dxm.types.AbstractTypeHandler;
@@ -115,6 +117,12 @@ public int getBitmask() {
115117
return bitmask;
116118
}
117119
}
120+
121+
private enum State {
122+
STOPPED,
123+
STARTING,
124+
STARTED
125+
}
118126

119127
private static final Set<MmvFlag> DEFAULT_FLAGS = Collections.unmodifiableSet(EnumSet.of(
120128
MmvFlag.MMV_FLAG_NOPREFIX, MmvFlag.MMV_FLAG_PROCESS));
@@ -156,7 +164,9 @@ public void putBytes(ByteBuffer buffer, String value) {
156164
private final Map<Class<?>, TypeHandler<?>> typeHandlers = new ConcurrentHashMap<Class<?>, TypeHandler<?>>(
157165
DefaultTypeHandlers.getDefaultMappings());
158166
private final PcpStringStore stringStore = new PcpStringStore();
159-
private volatile boolean started = false;
167+
private volatile State state = State.STOPPED;
168+
private final Monitor stateMonitor = new Monitor();
169+
private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED);
160170
private volatile boolean usePerMetricLock = true;
161171
private final Map<PcpValueInfo,ByteBuffer> perMetricByteBuffers = newConcurrentMap();
162172
private final Object globalLock = new Object();
@@ -295,7 +305,7 @@ public final <T> void addMetric(MetricName name, Semantics semantics, Unit<?> un
295305
* io.pcp.parfait.pcp.types.TypeHandler)
296306
*/
297307
public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler) {
298-
if (started) {
308+
if (state != State.STOPPED) {
299309
// Can't add any more metrics anyway; harmless
300310
return;
301311
}
@@ -307,9 +317,22 @@ public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler
307317
* @see io.pcp.parfait.pcp.PcpWriter#updateMetric(java.lang.String, java.lang.Object)
308318
*/
309319
public final void updateMetric(MetricName name, Object value) {
310-
if (!started) {
311-
return;
320+
// If another thread has called start() we need to wait until the writer has completely started before
321+
// proceeding to update the metric value. This is to avoid a race condition where start() has already written
322+
// the old metric value, but has not yet finished writing all of the values, when the metric is updated. The
323+
// implementation here is a little complicated to avoid taking a lock on the happy paths.
324+
if (state == State.STARTED) {
325+
doUpdateMetric(name, value);
326+
} else if (state == State.STARTING) {
327+
if (stateMonitor.enterWhenUninterruptibly(isStarted, Duration.ofSeconds(10))) {
328+
// Leave the monitor immediately because we only care about being notified about the state change
329+
stateMonitor.leave();
330+
doUpdateMetric(name, value);
331+
}
312332
}
333+
}
334+
335+
private void doUpdateMetric(MetricName name, Object value) {
313336
PcpValueInfo info = metricData.get(name);
314337
if (info == null) {
315338
throw new IllegalArgumentException("Metric " + name
@@ -323,6 +346,8 @@ public final void updateMetric(MetricName name, Object value) {
323346
* @see io.pcp.parfait.pcp.PcpWriter#start()
324347
*/
325348
public final void start() throws IOException {
349+
updateState(State.STARTING);
350+
326351
initialiseOffsets();
327352

328353
dataFileBuffer = byteBufferFactory.build(getBufferLength());
@@ -331,12 +356,12 @@ public final void start() throws IOException {
331356
preparePerMetricBufferSlices();
332357
}
333358

334-
started = true;
359+
updateState(State.STARTED);
335360
}
336361

337362
@Override
338363
public void reset() {
339-
started = false;
364+
updateState(State.STOPPED);
340365
metricData.clear();
341366
perMetricByteBuffers.clear();
342367
instanceDomainStore.clear();
@@ -384,7 +409,7 @@ public void setProcessIdentifier(int pid) {
384409
}
385410

386411
public void setPerMetricLock(boolean usePerMetricLock) {
387-
Preconditions.checkState(!started, "Cannot change use of perMetricLock when started");
412+
Preconditions.checkState(state == State.STOPPED, "Cannot change use of perMetricLock when started");
388413
this.usePerMetricLock = usePerMetricLock;
389414
}
390415

@@ -464,11 +489,12 @@ private void updateValue(PcpValueInfo info, Object value) {
464489

465490
private void writeValueWithLockPerMetric(PcpValueInfo info, Object value, TypeHandler rawHandler) {
466491
ByteBuffer perMetricByteBuffer = perMetricByteBuffers.get(info);
467-
synchronized (perMetricByteBuffer) {
468-
perMetricByteBuffer.position(0);
469-
rawHandler.putBytes(perMetricByteBuffer, value);
492+
if (perMetricByteBuffer != null) {
493+
synchronized (perMetricByteBuffer) {
494+
perMetricByteBuffer.position(0);
495+
rawHandler.putBytes(perMetricByteBuffer, value);
496+
}
470497
}
471-
472498
}
473499

474500
private void writeValueWithGlobalLock(PcpValueInfo info, Object value, TypeHandler rawHandler) {
@@ -655,6 +681,15 @@ private int getProcessIdentifier() {
655681
return processIdentifier;
656682
}
657683

684+
private void updateState(State newState) {
685+
stateMonitor.enter();
686+
try {
687+
state = newState;
688+
} finally {
689+
stateMonitor.leave();
690+
}
691+
}
692+
658693
public static void main(String[] args) throws IOException {
659694
PcpMmvWriter bridge;
660695

0 commit comments

Comments
 (0)