Skip to content

Commit 79a51f1

Browse files
author
Ruben L. Mendoza
authored
Monitoring replication files (#297)
* Add python script that going to monitoring replication files * Add docstring for replication monitoring script * Create empty files * Add job config * Add env var for monitoring * Update comand to start monitoring * Add python header * Change permision for monitoring script * Update start point for monitoring
1 parent acee14e commit 79a51f1

File tree

4 files changed

+299
-0
lines changed

4 files changed

+299
-0
lines changed

images/replication-job/Dockerfile

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
FROM developmentseed/osmseed-osm-processor:v2
22

3+
RUN pip3 install boto3
34
COPY ./start.sh /
5+
COPY monitoring.py /
46
WORKDIR /mnt/data
57
CMD /start.sh

images/replication-job/monitoring.py

+193
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
#!/usr/bin/env python3
2+
3+
import os
4+
import boto3
5+
from datetime import datetime
6+
from botocore.exceptions import ClientError
7+
import gzip
8+
9+
s3 = boto3.client("s3")
10+
11+
12+
def write_monitoring_status(bucket_name, s3_file_name, list_):
13+
"""Write output from monitoring the replication files.
14+
15+
Args:
16+
bucket_name (str): bucket name
17+
s3_file_name (str): file name
18+
list_ (list): list of string to write in the file
19+
"""
20+
dir_name = os.path.dirname(s3_file_name)
21+
if dir_name:
22+
os.makedirs(dir_name, exist_ok=True)
23+
list_str = "\n".join(map(str, list_)) + "\n"
24+
with open(s3_file_name, "w") as f:
25+
f.write(list_str)
26+
s3.upload_file(s3_file_name, bucket_name, f"replication_monitoring/{s3_file_name}")
27+
28+
29+
def get_value_from_state(bucket, s3_key, file_value):
30+
"""Retrieve the 'sequenceNumber' value from the state file.
31+
32+
Args:
33+
bucket_name (str): bucket name
34+
s3_key (str): status file
35+
36+
Returns:
37+
number: sequence number
38+
"""
39+
local_file = "/tmp/state.txt"
40+
s3.download_file(bucket, s3_key, local_file)
41+
with open(local_file, "r") as file:
42+
for line in file:
43+
if line.startswith(f"{file_value}="):
44+
value = line.split("=")[1].strip()
45+
return value
46+
47+
48+
def process_sequence(n):
49+
"""Prefix the sequence number with zeros and divide it into groups
50+
51+
Args:
52+
n (number): seqeunce number
53+
54+
Returns:
55+
tuple: groups of string
56+
"""
57+
s = str(n).zfill(9)
58+
return int(s[:3]), int(s[3:6]), int(s[6:])
59+
60+
61+
def check_missing_files(bucket_name, key, end_seq):
62+
"""Verify the absence of certain files in the bucket's folders.
63+
64+
Args:
65+
bucket_name (str): bucket name
66+
key (str): folder name
67+
end_seq (number): last replication sequence
68+
69+
Returns:
70+
list: List of missing files
71+
"""
72+
print(f"Checking files...s3://{bucket_name}/{key}...range(0, {end_seq})")
73+
s3 = boto3.resource("s3")
74+
bucket = s3.Bucket(bucket_name)
75+
existing_files = [obj.key for obj in bucket.objects.filter(Prefix=key)]
76+
missing_files = []
77+
for i in range(1, end_seq):
78+
sequence_file = str(i).zfill(3)
79+
status_filename = os.path.join(key, "{}.state.txt".format(sequence_file))
80+
osm_filename = os.path.join(key, "{}.osc.gz".format(sequence_file))
81+
if status_filename not in existing_files:
82+
missing_files.append(status_filename)
83+
if osm_filename not in existing_files:
84+
missing_files.append(osm_filename)
85+
return missing_files
86+
87+
88+
def get_missing_files(
89+
bucket_name,
90+
replication_folder,
91+
last_replication_sequence,
92+
start_monitoring_sequence,
93+
):
94+
"""Loop bucket's folder to get the missing files
95+
96+
Args:
97+
bucket_name (str): bucket name
98+
replication_folder (str): replication folder
99+
last_replication_sequence (number): generated the last replication sequence using Osmosis.
100+
start_monitoring_sequence (_type_): start point of monitoring from previous tasks
101+
102+
Returns:
103+
list: List of missing files
104+
"""
105+
start_a, start_b, start_c = process_sequence(start_monitoring_sequence)
106+
end_a, end_b, end_c = process_sequence(last_replication_sequence)
107+
108+
missing_files = []
109+
110+
for a in range(start_a, end_a + 1):
111+
for b in range(start_b, end_b + 1):
112+
fixed_end_c = 1000
113+
if b == end_b:
114+
fixed_end_c = end_c + 1
115+
key = f"{replication_folder}/{str(a).zfill(3)}/{str(b).zfill(3)}"
116+
m_files = check_missing_files(bucket_name, key, fixed_end_c)
117+
missing_files = missing_files + m_files
118+
119+
return missing_files
120+
121+
122+
def create_state_file(bucket, filename):
123+
folder = os.path.dirname(filename)
124+
current_sequence = int(os.path.splitext(os.path.basename(filename))[0].replace(".state", ""))
125+
previous_sequence = current_sequence - 1
126+
previous_file = os.path.dirname(filename) + "/" + str(previous_sequence).zfill(3) + ".state.txt"
127+
date_str = get_value_from_state(bucket, previous_file, "timestamp")
128+
current_sequence_number = int(folder.split("/")[2] + folder.split("/")[3] + str(current_sequence).zfill(3))
129+
content = f"sequenceNumber={current_sequence_number} \ntxnMaxQueried=6182454 \ntxnActiveList= \ntxnReadyList= \ntxnMax=6182454 \ntimestamp={date_str}"
130+
os.makedirs(folder, exist_ok=True)
131+
with open(filename, "w") as f:
132+
f.write(content)
133+
print(f"Updating missing file... {filename} to s3://{bucket_name}/{filename} ")
134+
s3.upload_file(filename, bucket_name, f"{filename}")
135+
136+
def create_osc_file(bucket_name, filename):
137+
content = """<?xml version='1.0' encoding='UTF-8'?>\n\t<osmChange version="0.6" generator="Osmosis 0.48.0-SNAPSHOT">\n</osmChange>"""
138+
os.makedirs(os.path.dirname(filename), exist_ok=True)
139+
filename_osc = filename.replace(".gz", "")
140+
with open(filename_osc, "w") as f:
141+
f.write(content)
142+
with open(filename_osc, "rb") as f_in:
143+
with gzip.open(filename_osc + ".gz", "wb") as f_out:
144+
f_out.writelines(f_in)
145+
print(f"Updating missing file {filename} to s3://{bucket_name}/{filename} ")
146+
s3.upload_file(filename, bucket_name, f"{filename}")
147+
148+
149+
if __name__ == "__main__":
150+
bucket_name = os.environ["AWS_S3_BUCKET"]
151+
bucket_name = bucket_name.replace("s3://", "")
152+
replication_folder = os.environ["REPLICATION_FOLDER"]
153+
create_missing_files = os.getenv("CREATE_MISSING_FILES", "empty")
154+
155+
## Get last sequence file from replication/minute
156+
STATE_FILE = f"{replication_folder}/state.txt"
157+
last_replication_sequence = int(get_value_from_state(bucket_name, STATE_FILE, "sequenceNumber"))
158+
159+
## Get last monitoring sequence number
160+
try:
161+
STATE_MISSING_FILE = "replication_monitoring/state.txt"
162+
start_monitoring_sequence = int(get_value_from_state(bucket_name, STATE_MISSING_FILE, "sequenceNumber"))
163+
except ClientError as e:
164+
start_monitoring_sequence = int(os.environ["REPLICATION_SEQUENCE_NUMBER"])
165+
166+
missing_files = get_missing_files(
167+
bucket_name,
168+
replication_folder,
169+
last_replication_sequence,
170+
start_monitoring_sequence,
171+
)
172+
173+
## Print error to monitoring (new relic)
174+
if len(missing_files) > 0:
175+
for f in missing_files:
176+
print(f"Error, {f} is missing")
177+
now = datetime.now()
178+
date_str = now.strftime("%Y_%m_%d-%H-%M")
179+
write_monitoring_status(bucket_name, f"missing/{date_str}.txt", missing_files)
180+
181+
# Write missing files in s3
182+
if create_missing_files == "empty":
183+
for file in missing_files:
184+
file_extension = os.path.splitext(file)[1]
185+
if file_extension == ".txt":
186+
create_state_file(bucket_name, file)
187+
if file_extension == ".gz":
188+
create_osc_file(bucket_name, file)
189+
190+
## Write state file
191+
write_monitoring_status(
192+
bucket_name, "state.txt", [f"sequenceNumber={last_replication_sequence}"]
193+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
{{- if .Values.monitoringReplication.enabled -}}
2+
apiVersion: batch/v1
3+
kind: CronJob
4+
metadata:
5+
name: {{ .Release.Name }}-replication-monitoring-job
6+
labels:
7+
app: {{ template "osm-seed.name" . }}
8+
component: replication-monitoring-job
9+
environment: {{ .Values.environment }}
10+
release: {{ .Release.Name }}
11+
spec:
12+
schedule: {{ quote .Values.monitoringReplication.schedule }}
13+
startingDeadlineSeconds: 100
14+
successfulJobsHistoryLimit: 2
15+
failedJobsHistoryLimit: 2
16+
concurrencyPolicy: Forbid
17+
jobTemplate:
18+
spec:
19+
template:
20+
spec:
21+
{{- if .Values.monitoringReplication.nodeSelector.enabled }}
22+
nodeSelector:
23+
{{ .Values.monitoringReplication.nodeSelector.label_key }} : {{ .Values.monitoringReplication.nodeSelector.label_value }}
24+
{{- end }}
25+
containers:
26+
- name: {{ .Release.Name }}-replication-monitoring-job
27+
image: {{ .Values.monitoringReplication.image.name }}:{{ .Values.monitoringReplication.image.tag }}
28+
command: ['/monitoring.py']
29+
{{- if .Values.monitoringReplication.resources.enabled }}
30+
resources:
31+
requests:
32+
memory: {{ .Values.monitoringReplication.resources.requests.memory }}
33+
cpu: {{ .Values.monitoringReplication.resources.requests.cpu }}
34+
limits:
35+
memory: {{ .Values.monitoringReplication.resources.limits.memory }}
36+
cpu: {{ .Values.monitoringReplication.resources.limits.cpu }}
37+
{{- end }}
38+
env:
39+
- name: POSTGRES_HOST
40+
value: {{ .Release.Name }}-db
41+
- name: POSTGRES_DB
42+
value: {{ .Values.db.env.POSTGRES_DB }}
43+
- name: POSTGRES_PASSWORD
44+
value: {{ quote .Values.db.env.POSTGRES_PASSWORD }}
45+
- name: POSTGRES_USER
46+
value: {{ .Values.db.env.POSTGRES_USER }}
47+
- name: CLOUDPROVIDER
48+
value: {{ .Values.cloudProvider }}
49+
- name: REPLICATION_FOLDER
50+
value: replication/minute
51+
- name: CREATE_MISSING_FILES
52+
value: {{ .Values.monitoringReplication.env.CREATE_MISSING_FILES }}
53+
- name: REPLICATION_SEQUENCE_NUMBER
54+
value: {{ .Values.monitoringReplication.env.REPLICATION_SEQUENCE_NUMBER | quote}}
55+
# In case cloudProvider=aws
56+
{{- if eq .Values.cloudProvider "aws" }}
57+
- name: AWS_S3_BUCKET
58+
value: {{ .Values.AWS_S3_BUCKET }}
59+
{{- end }}
60+
# In case cloudProvider=gcp
61+
{{- if eq .Values.cloudProvider "gcp" }}
62+
- name: GCP_STORAGE_BUCKET
63+
value: {{ .Values.GCP_STORAGE_BUCKET }}
64+
{{- end }}
65+
# In case cloudProvider=azure
66+
{{- if eq .Values.cloudProvider "azure" }}
67+
- name: AZURE_STORAGE_ACCOUNT
68+
value: {{ .Values.AZURE_STORAGE_ACCOUNT }}
69+
- name: AZURE_CONTAINER_NAME
70+
value: {{ .Values.AZURE_CONTAINER_NAME }}
71+
- name: AZURE_STORAGE_CONNECTION_STRING
72+
value: {{ .Values.AZURE_STORAGE_CONNECTION_STRING }}
73+
{{- end }}
74+
# Memory optimization for osmosis
75+
{{- if .Values.monitoringReplication.resources.enabled }}
76+
- name: MEMORY_JAVACMD_OPTIONS
77+
value: {{ .Values.monitoringReplication.resources.requests.memory | default "2Gi" | quote}}
78+
{{- end }}
79+
restartPolicy: OnFailure
80+
backoffLimit: 3
81+
{{- end }}

osm-seed/values.yaml

+23
Original file line numberDiff line numberDiff line change
@@ -665,3 +665,26 @@ osmSimpleMetrics:
665665
cpu: '2'
666666
nodeSelector:
667667
enabled: false
668+
669+
# ====================================================================================================
670+
# Variables for osm-simple-metrics
671+
# ====================================================================================================
672+
monitoringReplication:
673+
enabled: false
674+
image:
675+
name: 'developmentseed/osmseed-replication-job'
676+
tag: '0.1.0-n770.hd15c9f2'
677+
schedule: '* */1 * * *'
678+
env:
679+
CREATE_MISSING_FILES: empty
680+
REPLICATION_SEQUENCE_NUMBER: "109789"
681+
resources:
682+
enabled: false
683+
requests:
684+
memory: '1Gi'
685+
cpu: '2'
686+
limits:
687+
memory: '2Gi'
688+
cpu: '2'
689+
nodeSelector:
690+
enabled: false

0 commit comments

Comments
 (0)