Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix PcpMmvWriter race condition #133

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 56 additions & 16 deletions dxm/src/main/java/io/pcp/parfait/dxm/PcpMmvWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -47,6 +48,7 @@
import javax.measure.Unit;

import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Monitor;
import io.pcp.parfait.dxm.PcpString.PcpStringStore;
import io.pcp.parfait.dxm.semantics.Semantics;
import io.pcp.parfait.dxm.types.AbstractTypeHandler;
Expand Down Expand Up @@ -115,6 +117,12 @@ public int getBitmask() {
return bitmask;
}
}

private enum State {
STOPPED,
STARTING,
STARTED
}

private static final Set<MmvFlag> DEFAULT_FLAGS = Collections.unmodifiableSet(EnumSet.of(
MmvFlag.MMV_FLAG_NOPREFIX, MmvFlag.MMV_FLAG_PROCESS));
Expand Down Expand Up @@ -156,7 +164,9 @@ public void putBytes(ByteBuffer buffer, String value) {
private final Map<Class<?>, TypeHandler<?>> typeHandlers = new ConcurrentHashMap<Class<?>, TypeHandler<?>>(
DefaultTypeHandlers.getDefaultMappings());
private final PcpStringStore stringStore = new PcpStringStore();
private volatile boolean started = false;
private volatile State state = State.STOPPED;
private final Monitor stateMonitor = new Monitor();
private final Monitor.Guard isStarted = stateMonitor.newGuard(() -> state == State.STARTED);
private volatile boolean usePerMetricLock = true;
private final Map<PcpValueInfo,ByteBuffer> perMetricByteBuffers = newConcurrentMap();
private final Object globalLock = new Object();
Expand Down Expand Up @@ -295,7 +305,7 @@ public final <T> void addMetric(MetricName name, Semantics semantics, Unit<?> un
* io.pcp.parfait.pcp.types.TypeHandler)
*/
public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler) {
if (started) {
if (state != State.STOPPED) {
// Can't add any more metrics anyway; harmless
return;
}
Expand All @@ -307,9 +317,22 @@ public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler
* @see io.pcp.parfait.pcp.PcpWriter#updateMetric(java.lang.String, java.lang.Object)
*/
public final void updateMetric(MetricName name, Object value) {
if (!started) {
return;
// If another thread has called start() we need to wait until the writer has completely started before
// proceeding to update the metric value. This is to avoid a race condition where start() has already written
// the old metric value, but has not yet finished writing all of the values, when the metric is updated. The
// implementation here is a little complicated to avoid taking a lock on the happy paths.
if (state == State.STARTED) {
doUpdateMetric(name, value);
} else if (state == State.STARTING) {
if (stateMonitor.enterWhenUninterruptibly(isStarted, Duration.ofSeconds(10))) {
// Leave the monitor immediately because we only care about being notified about the state change
stateMonitor.leave();
doUpdateMetric(name, value);
}
}
}

private void doUpdateMetric(MetricName name, Object value) {
PcpValueInfo info = metricData.get(name);
if (info == null) {
throw new IllegalArgumentException("Metric " + name
Expand All @@ -323,20 +346,27 @@ public final void updateMetric(MetricName name, Object value) {
* @see io.pcp.parfait.pcp.PcpWriter#start()
*/
public final void start() throws IOException {
initialiseOffsets();
updateState(State.STARTING);

dataFileBuffer = byteBufferFactory.build(getBufferLength());
synchronized (globalLock) {
populateDataBuffer(dataFileBuffer, metricData.values());
preparePerMetricBufferSlices();
try {
initialiseOffsets();

dataFileBuffer = byteBufferFactory.build(getBufferLength());
synchronized (globalLock) {
populateDataBuffer(dataFileBuffer, metricData.values());
preparePerMetricBufferSlices();
}
} catch (IOException | RuntimeException e) {
updateState(State.STOPPED);
throw e;
}

started = true;
updateState(State.STARTED);
}

@Override
public void reset() {
started = false;
updateState(State.STOPPED);
metricData.clear();
perMetricByteBuffers.clear();
instanceDomainStore.clear();
Expand Down Expand Up @@ -384,7 +414,7 @@ public void setProcessIdentifier(int pid) {
}

public void setPerMetricLock(boolean usePerMetricLock) {
Preconditions.checkState(!started, "Cannot change use of perMetricLock when started");
Preconditions.checkState(state == State.STOPPED, "Cannot change use of perMetricLock when started");
this.usePerMetricLock = usePerMetricLock;
}

Expand Down Expand Up @@ -464,11 +494,12 @@ private void updateValue(PcpValueInfo info, Object value) {

private void writeValueWithLockPerMetric(PcpValueInfo info, Object value, TypeHandler rawHandler) {
ByteBuffer perMetricByteBuffer = perMetricByteBuffers.get(info);
synchronized (perMetricByteBuffer) {
perMetricByteBuffer.position(0);
rawHandler.putBytes(perMetricByteBuffer, value);
if (perMetricByteBuffer != null) {
synchronized (perMetricByteBuffer) {
perMetricByteBuffer.position(0);
rawHandler.putBytes(perMetricByteBuffer, value);
}
}

}

private void writeValueWithGlobalLock(PcpValueInfo info, Object value, TypeHandler rawHandler) {
Expand Down Expand Up @@ -655,6 +686,15 @@ private int getProcessIdentifier() {
return processIdentifier;
}

private void updateState(State newState) {
stateMonitor.enter();
try {
state = newState;
} finally {
stateMonitor.leave();
}
}

public static void main(String[] args) throws IOException {
PcpMmvWriter bridge;

Expand Down
Loading