16
16
import org .opensearch .knn .index .KNNSettings ;
17
17
import org .opensearch .knn .index .codec .nativeindex .NativeIndexBuildStrategy ;
18
18
import org .opensearch .knn .index .codec .nativeindex .model .BuildIndexParams ;
19
- import org .opensearch .knn .index .codec .util .KNNCodecUtil ;
20
19
import org .opensearch .knn .index .engine .KNNMethodContext ;
21
20
import org .opensearch .knn .index .remote .RemoteIndexWaiter ;
22
21
import org .opensearch .knn .index .remote .RemoteIndexWaiterFactory ;
43
42
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_SETTING ;
44
43
import static org .opensearch .knn .index .KNNSettings .KNN_INDEX_REMOTE_VECTOR_BUILD_THRESHOLD_SETTING ;
45
44
import static org .opensearch .knn .index .KNNSettings .KNN_REMOTE_VECTOR_REPO_SETTING ;
45
+ import static org .opensearch .knn .index .codec .util .KNNCodecUtil .initializeVectorValues ;
46
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_FAILURE_COUNT ;
47
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .BUILD_REQUEST_SUCCESS_COUNT ;
48
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .INDEX_BUILD_FAILURE_COUNT ;
49
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .INDEX_BUILD_SUCCESS_COUNT ;
50
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_FAILURE_COUNT ;
51
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_SUCCESS_COUNT ;
52
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .READ_TIME ;
53
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_OPERATIONS ;
54
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_CURRENT_SIZE ;
55
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .REMOTE_INDEX_BUILD_TIME ;
56
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WAITING_TIME ;
57
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_FAILURE_COUNT ;
58
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_SUCCESS_COUNT ;
59
+ import static org .opensearch .knn .plugin .stats .KNNRemoteIndexBuildValue .WRITE_TIME ;
46
60
47
61
/**
48
62
* This class orchestrates building vector indices. It handles uploading data to a repository, submitting a remote
@@ -120,63 +134,154 @@ public static boolean shouldBuildIndexRemotely(IndexSettings indexSettings, long
120
134
* 2. Triggers index build
121
135
* 3. Awaits on vector build to complete
122
136
* 4. Downloads index file and writes to indexOutput
123
- *
124
- * @param indexInfo
125
- * @throws IOException
126
137
*/
127
138
@ Override
128
139
public void buildAndWriteIndex (BuildIndexParams indexInfo ) throws IOException {
129
- StopWatch stopWatch ;
140
+ StopWatch remoteBuildStopWatch = new StopWatch ();
141
+ KNNVectorValues <?> knnVectorValues = indexInfo .getKnnVectorValuesSupplier ().get ();
142
+ initializeVectorValues (knnVectorValues );
143
+ startRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), remoteBuildStopWatch );
144
+
145
+ try {
146
+ RepositoryContext repositoryContext = getRepositoryContext (indexInfo );
147
+
148
+ // 1. Write required data to repository
149
+ writeToRepository (repositoryContext , indexInfo );
150
+
151
+ // 2. Trigger remote index build
152
+ RemoteIndexClient client = RemoteIndexClientFactory .getRemoteIndexClient (KNNSettings .getRemoteBuildServiceEndpoint ());
153
+ RemoteBuildResponse remoteBuildResponse = submitBuild (repositoryContext , indexInfo , client );
154
+
155
+ // 3. Await vector build completion
156
+ RemoteBuildStatusResponse remoteBuildStatusResponse = awaitIndexBuild (remoteBuildResponse , indexInfo , client );
157
+
158
+ // 4. Download index file and write to indexOutput
159
+ readFromRepository (indexInfo , repositoryContext , remoteBuildStatusResponse );
160
+
161
+ endRemoteIndexBuildStats ((long ) indexInfo .getTotalLiveDocs () * knnVectorValues .bytesPerVector (), remoteBuildStopWatch );
162
+ INDEX_BUILD_SUCCESS_COUNT .increment ();
163
+ } catch (Exception e ) {
164
+ log .warn ("Failed to build index remotely" , e );
165
+ handleFailure (indexInfo , knnVectorValues .bytesPerVector (), remoteBuildStopWatch );
166
+ }
167
+ }
168
+
169
+ /**
170
+ * Writes the required vector and doc ID data to the repository
171
+ */
172
+ private void writeToRepository (RepositoryContext repositoryContext , BuildIndexParams indexInfo ) throws IOException ,
173
+ InterruptedException {
174
+ BlobStoreRepository repository = repositoryContext .blobStoreRepository ;
175
+ BlobPath blobPath = repositoryContext .blobPath ;
176
+ VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (
177
+ repository .blobStore ().blobContainer (blobPath )
178
+ );
179
+
180
+ StopWatch stopWatch = new StopWatch ().start ();
130
181
long time_in_millis ;
131
182
try {
132
- BlobStoreRepository repository = getRepository ();
133
- BlobPath blobPath = repository .basePath ().add (indexSettings .getUUID () + VECTORS_PATH );
134
- VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (
135
- repository .blobStore ().blobContainer (blobPath )
136
- );
137
- stopWatch = new StopWatch ().start ();
138
- // We create a new time based UUID per file in order to avoid conflicts across shards. It is also very difficult to get the
139
- // shard id in this context.
140
- String blobName = UUIDs .base64UUID () + "_" + indexInfo .getFieldName () + "_" + indexInfo .getSegmentWriteState ().segmentInfo .name ;
141
183
vectorRepositoryAccessor .writeToRepository (
142
- blobName ,
184
+ repositoryContext . blobName ,
143
185
indexInfo .getTotalLiveDocs (),
144
186
indexInfo .getVectorDataType (),
145
187
indexInfo .getKnnVectorValuesSupplier ()
146
188
);
147
189
time_in_millis = stopWatch .stop ().totalTime ().millis ();
190
+ WRITE_SUCCESS_COUNT .increment ();
191
+ WRITE_TIME .incrementBy (time_in_millis );
148
192
log .debug ("Repository write took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
193
+ } catch (Exception e ) {
194
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
195
+ WRITE_FAILURE_COUNT .increment ();
196
+ log .error ("Repository write failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
197
+ throw e ;
198
+ }
199
+ }
149
200
150
- final RemoteIndexClient client = RemoteIndexClientFactory .getRemoteIndexClient (KNNSettings .getRemoteBuildServiceEndpoint ());
201
+ /**
202
+ * Submits a remote build request to the remote index build service
203
+ * @return RemoteBuildResponse containing the response from the remote service
204
+ */
205
+ private RemoteBuildResponse submitBuild (RepositoryContext repositoryContext , BuildIndexParams indexInfo , RemoteIndexClient client )
206
+ throws IOException {
207
+ final RemoteBuildResponse remoteBuildResponse ;
208
+ StopWatch stopWatch = new StopWatch ().start ();
209
+ long time_in_millis ;
210
+ try {
151
211
final RemoteBuildRequest buildRequest = buildRemoteBuildRequest (
152
212
indexSettings ,
153
213
indexInfo ,
154
- repository .getMetadata (),
155
- blobPath .buildAsString () + blobName ,
214
+ repositoryContext . blobStoreRepository .getMetadata (),
215
+ repositoryContext . blobPath .buildAsString () + repositoryContext . blobName ,
156
216
knnMethodContext
157
217
);
158
- stopWatch = new StopWatch (). start ( );
159
- final RemoteBuildResponse remoteBuildResponse = client . submitVectorBuild ( buildRequest );
218
+ remoteBuildResponse = client . submitVectorBuild ( buildRequest );
219
+ BUILD_REQUEST_SUCCESS_COUNT . increment ( );
160
220
time_in_millis = stopWatch .stop ().totalTime ().millis ();
161
221
log .debug ("Submit vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
222
+ return remoteBuildResponse ;
223
+ } catch (IOException e ) {
224
+ BUILD_REQUEST_FAILURE_COUNT .increment ();
225
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
226
+ log .error ("Submit vector build failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
227
+ throw e ;
228
+ }
229
+ }
162
230
231
+ /**
232
+ * Awaits the vector build to complete
233
+ * @return RemoteBuildStatusResponse containing the completed status response from the remote service.
234
+ * This will only be returned with a COMPLETED_INDEX_BUILD status, otherwise the method will throw an exception.
235
+ */
236
+ private RemoteBuildStatusResponse awaitIndexBuild (
237
+ RemoteBuildResponse remoteBuildResponse ,
238
+ BuildIndexParams indexInfo ,
239
+ RemoteIndexClient client
240
+ ) throws IOException , InterruptedException {
241
+ RemoteBuildStatusResponse remoteBuildStatusResponse ;
242
+ StopWatch stopWatch = new StopWatch ().start ();
243
+ long time_in_millis ;
244
+ try {
163
245
final RemoteBuildStatusRequest remoteBuildStatusRequest = RemoteBuildStatusRequest .builder ()
164
246
.jobId (remoteBuildResponse .getJobId ())
165
247
.build ();
166
248
RemoteIndexWaiter waiter = RemoteIndexWaiterFactory .getRemoteIndexWaiter (client );
167
- stopWatch = new StopWatch ().start ();
168
- RemoteBuildStatusResponse remoteBuildStatusResponse = waiter .awaitVectorBuild (remoteBuildStatusRequest );
249
+ remoteBuildStatusResponse = waiter .awaitVectorBuild (remoteBuildStatusRequest );
169
250
time_in_millis = stopWatch .stop ().totalTime ().millis ();
251
+ WAITING_TIME .incrementBy (time_in_millis );
170
252
log .debug ("Await vector build took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
253
+ return remoteBuildStatusResponse ;
254
+ } catch (InterruptedException | IOException e ) {
255
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
256
+ log .error ("Await vector build failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
257
+ throw e ;
258
+ }
259
+ }
171
260
172
- stopWatch = new StopWatch ().start ();
173
- vectorRepositoryAccessor .readFromRepository (remoteBuildStatusResponse .getFileName (), indexInfo .getIndexOutputWithBuffer ());
261
+ /**
262
+ * Downloads the index file from the repository and writes to the indexOutput
263
+ */
264
+ private void readFromRepository (
265
+ BuildIndexParams indexInfo ,
266
+ RepositoryContext repositoryContext ,
267
+ RemoteBuildStatusResponse remoteBuildStatusResponse
268
+ ) throws IOException {
269
+ StopWatch stopWatch = new StopWatch ().start ();
270
+ long time_in_millis ;
271
+ try {
272
+ repositoryContext .vectorRepositoryAccessor .readFromRepository (
273
+ remoteBuildStatusResponse .getFileName (),
274
+ indexInfo .getIndexOutputWithBuffer ()
275
+ );
174
276
time_in_millis = stopWatch .stop ().totalTime ().millis ();
277
+ READ_SUCCESS_COUNT .increment ();
278
+ READ_TIME .incrementBy (time_in_millis );
175
279
log .debug ("Repository read took {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName ());
176
280
} catch (Exception e ) {
177
- // TODO: This needs more robust failure handling
178
- log .warn ("Failed to build index remotely" , e );
179
- fallbackStrategy .buildAndWriteIndex (indexInfo );
281
+ time_in_millis = stopWatch .stop ().totalTime ().millis ();
282
+ READ_FAILURE_COUNT .increment ();
283
+ log .error ("Repository read failed after {} ms for vector field [{}]" , time_in_millis , indexInfo .getFieldName (), e );
284
+ throw e ;
180
285
}
181
286
}
182
287
@@ -223,7 +328,7 @@ static RemoteBuildRequest buildRemoteBuildRequest(
223
328
String vectorDataType = indexInfo .getVectorDataType ().getValue ();
224
329
225
330
KNNVectorValues <?> vectorValues = indexInfo .getKnnVectorValuesSupplier ().get ();
226
- KNNCodecUtil . initializeVectorValues (vectorValues );
331
+ initializeVectorValues (vectorValues );
227
332
assert (vectorValues .dimension () > 0 );
228
333
229
334
return RemoteBuildRequest .builder ()
@@ -240,4 +345,51 @@ static RemoteBuildRequest buildRemoteBuildRequest(
240
345
.build ();
241
346
}
242
347
348
+ /**
349
+ * Helper method to collect remote index build metrics on start
350
+ */
351
+ private void startRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
352
+ stopWatch .start ();
353
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .increment ();
354
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .incrementBy (size );
355
+ }
356
+
357
+ /**
358
+ * Helper method to collect remote index build metrics on success
359
+ */
360
+ private void endRemoteIndexBuildStats (long size , StopWatch stopWatch ) {
361
+ long time_in_millis = stopWatch .stop ().totalTime ().millis ();
362
+ REMOTE_INDEX_BUILD_CURRENT_OPERATIONS .decrement ();
363
+ REMOTE_INDEX_BUILD_CURRENT_SIZE .decrementBy (size );
364
+ REMOTE_INDEX_BUILD_TIME .incrementBy (time_in_millis );
365
+ }
366
+
367
+ /**
368
+ * Helper method to collect remote index build metrics on failure and invoke fallback strategy
369
+ */
370
+ private void handleFailure (BuildIndexParams indexParams , long bytesPerVector , StopWatch stopWatch ) throws IOException {
371
+ endRemoteIndexBuildStats (indexParams .getTotalLiveDocs () * bytesPerVector , stopWatch );
372
+ INDEX_BUILD_FAILURE_COUNT .increment ();
373
+ fallbackStrategy .buildAndWriteIndex (indexParams );
374
+ }
375
+
376
+ /**
377
+ * Record to hold various repository related objects
378
+ */
379
+ private record RepositoryContext (BlobStoreRepository blobStoreRepository , BlobPath blobPath ,
380
+ VectorRepositoryAccessor vectorRepositoryAccessor , String blobName ) {
381
+ }
382
+
383
+ /**
384
+ * Helper method to get repository context
385
+ */
386
+ private RepositoryContext getRepositoryContext (BuildIndexParams indexInfo ) {
387
+ BlobStoreRepository repository = getRepository ();
388
+ BlobPath blobPath = repository .basePath ().add (indexSettings .getUUID () + VECTORS_PATH );
389
+ String blobName = UUIDs .base64UUID () + "_" + indexInfo .getFieldName () + "_" + indexInfo .getSegmentWriteState ().segmentInfo .name ;
390
+ VectorRepositoryAccessor vectorRepositoryAccessor = new DefaultVectorRepositoryAccessor (
391
+ repository .blobStore ().blobContainer (blobPath )
392
+ );
393
+ return new RepositoryContext (repository , blobPath , vectorRepositoryAccessor , blobName );
394
+ }
243
395
}
0 commit comments