17
17
18
18
package org .elasticsearch .action ;
19
19
20
+ import static org .compuscene .metrics .prometheus .PrometheusMetricsCollector .PROMETHEUS_INDICES ;
21
+
20
22
import org .elasticsearch .ElasticsearchException ;
21
23
import org .elasticsearch .action .admin .cluster .health .ClusterHealthRequest ;
22
24
import org .elasticsearch .action .admin .cluster .health .ClusterHealthResponse ;
23
25
import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsRequest ;
24
26
import org .elasticsearch .action .admin .cluster .node .stats .NodesStatsResponse ;
27
+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsRequest ;
28
+ import org .elasticsearch .action .admin .indices .stats .IndicesStatsResponse ;
25
29
import org .elasticsearch .action .support .ActionFilters ;
26
30
import org .elasticsearch .action .support .HandledTransportAction ;
27
31
import org .elasticsearch .client .Client ;
32
+ import org .elasticsearch .client .Requests ;
28
33
import org .elasticsearch .cluster .metadata .IndexNameExpressionResolver ;
34
+ import org .elasticsearch .common .Nullable ;
29
35
import org .elasticsearch .common .inject .Inject ;
30
36
import org .elasticsearch .common .settings .Settings ;
31
37
import org .elasticsearch .threadpool .ThreadPool ;
@@ -56,12 +62,48 @@ private class AsyncAction {
56
62
private final ActionListener <NodePrometheusMetricsResponse > listener ;
57
63
private final ClusterHealthRequest healthRequest ;
58
64
private final NodesStatsRequest nodesStatsRequest ;
65
+ private final IndicesStatsRequest indicesStatsRequest ;
59
66
private ClusterHealthResponse clusterHealthResponse ;
67
+ private NodesStatsResponse nodesStatsResponse ;
68
+
69
+ private AsyncAction (ActionListener <NodePrometheusMetricsResponse > listener ) {
70
+ this .listener = listener ;
71
+
72
+ // Note: when using ClusterHealthRequest in Java, it pulls data at the shards level, according to ES source
73
+ // code comment this is "so it is backward compatible with the transport client behaviour".
74
+ // hence we are explicit about ClusterHealthRequest level and do not rely on defaults.
75
+ // https://www.elastic.co/guide/en/elasticsearch/reference/6.4/cluster-health.html#request-params
76
+ this .healthRequest = Requests .clusterHealthRequest ().local (true );
77
+ this .healthRequest .level (ClusterHealthRequest .Level .SHARDS );
78
+
79
+ this .nodesStatsRequest = Requests .nodesStatsRequest ("_local" ).clear ().all ();
80
+ // Note: this request is not "node-specific", it does not support any "_local" notion
81
+ // it is broad-casted to all cluster nodes.
82
+ this .indicesStatsRequest = new IndicesStatsRequest ();
83
+ }
84
+
85
+ private ActionListener <IndicesStatsResponse > indicesStatsResponseActionListener =
86
+ new ActionListener <IndicesStatsResponse >() {
87
+ @ Override
88
+ public void onResponse (IndicesStatsResponse indicesStatsResponse ) {
89
+ listener .onResponse (buildResponse (clusterHealthResponse , nodesStatsResponse , indicesStatsResponse ));
90
+ }
91
+
92
+ @ Override
93
+ public void onFailure (Exception e ) {
94
+ listener .onFailure (new ElasticsearchException ("Indices stats request failed" , e ));
95
+ }
96
+ };
60
97
61
98
private ActionListener <NodesStatsResponse > nodesStatsResponseActionListener = new ActionListener <NodesStatsResponse >() {
62
99
@ Override
63
100
public void onResponse (NodesStatsResponse nodeStats ) {
64
- listener .onResponse (buildResponse (clusterHealthResponse , nodeStats ));
101
+ if (PROMETHEUS_INDICES .get (settings )) {
102
+ nodesStatsResponse = nodeStats ;
103
+ client .admin ().indices ().stats (indicesStatsRequest , indicesStatsResponseActionListener );
104
+ } else {
105
+ listener .onResponse (buildResponse (clusterHealthResponse , nodeStats , null ));
106
+ }
65
107
}
66
108
67
109
@ Override
@@ -84,20 +126,15 @@ public void onFailure(Exception e) {
84
126
}
85
127
};
86
128
87
- private AsyncAction (ActionListener <NodePrometheusMetricsResponse > listener ) {
88
- this .listener = listener ;
89
- this .healthRequest = new ClusterHealthRequest ();
90
- this .nodesStatsRequest = new NodesStatsRequest ("_local" ).all ();
91
- }
92
-
93
129
private void start () {
94
130
client .admin ().cluster ().health (healthRequest , clusterHealthResponseActionListener );
95
131
}
96
132
97
133
protected NodePrometheusMetricsResponse buildResponse (ClusterHealthResponse clusterHealth ,
98
- NodesStatsResponse nodesStats ) {
134
+ NodesStatsResponse nodesStats ,
135
+ @ Nullable IndicesStatsResponse indicesStats ) {
99
136
NodePrometheusMetricsResponse response = new NodePrometheusMetricsResponse (clusterHealth ,
100
- nodesStats .getNodes ().get (0 ));
137
+ nodesStats .getNodes ().get (0 ), indicesStats );
101
138
if (logger .isTraceEnabled ()) {
102
139
logger .trace ("Return response: [{}]" , response );
103
140
}
0 commit comments