Skip to content

Commit 94c247d

Browse files
committed
support multi-zone
1 parent 3e51660 commit 94c247d

File tree

8 files changed

+171
-49
lines changed

8 files changed

+171
-49
lines changed

qiniu/__init__.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313

1414
from .auth import Auth
1515

16-
from .config import set_default, Zone
16+
from .config import set_default
17+
from .zone import Zone
1718

1819
from .services.storage.bucket import BucketManager, build_batch_copy, build_batch_rename, build_batch_move, build_batch_stat, build_batch_delete
1920
from .services.storage.uploader import put_data, put_file, put_stream

qiniu/auth.py

+3
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ def __init__(self, access_key, secret_key):
5858
self.__access_key = access_key
5959
self.__secret_key = b(secret_key)
6060

61+
def get_access_key(self):
62+
return self.__access_key
63+
6164
def __token(self, data):
6265
data = b(data)
6366
hashed = hmac.new(self.__secret_key, data, sha1)

qiniu/config.py

+8-29
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,17 @@
11
# -*- coding: utf-8 -*-
22

3-
RS_HOST = 'rs.qbox.me' # 管理操作Host
4-
IO_HOST = 'iovip.qbox.me' # 七牛源站Host
5-
RSF_HOST = 'rsf.qbox.me' # 列举操作Host
6-
API_HOST = 'api.qiniu.com' # 数据处理操作Host
3+
from qiniu import zone
74

8-
_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续上传分块大小,该参数为接口规格,暂不支持修改
9-
10-
11-
class Zone(object):
12-
"""七牛上传区域类
13-
14-
该类主要内容上传区域地址。
15-
16-
Attributes:
17-
up_host: 首选上传地址
18-
up_host_backup: 备用上传地址
19-
"""
20-
def __init__(self, up_host, up_host_backup):
21-
"""初始化Zone类"""
22-
self.up_host, self.up_host_backup = up_host, up_host_backup
5+
RS_HOST = 'http://rs.qbox.me' # 管理操作Host
6+
RSF_HOST = 'http://rsf.qbox.me' # 列举操作Host
7+
API_HOST = 'http://api.qiniu.com' # 数据处理操作Host
238

9+
_BLOCK_SIZE = 1024 * 1024 * 4 # 断点续上传分块大小,该参数为接口规格,暂不支持修改
2410

25-
zone0 = Zone('up.qiniu.com', 'upload.qiniu.com')
26-
zone1 = Zone('up-z1.qiniu.com', 'upload-z1.qiniu.com')
2711

2812
_config = {
29-
'default_up_host': zone0.up_host, # 设置为默认上传Host
30-
'default_up_host_backup': zone0.up_host_backup,
13+
'default_zone': zone.Zone(),
3114
'default_rs_host': RS_HOST,
32-
'default_io_host': IO_HOST,
3315
'default_rsf_host': RSF_HOST,
3416
'default_api_host': API_HOST,
3517
'connection_timeout': 30, # 链接超时为时间为30s
@@ -44,15 +26,12 @@ def get_default(key):
4426

4527
def set_default(
4628
default_zone=None, connection_retries=None, connection_pool=None,
47-
connection_timeout=None, default_rs_host=None, default_io_host=None,
29+
connection_timeout=None, default_rs_host=None,
4830
default_rsf_host=None, default_api_host=None):
4931
if default_zone:
50-
_config['default_up_host'] = default_zone.up_host
51-
_config['default_up_host_backup'] = default_zone.up_host_backup
32+
_config['default_zone'] = default_zone
5233
if default_rs_host:
5334
_config['default_rs_host'] = default_rs_host
54-
if default_io_host:
55-
_config['default_io_host'] = default_io_host
5635
if default_rsf_host:
5736
_config['default_rsf_host'] = default_rsf_host
5837
if default_api_host:

qiniu/http.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from requests.auth import AuthBase
66

77
from qiniu import config
8-
from .auth import RequestsAuth
8+
import qiniu.auth
99
from . import __version__
1010

1111

@@ -49,7 +49,7 @@ def _post(url, data, files, auth):
4949
def _get(url, params, auth):
5050
try:
5151
r = requests.get(
52-
url, params=params, auth=RequestsAuth(auth) if auth is not None else None,
52+
url, params=params, auth=qiniu.auth.RequestsAuth(auth) if auth is not None else None,
5353
timeout=config.get_default('connection_timeout'), headers=_headers)
5454
except Exception as e:
5555
return None, ResponseInfo(None, e)
@@ -74,7 +74,7 @@ def _post_file(url, data, files):
7474

7575

7676
def _post_with_auth(url, data, auth):
77-
return _post(url, data, None, RequestsAuth(auth))
77+
return _post(url, data, None, qiniu.auth.RequestsAuth(auth))
7878

7979

8080
class ResponseInfo(object):

qiniu/services/processing/pfop.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,5 +45,5 @@ def execute(self, key, fops, force=None):
4545
if force == 1:
4646
data['force'] = 1
4747

48-
url = 'http://{0}/pfop'.format(config.get_default('default_api_host'))
48+
url = '{0}/pfop'.format(config.get_default('default_api_host'))
4949
return http._post_with_auth(url, data, self.auth)

qiniu/services/storage/bucket.py

+15-9
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
# -*- coding: utf-8 -*-
22

33
from qiniu import config
4-
from qiniu.utils import urlsafe_base64_encode, entry
54
from qiniu import http
5+
from qiniu.utils import urlsafe_base64_encode, entry
66

77

88
class BucketManager(object):
@@ -15,8 +15,12 @@ class BucketManager(object):
1515
auth: 账号管理密钥对,Auth对象
1616
"""
1717

18-
def __init__(self, auth):
18+
def __init__(self, auth, zone=None):
1919
self.auth = auth
20+
if(zone is None):
21+
self.zone = config.get_default('default_zone')
22+
else:
23+
self.zone = zone
2024

2125
def list(self, bucket, prefix=None, marker=None, limit=None, delimiter=None):
2226
"""前缀查询:
@@ -51,7 +55,7 @@ def list(self, bucket, prefix=None, marker=None, limit=None, delimiter=None):
5155
if delimiter is not None:
5256
options['delimiter'] = delimiter
5357

54-
url = 'http://{0}/list'.format(config.get_default('default_rsf_host'))
58+
url = '{0}/list'.format(config.get_default('default_rsf_host'))
5559
ret, info = self.__get(url, options)
5660

5761
eof = False
@@ -172,7 +176,7 @@ def fetch(self, url, bucket, key=None):
172176
"""
173177
resource = urlsafe_base64_encode(url)
174178
to = entry(bucket, key)
175-
return self.__io_do('fetch', resource, 'to/{0}'.format(to))
179+
return self.__io_do(bucket, 'fetch', resource, 'to/{0}'.format(to))
176180

177181
def prefetch(self, bucket, key):
178182
"""镜像回源预取文件:
@@ -189,7 +193,7 @@ def prefetch(self, bucket, key):
189193
一个ResponseInfo对象
190194
"""
191195
resource = entry(bucket, key)
192-
return self.__io_do('prefetch', resource)
196+
return self.__io_do(bucket, 'prefetch', resource)
193197

194198
def change_mime(self, bucket, key, mime):
195199
"""修改文件mimeType:
@@ -227,7 +231,7 @@ def batch(self, operations):
227231
]
228232
一个ResponseInfo对象
229233
"""
230-
url = 'http://{0}/batch'.format(config.get_default('default_rs_host'))
234+
url = '{0}/batch'.format(config.get_default('default_rs_host'))
231235
return self.__post(url, dict(op=operations))
232236

233237
def buckets(self):
@@ -245,12 +249,14 @@ def buckets(self):
245249
def __rs_do(self, operation, *args):
246250
return self.__server_do(config.get_default('default_rs_host'), operation, *args)
247251

248-
def __io_do(self, operation, *args):
249-
return self.__server_do(config.get_default('default_io_host'), operation, *args)
252+
def __io_do(self, bucket, operation, *args):
253+
ak = self.auth.get_access_key()
254+
io_host = self.zone.get_io_host(ak, bucket)
255+
return self.__server_do(io_host, operation, *args)
250256

251257
def __server_do(self, host, operation, *args):
252258
cmd = _build_op(operation, *args)
253-
url = 'http://{0}/{1}'.format(host, cmd)
259+
url = '{0}/{1}'.format(host, cmd)
254260
return self.__post(url)
255261

256262
def __post(self, url, data=None):

qiniu/services/storage/uploader.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None
7777
fields['key'] = key
7878

7979
fields['token'] = up_token
80-
url = 'http://' + config.get_default('default_up_host') + '/'
80+
url = config.get_default('default_zone').get_up_host_by_token(up_token) + '/'
8181
# name = key if key else file_name
8282

8383
fname = file_name
@@ -87,7 +87,7 @@ def _form_put(up_token, key, data, params, mime_type, crc, progress_handler=None
8787
r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
8888
if r is None and info.need_retry():
8989
if info.connect_failed:
90-
url = 'http://' + config.get_default('default_up_host_backup') + '/'
90+
url = config.get_default('default_zone').get_up_host_backup_by_token(up_token) + '/'
9191
if hasattr(data, 'read') is False:
9292
pass
9393
elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
@@ -170,7 +170,7 @@ def recovery_from_record(self):
170170
def upload(self):
171171
"""上传操作"""
172172
self.blockStatus = []
173-
host = config.get_default('default_up_host')
173+
host = config.get_default('default_zone').get_up_host_by_token(self.up_token)
174174
offset = self.recovery_from_record()
175175
for block in _file_iter(self.input_stream, config._BLOCK_SIZE, offset):
176176
length = len(block)
@@ -179,7 +179,7 @@ def upload(self):
179179
if ret is None and not info.need_retry():
180180
return ret, info
181181
if info.connect_failed():
182-
host = config.get_default('default_up_host_backup')
182+
host = config.get_default('default_zone').get_up_host_backup_by_token(self.up_token)
183183
if info.need_retry() or crc != ret['crc32']:
184184
ret, info = self.make_block(block, length, host)
185185
if ret is None or crc != ret['crc32']:
@@ -197,10 +197,10 @@ def make_block(self, block, block_size, host):
197197
return self.post(url, block)
198198

199199
def block_url(self, host, size):
200-
return 'http://{0}/mkblk/{1}'.format(host, size)
200+
return '{0}/mkblk/{1}'.format(host, size)
201201

202202
def file_url(self, host):
203-
url = ['http://{0}/mkfile/{1}'.format(host, self.size)]
203+
url = ['{0}/mkfile/{1}'.format(host, self.size)]
204204

205205
if self.mime_type:
206206
url.append('mimeType/{0}'.format(urlsafe_base64_encode(self.mime_type)))

qiniu/zone.py

+133
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
# -*- coding: utf-8 -*-
2+
3+
import os
4+
import time
5+
6+
from qiniu import http
7+
from qiniu import compat, utils
8+
9+
UC_HOST = 'https://uc.qbox.me' # 获取空间信息Host
10+
11+
12+
class Zone(object):
13+
"""七牛上传区域类
14+
15+
该类主要内容上传区域地址。
16+
17+
Attributes:
18+
up_host: 首选上传地址
19+
up_host_backup: 备用上传地址
20+
"""
21+
def __init__(self, up_host=None, up_host_backup=None, io_host=None, host_cache={}, scheme="http"):
22+
"""初始化Zone类"""
23+
self.up_host, self.up_host_backup, self.io_host = up_host, up_host_backup, io_host
24+
self.host_cache = host_cache
25+
self.scheme = scheme
26+
27+
def get_up_host_by_token(self, up_token):
28+
ak, bucket = self.unmarshal_up_token(up_token)
29+
up_hosts = self.get_up_host(ak, bucket)
30+
return up_hosts[0]
31+
32+
def get_up_host_backup_by_token(self, up_token):
33+
ak, bucket = self.unmarshal_up_token(up_token)
34+
up_hosts = self.get_up_host(ak, bucket)
35+
36+
if (len(up_hosts) <= 1):
37+
up_host = up_hosts[0]
38+
else:
39+
up_host = up_hosts[1]
40+
return up_host
41+
42+
def get_io_host(self, ak, bucket):
43+
bucket_hosts = self.get_bucket_hosts(ak, bucket)
44+
io_hosts = bucket_hosts['ioHosts']
45+
return io_hosts[0]
46+
47+
def get_up_host(self, ak, bucket):
48+
bucket_hosts = self.get_bucket_hosts(ak, bucket)
49+
up_hosts = bucket_hosts['upHosts']
50+
return up_hosts
51+
52+
def unmarshal_up_token(self, up_token):
53+
token = up_token.split(':')
54+
if(len(token) != 3):
55+
raise ValueError('invalid up_token')
56+
57+
ak = token[0]
58+
policy = compat.json.loads(str(utils.urlsafe_base64_decode(token[2]), "utf-8"))
59+
60+
scope = policy["scope"]
61+
bucket = scope
62+
if(':' in scope):
63+
bucket = scope.split(':')[0]
64+
65+
return ak, bucket
66+
67+
def get_bucket_hosts(self, ak, bucket):
68+
key = self.scheme + ":" + ak + ":" + bucket
69+
70+
bucket_hosts = self.get_bucket_hosts_to_cache(key)
71+
if(len(bucket_hosts) > 0):
72+
return bucket_hosts
73+
74+
hosts = compat.json.loads(self.bucket_hosts(ak, bucket))
75+
76+
scheme_hosts = hosts[self.scheme]
77+
bucket_hosts = {
78+
'upHosts': scheme_hosts['up'],
79+
'ioHosts': scheme_hosts['io'],
80+
'deadline': int(time.time()) + hosts['ttl']
81+
}
82+
83+
self.set_bucket_hosts_to_cache(key, bucket_hosts)
84+
85+
# hosts = self.bucket_hosts(ak, bucket)
86+
# self.up_host = compat.json.loads(hosts)[self.scheme]["up"][0]
87+
# self.up_host_backup = compat.json.loads(hosts)[self.scheme]["up"][1]
88+
# self.io_host = compat.json.loads(hosts)[self.scheme]["io"][0]
89+
return bucket_hosts
90+
91+
def get_bucket_hosts_to_cache(self, key):
92+
ret = []
93+
if(len(self.host_cache) == 0):
94+
self.host_cache_from_file()
95+
96+
if(not (key in self.host_cache)):
97+
return ret
98+
99+
if(self.host_cache[key]['deadline'] > time.time()):
100+
ret = self.host_cache[key]
101+
102+
return ret
103+
104+
def set_bucket_hosts_to_cache(self, key, val):
105+
self.host_cache[key] = val
106+
self.host_cache_to_file()
107+
return
108+
109+
def host_cache_from_file(self):
110+
path = self.host_cache_file_path()
111+
if not os.path.isfile(path):
112+
return None
113+
with open(path, 'r') as f:
114+
bucket_hosts = compat.json.load(f)
115+
self.host_cache = bucket_hosts
116+
f.close()
117+
return
118+
119+
def host_cache_to_file(self):
120+
path = self.host_cache_file_path()
121+
with open(path, 'w') as f:
122+
compat.json.dump(self.host_cache, f)
123+
f.close()
124+
125+
def host_cache_file_path(self):
126+
home = os.getenv("HOME")
127+
return home + "/.qiniu_pythonsdk_hostscache2.json"
128+
129+
def bucket_hosts(self, ak, bucket):
130+
url = "{0}/v1/query?ak={1}&bucket={2}".format(UC_HOST, ak, bucket)
131+
ret, info = http._get(url, None, None)
132+
data = compat.json.dumps(ret, separators=(',', ':'))
133+
return data

0 commit comments

Comments
 (0)