-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcaching_server.py
152 lines (110 loc) · 4.83 KB
/
caching_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
#!/usr/bin/env python3
import os, threading
from queue import Queue
from flask import Flask, send_file
import requests
TILE_DIR = 'tiles'
next_tile_server = 0
def get_osm_tile_server():
global next_tile_server
ts = ['a','b','c'][next_tile_server]
next_tile_server = (next_tile_server+1) % 3
return 'http://%s.tile.openstreetmap.org' % ts
class TileFetchThread(threading.Thread):
"""
Responsible for fetching a tile image and saving it to disk"""
def __init__(self, job_queue, result_queue):
threading.Thread.__init__(self)
self.job_queue = job_queue
self.result_queue = result_queue
def run(self):
while True:
tile_data = self.job_queue.get()
if tile_data is None:
# Time to quit
break
zoom, x, y, tilefile = tile_data
url = '%s/%d/%d/%d.png' % (get_osm_tile_server(), zoom, x, y)
# Make request to OSM
#print('Fetching %s' % url)
headers = {"User-Agent" :"osmcache 1.0"}
r = requests.get(url, headers=headers)
if r.status_code == 200:
# Save to disk
# XXX could use the expiry header value for better caching
with open(tilefile, 'wb') as f:
f.write(r.content)
self.result_queue.put(('result', (zoom, x, y), r.status_code, tilefile))
class Manager(threading.Thread):
NUM_THREADS = 2
def __init__(self):
threading.Thread.__init__(self)
self.incoming_requests = Queue() # Both tile requests, as well as fetch results
self.job_queue = Queue() # Jobs for fetch threads
self.blocked_requests = {}
self.fetch_threads = []
for i in range(self.NUM_THREADS):
th = TileFetchThread(self.job_queue, self.incoming_requests)
th.start()
self.fetch_threads.append(th)
def run(self):
while True:
request = self.incoming_requests.get()
if request[0] == 'tile':
# Tile request
zoom, x, y = request[1]
result_queue = request[2]
tilefile = os.path.join(TILE_DIR, str(zoom), str(x), str(y)+'.png')
if os.path.isfile(tilefile):
# Tile available, can return result immediately
print('%s in cache' % tilefile)
result_queue.put((200, tilefile))
else:
# Need to let thread fetch it
print('%s NOT in cache, fetching' % tilefile)
# Create parent dirs to tile file
dir = os.path.join(TILE_DIR, str(zoom), str(x))
if not os.path.isdir(dir):
os.makedirs(dir)
key = (zoom, x, y)
if key not in self.blocked_requests:
self.blocked_requests[key] = [ result_queue ]
else:
self.blocked_requests[key].append(result_queue)
# Add new job
self.job_queue.put((zoom, x, y, tilefile))
else:
assert request[0] == 'result'
zoom, x, y = request[1]
status_code = request[2]
tilefile = request[3]
print('z%d x%d y%d (%s) was fetched (%d)' % (zoom, x, y, tilefile, status_code))
if status_code == 200:
result = (200, tilefile)
else:
result = (status_code, None)
key = zoom, x, y
if key in self.blocked_requests:
for rqueue in self.blocked_requests[key]:
rqueue.put(result)
del self.blocked_requests[key]
# Signal fetch threads to quit
for i in range(self.NUM_THREADS):
self.job_queue.put(None)
manager = Manager()
manager.daemon = True
manager.start()
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello World!"
@app.route("/tile/<int:zoom>/<int:x>/<int:y>")
def tile(zoom, x, y):
result_queue = Queue()
manager.incoming_requests.put(('tile', (zoom, x, y), result_queue))
# Blocks until file available
status_code, tile_file = result_queue.get()
if status_code == 200:
return send_file(tile_file, mimetype='image/png', as_attachment=False)
else:
return 'Doh!', status_code