Skip to content

Commit 1e21ac9

Browse files
committed
final push
1 parent dfb3ec7 commit 1e21ac9

File tree

3 files changed

+251
-0
lines changed

3 files changed

+251
-0
lines changed

configurations.json

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{"aws_secret_access_key": "XXX",
2+
"aws_access_key_id":"XXX",
3+
"service_name":"s3",
4+
"region_name":"us-east-2"}

dags/end_to_end_pipeline_airflow.py

+192
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
import os, json, boto3, pathlib, psycopg2, requests
2+
import pandas as pd
3+
from airflow import DAG
4+
from pathlib import Path
5+
import airflow.utils.dates
6+
from airflow.sensors.python import PythonSensor
7+
from airflow.operators.python import PythonOperator
8+
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
9+
10+
dag1 = DAG(
11+
dag_id="get_api_send_psql",
12+
start_date=airflow.utils.dates.days_ago(1),
13+
schedule_interval=None)
14+
15+
dag2 = DAG(
16+
dag_id="get_psql_send_s3",
17+
start_date=airflow.utils.dates.days_ago(1),
18+
schedule_interval=None)
19+
20+
def _fetch_and_save():
21+
response_API = requests.get('https://api.covid19india.org/state_district_wise.json')
22+
data = response_API.text
23+
24+
data_json = json.loads(data)
25+
26+
names = [i for i in data_json]
27+
28+
active_numbers = []
29+
for name in names:
30+
active_no = sum([data_json[name]["districtData"][i]["active"] for i in data_json[name]["districtData"] if i!="Unknown"])
31+
active_numbers.append(active_no)
32+
33+
# Check if path is exists or create
34+
main_path = "/tmp/json_file"
35+
pathlib.Path(main_path).mkdir(parents=True, exist_ok=True)
36+
37+
json_object = json.dumps(data_json)
38+
39+
with open(main_path+"/covid_19.json", "w") as outfile:
40+
outfile.write(json_object)
41+
42+
def _read_json_file(**context):
43+
main_path = "/tmp/json_file"
44+
with open(main_path+"/covid_19.json", "r") as openfile:
45+
json_object = json.load(openfile)
46+
47+
names = list(json_object.keys())
48+
49+
active_numbers = []
50+
for name in names:
51+
active_no = sum([json_object[name]["districtData"][i]["active"] for i in json_object[name]["districtData"] if i!="Unknown"])
52+
active_numbers.append(active_no)
53+
54+
context["task_instance"].xcom_push(key="names",value=names)
55+
context["task_instance"].xcom_push(key="active_numbers",value=active_numbers)
56+
57+
def _send_to_postgresql(**context):
58+
names = context["task_instance"].xcom_pull(task_ids="read_json_file", key="names")
59+
active_numbers = context["task_instance"].xcom_pull(task_ids="read_json_file", key="active_numbers")
60+
61+
conn = psycopg2.connect(
62+
database="airflow",
63+
user="airflow",
64+
password="airflow",
65+
host="end_to_end_project_postgres_1", port="5432")
66+
67+
cursor = conn.cursor()
68+
69+
cursor.execute("""CREATE TABLE IF NOT EXISTS covid_table(id SERIAL PRIMARY KEY NOT NULL, district_name TEXT, active_number INT)""")
70+
71+
for name, number in zip(names, active_numbers):
72+
name = f"""'{name}'"""
73+
cursor.execute(f"""INSERT INTO covid_table(district_name, active_number) VALUES({name}, {number})""")
74+
75+
conn.commit()
76+
77+
cursor.close()
78+
79+
conn.close()
80+
81+
os.remove("/tmp/json_file/covid_19.json")
82+
83+
def _wait_for_json(filepath):
84+
return Path(filepath).exists()
85+
86+
def _wait_for_csv(filepath):
87+
return Path(filepath).exists()
88+
89+
def _send_csv_s3():
90+
with open("/tmp/conf_file/configurations.json","r") as output:
91+
configurations = json.load(output)
92+
93+
s3_client = boto3.client(
94+
service_name=configurations["service_name"],
95+
region_name=configurations["region_name"],
96+
aws_access_key_id=configurations["aws_access_key_id"],
97+
aws_secret_access_key=configurations["aws_secret_access_key"])
98+
99+
s3_client.upload_file(
100+
"/tmp/csv/covid_19.csv",
101+
"my-apache-airflow-bucket",
102+
"covid_19.csv")
103+
104+
os.remove("/tmp/csv/covid_19.csv")
105+
106+
def _fetch_psql_save_csv(file_path, csv_path):
107+
conn = psycopg2.connect(
108+
database="airflow",
109+
user="airflow",
110+
password="airflow",
111+
host="end_to_end_project_postgres_1",
112+
port="5432")
113+
114+
cursor = conn.cursor()
115+
116+
cursor.execute("""SELECT * FROM covid_table""")
117+
all_data = cursor.fetchall()
118+
119+
dataframe = pd.DataFrame({}, columns=["district_name", "active_number"])
120+
for data in all_data:
121+
dataframe = dataframe.append({"district_name":data[1], "active_number":data[2]}, ignore_index=True)
122+
123+
pathlib.Path(file_path).mkdir(parents=True, exist_ok=True)
124+
125+
dataframe.to_csv(csv_path, index=False)
126+
print("Dataframe saved!")
127+
128+
cursor.close()
129+
conn.close()
130+
131+
fetch_and_save = PythonOperator(
132+
task_id="fetch_and_save",
133+
python_callable=_fetch_and_save,
134+
dag=dag1)
135+
136+
# Poke interval is for every 30 seconds check
137+
# timeout is to prevent sensor deadlock
138+
# mode is reschedule to make free of the sensor's slot if it is not poking.
139+
wait_for_json = PythonSensor(
140+
task_id="wait_for_json",
141+
python_callable=_wait_for_json,
142+
poke_interval=30,
143+
timeout=24*60*60,
144+
mode="reschedule",
145+
op_kwargs={
146+
"filepath":"/tmp/json_file/covid_19.json"},
147+
dag=dag1)
148+
149+
read_json_file = PythonOperator(
150+
task_id="read_json_file",
151+
python_callable=_read_json_file,
152+
dag=dag1)
153+
154+
send_to_postgresql = PythonOperator(
155+
task_id="send_to_postgresql",
156+
python_callable=_send_to_postgresql,
157+
dag=dag1)
158+
159+
trigger_dag2 = TriggerDagRunOperator(
160+
task_id="trigger_dag2",
161+
trigger_dag_id="get_psql_send_s3",
162+
dag=dag1)
163+
164+
fetch_psql_save_csv = PythonOperator(
165+
task_id="fetch_psql_save_csv",
166+
python_callable=_fetch_psql_save_csv,
167+
op_kwargs={"file_path":"/tmp/csv",
168+
"csv_path":"/tmp/csv/covid_19.csv"},
169+
dag=dag2)
170+
171+
wait_for_csv = PythonSensor(
172+
task_id="wait_for_csv",
173+
python_callable=_wait_for_csv,
174+
poke_interval=30,
175+
timeout=24*60*60,
176+
mode="reschedule",
177+
op_kwargs={"filepath":"/tmp/csv/covid_19.csv"},
178+
dag=dag2)
179+
180+
send_csv_s3 = PythonOperator(
181+
task_id="send_csv_s3",
182+
python_callable=_send_csv_s3,
183+
dag=dag2)
184+
185+
fetch_and_save >> wait_for_json >> read_json_file >> send_to_postgresql >> trigger_dag2
186+
187+
fetch_psql_save_csv >> wait_for_csv >> send_csv_s3
188+
189+
190+
191+
192+

docker-compose.yml

+55
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
version: '3.7'
2+
# ====================================== AIRFLOW ENVIRONMENT VARIABLES =======================================
3+
x-environment: &airflow_environment
4+
- AIRFLOW__CORE__EXECUTOR=LocalExecutor
5+
- AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS=False
6+
- AIRFLOW__CORE__LOAD_EXAMPLES=False
7+
- AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql://airflow:airflow@postgres:5432/airflow
8+
- AIRFLOW__CORE__STORE_DAG_CODE=True
9+
- AIRFLOW__CORE__STORE_SERIALIZED_DAGS=True
10+
- AIRFLOW__WEBSERVER__EXPOSE_CONFIG=True
11+
- AIRFLOW__WEBSERVER__RBAC=False
12+
x-airflow-image: &airflow_image apache/airflow:2.0.0-python3.8
13+
# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ======================================
14+
services:
15+
postgres:
16+
image: postgres:12-alpine
17+
restart: always
18+
environment:
19+
- POSTGRES_USER=airflow
20+
- POSTGRES_PASSWORD=airflow
21+
- POSTGRES_DB=airflow
22+
ports:
23+
- "5432"
24+
init:
25+
image: *airflow_image
26+
depends_on:
27+
- postgres
28+
environment: *airflow_environment
29+
entrypoint: /bin/bash
30+
command: -c 'airflow db upgrade && sleep 5 && airflow users create --username admin --password admin --firstname Anonymous --lastname Admin --role Admin --email [email protected]'
31+
webserver:
32+
image: *airflow_image
33+
restart: always
34+
depends_on:
35+
- postgres
36+
ports:
37+
- "8080:8080"
38+
volumes:
39+
- logs:/opt/airflow/logs
40+
environment: *airflow_environment
41+
command: webserver
42+
scheduler:
43+
image: *airflow_image
44+
restart: always
45+
depends_on:
46+
- postgres
47+
volumes:
48+
- ./dags:/opt/airflow/dags
49+
- ./configurations.json:/tmp/conf_file/configurations.json
50+
- logs:/opt/airflow/logs
51+
environment: *airflow_environment
52+
command: scheduler
53+
54+
volumes:
55+
logs:

0 commit comments

Comments
 (0)