Skip to content

Commit 9ed94cd

Browse files
committed
Added error handling and comments
1 parent 96af903 commit 9ed94cd

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

src/main/java/org/kairosdb/kafka/monitor/OffsetListenerService.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -218,14 +218,34 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
218218

219219
for (GroupStats groupStats : m_offsetsTracker.copyOfCurrentStats())
220220
{
221-
Map<Integer, Long> latestOffsets = getLatestTopicOffsets(groupStats.getTopic());
221+
Map<Integer, Long> latestOffsets = null;
222+
223+
try
224+
{
225+
latestOffsets = getLatestTopicOffsets(groupStats.getTopic());
226+
}
227+
catch (Exception e)
228+
{
229+
logger.error("Error reading kafka offsets for topic: "+groupStats.getTopic(), e);
230+
231+
//Report failure metric
232+
ImmutableSortedMap<String, String> offsetTags = new ImmutableSortedMap.Builder<String, String>(Ordering.natural())
233+
.putAll(m_monitorConfig.getAdditionalTags())
234+
.put("host", m_clientId).build();
235+
236+
DataPointEvent failureEvent = new DataPointEvent(m_monitorConfig.getGatherFailureMetric(),
237+
offsetTags,
238+
m_dataPointFactory.createDataPoint(now, 1));
239+
postEvent(failureEvent);
240+
}
222241

223242
ImmutableSortedMap<String, String> groupTags = new ImmutableSortedMap.Builder<String, String>(Ordering.natural())
224243
.putAll(m_monitorConfig.getAdditionalTags())
225244
.put("group", groupStats.getGroupName())
226245
.put("proxy_group", groupStats.getProxyGroup())
227246
.put("topic", groupStats.getTopic()).build();
228247

248+
//Report consumer rate
229249
DataPointEvent event = new DataPointEvent(m_monitorConfig.getConsumerRateMetric(),
230250
groupTags,
231251
m_dataPointFactory.createDataPoint(now, groupStats.getConsumeCount()));
@@ -244,30 +264,34 @@ public void execute(JobExecutionContext jobExecutionContext) throws JobExecution
244264
.put("topic", groupStats.getTopic())
245265
.put("partition", String.valueOf(offsetStat.getPartition())).build();
246266

267+
//Report offset age
247268
DataPointEvent offsetAge = new DataPointEvent(m_monitorConfig.getOffsetAgeMetric(),
248269
partitionTags,
249270
m_dataPointFactory.createDataPoint(now, (now - offsetStat.getCommitTime())));
250271
postEvent(offsetAge);
251272

252-
Long latestOffset = latestOffsets.get(offsetStat.getPartition());
273+
Long latestOffset = latestOffsets != null ? latestOffsets.get(offsetStat.getPartition()) : null;
253274
if (latestOffset != null) //in case something goes bananas
254275
{
255276
long partitionLag = calculateDiff(latestOffset, offsetStat.getOffset());
256277
groupLag += partitionLag;
257278

279+
//Report partition lag
258280
DataPointEvent partitionLagEvent = new DataPointEvent(m_monitorConfig.getPartitionLagMetric(),
259281
partitionTags,
260282
m_dataPointFactory.createDataPoint(now, partitionLag));
261283
postEvent(partitionLagEvent);
262284
}
263285
}
264286

287+
//Report group lag
265288
DataPointEvent groupLagEvent = new DataPointEvent(m_monitorConfig.getGroupLagMetric(),
266289
groupTags,
267290
m_dataPointFactory.createDataPoint(now, groupLag));
268291
postEvent(groupLagEvent);
269292

270293

294+
//Report time to process lag
271295
long secToProcess = 0;
272296
if (groupStats.getCurrentRate() != 0.0)
273297
secToProcess = (long)((double)groupLag / groupStats.getCurrentRate());

src/main/java/org/kairosdb/kafka/monitor/PartitionedOffsetReader.java

+1
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ protected void readTopic()
8686
ownedTopics.add(record.key());
8787
}
8888

89+
//todo: maybe not send this every time
8990
for (String ownedTopic : ownedTopics)
9091
{
9192
m_producer.send(new ProducerRecord<String, String>(

0 commit comments

Comments
 (0)