17
17
package io .pcp .parfait .dxm ;
18
18
19
19
import io .pcp .parfait .dxm .semantics .Semantics ;
20
+ import io .pcp .parfait .dxm .types .AbstractTypeHandler ;
21
+ import io .pcp .parfait .dxm .types .MmvMetricType ;
20
22
import org .hamcrest .Matcher ;
21
23
import org .junit .BeforeClass ;
22
24
import org .junit .Test ;
23
25
24
26
import java .lang .reflect .Field ;
27
+ import java .nio .ByteBuffer ;
28
+ import java .util .ArrayList ;
29
+ import java .util .List ;
30
+ import java .util .concurrent .CountDownLatch ;
31
+ import java .util .concurrent .Phaser ;
25
32
26
33
import static io .pcp .parfait .dxm .IdentifierSourceSet .DEFAULT_SET ;
27
34
import static io .pcp .parfait .dxm .MmvVersion .MMV_VERSION1 ;
@@ -109,6 +116,93 @@ public void resetShouldClearStrings() throws Exception {
109
116
assertStringsCount (pcpMmvWriterV2 , 0 );
110
117
}
111
118
119
+ @ Test
120
+ public void metricUpdatesWhileResettingWriterShouldNotBeLost () throws Exception {
121
+ // The order the metrics are written is non-deterministic because they're pulled out of a hash map, so
122
+ // we must dynamically record their order.
123
+ List <String > order = new ArrayList <>();
124
+
125
+ pcpMmvWriterV1 .reset ();
126
+ pcpMmvWriterV1 .addMetric (MetricName .parse ("value1" ), Semantics .COUNTER , ONE , 1 ,
127
+ new AbstractTypeHandler <Number >(MmvMetricType .I32 , 4 ) {
128
+ public void putBytes (ByteBuffer buffer , Number value ) {
129
+ order .add ("value1" );
130
+ buffer .putInt (value == null ? 0 : value .intValue ());
131
+ }
132
+ });
133
+ pcpMmvWriterV1 .addMetric (MetricName .parse ("value2" ), Semantics .COUNTER , ONE , 2 ,
134
+ new AbstractTypeHandler <Number >(MmvMetricType .I32 , 4 ) {
135
+ public void putBytes (ByteBuffer buffer , Number value ) {
136
+ order .add ("value2" );
137
+ buffer .putInt (value == null ? 0 : value .intValue ());
138
+ }
139
+ });
140
+
141
+ pcpMmvWriterV1 .start ();
142
+
143
+ waitForReload ();
144
+
145
+ assertMetric ("mmv.value1" , is ("1.000" ));
146
+ assertMetric ("mmv.value2" , is ("2.000" ));
147
+
148
+ pcpMmvWriterV1 .reset ();
149
+
150
+ // The idea here is that the 1st metric will be written immediately, but the 2nd will wait on the phaser to
151
+ // write. This gives us time to update the 1st metric value. The sleep is needed to ensure the start() method
152
+ // doesn't exit before updateMetric() is executed.
153
+ Phaser phaser = new Phaser (2 );
154
+
155
+ pcpMmvWriterV1 .addMetric (MetricName .parse ("value1" ), Semantics .COUNTER , ONE , 1 ,
156
+ new AbstractTypeHandler <Number >(MmvMetricType .I32 , 4 ) {
157
+ public void putBytes (ByteBuffer buffer , Number value ) {
158
+ boolean isNotFirst = !"value1" .equals (order .get (0 ));
159
+ if (isNotFirst ) {
160
+ phaser .arriveAndAwaitAdvance ();
161
+ }
162
+ buffer .putInt (value == null ? 0 : value .intValue ());
163
+ if (isNotFirst ) {
164
+ sleep (1_000 );
165
+ }
166
+ }
167
+ });
168
+ pcpMmvWriterV1 .addMetric (MetricName .parse ("value2" ), Semantics .COUNTER , ONE , 2 ,
169
+ new AbstractTypeHandler <Number >(MmvMetricType .I32 , 4 ) {
170
+ public void putBytes (ByteBuffer buffer , Number value ) {
171
+ boolean isNotFirst = !"value2" .equals (order .get (0 ));
172
+ if (isNotFirst ) {
173
+ phaser .arriveAndAwaitAdvance ();
174
+ }
175
+ buffer .putInt (value == null ? 0 : value .intValue ());
176
+ if (isNotFirst ) {
177
+ sleep (1_000 );
178
+ }
179
+ }
180
+ });
181
+
182
+ CountDownLatch startDone = new CountDownLatch (1 );
183
+
184
+ new Thread (() -> {
185
+ try {
186
+ pcpMmvWriterV1 .start ();
187
+ } catch (Exception e ) {
188
+ e .printStackTrace ();
189
+ } finally {
190
+ startDone .countDown ();
191
+ }
192
+ }).start ();
193
+
194
+ // Will not continue till after the 1st metric has been written
195
+ phaser .arriveAndAwaitAdvance ();
196
+
197
+ pcpMmvWriterV1 .updateMetric (MetricName .parse (order .get (0 )), 10 );
198
+
199
+ startDone .await ();
200
+
201
+ waitForReload ();
202
+
203
+ assertMetric ("mmv." + order .get (0 ), is ("10.000" ));
204
+ }
205
+
112
206
private void assertMetric (String metricName , Matcher <String > expectedValue ) throws Exception {
113
207
String actual = pcpClient .getMetric (metricName );
114
208
assertThat (actual , expectedValue );
@@ -125,4 +219,12 @@ private void waitForReload() throws InterruptedException {
125
219
Thread .sleep (1000 );
126
220
}
127
221
222
+ private void sleep (long millis ) {
223
+ try {
224
+ Thread .sleep (millis );
225
+ } catch (InterruptedException e ) {
226
+ throw new RuntimeException (e );
227
+ }
228
+ }
229
+
128
230
}
0 commit comments