32
32
import java .lang .management .ManagementFactory ;
33
33
import java .nio .ByteBuffer ;
34
34
import java .nio .charset .Charset ;
35
+ import java .time .Duration ;
35
36
import java .util .ArrayList ;
36
37
import java .util .Collection ;
37
38
import java .util .Collections ;
47
48
import javax .measure .Unit ;
48
49
49
50
import com .google .common .collect .Maps ;
51
+ import com .google .common .util .concurrent .Monitor ;
50
52
import io .pcp .parfait .dxm .PcpString .PcpStringStore ;
51
53
import io .pcp .parfait .dxm .semantics .Semantics ;
52
54
import io .pcp .parfait .dxm .types .AbstractTypeHandler ;
@@ -115,6 +117,12 @@ public int getBitmask() {
115
117
return bitmask ;
116
118
}
117
119
}
120
+
121
+ private enum State {
122
+ STOPPED ,
123
+ STARTING ,
124
+ STARTED
125
+ }
118
126
119
127
private static final Set <MmvFlag > DEFAULT_FLAGS = Collections .unmodifiableSet (EnumSet .of (
120
128
MmvFlag .MMV_FLAG_NOPREFIX , MmvFlag .MMV_FLAG_PROCESS ));
@@ -156,7 +164,9 @@ public void putBytes(ByteBuffer buffer, String value) {
156
164
private final Map <Class <?>, TypeHandler <?>> typeHandlers = new ConcurrentHashMap <Class <?>, TypeHandler <?>>(
157
165
DefaultTypeHandlers .getDefaultMappings ());
158
166
private final PcpStringStore stringStore = new PcpStringStore ();
159
- private volatile boolean started = false ;
167
+ private volatile State state = State .STOPPED ;
168
+ private final Monitor stateMonitor = new Monitor ();
169
+ private final Monitor .Guard isStarted = stateMonitor .newGuard (() -> state == State .STARTED );
160
170
private volatile boolean usePerMetricLock = true ;
161
171
private final Map <PcpValueInfo ,ByteBuffer > perMetricByteBuffers = newConcurrentMap ();
162
172
private final Object globalLock = new Object ();
@@ -295,7 +305,7 @@ public final <T> void addMetric(MetricName name, Semantics semantics, Unit<?> un
295
305
* io.pcp.parfait.pcp.types.TypeHandler)
296
306
*/
297
307
public final <T > void registerType (Class <T > runtimeClass , TypeHandler <T > handler ) {
298
- if (started ) {
308
+ if (state != State . STOPPED ) {
299
309
// Can't add any more metrics anyway; harmless
300
310
return ;
301
311
}
@@ -307,9 +317,22 @@ public final <T> void registerType(Class<T> runtimeClass, TypeHandler<T> handler
307
317
* @see io.pcp.parfait.pcp.PcpWriter#updateMetric(java.lang.String, java.lang.Object)
308
318
*/
309
319
public final void updateMetric (MetricName name , Object value ) {
310
- if (!started ) {
311
- return ;
320
+ // If another thread has called start() we need to wait until the writer has completely started before
321
+ // proceeding to update the metric value. This is to avoid a race condition where start() has already written
322
+ // the old metric value, but has not yet finished writing all of the values, when the metric is updated. The
323
+ // implementation here is a little complicated to avoid taking a lock on the happy paths.
324
+ if (state == State .STARTED ) {
325
+ doUpdateMetric (name , value );
326
+ } else if (state == State .STARTING ) {
327
+ if (stateMonitor .enterWhenUninterruptibly (isStarted , Duration .ofSeconds (10 ))) {
328
+ // Leave the monitor immediately because we only care about being notified about the state change
329
+ stateMonitor .leave ();
330
+ doUpdateMetric (name , value );
331
+ }
312
332
}
333
+ }
334
+
335
+ private void doUpdateMetric (MetricName name , Object value ) {
313
336
PcpValueInfo info = metricData .get (name );
314
337
if (info == null ) {
315
338
throw new IllegalArgumentException ("Metric " + name
@@ -323,6 +346,8 @@ public final void updateMetric(MetricName name, Object value) {
323
346
* @see io.pcp.parfait.pcp.PcpWriter#start()
324
347
*/
325
348
public final void start () throws IOException {
349
+ updateState (State .STARTING );
350
+
326
351
initialiseOffsets ();
327
352
328
353
dataFileBuffer = byteBufferFactory .build (getBufferLength ());
@@ -331,12 +356,12 @@ public final void start() throws IOException {
331
356
preparePerMetricBufferSlices ();
332
357
}
333
358
334
- started = true ;
359
+ updateState ( State . STARTED ) ;
335
360
}
336
361
337
362
@ Override
338
363
public void reset () {
339
- started = false ;
364
+ updateState ( State . STOPPED ) ;
340
365
metricData .clear ();
341
366
perMetricByteBuffers .clear ();
342
367
instanceDomainStore .clear ();
@@ -384,7 +409,7 @@ public void setProcessIdentifier(int pid) {
384
409
}
385
410
386
411
public void setPerMetricLock (boolean usePerMetricLock ) {
387
- Preconditions .checkState (! started , "Cannot change use of perMetricLock when started" );
412
+ Preconditions .checkState (state == State . STOPPED , "Cannot change use of perMetricLock when started" );
388
413
this .usePerMetricLock = usePerMetricLock ;
389
414
}
390
415
@@ -464,11 +489,12 @@ private void updateValue(PcpValueInfo info, Object value) {
464
489
465
490
private void writeValueWithLockPerMetric (PcpValueInfo info , Object value , TypeHandler rawHandler ) {
466
491
ByteBuffer perMetricByteBuffer = perMetricByteBuffers .get (info );
467
- synchronized (perMetricByteBuffer ) {
468
- perMetricByteBuffer .position (0 );
469
- rawHandler .putBytes (perMetricByteBuffer , value );
492
+ if (perMetricByteBuffer != null ) {
493
+ synchronized (perMetricByteBuffer ) {
494
+ perMetricByteBuffer .position (0 );
495
+ rawHandler .putBytes (perMetricByteBuffer , value );
496
+ }
470
497
}
471
-
472
498
}
473
499
474
500
private void writeValueWithGlobalLock (PcpValueInfo info , Object value , TypeHandler rawHandler ) {
@@ -655,6 +681,15 @@ private int getProcessIdentifier() {
655
681
return processIdentifier ;
656
682
}
657
683
684
+ private void updateState (State newState ) {
685
+ stateMonitor .enter ();
686
+ try {
687
+ state = newState ;
688
+ } finally {
689
+ stateMonitor .leave ();
690
+ }
691
+ }
692
+
658
693
public static void main (String [] args ) throws IOException {
659
694
PcpMmvWriter bridge ;
660
695
0 commit comments