Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dags/
plugins/
logs/
config/
project_data/

104 changes: 69 additions & 35 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.6.0
# Default: apache/airflow:3.0.4
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
Expand All @@ -44,27 +44,24 @@
#
# Feel free to modify this file to suit your needs.
---
version: '3.8'
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# In order to add custom dependencies or upgrade provider distributions you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.6.0}
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:3.0.4}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__AUTH_MANAGER: airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
# For backward compatibility, with Airflow <2.3
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 300
AIRFLOW__CORE__EXECUTION_API_SERVER_URL: 'http://airflow-apiserver:8080/execution/'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
Expand All @@ -73,9 +70,12 @@ x-airflow-common:
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
# The following line can be used to set a custom config file, stored in the local config folder
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ./project_data:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
Expand All @@ -102,7 +102,9 @@ services:
restart: always

redis:
image: redis:latest
# Redis is limited to 7.2-bookworm due to licencing change
# https://redis.io/blog/redis-adopts-dual-source-available-licensing/
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
Expand All @@ -113,13 +115,13 @@ services:
start_period: 30s
restart: always

airflow-webserver:
airflow-apiserver:
<<: *airflow-common
command: webserver
command: api-server
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"]
interval: 30s
timeout: 10s
retries: 5
Expand All @@ -145,13 +147,29 @@ services:
airflow-init:
condition: service_completed_successfully

airflow-dag-processor:
<<: *airflow-common
command: dag-processor
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type DagProcessorJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
# yamllint disable rule:line-length
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
- 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 30s
timeout: 10s
retries: 5
Expand All @@ -164,6 +182,8 @@ services:
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-apiserver:
condition: service_healthy
airflow-init:
condition: service_completed_successfully

Expand All @@ -189,20 +209,6 @@ services:
command:
- -c
- |
function ver() {
printf "%04d%04d%04d%04d" $${1//./ }
}
airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
airflow_version_comparable=$$(ver $${airflow_version})
min_airflow_version=2.2.0
min_airflow_version_comparable=$$(ver $${min_airflow_version})
if (( airflow_version_comparable < min_airflow_version_comparable )); then
echo
echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
echo
exit 1
fi
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
Expand All @@ -211,6 +217,7 @@ services:
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
export AIRFLOW_UID=$$(id -u)
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
Expand Down Expand Up @@ -245,20 +252,47 @@ services:
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
echo
echo "Creating missing opt dirs if missing:"
echo
mkdir -v -p /opt/airflow/{logs,dags,plugins,config}
echo
echo "Airflow version:"
/entrypoint airflow version
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Running airflow config list to create default config file if missing."
echo
/entrypoint airflow config list >/dev/null
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}
echo
echo "Change ownership of files in /opt/airflow to ${AIRFLOW_UID}:0"
echo
chown -R "${AIRFLOW_UID}:0" /opt/airflow/
echo
echo "Change ownership of files in shared volumes to ${AIRFLOW_UID}:0"
echo
chown -v -R "${AIRFLOW_UID}:0" /opt/airflow/{logs,dags,plugins,config}
echo
echo "Files in shared volumes:"
echo
ls -la /opt/airflow/{logs,dags,plugins,config}

# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources

airflow-cli:
<<: *airflow-common
Expand All @@ -272,6 +306,8 @@ services:
- bash
- -c
- airflow
depends_on:
<<: *airflow-common-depends-on

# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
Expand Down Expand Up @@ -306,7 +342,5 @@ services:
DIRECTORY_NAME: ${DIRECTORY_NAME:-project}
DESTINATION_PATH: ${DESTINATION_PATH:-/app/sync}
INTERVAL: ${INTERVAL:-10}

volumes:
postgres-db-volume:

31 changes: 14 additions & 17 deletions examples/dags/sample_bash_operator_dag.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator

from airflow import DAG
from airflow.providers.standard.operators.bash import BashOperator

default_args = {
'owner' : 'Mostafa Ghadimi',
'description' : 'Use of the DockerOperator',
'depend_on_past' : False,
'start_date' : datetime(2023, 4, 4),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
"owner": "Mostafa Ghadimi",
"description": "Use of the BashOperator",
"depend_on_past": False,
"start_date": datetime(2023, 4, 4),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG('docker_dag_sample', default_args=default_args, schedule_interval="5 10 * * *", catchup=False) as dag:
t1 = BashOperator(
task_id='print_hello',
bash_command='echo "hello world"'
)
with DAG(
"bash_dag_sample", default_args=default_args, schedule="5 10 * * *", catchup=False
) as dag:
t1 = BashOperator(task_id="print_hello", bash_command='echo "hello world"')

t1

39 changes: 21 additions & 18 deletions examples/dags/sample_docker_operator_dag.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.bash_operator import BashOperator
from airflow.operators.docker_operator import DockerOperator

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator

default_args = {
'owner' : 'Mostafa Ghadimi',
'description' : 'Use of the DockerOperator',
'depend_on_past' : False,
'start_date' : datetime(2023, 4, 23),
'email_on_failure' : False,
'email_on_retry' : False,
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
"owner": "Mostafa Ghadimi",
"description": "Use of the DockerOperator",
"depend_on_past": False,
"start_date": datetime(2023, 4, 23),
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
}

with DAG('docker_dag_v2', default_args=default_args, schedule_interval="5 10 * * *", catchup=False) as dag:
with DAG(
"docker_dag_v2",
default_args=default_args,
schedule="5 10 * * *",
catchup=False,
) as dag:
t1 = DockerOperator(
task_id='docker_command',
image='alpine:latest',
api_version='auto',
auto_remove=True,
task_id="docker_command",
image="alpine:latest",
api_version="auto",
auto_remove="success",
command="/bin/sleep 30",
docker_url="unix://var/run/docker.sock",
network_mode="bridge"
network_mode="bridge",
)

t1