@@ -181,6 +181,56 @@ public void setUp() throws Exception {
181181 }).when (client ).execute (any (), any (MLExecuteTaskRequest .class ), any ());
182182 }
183183
184+ @ Test
185+ public void testSkipCorrelationWhenNoDetectorIdsInAnomalies () throws IOException {
186+ // An anomaly without detector_id
187+ String anomalyJson = TestHelpers
188+ .builder ()
189+ .startObject ()
190+ .field ("anomaly_grade" , 0.7 )
191+ .field ("anomaly_score" , 2.3 )
192+ .field ("data_start_time" , Instant .now ().minus (30 , ChronoUnit .MINUTES ).toEpochMilli ())
193+ .field ("data_end_time" , Instant .now ().minus (29 , ChronoUnit .MINUTES ).toEpochMilli ())
194+ .endObject ()
195+ .toString ();
196+
197+ SearchHit anomalyHit = new SearchHit (1 );
198+ anomalyHit .sourceRef (new BytesArray (anomalyJson ));
199+ anomalyHit .score (1.0f );
200+ anomalyHit .shard (new SearchShardTarget ("node" , new ShardId ("test" , "uuid" , 0 ), null , null ));
201+
202+ SearchHits anomalySearchHits = new SearchHits (new SearchHit [] { anomalyHit }, new TotalHits (1 , TotalHits .Relation .EQUAL_TO ), 1.0f );
203+
204+ // Return anomalies from results index; we won't reach detector config enrichment
205+ doAnswer (invocation -> {
206+ ActionListener <SearchResponse > listener = invocation .getArgument (1 );
207+ SearchResponse searchResponse = mock (SearchResponse .class );
208+ when (searchResponse .getHits ()).thenReturn (anomalySearchHits );
209+ listener .onResponse (searchResponse );
210+ return null ;
211+ }).when (client ).search (any (SearchRequest .class ), any ());
212+
213+ // Lock lifecycle
214+ doAnswer (invocation -> {
215+ ActionListener <LockModel > listener = invocation .getArgument (2 );
216+ listener .onResponse (lockModel );
217+ return null ;
218+ }).when (lockService ).acquireLock (any (), any (), any ());
219+ doAnswer (invocation -> {
220+ ActionListener <Boolean > listener = invocation .getArgument (1 );
221+ listener .onResponse (true );
222+ return null ;
223+ }).when (lockService ).release (any (), any ());
224+
225+ insightsJobProcessor .process (insightsJob , jobExecutionContext );
226+
227+ // One search for anomalies, then skip correlation and release lock
228+ verify (client , times (1 )).search (any (SearchRequest .class ), any ());
229+ verify (lockService , times (1 )).release (any (), any ());
230+ // No index write occurs
231+ verify (client , never ()).index (any (IndexRequest .class ), any ());
232+ }
233+
184234 @ Test
185235 public void testProcessWithIntervalSchedule () {
186236 // Mock lock acquisition
0 commit comments