-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtimezone_manager.py
317 lines (273 loc) · 12.1 KB
/
timezone_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import os
import json
import subprocess
from log import MeticulousLogger
from config import (
MeticulousConfig,
CONFIG_USER,
TIME_ZONE,
TIMEZONE_SYNC,
DEFAULT_TIME_ZONE,
)
import asyncio
from datetime import datetime
logger = MeticulousLogger.getLogger(__name__)
TIMEZONE_JSON_FILE_PATH: str = os.getenv(
"TIMEZONE_JSON_FILE_PATH", "/usr/share/zoneinfo/UI_timezones.json"
)
class TimezoneManagerError(Exception):
pass
class TimezoneManager:
__system_timezone: str = ""
__system_synced: bool = False
@staticmethod
def init():
TimezoneManager.validate_timezones_json()
TimezoneManager.__system_timezone = TimezoneManager.get_system_timezone()
target_timezone = MeticulousConfig[CONFIG_USER][TIME_ZONE]
if target_timezone != TimezoneManager.__system_timezone:
if target_timezone == DEFAULT_TIME_ZONE:
logger.info("No timezone was ever configured")
if TimezoneManager.__system_timezone == "Etc/UTC":
return
logger.info("Setting system timezone to Etc/UTC as a default")
target_timezone = "Etc/UTC"
else:
logger.warning(
f"user config and system timezones confilct, updating system config to {MeticulousConfig[CONFIG_USER][TIME_ZONE]}"
)
# Try to set the system_timezone to the user specified tz
try:
TimezoneManager.set_system_timezone(target_timezone)
except TimezoneManagerError as e:
# If fails, set the system_timezone as the user timezone and report the error
logger.error(
f"failed to set system TZ, syncing user TZ with system to {TimezoneManager.__system_timezone}. Error: {e}"
)
MeticulousConfig[CONFIG_USER][
TIME_ZONE
] = TimezoneManager.__system_timezone
finally:
MeticulousConfig.save()
@staticmethod
def update_timezone(new_timezone: str) -> None:
stripped_new_tz = new_timezone.rstrip('"').lstrip('"')
if MeticulousConfig[CONFIG_USER][TIME_ZONE] != stripped_new_tz:
try:
TimezoneManager.set_system_timezone(stripped_new_tz)
logger.debug("update timezone status: Success")
except TimezoneManagerError as e:
logger.error(f"Error updating timezone: {e}")
raise TimezoneManagerError(f"Error updating timezone: {e}")
@staticmethod
def set_system_timezone(new_timezone: str) -> str:
logger.debug("setting system timezone")
try:
command = f"timedatectl set-timezone {new_timezone}"
cmd_result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
shell=True,
)
if len(cmd_result.stderr) > 0 or len(cmd_result.stdout) > 0:
error = f"[ Out:{cmd_result.stdout} | Err: {cmd_result.stderr} ]"
raise Exception(error)
logger.info(
f"new system time zone: {TimezoneManager.get_system_timezone()}"
)
except Exception as e:
message = f"Error setting system time zone: {e}"
logger.error(message)
raise TimezoneManagerError(message)
@staticmethod
def get_system_timezone():
system_timezone = ""
try:
command = (
"timedatectl status | grep 'Time zone' | awk -F'[:()]' '{print $2}'"
)
cmd_result = subprocess.run(
command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
check=True,
shell=True,
)
if len(cmd_result.stderr) > 0:
logger.error("error getting system time zone, using last saved by user")
system_timezone = cmd_result.stdout.lstrip().rstrip()
return system_timezone
except Exception:
logger.error("error getting system time zone, using last saved by user")
@staticmethod
def validate_timezones_json():
if os.path.isfile(TIMEZONE_JSON_FILE_PATH):
return
logger.warning("Json file for UI timezones does not exist, generating")
try:
with open(TIMEZONE_JSON_FILE_PATH, "w") as json_file:
json_file.write(json.dumps(TimezoneManager.get_organized_timezones()))
except Exception as e:
logger.error(f"Error generating json file: {e}")
@staticmethod
def get_organized_timezones() -> dict:
"""
make use of linux embedded timezone information to compile and provide
the user information to make the timezone selection easier
return: json containing timezones grouped by country and identified by zone
i.e
{
...
"Mexico":{
"Bahia Banderas": "America/Bahia_Banderas",
"Cancun": "America/Cancun",
"Chihuahua": "America/Chihuahua",
"Ciudad Juarez": "America/Ciudad_Juarez",
...
}
}
"""
# Read the country -> timezones mapping from /usr/share/zoneinfo/zone.tab
country_to_timezones = {}
# Create a map from country code to country name
country_code_to_name = {}
# map country code to country name
with open("/usr/share/zoneinfo/iso3166.tab") as f:
for line in f:
# Skip comments and empty lines
if line.startswith("#") or not line.strip():
continue
parts = line.split("\t")
country_code = parts[0]
country_name = parts[1].rstrip("\n")
country_code_to_name.setdefault(country_code, country_name)
# map timezones to country code
with open("/usr/share/zoneinfo/zone.tab") as f:
for line in f:
# Skip comments and empty lines
if line.startswith("#") or not line.strip():
continue
parts = line.split("\t")
country_code = parts[0]
timezone = parts[2].rstrip()
country_name = country_code_to_name.get(country_code, "Unknown Country")
country_to_timezones.setdefault(country_name, []).append(timezone)
# Group timezones by country
country_timezones = {}
try:
result = subprocess.run(
["timedatectl", "list-timezones"],
stdout=subprocess.PIPE, # Capture the standard output
stderr=subprocess.PIPE, # Capture the standard error
text=True, # Decode output as a string
check=True, # Raise an exception on non-zero exit codes
)
_all_timezones = result.stdout.splitlines()
for tz in _all_timezones:
fallback_subregion = tz.split("/")[
0
] # if there is no sub-region, take the region as sub region
zone = tz.split("/")[1:] # Skip the first segment (Region)
# separate words liked by "_"
zone = [" ".join(zone[i].split("_")) for i in range(len(zone))]
if zone.__len__() > 0:
sub_region = " - ".join(
zone
) # Combine state/city into "state - city" format
else:
sub_region = "".join(
fallback_subregion
) # Combine state/city into "state - city" format
if tz.startswith("Etc/"):
country_timezones.setdefault("ETC", {})[sub_region] = tz
elif tz in sum(
country_to_timezones.values(), []
): # Flatten the country -> timezone mapping
for country, timezones in country_to_timezones.items():
if tz in timezones:
# if the first sub-region is the same as the country (Argentina, looking at You) don't
# keep that as subregion
if zone[0] == country and len(zone) > 1:
sub_region = " - ".join(zone[1:])
country_timezones.setdefault(country, {})[sub_region] = tz
break
else:
country_timezones.setdefault("LINKS", {})[sub_region] = tz
return country_timezones
except Exception:
logger.error("Could not fetch timezones from system")
return {}
@staticmethod
def get_UI_timezones() -> dict:
try:
UI_TIMEZONES_JSON: dict = {}
TimezoneManager.validate_timezones_json()
with open(TIMEZONE_JSON_FILE_PATH, "r") as json_file:
UI_TIMEZONES_JSON = json.loads(json_file.read())
return UI_TIMEZONES_JSON
except Exception:
logger.error(
"Json file for UI timezones does not exist and could not be created"
)
return {}
@staticmethod
def tz_background_update():
tz_config = MeticulousConfig[CONFIG_USER][TIMEZONE_SYNC]
if tz_config == "automatic" and not TimezoneManager.__system_synced:
logger.info(
"Timezone is set to automatic, fetching timezone in the background"
)
loop = asyncio.get_event_loop()
loop.run_until_complete(TimezoneManager.request_and_sync_tz())
@staticmethod
async def request_and_sync_tz() -> str:
async def request_tz_task() -> str:
# nonlocal error
import aiohttp
TZ_GETTER_URL = "https://analytics.meticulousespresso.com/timezone_ip"
async with aiohttp.ClientSession() as session:
while True: # Retry loop
await asyncio.sleep(1)
async with session.get(TZ_GETTER_URL) as response:
if response.status == 200: # Break on success
str_content = await response.text()
tz = json.loads(str_content).get("tz")
if tz is not None:
TimezoneManager.update_timezone(
tz
) # raises TimezoneManagerError if fails
TimezoneManager.__system_synced = True
return tz
else:
logger.warning("No timezone known for this IP")
TimezoneManager.__system_synced = True
return None
else:
logger.warning(
f"timezone fetch failed with status code: {response.status}, re-fetching"
)
try:
new_tz = await asyncio.wait_for(request_tz_task(), timeout=20)
return new_tz
except asyncio.TimeoutError:
logger.error(
"time out error, server could not be contacted or took too long to answer"
)
raise Exception(
"time out error, server could not be contacted or took too long to answer"
)
except TimezoneManagerError as e:
logger.error(f"failed to set the provided timezone\n\t{e}")
raise Exception("failed to set the provided timezone")
@staticmethod
def set_system_datetime(newDate: datetime):
"""This will raise an exception if the date command fails."""
# Format the datetime in a form acceptable for the date command.
formatted_date = newDate.strftime("%Y-%m-%d %H:%M:%S")
logger.info("Setting system date to %s", formatted_date)
cmd = ["date", "--set", formatted_date]
subprocess.check_call(cmd)