-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustomer_churn_dag.py
71 lines (59 loc) · 2.26 KB
/
customer_churn_dag.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from airflow import DAG
from datetime import timedelta, datetime
from airflow.operators.python import PythonOperator
from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
import time
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
def glue_job_s3_redshift_transfer(job_name, **kwargs):
#create connection between airflow & AWS
session = AwsGenericHook(aws_conn_id='aws_s3_conn')
# Get a client in the same region as the Glue job
boto3_session = session.get_session(region_name='ap-southeast-1')
# Trigger the job using its name
client = boto3_session.client('glue')
client.start_job_run(
JobName=job_name,
)
def get_run_id():
time.sleep(10)
session = AwsGenericHook(aws_conn_id='aws_s3_conn')
boto3_session = session.get_session(region_name='ap-southeast-1')
glue_client = boto3_session.client('glue')
response = glue_client.get_job_runs(JobName="s3_upload_to_redshift_gluejob")
job_run_id = response["JobRuns"][0]["Id"]
return job_run_id
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 8, 1),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=15)
}
with DAG('customer_churn_dag',
default_args=default_args,
schedule_interval = '@weekly',
catchup=False) as dag:
glue_job_trigger = PythonOperator(
task_id='tsk_glue_job_trigger',
python_callable=glue_job_s3_redshift_transfer,
op_kwargs={
'job_name': 's3_upload_to_redshift_gluejob'
},
)
grab_glue_job_run_id = PythonOperator(
task_id='tsk_grab_glue_job_run_id',
python_callable=get_run_id,
)
is_glue_job_finish_running = GlueJobSensor(
task_id="tsk_is_glue_job_finish_running",
job_name='s3_upload_to_redshift_gluejob',
run_id='{{task_instance.xcom_pull("tsk_grab_glue_job_run_id")}}',
verbose=True, # prints glue job logs in airflow logs
aws_conn_id='aws_s3_conn',
poke_interval=60,
timeout=3600,
)
glue_job_trigger >> grab_glue_job_run_id >> is_glue_job_finish_running