-
Notifications
You must be signed in to change notification settings - Fork 523
/
Copy pathimpala_client.py
executable file
·1521 lines (1346 loc) · 65.6 KB
/
impala_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import print_function, unicode_literals
from compatibility import _xrange as xrange
from bitarray import bitarray
import base64
import operator
import re
import sasl
import socket
import ssl
import sys
import time
import traceback
from datetime import datetime
import uuid
from impala_thrift_gen.beeswax import BeeswaxService
from impala_thrift_gen.beeswax.BeeswaxService import QueryState
from impala_thrift_gen.ImpalaService import ImpalaService, ImpalaHiveServer2Service
from impala_thrift_gen.ImpalaService.ImpalaHiveServer2Service import (
TGetRuntimeProfileReq, TGetExecSummaryReq, TPingImpalaHS2ServiceReq,
TCloseImpalaOperationReq)
from impala_thrift_gen.ErrorCodes.ttypes import TErrorCode
from impala_thrift_gen.Status.ttypes import TStatus
from impala_thrift_gen.TCLIService.TCLIService import (TExecuteStatementReq,
TOpenSessionReq, TCloseSessionReq, TProtocolVersion, TStatusCode,
TGetOperationStatusReq, TOperationState, TFetchResultsReq, TFetchOrientation,
TGetLogReq, TGetResultSetMetadataReq, TTypeId, TCancelOperationReq,
TCloseOperationReq)
from ImpalaHttpClient import ImpalaHttpClient
from exec_summary import build_exec_summary_table
from kerberos_util import get_kerb_host_from_kerberos_host_fqdn
from thrift.protocol import TBinaryProtocol
from thrift_sasl import TSaslClientTransport
from thrift.transport.TSocket import TSocket
from thrift.transport.TTransport import TBufferedTransport, TTransportException
from thrift.Thrift import TApplicationException, TException
from shell_exceptions import (RPCException, QueryStateException, DisconnectedException,
QueryCancelledByShellException, MissingThriftMethodException, HttpError)
from value_converter import HS2ValueConverter
from thrift_printer import ThriftPrettyPrinter
# Getters to extract HS2's representation of values to the display version.
# An entry must be added to this map for each supported type. HS2's TColumn has many
# different typed field, each of which has a 'values' and a 'nulls' field. These getters
# extract the appropriate member from TColumn for the given TTypeId.
HS2_VALUE_GETTERS = {
TTypeId.BOOLEAN_TYPE: operator.attrgetter('boolVal'),
TTypeId.TINYINT_TYPE: operator.attrgetter('byteVal'),
TTypeId.SMALLINT_TYPE: operator.attrgetter('i16Val'),
TTypeId.INT_TYPE: operator.attrgetter('i32Val'),
TTypeId.BIGINT_TYPE: operator.attrgetter('i64Val'),
TTypeId.TIMESTAMP_TYPE: operator.attrgetter('stringVal'),
TTypeId.FLOAT_TYPE: operator.attrgetter('doubleVal'),
TTypeId.DOUBLE_TYPE: operator.attrgetter('doubleVal'),
TTypeId.STRING_TYPE: operator.attrgetter('stringVal'),
TTypeId.DECIMAL_TYPE: operator.attrgetter('stringVal'),
TTypeId.BINARY_TYPE: operator.attrgetter('binaryVal'),
TTypeId.VARCHAR_TYPE: operator.attrgetter('stringVal'),
TTypeId.CHAR_TYPE: operator.attrgetter('stringVal'),
TTypeId.MAP_TYPE: operator.attrgetter('stringVal'),
TTypeId.ARRAY_TYPE: operator.attrgetter('stringVal'),
TTypeId.STRUCT_TYPE: operator.attrgetter('stringVal'),
TTypeId.UNION_TYPE: operator.attrgetter('stringVal'),
TTypeId.NULL_TYPE: operator.attrgetter('stringVal'),
TTypeId.DATE_TYPE: operator.attrgetter('stringVal')
}
# Helper to decode utf8 encoded str to unicode type in Python 2. NOOP in Python 3.
def utf8_decode_if_needed(val):
if sys.version_info.major < 3 and isinstance(val, str):
val = val.decode('utf-8', errors='replace')
return val
# Helper to decode unicode to utf8 encoded str in Python 2. NOOP in Python 3.
def utf8_encode_if_needed(val):
if sys.version_info.major < 3 and isinstance(val, unicode):
val = val.encode('utf-8', errors='replace')
return val
# Regular expression that matches the progress line added to HS2 logs by
# the Impala server.
HS2_LOG_PROGRESS_REGEX = re.compile(r"Query.*Complete \([0-9]* out of [0-9]*\)\n")
# Exception types to differentiate between the different RPCExceptions.
# RPCException raised when TApplicationException is caught.
RPC_EXCEPTION_TAPPLICATION = "TAPPLICATION_EXCEPTION"
# RPCException raised when impala server sends a TStatusCode.ERROR_STATUS status code.
RPC_EXCEPTION_SERVER = "SERVER_ERROR"
class QueryOptionLevels:
"""These are the levels used when displaying query options.
The values correspond to the ones in TQueryOptionLevel"""
REGULAR = 0
ADVANCED = 1
DEVELOPMENT = 2
DEPRECATED = 3
REMOVED = 4
# Map from level name to the level's numeric value.
NAME_TO_VALUES = {'REGULAR': 0, 'ADVANCED': 1, 'DEVELOPMENT': 2,
'DEPRECATED': 3, 'REMOVED': 4}
@classmethod
def from_string(cls, string):
"""Return the integral value based on the string. Defaults to DEVELOPMENT."""
return cls.NAME_TO_VALUES.get(string.upper(), cls.DEVELOPMENT)
class ImpalaClient(object):
"""Base class for shared functionality between HS2 and Beeswax. Includes stub methods
for methods that are expected to be implemented in the subclasses.
TODO: when beeswax support is removed, merge this with ImpalaHS2Client."""
def __init__(self, impalad, fetch_size, kerberos_host_fqdn, use_kerberos=False,
kerberos_service_name="impala", use_ssl=False, ca_cert=None, user=None,
ldap_password=None, use_ldap=False, client_connect_timeout_ms=60000,
verbose=True, use_http_base_transport=False, http_path=None,
http_cookie_names=None, http_socket_timeout_s=None, value_converter=None,
connect_max_tries=4, rpc_stdout=False, rpc_file=None, http_tracing=True,
jwt=None, oauth=None, hs2_x_forward=None):
self.connected = False
self.impalad_host = impalad[0]
self.impalad_port = int(impalad[1])
self.kerberos_host_fqdn = kerberos_host_fqdn
self.imp_service = None
self.transport = None
self.use_kerberos = use_kerberos
self.kerberos_service_name = kerberos_service_name
self.use_ssl = use_ssl
self.ca_cert = ca_cert
self.user, self.ldap_password = user, ldap_password
self.use_ldap = use_ldap
self.client_connect_timeout_ms = int(client_connect_timeout_ms)
self.http_socket_timeout_s = http_socket_timeout_s
self.connect_max_tries = connect_max_tries
self.default_query_options = {}
self.query_option_levels = {}
self.fetch_size = fetch_size
self.use_http_base_transport = use_http_base_transport
self.http_path = http_path
self.http_cookie_names = http_cookie_names
self.http_tracing = http_tracing
self.jwt = jwt
self.oauth = oauth
# This is set from ImpalaShell's signal handler when a query is cancelled
# from command line via CTRL+C. It is used to suppress error messages of
# query cancellation.
self.is_query_cancelled = False
self.verbose = verbose
# This is set in connect(). It's used in constructing the retried query link after
# we parse the retried query id.
self.webserver_address = None
self.value_converter = value_converter
self.rpc_stdout = rpc_stdout
self.rpc_file = rpc_file
# In h2s-http clients only, the value of the X-Forwarded-For http header.
self.hs2_x_forward = hs2_x_forward
def connect(self):
"""Creates a connection to an Impalad instance. Returns a tuple with the impala
version string and the webserver address, otherwise raises an exception. If the client
was already connected, closes the previous connection."""
self.close_connection()
if self.use_http_base_transport:
self.transport = self._get_http_transport(self.client_connect_timeout_ms)
else:
self.transport = self._get_transport(self.client_connect_timeout_ms)
assert self.transport and self.transport.isOpen()
if self.verbose:
msg = 'Opened TCP connection to %s:%s' % (self.impalad_host, self.impalad_port)
print(msg, file=sys.stderr)
protocol = TBinaryProtocol.TBinaryProtocolAccelerated(self.transport)
self.imp_service = self._get_thrift_client(protocol)
self.connected = True
try:
self._open_session()
return self._ping_impala_service()
except Exception:
# Ensure we are in a disconnected state if we failed above.
self.close_connection()
raise
def _get_thrift_client(self, protocol):
"""Instantiate a thrift client with the provided protocol."""
raise NotImplementedError()
def _open_session(self):
"""Does any work to open a session for a new connection.
Also sets up self.default_query_options and self.query_option_levels
to include metadata about the options supported by the server."""
raise NotImplementedError()
def is_connected(self):
"""Returns True if the current Impala connection is alive and False otherwise."""
if not self.connected:
return False
try:
self._ping_impala_service()
return True
# Catch exceptions that are associated with communication errors.
except TException:
self.close_connection()
return False
except RPCException:
self.close_connection()
return False
except DisconnectedException:
self.close_connection()
return False
def close_connection(self):
"""Close any open sessions and close the connection if still open."""
raise NotImplementedError()
def _close_transport(self):
"""Closes transport if not closed and set self.connected to False. This is the last
step of close_connection()."""
if self.transport and self.transport.isOpen():
self.transport.close()
self.connected = False
def _ping_impala_service(self):
"""Pings the Impala service to ensure it can receive RPCs. Returns a tuple with
the impala version string and the webserver address. Raise TException, RPCException,
DisconnectedException or MissingThriftMethodException if it cannot successfully
communicate with the Impala daemon."""
raise NotImplementedError()
def execute_query(self, query_str, set_query_options):
"""Execute the query 'query_str' asynchronously on the server with options dictionary
'set_query_options' and return a query handle that can be used for subsequent
ImpalaClient method calls for the query. The handle returned is
implementation-dependent but is guaranteed to have an 'is_closed' member
that reflects whether the query was closed with close_query() or close_dml()"""
raise NotImplementedError()
def get_query_id_str(self, last_query_handle):
"""Return the standard string representation of an Impala query ID, e.g.
'd74d8ce632c9d4d0:75c5a51100000000'"""
raise NotImplementedError()
def get_query_link(self, query_id):
"""Return the URL link to the debug page of the query"""
return "%s/query_plan?query_id=%s" % (self.webserver_address, query_id)
def wait_to_finish(self, last_query_handle, periodic_callback=None):
"""Wait until the results can be fetched for 'last_query_handle' or until the
query encounters an error or is cancelled. Raises an exception if the query
encounters an error or is cancelled or if we lose connection to the impalad.
If 'periodic_callback' is provided, it is called periodically with no arguments."""
loop_start = time.time()
while True:
start_rpc_time = time.time()
query_state = self.get_query_state(last_query_handle)
rpc_time = time.time() - start_rpc_time
if query_state == self.FINISHED_STATE:
break
elif query_state in (self.ERROR_STATE, self.CANCELED_STATE):
if self.connected:
# TODO: does this do the right thing for a cancelled query?
raise QueryStateException(self.get_error_log(last_query_handle))
else:
raise DisconnectedException("Not connected to impalad.")
if periodic_callback is not None: periodic_callback()
sleep_time = self._get_sleep_interval(loop_start)
if rpc_time < sleep_time:
time.sleep(sleep_time - rpc_time)
def get_query_state(self, last_query_handle):
"""Return the query state string for 'last_query_handle'. Returns self.ERROR_STATE
if there is an error communicating with the server or the client is disconnected.
"""
raise NotImplementedError()
def get_column_names(self, last_query_handle):
"""Get a list of column names for the query. The query must have a result set."""
raise NotImplementedError()
def expect_result_metadata(self, query_str, query_handle):
"""Given a query string and handle, return True if impalad expects result metadata."""
raise NotImplementedError()
def fetch(self, query_handle):
"""Returns an iterable of batches of result rows. Each batch is an iterable of rows.
Each row is an iterable of strings in the format in which they should be displayed
Tries to ensure that the batches have a granularity of self.fetch_size but
does not guarantee it.
"""
"""Returns an iterable of batches of result rows up to self.fetch_size. Does
not need to consolidate those batches into larger batches."""
raise NotImplementedError()
# TODO: when we remove Beeswax, we could merge close_dml() and close_query()
# because the CloseImpalaOperation() response contains enough information to
# differentiate between DML and non-DML.
def close_dml(self, last_query_handle):
"""Fetches the results of a DML query. Returns a tuple containing the
number of rows modified, the number of rows deleted, and the number of row errors,
in that order. If the DML operation doesn't return 'rows_deleted' or
'num_row_errors', then the respective element in the tuple is None.
Returns None if the query was not closed successfully. Not idempotent."""
raise NotImplementedError()
def close_query(self, last_query_handle):
"""Close the query handle. Idempotent - after the first attempt, closing the same
query handle is a no-op. Returns True if the query was closed
successfully or False otherwise."""
raise NotImplementedError()
def cancel_query(self, last_query_handle):
"""Cancel a query on a keyboard interrupt from the shell. Return True if the
query was previously cancelled or if the cancel operation succeeded. Return
False otherwise."""
raise NotImplementedError()
def get_runtime_profile(self, last_query_handle):
"""Get the runtime profile string from the server. Returns None if
an error was encountered. If the query was retried, returns the profile of the failed
attempt as well; the tuple (profile, failed_profile) is returned where 'profile' is
the profile of the most recent query attempt and 'failed_profile' is the profile of
the original query attempt that failed. Currently, only the HS2 protocol supports
returning the failed profile."""
raise NotImplementedError()
def get_summary(self, last_query_handle):
"""Get the thrift TExecSummary from the server. Returns None if
an error was encountered. If the query was retried, returns TExecSummary of the failed
attempt as well; the tuple (summary, failed_summary) is returned where 'summary' is
the TExecSummary of the most recent query attempt and 'failed_summary' is the
TExecSummary of the original query attempt that failed. Currently, only the HS2
protocol supports returning the failed summary"""
raise NotImplementedError()
def _get_warn_or_error_log(self, last_query_handle, warn):
"""Returns all messages from the error log prepended with 'WARNINGS:' or 'ERROR:' for
last_query_handle, depending on whether warn is True or False. Note that the error
log may contain messages that are not errors (e.g. warnings)."""
raise NotImplementedError()
def get_warning_log(self, last_query_handle):
"""Returns all messages from the error log prepended with 'WARNINGS:' for
last_query_handle. Note that the error log may contain messages that are not errors
(e.g. warnings)."""
return self._get_warn_or_error_log(last_query_handle, True)
def get_error_log(self, last_query_handle):
"""Returns all messages from the error log prepended with 'ERROR:' for
last_query_handle. Note that the error log may contain messages that are not errors
(e.g. warnings)."""
return self._get_warn_or_error_log(last_query_handle, False)
def _append_retried_query_link(self, get_log_result):
"""Append the retried query link if the original query has been retried"""
if self.webserver_address:
query_id_search = re.search("Query has been retried using query id: (.*)\n",
get_log_result)
if query_id_search and len(query_id_search.groups()) >= 1:
retried_query_id = query_id_search.group(1)
get_log_result += "Retried query link: %s" % \
self.get_query_link(retried_query_id)
return get_log_result
def _get_http_transport(self, connect_timeout_ms):
"""Creates a transport with HTTP as the base."""
# Older python versions do not support SSLContext needed by ImpalaHttpClient. More
# context in IMPALA-8864. CentOs 6 ships such an incompatible python version
# out of the box.
if not hasattr(ssl, "create_default_context"):
print("Python version too old. SSLContext not supported.", file=sys.stderr)
raise NotImplementedError()
# Current implementation of ImpalaHttpClient does a close() and open() of the
# underlying http connection on every flush() (THRIFT-4600). Due to this, setting a
# connect timeout does not achieve the desirable result as the subsequent open() could
# block similary in case of problematic remote end points.
# TODO: Investigate connection reuse in ImpalaHttpClient and revisit this.
if connect_timeout_ms > 0 and self.verbose:
print("Warning: --connect_timeout_ms is currently ignored with HTTP transport.",
file=sys.stderr)
# Notes on http socket timeout:
# https://docs.python.org/3/library/socket.html#socket-timeouts
# Having a default timeout of 'None' (blocking mode) could result in hang like
# symptoms in case of a problematic remote endpoint. It's better to have a finite
# timeout so that in case of any connection errors, the client retries have a better
# chance of succeeding.
host_and_port = "{0}:{1}".format(self.impalad_host, self.impalad_port)
assert self.http_path
# ImpalaHttpClient relies on the URI scheme (http vs https) to open an appropriate
# connection to the server.
if self.use_ssl:
ssl_ctx = ssl.create_default_context(cafile=self.ca_cert)
if self.ca_cert:
ssl_ctx.verify_mode = ssl.CERT_REQUIRED
else:
ssl_ctx.check_hostname = False # Mandated by the SSL lib for CERT_NONE mode.
ssl_ctx.verify_mode = ssl.CERT_NONE
url = "https://{0}/{1}".format(host_and_port, self.http_path)
transport = ImpalaHttpClient(url, ssl_context=ssl_ctx,
http_cookie_names=self.http_cookie_names,
socket_timeout_s=self.http_socket_timeout_s,
verbose=self.verbose)
else:
url = "http://{0}/{1}".format(host_and_port, self.http_path)
transport = ImpalaHttpClient(url, http_cookie_names=self.http_cookie_names,
socket_timeout_s=self.http_socket_timeout_s,
verbose=self.verbose)
if self.use_ldap:
# Set the BASIC authorization
user_passwd = "{0}:{1}".format(self.user, self.ldap_password)
# Produce RFC 2617-compliant basic credentials:
# - RFC 2045 encoding of username:password without limitations to 76 chars
# per line (and without trailing newline)
# - No translation of characters (+,/) for URL-safety
auth = base64.b64encode(user_passwd.encode()).decode()
transport.setLdapAuth(auth)
elif self.jwt is not None:
transport.setJwtAuth(self.jwt)
elif self.oauth is not None:
transport.setOAuthAuth(self.oauth)
elif self.use_kerberos or self.kerberos_host_fqdn:
# Set the Kerberos service
if self.kerberos_host_fqdn is not None:
kerb_host = get_kerb_host_from_kerberos_host_fqdn(self.kerberos_host_fqdn)
else:
kerb_host = self.impalad_host
kerb_service = "{0}@{1}".format(self.kerberos_service_name, kerb_host)
transport.setKerberosAuth(kerb_service)
else:
transport.setNoneAuth()
transport.addCustomHeaderFunc(self.get_custom_http_headers)
# Without buffering Thrift would call socket.recv() each time it deserializes
# something (e.g. a member in a struct).
transport = TBufferedTransport(transport)
transport.open()
return transport
def _get_transport(self, connect_timeout_ms):
"""Create a Transport.
A non-kerberized impalad just needs a simple buffered transport. For
the kerberized version, a sasl transport is created.
If SSL is enabled, a TSSLSocket underlies the transport stack; otherwise a TSocket
is used.
This function returns the socket and the transport object.
"""
if self.use_ssl:
# TSSLSocket needs the ssl module, which may not be standard on all Operating
# Systems. Only attempt to import TSSLSocket if the user wants an SSL connection.
from TSSLSocketWithWildcardSAN import TSSLSocketWithWildcardSAN
# The kerberos_host_fqdn option exposes the SASL client's hostname attribute to
# the user. impala-shell checks to ensure this host matches the host in the kerberos
# principal. So when a load balancer is configured to be used, its hostname is
# expected by impala-shell. Setting this option to the load balancer hostname allows
# impala-shell to connect directly to an impalad.
if self.kerberos_host_fqdn is not None:
sasl_host = get_kerb_host_from_kerberos_host_fqdn(self.kerberos_host_fqdn)
else:
sasl_host = self.impalad_host
# Always use the hostname and port passed in to -i / --impalad as the host for the
# purpose of creating the actual socket.
sock_host = self.impalad_host
sock_port = self.impalad_port
if self.use_ssl:
if self.ca_cert is None:
# No CA cert means don't try to verify the certificate
sock = TSSLSocketWithWildcardSAN(sock_host, sock_port, validate=False)
else:
sock = TSSLSocketWithWildcardSAN(
sock_host, sock_port, validate=True, ca_certs=self.ca_cert)
else:
sock = TSocket(sock_host, sock_port)
if connect_timeout_ms > 0: sock.setTimeout(connect_timeout_ms)
# Helper to initialize a sasl client
def sasl_factory():
sasl_client = sasl.Client()
sasl_client.setAttr("host", sasl_host)
if self.use_ldap:
sasl_client.setAttr("username", self.user)
sasl_client.setAttr("password", self.ldap_password)
else:
sasl_client.setAttr("service", self.kerberos_service_name)
sasl_client.init()
return sasl_client
transport = None
if not (self.use_ldap or self.use_kerberos):
transport = TBufferedTransport(sock)
# GSSASPI is the underlying mechanism used by kerberos to authenticate.
elif self.use_kerberos:
transport = TSaslClientTransport(sasl_factory, "GSSAPI", sock)
else:
transport = TSaslClientTransport(sasl_factory, "PLAIN", sock)
# Open the transport and reset the timeout so that it does not apply to the
# subsequent RPCs on the same socket.
transport.open()
sock.setTimeout(None)
return transport
def build_summary_table(self, summary, output):
build_exec_summary_table(summary, 0, 0, False, output, is_prettyprint=True,
separate_prefix_column=False)
def _get_sleep_interval(self, start_time):
"""Returns a step function of time to sleep in seconds before polling
again. Maximum sleep is 1s, minimum is 0.1s"""
elapsed = time.time() - start_time
if elapsed < 10.0:
return 0.1
elif elapsed < 60.0:
return 0.5
return 1.0
def _check_connected(self):
"""Raise DiconnectedException if the client is not connected."""
if not self.connected:
raise DisconnectedException("Not connected (use CONNECT to establish a connection)")
def get_custom_http_headers(self):
# When the transport is http, subclasses can override this function
# to add arbitrary http headers.
return None
def _process_dml_result(self, dml_result):
num_rows = sum([int(k) for k in dml_result.rows_modified.values()])
num_deleted_rows = None
if dml_result.rows_deleted:
num_deleted_rows = sum([int(k) for k in dml_result.rows_deleted.values()])
return (num_rows, num_deleted_rows, dml_result.num_row_errors)
class ImpalaHS2Client(ImpalaClient):
"""Impala client. Uses the HS2 protocol plus Impala-specific extensions."""
def __init__(self, *args, **kwargs):
super(ImpalaHS2Client, self).__init__(*args, **kwargs)
self.FINISHED_STATE = TOperationState._NAMES_TO_VALUES["FINISHED_STATE"]
self.ERROR_STATE = TOperationState._NAMES_TO_VALUES["ERROR_STATE"]
self.CANCELED_STATE = TOperationState._NAMES_TO_VALUES["CANCELED_STATE"]
self._clear_current_query_handle()
# If connected, this is the handle returned by the OpenSession RPC that needs
# to be passed into most HS2 RPCs.
self.session_handle = None
# Enable retries only for hs2-http protocol.
if self.use_http_base_transport:
# Maximum number of tries for idempotent rpcs.
self.max_tries = self.connect_max_tries
else:
self.max_tries = 1
# Minimum sleep interval between retry attempts.
self.min_sleep_interval = 1
# In case of direct instantiation of the client where the converter is
# not set, there should be a default value converter assigned
if self.value_converter is None:
self.value_converter = HS2ValueConverter()
if self.rpc_stdout or self.rpc_stdout is not None:
self.thrift_printer = ThriftPrettyPrinter()
self._base_request_id = str(uuid.uuid1())
self._request_num = 0
def _get_thrift_client(self, protocol):
return ImpalaHiveServer2Service.Client(protocol)
def _get_sleep_interval_for_retries(self, num_tries):
"""Returns the sleep interval in seconds for the 'num_tries' retry attempt."""
assert num_tries > 0 and num_tries < self.max_tries
return self.min_sleep_interval * (num_tries - 1)
def _open_session(self):
def OpenSession(req):
return self.imp_service.OpenSession(req)
# OpenSession rpcs are idempotent and so ok to retry. If the client gets disconnected
# and the server successfully opened a session, the client will retry and rely on
# server to clean up the session.
req = TOpenSessionReq(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6,
username=self.user)
resp = self._do_hs2_rpc(OpenSession, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
assert (resp.serverProtocolVersion
== TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6), resp.serverProtocolVersion
# TODO: ensure it's closed if needed
self.session_handle = resp.sessionHandle
self._populate_query_options()
def get_custom_http_headers(self):
headers = {}
if self.http_tracing:
session_id = self.get_session_id()
if session_id is not None:
headers["X-Impala-Session-Id"] = session_id
current_query_id = self.get_query_id_str(self._current_query_handle)
if current_query_id is not None:
headers["X-Impala-Query-Id"] = current_query_id
assert getattr(self, "_current_request_id", None) is not None, \
"request id was not set"
headers["X-Request-Id"] = self._current_request_id
if self.hs2_x_forward:
headers["X-Forwarded-For"] = self.hs2_x_forward
return headers
def close_connection(self):
if self.session_handle is not None:
# Attempt to close session explicitly. Do not fail if there is an error
# doing so. We still need to close the transport and we can rely on the
# server to clean up the session.
try:
def CloseSession(req):
return self.imp_service.CloseSession(req)
# CloseSession rpcs don't need retries since we catch all exceptions and close
# transport.
req = TCloseSessionReq(self.session_handle)
resp = self._do_hs2_rpc(CloseSession, req)
self._check_hs2_rpc_status(resp.status)
except Exception as e:
log_exception_with_timestamp(e, "Warning",
"close session RPC failed: {0}".format(type(e)), stderr_flag=False)
self.session_handle = None
self._close_transport()
def _populate_query_options(self):
# List all of the query options and their levels.
# Retrying "set all" should be idempotent
num_tries = 1
while num_tries <= self.max_tries:
raise_error = (num_tries == self.max_tries)
set_all_handle = None
if self.max_tries > 1:
retry_msg = 'Num remaining tries: {0}'.format(self.max_tries - num_tries)
else:
retry_msg = ''
try:
set_all_handle = self.execute_query("set all", {})
self.default_query_options = {}
self.query_option_levels = {}
for rows in self.fetch(set_all_handle):
for name, value, level in rows:
self.default_query_options[name.upper()] = value
self.query_option_levels[name.upper()] = QueryOptionLevels.from_string(level)
break
except (QueryCancelledByShellException, MissingThriftMethodException,
QueryStateException):
raise
except RPCException as r:
if (r.exception_type == RPC_EXCEPTION_TAPPLICATION
or r.exception_type == RPC_EXCEPTION_SERVER):
raise
log_exception_with_timestamp(r, "Exception",
"type={0} when listing query options. {1}".format(type(r), retry_msg))
if raise_error:
raise
except Exception as e:
log_exception_with_timestamp(e, "Exception",
"type={0} when listing query options. {1}".format(type(e), retry_msg))
if raise_error:
raise
finally:
if set_all_handle is not None:
self.close_query(set_all_handle)
time.sleep(self._get_sleep_interval_for_retries(num_tries))
num_tries += 1
def _ping_impala_service(self):
def PingImpalaHS2Service(req):
return self.imp_service.PingImpalaHS2Service(req)
# PingImpalaHS2Service rpc is idempotent and so safe to retry.
req = TPingImpalaHS2ServiceReq(self.session_handle)
resp = self._do_hs2_rpc(PingImpalaHS2Service, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
self.webserver_address = resp.webserver_address
return (resp.version, resp.webserver_address)
def _create_query_req(self, query_str, set_query_options):
conf_overlay = {}
if sys.version_info.major < 3:
key_value_pairs = set_query_options.iteritems()
else:
key_value_pairs = set_query_options.items()
for k, v in key_value_pairs:
conf_overlay[utf8_encode_if_needed(k)] = utf8_encode_if_needed(v)
query = TExecuteStatementReq(sessionHandle=self.session_handle,
statement=utf8_encode_if_needed(query_str),
confOverlay=conf_overlay, runAsync=True)
return query
def execute_query(self, query_str, set_query_options):
"""Execute the query 'query_str' asynchronously on the server with options dictionary
'set_query_options' and return a query handle that can be used for subsequent
ImpalaClient method calls for the query."""
self._clear_current_query_handle()
self.is_query_cancelled = False
def ExecuteStatement(req):
return self.imp_service.ExecuteStatement(req)
# Read queries should be idempotent but most dml queries are not. Also retrying
# query execution from client could be expensive and so likely makes sense to do
# it if server is also aware of the retries.
req = self._create_query_req(query_str, set_query_options)
resp = self._do_hs2_rpc(ExecuteStatement, req)
if resp.status.statusCode != TStatusCode.SUCCESS_STATUS:
msg = utf8_decode_if_needed(resp.status.errorMessage)
raise QueryStateException("ERROR: {0}".format(msg))
handle = resp.operationHandle
try:
self._set_current_query_handle(handle)
if handle.hasResultSet:
def GetResultSetMetadata(req):
return self.imp_service.GetResultSetMetadata(req)
# GetResultSetMetadata rpc is idempotent and should be safe to retry.
req = TGetResultSetMetadataReq(handle)
resp = self._do_hs2_rpc(GetResultSetMetadata, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
assert resp.schema is not None, resp
# Attach the schema to the handle for convenience.
handle.schema = resp.schema
handle.is_closed = False
return handle
finally:
self._clear_current_query_handle()
def get_query_id_str(self, last_query_handle):
if last_query_handle is None:
return None
guid_bytes = last_query_handle.operationId.guid
return self._convert_id_to_str(guid_bytes)
def _set_current_query_handle(self, query_handle):
self._current_query_handle = query_handle
def _clear_current_query_handle(self):
self._current_query_handle = None
def get_session_id(self):
if self.session_handle is None:
return None
return self._convert_id_to_str(self.session_handle.sessionId.guid)
def _convert_id_to_str(self, id_bytes):
# The binary representation is present in the query handle but we need to
# massage it into the expected string representation. C++ and Java code
# treats the low and high half as two 64-bit little-endian integers and
# as a result prints the hex representation in the reverse order to how
# bytes are laid out in guid.
low_bytes_reversed = id_bytes[7::-1]
high_bytes_reversed = id_bytes[16:7:-1]
if sys.version_info.major < 3:
low_hex = low_bytes_reversed.encode('hex_codec')
high_hex = high_bytes_reversed.encode('hex_codec')
else:
low_hex = low_bytes_reversed.hex()
high_hex = high_bytes_reversed.hex()
return "{low}:{high}".format(low=low_hex, high=high_hex)
def fetch(self, query_handle):
try:
self._set_current_query_handle(query_handle)
assert query_handle.hasResultSet
prim_types = [column.typeDesc.types[0].primitiveEntry.type
for column in query_handle.schema.columns]
column_value_getters = [HS2_VALUE_GETTERS[prim_type]
for prim_type in prim_types]
column_value_converters = [self.value_converter.get_converter(prim_type)
for prim_type in prim_types]
while True:
def FetchResults(req):
return self.imp_service.FetchResults(req)
# FetchResults rpc is not idempotent unless the client and server communicate and
# results are kept around for retry to be successful.
req = TFetchResultsReq(query_handle,
TFetchOrientation.FETCH_NEXT,
self.fetch_size)
resp = self._do_hs2_rpc(FetchResults, req)
self._check_hs2_rpc_status(resp.status)
# Transpose the columns into a row-based format for more convenient processing
# for the display code. This is somewhat inefficient, but performance is
# comparable to the old Beeswax code.
yield self._transpose(column_value_getters, column_value_converters,
resp.results.columns)
if not self._hasMoreRows(resp, column_value_getters):
return
finally:
self._clear_current_query_handle()
def _hasMoreRows(self, resp, column_value_getters):
return resp.hasMoreRows
def _transpose(self, column_value_getters, column_value_converters, columns):
"""Transpose the columns from a TFetchResultsResp into the row format returned
by fetch() with all the values converted into their string representations for
display. Uses the getters and convertes provided in column_value_getters[i] and
column_value_converters[i] for column i."""
tcols = [column_value_getters[i](col) for i, col in enumerate(columns)]
num_rows = len(tcols[0].values)
# Preallocate rows for efficiency.
rows = [[None] * len(tcols) for i in xrange(num_rows)]
for col_idx, tcol in enumerate(tcols):
is_null = bitarray(endian='little')
is_null.frombytes(tcol.nulls)
stringifier = column_value_converters[col_idx]
# Skip stringification if not needed. This makes large extracts of tpch.orders
# ~8% faster according to benchmarks.
if stringifier is None:
bitset_len = min(len(is_null), len(rows))
for current_row in xrange(bitset_len):
rows[current_row][col_idx] = 'NULL' if is_null[current_row] \
else tcol.values[current_row]
for current_row in xrange(bitset_len, len(rows)):
rows[current_row][col_idx] = tcol.values[current_row]
else:
bitset_len = min(len(is_null), len(rows))
for current_row in xrange(bitset_len):
rows[current_row][col_idx] = 'NULL' if is_null[current_row] \
else stringifier(tcol.values[current_row])
for current_row in xrange(bitset_len, len(rows)):
rows[current_row][col_idx] = stringifier(tcol.values[current_row])
return rows
def close_dml(self, last_query_handle):
try:
self._set_current_query_handle(last_query_handle)
def CloseImpalaOperation(req):
return self.imp_service.CloseImpalaOperation(req)
# CloseImpalaOperation rpc is not idempotent for dmls.
req = TCloseImpalaOperationReq(last_query_handle)
resp = self._do_hs2_rpc(CloseImpalaOperation, req)
self._check_hs2_rpc_status(resp.status)
if not resp.dml_result:
raise RPCException("Impala DML operation did not return DML statistics.")
last_query_handle.is_closed = True
return self._process_dml_result(resp.dml_result)
finally:
self._clear_current_query_handle()
def close_query(self, last_query_handle):
try:
self._set_current_query_handle(last_query_handle)
# Set a member in the handle to make sure that it is idempotent
if last_query_handle.is_closed:
return True
def CloseImpalaOperation(req):
return self.imp_service.CloseImpalaOperation(req)
# CloseImpalaOperation rpc is idempotent for non dml queries and so safe to retry.
req = TCloseImpalaOperationReq(last_query_handle)
resp = self._do_hs2_rpc(CloseImpalaOperation, req, retry_on_error=True)
last_query_handle.is_closed = True
return self._is_hs2_nonerror_status(resp.status.statusCode)
finally:
self._clear_current_query_handle()
def cancel_query(self, last_query_handle):
# Cancel sets query_state to ERROR_STATE before calling cancel() in the
# co-ordinator, so we don't need to wait.
try:
self._set_current_query_handle(last_query_handle)
if last_query_handle.is_closed:
return True
def CancelOperation(req):
return self.imp_service.CancelOperation(req)
# CancelOperation rpc is idempotent and so safe to retry.
req = TCancelOperationReq(last_query_handle)
resp = self._do_hs2_rpc(CancelOperation, req, retry_on_error=True)
return self._is_hs2_nonerror_status(resp.status.statusCode)
finally:
self._clear_current_query_handle()
def get_query_state(self, last_query_handle):
try:
self._set_current_query_handle(last_query_handle)
def GetOperationStatus(req):
return self.imp_service.GetOperationStatus(req)
# GetOperationStatus rpc is idempotent and so safe to retry.
req = TGetOperationStatusReq(last_query_handle)
resp = self._do_hs2_rpc(GetOperationStatus, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
return resp.operationState
finally:
self._clear_current_query_handle()
def get_runtime_profile(self, last_query_handle):
try:
self._set_current_query_handle(last_query_handle)
def GetRuntimeProfile(req):
return self.imp_service.GetRuntimeProfile(req)
# GetRuntimeProfile rpc is idempotent and so safe to retry.
profile_req = TGetRuntimeProfileReq(last_query_handle,
self.session_handle,
include_query_attempts=True)
resp = self._do_hs2_rpc(GetRuntimeProfile, profile_req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
failed_profile = None
if resp.failed_profiles and len(resp.failed_profiles) >= 1:
failed_profile = resp.failed_profiles[0]
return resp.profile, failed_profile
finally:
self._clear_current_query_handle()
def get_summary(self, last_query_handle):
try:
self._set_current_query_handle(last_query_handle)
def GetExecSummary(req):
return self.imp_service.GetExecSummary(req)
# GetExecSummary rpc is idempotent and so safe to retry.
req = TGetExecSummaryReq(last_query_handle,
self.session_handle,
include_query_attempts=True)
resp = self._do_hs2_rpc(GetExecSummary, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
failed_summary = None
if resp.failed_summaries and len(resp.failed_summaries) >= 1:
failed_summary = resp.failed_summaries[0]
return resp.summary, failed_summary
finally:
self._clear_current_query_handle()
def get_column_names(self, last_query_handle):
# The handle has the schema embedded in it.
assert last_query_handle.hasResultSet
return [column.columnName for column in last_query_handle.schema.columns]
def expect_result_metadata(self, query_str, query_handle):
""" Given a query string, return True if impalad expects result metadata."""
return query_handle.hasResultSet
def _get_warn_or_error_log(self, last_query_handle, warn):
"""Returns all messages from the error log prepended with 'WARNINGS:' or 'ERROR:' for
last_query_handle, depending on whether warn is True or False. Note that the error
log may contain messages that are not errors (e.g. warnings)."""
try:
self._set_current_query_handle(last_query_handle)
if last_query_handle is None:
return "Query could not be executed"
def GetLog(req):
return self.imp_service.GetLog(req)
# GetLog rpc is idempotent and so safe to retry.
req = TGetLogReq(last_query_handle)
resp = self._do_hs2_rpc(GetLog, req, retry_on_error=True)
self._check_hs2_rpc_status(resp.status)
log = utf8_decode_if_needed(resp.log)
# Strip progress message out of HS2 log.
log = HS2_LOG_PROGRESS_REGEX.sub("", log)
if log and log.strip():
log = self._append_retried_query_link(log)