-
Notifications
You must be signed in to change notification settings - Fork 124
Description
Is your feature request related to a problem?
I have been using the bulk indexing API and noticed that in the event a bulk request fails to reach OpenSearch, it is impossible to know which items were in that bulk request. Even if a failure callback is registered to an item, it is not executed. Only the bulk indexer's OnError
function is executed, and the items are silently flushed from the worker's buffer.
What solution would you like?
This results in the sender not knowing if an item successfully made it to Opensearch or not, which can result in data loss. I would like that on a bulk request failure, if an item in the buffer has a failure callback registered to it, it is executed and passed the bulk indexer error. I can open a PR if needed.
Specifically this error check:
https://github.com/opensearch-project/opensearch-go/blob/main/opensearchutil/bulk_indexer.go#L505-L511
Would become:
func (w *worker) flush(ctx context.Context) error {
// ...
blk, err = w.bi.config.Client.Bulk(ctx, req)
if err != nil {
return w.handleBulkError(ctx, fmt.Errorf("flush: %w", err))
}
// ...
}
func (w *worker) handleBulkError(ctx context.Context, err error) error {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, err)
}
for i := range w.items {
if w.items[i].OnFailure != nil {
w.items[i].OnFailure(ctx, w.items[i], opensearchapi.BulkRespItem{}, err)
}
}
return err
}
What alternatives have you considered?
An external inbox pattern could be used, but that adds a lot of complexity when a simple code change could be used to mitigate this.
Do you have any additional context?
Implementing this feature would allow developers to handle item-level failures more effectively, especially in applications where data integrity is paramount. It ensures that each item's failure is acknowledged and can be retried or logged as needed.