-
Notifications
You must be signed in to change notification settings - Fork 65
/
Copy pathclient.py
636 lines (516 loc) · 23 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
# Copyright 2015 Lukas Lalinsky
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Implementation of the PROTOBUF-over-HTTP RPC protocol used by Avatica."""
import logging
import math
import pprint
import re
import time
from phoenixdb import errors
from phoenixdb.avatica.proto import common_pb2, requests_pb2, responses_pb2
import requests
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
try:
from HTMLParser import HTMLParser
except ImportError:
from html.parser import HTMLParser
__all__ = ['AvaticaClient']
logger = logging.getLogger(__name__)
class JettyErrorPageParser(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.path = []
self.title = []
self.message = []
def handle_starttag(self, tag, attrs):
self.path.append(tag)
def handle_endtag(self, tag):
self.path.pop()
def handle_data(self, data):
if len(self.path) > 2 and self.path[0] == 'html' and self.path[1] == 'body':
if len(self.path) == 3 and self.path[2] == 'h2':
self.title.append(data.strip())
elif len(self.path) == 4 and self.path[2] == 'p' and self.path[3] == 'pre':
self.message.append(data.strip())
def parse_url(url):
url = urlparse.urlparse(url)
if not url.scheme and not url.netloc and url.path:
netloc = url.path
if ':' not in netloc:
netloc = '{}:8765'.format(netloc)
return urlparse.ParseResult('http', netloc, '/', '', '', '')
return url
# Defined in phoenix-core/src/main/java/org/apache/phoenix/exception/SQLExceptionCode.java
SQLSTATE_ERROR_CLASSES = [
('08', errors.OperationalError), # Connection Exception
('22018', errors.IntegrityError), # Constraint violatioin.
('22', errors.DataError), # Data Exception
('23', errors.IntegrityError), # Constraint Violation
('24', errors.InternalError), # Invalid Cursor State
('25', errors.InternalError), # Invalid Transaction State
('42', errors.ProgrammingError), # Syntax Error or Access Rule Violation
('XLC', errors.OperationalError), # Execution exceptions
('INT', errors.InternalError), # Phoenix internal error
]
def raise_sql_error(code, sqlstate, message):
for prefix, error_class in SQLSTATE_ERROR_CLASSES:
if sqlstate.startswith(prefix):
raise error_class(message, code, sqlstate)
raise errors.InternalError(message, code, sqlstate)
def parse_and_raise_sql_error(message):
match = re.findall(r'(?:([^ ]+): )?ERROR (\d+) \(([0-9A-Z]{5})\): (.*?) ->', message)
if match is not None and len(match):
exception, code, sqlstate, message = match[0]
raise_sql_error(int(code), sqlstate, message)
def parse_error_page(html):
parser = JettyErrorPageParser()
parser.feed(html)
if parser.title == ['HTTP ERROR: 500']:
message = ' '.join(parser.message).strip()
parse_and_raise_sql_error(message)
raise errors.InternalError(message)
def parse_error_protobuf(text):
try:
message = common_pb2.WireMessage()
message.ParseFromString(text)
err = responses_pb2.ErrorResponse()
if not err.ParseFromString(message.wrapped_message):
raise Exception('No error message found')
except Exception:
# Not a protobuf error, fall through
return
parse_and_raise_sql_error(err.error_message)
raise_sql_error(err.error_code, err.sql_state, err.error_message)
# Not a protobuf error, fall through
class AvaticaClient(object):
"""Client for Avatica's RPC server.
This exposes all low-level functionality that the Avatica
server provides, using the native terminology. You most likely
do not want to use this class directly, but rather get connect
to a server using :func:`phoenixdb.connect`.
"""
def __init__(self, url, max_retries=None, auth=None, verify=None):
"""Constructs a new client object.
:param url:
URL of an Avatica RPC server.
"""
self.url = parse_url(url)
self.max_retries = max_retries if max_retries is not None else 3
self.auth = auth
self.verify = verify
self.session = None
def __del__(self):
"""Finalizer. Calls close() to close any open sessions"""
self.close()
def connect(self):
"""Open the session on the the first request instead"""
pass
def close(self):
if self.session:
self.session.close()
self.session = None
def _post_request(self, body, headers):
# Create the session if we haven't before
if not self.session:
logger.debug("Creating a new Session")
self.session = requests.Session()
self.session.headers.update(headers)
self.session.stream = True
if self.auth is not None:
self.session.auth = self.auth
retry_count = self.max_retries
while True:
logger.debug("POST %s %r %r", self.url.geturl(), body, self.session.headers)
requestArgs = {'data': body}
# Setting verify on the Session is not the same as setting it
# as a request arg
if self.verify is not None:
requestArgs.update(verify=self.verify)
try:
response = self.session.post(self.url.geturl(), **requestArgs)
except requests.HTTPError as e:
if retry_count > 0:
delay = math.exp(-retry_count)
logger.debug("HTTP protocol error, will retry in %s seconds...", delay, exc_info=True)
time.sleep(delay)
retry_count -= 1
continue
raise errors.InterfaceError('RPC request failed', cause=e)
else:
if response.status_code == requests.codes.service_unavailable:
if retry_count > 0:
delay = math.exp(-retry_count)
logger.debug("Service unavailable, will retry in %s seconds...", delay, exc_info=True)
time.sleep(delay)
retry_count -= 1
continue
return response
def _apply(self, request_data, expected_response_type=None):
logger.debug("Sending request\n%s", pprint.pformat(request_data))
request_name = request_data.__class__.__name__
message = common_pb2.WireMessage()
message.name = 'org.apache.calcite.avatica.proto.Requests${}'.format(request_name)
message.wrapped_message = request_data.SerializeToString()
body = message.SerializeToString()
headers = {'content-type': 'application/x-google-protobuf'}
response = self._post_request(body, headers)
response_body = response.raw.read()
if response.status_code != requests.codes.ok:
logger.debug("Received response\n%s", response_body)
if b'<html>' in response_body:
parse_error_page(response_body.decode(response.encoding))
else:
# assume the response is in protobuf format
parse_error_protobuf(response_body)
raise errors.InterfaceError('RPC request returned invalid status code', response.status_code)
message = common_pb2.WireMessage()
message.ParseFromString(response_body)
logger.debug("Received response\n%s", message)
if expected_response_type is None:
expected_response_type = request_name.replace('Request', 'Response')
expected_response_type = 'org.apache.calcite.avatica.proto.Responses$' + expected_response_type
if message.name != expected_response_type:
raise errors.InterfaceError('unexpected response type "{}" expected "{}"'.format(message.name, expected_response_type))
return message.wrapped_message
def get_catalogs(self, connection_id):
request = requests_pb2.CatalogsRequest()
request.connection_id = connection_id
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_schemas(self, connection_id, catalog=None, schemaPattern=None):
request = requests_pb2.SchemasRequest()
request.connection_id = connection_id
if catalog is not None:
request.catalog = catalog
if catalog == '':
request.has_catalog = True
if schemaPattern is not None:
request.schema_pattern = schemaPattern
if schemaPattern == '':
request.has_schema_pattern = True
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_tables(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, typeList=None):
request = requests_pb2.TablesRequest()
request.connection_id = connection_id
if catalog is not None:
request.catalog = catalog
if catalog == '':
request.has_catalog = True
if schemaPattern is not None:
request.schema_pattern = schemaPattern
if schemaPattern == '':
request.has_schema_pattern = True
if tableNamePattern is not None:
request.table_name_pattern = tableNamePattern
if tableNamePattern == '':
request.has_table_name_pattern = True
if typeList is not None:
request.type_list.extend(typeList)
request.has_type_list = typeList is not None
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_columns(self, connection_id, catalog=None, schemaPattern=None, tableNamePattern=None, columnNamePattern=None):
request = requests_pb2.ColumnsRequest()
request.connection_id = connection_id
if catalog is not None:
request.catalog = catalog
if catalog == '':
request.has_catalog = True
if schemaPattern is not None:
request.schema_pattern = schemaPattern
if schemaPattern == '':
request.has_schema_pattern = True
if tableNamePattern is not None:
request.table_name_pattern = tableNamePattern
if tableNamePattern == '':
request.has_table_name_pattern = True
if columnNamePattern is not None:
request.column_name_pattern = columnNamePattern
if columnNamePattern == '':
request.has_column_name_pattern = True
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_table_types(self, connection_id):
request = requests_pb2.TableTypesRequest()
request.connection_id = connection_id
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_type_info(self, connection_id):
request = requests_pb2.TypeInfoRequest()
request.connection_id = connection_id
response_data = self._apply(request, 'ResultSetResponse')
response = responses_pb2.ResultSetResponse()
response.ParseFromString(response_data)
return response
def get_sync_results(self, connection_id, statement_id, state):
request = requests_pb2.SyncResultsRequest()
request.connection_id = connection_id
request.statement_id = statement_id
request.state.CopyFrom(state)
response_data = self._apply(request, 'SyncResultsResponse')
syncResultResponse = responses_pb2.SyncResultsResponse()
syncResultResponse.ParseFromString(response_data)
return syncResultResponse
def connection_sync_dict(self, connection_id, connProps=None):
conn_props = self.connection_sync(connection_id, connProps)
return {
'autoCommit': conn_props.auto_commit,
'readOnly': conn_props.read_only,
'transactionIsolation': conn_props.transaction_isolation,
'catalog': conn_props.catalog,
'schema': conn_props.schema}
def connection_sync(self, connection_id, connProps=None):
"""Synchronizes connection properties with the server.
:param connection_id:
ID of the current connection.
:param connProps:
Dictionary with the properties that should be changed.
:returns:
A ``common_pb2.ConnectionProperties`` object.
"""
if connProps:
props = connProps.copy()
else:
props = {}
request = requests_pb2.ConnectionSyncRequest()
request.connection_id = connection_id
request.conn_props.has_auto_commit = True
request.conn_props.has_read_only = True
if 'autoCommit' in props:
request.conn_props.auto_commit = props.pop('autoCommit')
if 'readOnly' in props:
request.conn_props.read_only = props.pop('readOnly')
if 'transactionIsolation' in props:
request.conn_props.transaction_isolation = props.pop('transactionIsolation', None)
if 'catalog' in props:
request.conn_props.catalog = props.pop('catalog', None)
if 'schema' in props:
request.conn_props.schema = props.pop('schema', None)
if props:
logger.warning("Unhandled connection property:" + props)
response_data = self._apply(request)
response = responses_pb2.ConnectionSyncResponse()
response.ParseFromString(response_data)
return response.conn_props
def open_connection(self, connection_id, info=None):
"""Opens a new connection.
:param connection_id:
ID of the connection to open.
"""
request = requests_pb2.OpenConnectionRequest()
request.connection_id = connection_id
if info is not None:
# Info is a list of repeated pairs, setting a dict directly fails
for k, v in info.items():
request.info[k] = v
response_data = self._apply(request)
response = responses_pb2.OpenConnectionResponse()
response.ParseFromString(response_data)
def close_connection(self, connection_id):
"""Closes a connection.
:param connection_id:
ID of the connection to close.
"""
request = requests_pb2.CloseConnectionRequest()
request.connection_id = connection_id
self._apply(request)
def create_statement(self, connection_id):
"""Creates a new statement.
:param connection_id:
ID of the current connection.
:returns:
New statement ID.
"""
request = requests_pb2.CreateStatementRequest()
request.connection_id = connection_id
response_data = self._apply(request)
response = responses_pb2.CreateStatementResponse()
response.ParseFromString(response_data)
return response.statement_id
def close_statement(self, connection_id, statement_id):
"""Closes a statement.
:param connection_id:
ID of the current connection.
:param statement_id:
ID of the statement to close.
"""
request = requests_pb2.CloseStatementRequest()
request.connection_id = connection_id
request.statement_id = statement_id
self._apply(request)
def prepare_and_execute(self, connection_id, statement_id, sql, max_rows_total=None, first_frame_max_size=None):
"""Prepares and immediately executes a statement.
:param connection_id:
ID of the current connection.
:param statement_id:
ID of the statement to prepare.
:param sql:
SQL query.
:param max_rows_total:
The maximum number of rows that will be allowed for this query.
:param first_frame_max_size:
The maximum number of rows that will be returned in the first Frame returned for this query.
:returns:
Result set with the signature of the prepared statement and the first frame data.
"""
request = requests_pb2.PrepareAndExecuteRequest()
request.connection_id = connection_id
request.statement_id = statement_id
request.sql = sql
if max_rows_total is not None:
request.max_rows_total = max_rows_total
if first_frame_max_size is not None:
request.first_frame_max_size = first_frame_max_size
response_data = self._apply(request, 'ExecuteResponse')
response = responses_pb2.ExecuteResponse()
response.ParseFromString(response_data)
return response.results
def prepare(self, connection_id, sql, max_rows_total=None):
"""Prepares a statement.
:param connection_id:
ID of the current connection.
:param sql:
SQL query.
:param max_rows_total:
The maximum number of rows that will be allowed for this query.
:returns:
Signature of the prepared statement.
"""
request = requests_pb2.PrepareRequest()
request.connection_id = connection_id
request.sql = sql
if max_rows_total is not None:
request.max_rows_total = max_rows_total
response_data = self._apply(request)
response = responses_pb2.PrepareResponse()
response.ParseFromString(response_data)
return response.statement
def execute(self, connection_id, statement_id, signature, parameter_values=None, first_frame_max_size=None):
"""Returns a frame of rows.
The frame describes whether there may be another frame. If there is not
another frame, the current iteration is done when we have finished the
rows in the this frame.
:param connection_id:
ID of the current connection.
:param statement_id:
ID of the statement to fetch rows from.
:param signature:
common_pb2.Signature object
:param parameter_values:
A list of parameter values, if statement is to be executed; otherwise ``None``.
:param first_frame_max_size:
The maximum number of rows that will be returned in the first Frame returned for this query.
:returns:
Frame data, or ``None`` if there are no more.
"""
request = requests_pb2.ExecuteRequest()
request.statementHandle.id = statement_id
request.statementHandle.connection_id = connection_id
request.statementHandle.signature.CopyFrom(signature)
if parameter_values is not None:
request.parameter_values.extend(parameter_values)
request.has_parameter_values = True
if first_frame_max_size is not None:
request.deprecated_first_frame_max_size = first_frame_max_size
request.first_frame_max_size = first_frame_max_size
response_data = self._apply(request)
response = responses_pb2.ExecuteResponse()
response.ParseFromString(response_data)
return response.results
def execute_batch(self, connection_id, statement_id, rows):
"""Returns an array of update counts corresponding to each row written.
:param connection_id:
ID of the current connection.
:param statement_id:
ID of the statement to fetch rows from.
:param rows:
A list of lists corresponding to the columns to bind to the statement
for many rows.
:returns:
Update counts for the writes.
"""
request = requests_pb2.ExecuteBatchRequest()
request.statement_id = statement_id
request.connection_id = connection_id
if rows is not None:
for row in rows:
batch = requests_pb2.UpdateBatch()
for col in row:
batch.parameter_values.append(col)
request.updates.append(batch)
response_data = self._apply(request)
response = responses_pb2.ExecuteBatchResponse()
response.ParseFromString(response_data)
if response.missing_statement:
raise errors.DatabaseError('ExecuteBatch reported missing statement', -1)
return response.update_counts
def fetch(self, connection_id, statement_id, offset=0, frame_max_size=None):
"""Returns a frame of rows.
The frame describes whether there may be another frame. If there is not
another frame, the current iteration is done when we have finished the
rows in the this frame.
:param connection_id:
ID of the current connection.
:param statement_id:
ID of the statement to fetch rows from.
:param offset:
Zero-based offset of first row in the requested frame.
:param frame_max_size:
Maximum number of rows to return; negative means no limit.
:returns:
Frame data, or ``None`` if there are no more.
"""
request = requests_pb2.FetchRequest()
request.connection_id = connection_id
request.statement_id = statement_id
request.offset = offset
if frame_max_size is not None:
request.frame_max_size = frame_max_size
response_data = self._apply(request)
response = responses_pb2.FetchResponse()
response.ParseFromString(response_data)
return response.frame
def commit(self, connection_id):
"""TODO Commits the transaction
:param connection_id:
ID of the current connection.
"""
request = requests_pb2.CommitRequest()
request.connection_id = connection_id
return self._apply(request)
def rollback(self, connection_id):
"""TODO Rolls back the transaction
:param connection_id:
ID of the current connection.
"""
request = requests_pb2.RollbackRequest()
request.connection_id = connection_id
return self._apply(request)