10
10
11
11
import java .io .IOException ;
12
12
import java .io .InputStream ;
13
+ import java .io .UncheckedIOException ;
13
14
import java .util .List ;
14
15
import java .util .Map ;
15
16
import java .util .Map .Entry ;
16
17
import java .util .concurrent .CompletableFuture ;
18
+ import java .util .function .Predicate ;
17
19
import java .util .stream .Collectors ;
18
20
import javax .annotation .Nullable ;
19
21
import org .opensearch .client .ApiClient ;
24
26
* Client for the generic HTTP requests.
25
27
*/
26
28
public class OpenSearchGenericClient extends ApiClient <OpenSearchTransport , OpenSearchGenericClient > {
29
+ /**
30
+ * Generic client options
31
+ */
32
+ public static final class ClientOptions {
33
+ private static final ClientOptions DEFAULT = new ClientOptions ();
34
+
35
+ private final Predicate <Integer > error ;
36
+
37
+ private ClientOptions () {
38
+ this (statusCode -> false );
39
+ }
40
+
41
+ private ClientOptions (final Predicate <Integer > error ) {
42
+ this .error = error ;
43
+ }
44
+
45
+ public static ClientOptions throwOnHttpErrors () {
46
+ return new ClientOptions (statusCode -> statusCode >= 400 );
47
+ }
48
+ }
49
+
27
50
/**
28
51
* Generic endpoint instance
29
52
*/
30
53
private static final class GenericEndpoint implements org .opensearch .client .transport .GenericEndpoint <Request , Response > {
31
54
private final Request request ;
55
+ private final Predicate <Integer > error ;
32
56
33
- public GenericEndpoint (Request request ) {
57
+ public GenericEndpoint (Request request , Predicate < Integer > error ) {
34
58
this .request = request ;
59
+ this .error = error ;
35
60
}
36
61
37
62
@ Override
@@ -67,24 +92,70 @@ public GenericResponse responseDeserializer(
67
92
int status ,
68
93
String reason ,
69
94
List <Entry <String , String >> headers ,
70
- String contentType ,
71
- InputStream body
95
+ @ Nullable String contentType ,
96
+ @ Nullable InputStream body
72
97
) {
73
- return new GenericResponse (uri , protocol , method , status , reason , headers , Body .from (body , contentType ));
98
+ if (isError (status )) {
99
+ // Fully consume the response body since the it will be propagated as an exception with possible no chance to be closed
100
+ try (Body b = Body .from (body , contentType )) {
101
+ if (b != null ) {
102
+ return new GenericResponse (
103
+ uri ,
104
+ protocol ,
105
+ method ,
106
+ status ,
107
+ reason ,
108
+ headers ,
109
+ Body .from (b .bodyAsBytes (), b .contentType ())
110
+ );
111
+ } else {
112
+ return new GenericResponse (uri , protocol , method , status , reason , headers );
113
+ }
114
+ } catch (final IOException ex ) {
115
+ throw new UncheckedIOException (ex );
116
+ }
117
+ } else {
118
+ return new GenericResponse (uri , protocol , method , status , reason , headers , Body .from (body , contentType ));
119
+ }
120
+ }
121
+
122
+ @ Override
123
+ public boolean isError (int statusCode ) {
124
+ return error .test (statusCode );
125
+ }
126
+
127
+ @ Override
128
+ public <T extends RuntimeException > T exceptionConverter (int statusCode , @ Nullable Response error ) {
129
+ throw new OpenSearchClientException (error );
74
130
}
75
131
}
76
132
133
+ private final ClientOptions clientOptions ;
134
+
77
135
public OpenSearchGenericClient (OpenSearchTransport transport ) {
78
- super (transport , null );
136
+ this (transport , null , ClientOptions . DEFAULT );
79
137
}
80
138
81
139
public OpenSearchGenericClient (OpenSearchTransport transport , @ Nullable TransportOptions transportOptions ) {
140
+ this (transport , transportOptions , ClientOptions .DEFAULT );
141
+ }
142
+
143
+ public OpenSearchGenericClient (
144
+ OpenSearchTransport transport ,
145
+ @ Nullable TransportOptions transportOptions ,
146
+ ClientOptions clientOptions
147
+ ) {
82
148
super (transport , transportOptions );
149
+ this .clientOptions = clientOptions ;
150
+ }
151
+
152
+ public OpenSearchGenericClient withClientOptions (ClientOptions clientOptions ) {
153
+ return new OpenSearchGenericClient (this .transport , this .transportOptions , clientOptions );
83
154
}
84
155
85
156
@ Override
86
157
public OpenSearchGenericClient withTransportOptions (@ Nullable TransportOptions transportOptions ) {
87
- return new OpenSearchGenericClient (this .transport , transportOptions );
158
+ return new OpenSearchGenericClient (this .transport , transportOptions , this . clientOptions );
88
159
}
89
160
90
161
/**
@@ -94,7 +165,7 @@ public OpenSearchGenericClient withTransportOptions(@Nullable TransportOptions t
94
165
* @throws IOException I/O exception
95
166
*/
96
167
public Response execute (Request request ) throws IOException {
97
- return transport .performRequest (request , new GenericEndpoint (request ), this .transportOptions );
168
+ return transport .performRequest (request , new GenericEndpoint (request , clientOptions . error ), this .transportOptions );
98
169
}
99
170
100
171
/**
@@ -103,6 +174,6 @@ public Response execute(Request request) throws IOException {
103
174
* @return generic HTTP response future
104
175
*/
105
176
public CompletableFuture <Response > executeAsync (Request request ) {
106
- return transport .performRequestAsync (request , new GenericEndpoint (request ), this .transportOptions );
177
+ return transport .performRequestAsync (request , new GenericEndpoint (request , clientOptions . error ), this .transportOptions );
107
178
}
108
179
}
0 commit comments