A Comprehensive Guide to Apache Airflow

Yuvraj singh
The Cloudside View
Published in
7 min readFeb 7, 2024

--

Apache Airflow is a powerful open-source platform that allows you to schedule and orchestrate complex workflows. In this guide, we’ll explore the creation of distinct Directed Acyclic Graphs (DAGs) using Airflow. Each DAG is designed for different purposes and demonstrates various operators and functionalities.

Before delving into the intricacies of creating Apache Airflow Directed Acyclic Graphs (DAGs), let’s ensure you have Airflow installed on your system. If not, follow this Airflow Installation Guide to set up Airflow on your machine.

Now, let’s explore essential DAGs using Apache Airflow.

Key Aspects of Airflow Tasks:

  1. Operators: Tasks are implemented using operators. An operator defines the logic of a task, and there are various built-in operators for different types of tasks (e.g., BashOperator for running shell commands, PythonOperator for executing Python functions, etc.). Users can also create custom operators for specific use cases.
Example: Python Operator

2. Task Instances: When a DAG is executed, it creates instances for each workflow task. Task instances represent the actual occurrence of a task within a specific run of the DAG. Each task instance has a unique identifier and can be in one of several states, such as queued, running, success, or failure.

3. Dependencies: Tasks in a DAG can have dependencies on other tasks. These dependencies determine the order of execution. A task can only run if all its upstream tasks (tasks it depends on) have been completed. Dependencies are defined explicitly when constructing the DAG.

Directed Acyclic Graph

4. Logging and Monitoring: Airflow provides logging and monitoring capabilities for tasks. You can view the logs of each task execution, and Airflow also integrates with external tools and platforms for monitoring and alerting.

Understanding the Basic Process of Writing a DAG in Airflow:

Writing a Directed Acyclic Graph (DAG) in Apache Airflow involves several steps. Below is a step-by-step guide on how to create a basic DAG in Airflow:

  1. Import Dependencies: Start by importing the necessary modules and classes from the Airflow library. Commonly used classes include DAG, operators, and timedelta.
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

2. Define Default Arguments: Set default arguments for your DAG. These arguments include metadata like the owner, start date, and any other parameters you want to apply globally to all tasks in the DAG.

default_args = {
'owner': 'your_name',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

3. Instantiate the DAG: Create an instance of the DAG class, passing the DAG ID and default arguments.

dag = DAG('my_dag', default_args=default_args, schedule_interval=timedelta(days=1))

The schedule_interval parameter determines how often the DAG should run. In this example, the DAG is scheduled to run daily.

4. Define Tasks: Define the tasks that make up your DAG. Tasks are instantiated using operators. Airflow provides various built-in operators (e.g., BashOperator, PythonOperator, DummyOperator) that you can use based on the type of task you want to perform.

task1 = DummyOperator(task_id='task_1', dag=dag)
task2 = PythonOperator(task_id='task_2', python_callable=my_python_function, dag=dag)

In this example, DummyOperator represents a task that does nothing, and PythonOperator represents a task that executes a Python function (my_python_function).

5. Set Up Dependencies: Define the dependencies between tasks. Use the set_downstream or >> operator to establish the order of task execution.

task1 >> task2

In this example, task2 depends on the successful completion of task1.

6. Define Additional Tasks (Optional): Add more tasks to your DAG as needed. Repeat the process of defining tasks, setting dependencies, and adding them to the DAG.

7. Save the DAG File: Save your DAG file with an .py extension in the Airflow DAGs directory. By default, Airflow looks for DAG files in the dags/ folder within your Airflow installation.

8. Run Airflow Scheduler: Start the Airflow scheduler to begin executing your DAG. The scheduler will use the specified schedule_interval to determine when to trigger DAG runs.

airflow scheduler

9. Monitor DAG Execution: Use the Airflow web interface or command-line tools to monitor the progress of your DAG runs. You can view logs, check task status, and troubleshoot any issues that may arise.

DAG: Basic Dummy Operator DAG (ETL)

from airflow import DAG 
from airflow.operators.dummy import DummyOperator
from datetime import datetime as dt

def_args = {
"owner": "airflow",
"start_date": dt(2024, 1, 1)
}

with DAG("ETL",
catchup=False,
default_args=def_args) as dag:

start = DummyOperator(task_id="START")
e = DummyOperator(task_id="EXTRACTION")
t = DummyOperator(task_id="TRANSFORMATION")
l = DummyOperator(task_id="LOADING")
end = DummyOperator(task_id="END")

start >> e >> t >> l >> end

DAG Configuration:

  • DAG Name: ETL
  • Catchup: False (This means that the scheduler should only consider the start_date for scheduling and should not backfill or “catch up” with missed DAG runs.)

def_args = {
“owner”: “airflow”,
“start_date”: dt(2024, 1, 1)
}

Workflow Explanation:

Tasks:

  1. START: Task ID: “START”
    Description: Represents the beginning of the workflow.
    Dependency: None (It is the initial task).
  2. EXTRACTION: Task ID: “EXTRACTION”
    Description: Represents the task for extracting data.
    Dependency: Depends on “START” (It will run after the “START” task completes).
  3. TRANSFORMATION: Task ID: “TRANSFORMATION”
    Description: Represents the task of transforming data.
    Dependency: Depends on “EXTRACTION” (It will run after the “EXTRACTION” task completes).
  4. LOADING: Task ID: “LOADING”
    Description: Represents the task for loading data.
    Dependency: Depends on “TRANSFORMATION” (It will run after the “TRANSFORMATION” task is completed).
  5. END: Task ID: “END”
    Description: Represents the end of the workflow.
    Dependency: Depends on “LOADING” (It will run after the “LOADING” task completes).
start >> e >> t >> l >> end

This section establishes the order of task execution by defining dependencies. The >> operator indicates that a task on the left should be executed before the task on the right. In this case, the workflow follows the sequence: start >> e >> t >> l >> end.

XCOM

“XCom”(Cross-Communication) is a mechanism for tasks to exchange messages, small amounts of data, or metadata during the execution of a Directed Acyclic Graph (DAG).

How XCom Works:

  1. Task Extract Produces Data: Task Extract, during its execution, produces some data that it wants to share with other tasks.
  2. XCom Push: Task A uses the xcom_push method to push the data to the XCom system. This data can be a string, numeric value, dictionary, or any pickable object.
  3. Task Transform, load Pulls Data: Another task, Task Transform, Load, pulls the data from the XCom system using the xcom_pull method.

Example:

Let’s consider a simple example where we have 3 tasks, EXTRACT, and produce a value that TRANSFORM, LOAD needs to use.

from datetime import datetime as dtime
from datetime import timedelta
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator

import pandas as pd


GV = "Data Engineering"

def extract_fn():
print("Logic to Extract data")
print("Value of Global Variable is: ", GV)
rtn_val = "Analytics Training"
return rtn_val

# XCOM: Cross = Communication
# ti stands for task instance
def transform_fn(a1, ti):
xcom_pull_obj = ti.xcom_pull(task_ids=["EXTRACT"])
print("type of xcom pull object is {}".format(type(xcom_pull_obj)))
extract_rtn_obj = xcom_pull_obj[0]
print(f"the value of xcom pull object is {extract_rtn_obj}")
print("The value of a1 is ", a1)
print("Logic to Transform Data")
return 10

def load_fn(p1, p2, ti):
xcom_pull_obj = ti.xcom_pull(task_ids=["EXTRACT"])
print('type of xcom pull object is {}'.format(type(xcom_pull_obj)))
print("the value of xcom pull object is {}".format(xcom_pull_obj[0]))
print("The value of p1 is {}".format(p1))
print("The value of p2 is {}".format(p2))
print("Logic to load Data")

default_args = {
"owner": "Airflow",
"retries": 0,
"retry_delay": timedelta(minutes=1),
"start_date": dtime(2024, 1, 30)
}

with DAG("ex_xcom_push_pull", default_args=default_args, catchup=False) as dag:

start = DummyOperator(task_id="START")

e = PythonOperator(
task_id="EXTRACT",
python_callable=extract_fn,
do_xcom_push=True # By default this parameter is set to True
)

t = PythonOperator(
task_id="TRANSFORM",
python_callable=transform_fn,
op_args=["Learning Data Engineering with airflow"],
do_xcom_push=True
)

# op_args -> Operator Arguments
l = PythonOperator(
task_id="LOAD",
python_callable=load_fn,
op_args={"p2": "Engineering", "p1": "Data"}
)

end = DummyOperator(task_id="END")

start >> e >> t >> l >> end

This DAG represents a simple workflow where tasks share information through XCom. The EXTRACT task automatically pushes its return value, and subsequent tasks (TRANSFORM and LOAD) pull this value from XCom to perform their logic.

Database Connection using Airflow Hooks:

  • Airflow Hooks provide a reusable and consistent way to connect to external systems.
  • Hooks are typically used by Operators to interact with external systems.
  • They abstract the connection details and provide methods for common operations.
## Airflow Concepts
from airflow.hooks.postgres_hook import PostgresHook

def get_pg_hook_conn():
con = PostgresHook(postgres_conn_id='postgres_conn_demo')
return con
  • PostgresHook is a hook designed for PostgreSQL databases.
  • It is initialized with the connection ID (postgres_conn_demo in this case).
  • Returns the connection object (con), which is an instance of PostgresHookand is used in the code below as a con_obj.
def etl():
#con_obj = get_db_conn() ## syntax with sqlAlchemy
con_obj = get_pg_hook_conn()
e_df = extract_fn(con_obj)
t_df = transform_fn(e_df)
l_rtn_obj = load_fn(t_df, con_obj)
#con_obj.close()
return None

Once we have the connection object, we can use it in the DAG Tasks:

def extract_fn(con_obj):
select_qry = "select * from public.company"
df = con_obj.get_pandas_df(select_qry) ## Syntax with pg hook
print(df)
return df

CONCLUSION

This comprehensive guide has covered the essential aspects of Apache Airflow Directed Acyclic Graphs (DAGs). We understood the significance of tasks in Airflow, exploring key concepts like operators, task instances, dependencies, and logging. The step-by-step process of writing a DAG was outlined, demonstrating how to import dependencies, define default arguments, instantiate the DAG, define tasks, set up dependencies, and save the DAG file.

--

--

Data Engineer | Airflow | Airbyte | DBT | Tableau | SQL | Postgres | Python Developer | BIGQUERY