|
35 | 35 |
|
36 | 36 | from beeswaxd import BeeswaxService
|
37 | 37 | from beeswaxd.BeeswaxService import QueryState
|
38 |
| -from ExecStats.ttypes import TExecStats |
39 | 38 | from ImpalaService import ImpalaService, ImpalaHiveServer2Service
|
40 | 39 | from ImpalaService.ImpalaHiveServer2Service import (TGetRuntimeProfileReq,
|
41 | 40 | TGetExecSummaryReq, TPingImpalaHS2ServiceReq, TCloseImpalaOperationReq)
|
|
46 | 45 | TOperationState, TFetchResultsReq, TFetchOrientation, TGetLogReq,
|
47 | 46 | TGetResultSetMetadataReq, TTypeId, TCancelOperationReq, TCloseOperationReq)
|
48 | 47 | from ImpalaHttpClient import ImpalaHttpClient
|
| 48 | +from exec_summary import build_exec_summary_table |
49 | 49 | from kerberos_util import get_kerb_host_from_kerberos_host_fqdn
|
50 | 50 | from thrift.protocol import TBinaryProtocol
|
51 | 51 | from thrift_sasl import TSaslClientTransport
|
@@ -110,157 +110,6 @@ def utf8_encode_if_needed(val):
|
110 | 110 | RPC_EXCEPTION_SERVER = "SERVER_ERROR"
|
111 | 111 |
|
112 | 112 |
|
113 |
| -def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output, |
114 |
| - is_prettyprint=True, separate_prefix_column=False): |
115 |
| - """Direct translation of Coordinator::PrintExecSummary() to recursively build a list |
116 |
| - of rows of summary statistics, one per exec node |
117 |
| -
|
118 |
| - summary: the TExecSummary object that contains all the summary data |
119 |
| -
|
120 |
| - idx: the index of the node to print |
121 |
| -
|
122 |
| - indent_level: the number of spaces to print before writing the node's label, to give |
123 |
| - the appearance of a tree. The 0th child of a node has the same indent_level as its |
124 |
| - parent. All other children have an indent_level of one greater than their parent. |
125 |
| -
|
126 |
| - new_indent_level: If true, this indent level is different from the previous row's. |
127 |
| -
|
128 |
| - output: the list of rows into which to append the rows produced for this node and its |
129 |
| - children. |
130 |
| -
|
131 |
| - is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty |
132 |
| - printed format. |
133 |
| -
|
134 |
| - separate_prefix_column: Optional. If True, the prefix and operator name will be |
135 |
| - returned as separate column. Otherwise, prefix and operater name will be concatenated |
136 |
| - into single column. |
137 |
| -
|
138 |
| - Returns the index of the next exec node in summary.exec_nodes that should be |
139 |
| - processed, used internally to this method only. |
140 |
| - """ |
141 |
| - attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"] |
142 |
| - |
143 |
| - # Initialise aggregate and maximum stats |
144 |
| - agg_stats, max_stats = TExecStats(), TExecStats() |
145 |
| - for attr in attrs: |
146 |
| - setattr(agg_stats, attr, 0) |
147 |
| - setattr(max_stats, attr, 0) |
148 |
| - |
149 |
| - node = summary.nodes[idx] |
150 |
| - if node.exec_stats is not None: |
151 |
| - for stats in node.exec_stats: |
152 |
| - for attr in attrs: |
153 |
| - val = getattr(stats, attr) |
154 |
| - if val is not None: |
155 |
| - setattr(agg_stats, attr, getattr(agg_stats, attr) + val) |
156 |
| - setattr(max_stats, attr, max(getattr(max_stats, attr), val)) |
157 |
| - |
158 |
| - if node.exec_stats is not None and node.exec_stats: |
159 |
| - avg_time = agg_stats.latency_ns / len(node.exec_stats) |
160 |
| - else: |
161 |
| - avg_time = 0 |
162 |
| - |
163 |
| - is_sink = node.node_id == -1 |
164 |
| - # If the node is a broadcast-receiving exchange node, the cardinality of rows produced |
165 |
| - # is the max over all instances (which should all have received the same number of |
166 |
| - # rows). Otherwise, the cardinality is the sum over all instances which process |
167 |
| - # disjoint partitions. |
168 |
| - if is_sink: |
169 |
| - cardinality = -1 |
170 |
| - elif node.is_broadcast: |
171 |
| - cardinality = max_stats.cardinality |
172 |
| - else: |
173 |
| - cardinality = agg_stats.cardinality |
174 |
| - |
175 |
| - est_stats = node.estimated_stats |
176 |
| - label_prefix = "" |
177 |
| - if indent_level > 0: |
178 |
| - label_prefix = "|" |
179 |
| - label_prefix += " |" * (indent_level - 1) |
180 |
| - if new_indent_level: |
181 |
| - label_prefix += "--" |
182 |
| - else: |
183 |
| - label_prefix += " " |
184 |
| - |
185 |
| - def prettyprint(val, units, divisor): |
186 |
| - for unit in units: |
187 |
| - if val < divisor: |
188 |
| - if unit == units[0]: |
189 |
| - return "%d%s" % (val, unit) |
190 |
| - else: |
191 |
| - return "%3.2f%s" % (val, unit) |
192 |
| - val /= divisor |
193 |
| - |
194 |
| - def prettyprint_bytes(byte_val): |
195 |
| - return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0) |
196 |
| - |
197 |
| - def prettyprint_units(unit_val): |
198 |
| - return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0) |
199 |
| - |
200 |
| - def prettyprint_time(time_val): |
201 |
| - return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0) |
202 |
| - |
203 |
| - instances = 0 |
204 |
| - if node.exec_stats is not None: |
205 |
| - instances = len(node.exec_stats) |
206 |
| - latency = max_stats.latency_ns |
207 |
| - cardinality_est = est_stats.cardinality |
208 |
| - memory_used = max_stats.memory_used |
209 |
| - memory_est = est_stats.memory_used |
210 |
| - if (is_prettyprint): |
211 |
| - avg_time = prettyprint_time(avg_time) |
212 |
| - latency = prettyprint_time(latency) |
213 |
| - cardinality = "" if is_sink else prettyprint_units(cardinality) |
214 |
| - cardinality_est = "" if is_sink else prettyprint_units(cardinality_est) |
215 |
| - memory_used = prettyprint_bytes(memory_used) |
216 |
| - memory_est = prettyprint_bytes(memory_est) |
217 |
| - |
218 |
| - row = list() |
219 |
| - if separate_prefix_column: |
220 |
| - row.append(label_prefix) |
221 |
| - row.append(node.label) |
222 |
| - else: |
223 |
| - row.append(label_prefix + node.label) |
224 |
| - |
225 |
| - row.extend([ |
226 |
| - node.num_hosts, |
227 |
| - instances, |
228 |
| - avg_time, |
229 |
| - latency, |
230 |
| - cardinality, |
231 |
| - cardinality_est, |
232 |
| - memory_used, |
233 |
| - memory_est, |
234 |
| - node.label_detail]) |
235 |
| - |
236 |
| - output.append(row) |
237 |
| - try: |
238 |
| - sender_idx = summary.exch_to_sender_map[idx] |
239 |
| - # This is an exchange node or a join node with a separate builder, so the source |
240 |
| - # is a fragment root, and should be printed next. |
241 |
| - sender_indent_level = indent_level + node.num_children |
242 |
| - sender_new_indent_level = node.num_children > 0 |
243 |
| - build_exec_summary_table(summary, sender_idx, sender_indent_level, |
244 |
| - sender_new_indent_level, output, is_prettyprint, |
245 |
| - separate_prefix_column) |
246 |
| - except (KeyError, TypeError): |
247 |
| - # Fall through if idx not in map, or if exch_to_sender_map itself is not set |
248 |
| - pass |
249 |
| - |
250 |
| - idx += 1 |
251 |
| - if node.num_children > 0: |
252 |
| - first_child_output = [] |
253 |
| - idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output, |
254 |
| - is_prettyprint, separate_prefix_column) |
255 |
| - for child_idx in xrange(1, node.num_children): |
256 |
| - # All other children are indented (we only have 0, 1 or 2 children for every exec |
257 |
| - # node at the moment) |
258 |
| - idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output, |
259 |
| - is_prettyprint, separate_prefix_column) |
260 |
| - output += first_child_output |
261 |
| - return idx |
262 |
| - |
263 |
| - |
264 | 113 | class QueryOptionLevels:
|
265 | 114 | """These are the levels used when displaying query options.
|
266 | 115 | The values correspond to the ones in TQueryOptionLevel"""
|
|
0 commit comments