A Task is the basic unit of execution in Airflow. Tasks are arranged into DAG’s, and then have upstream and downstream dependencies set between them into order to express the order they should run in.
Task=Operators. Basically we have three types of operators
- Action
- Transfer
- Sensor
Now let’s try to understand task with a simple example
from airflow import DAG from airflow.providers.http.sensors.http import HttpSensor from airflow.operators.bash import BashOperator from datetime import datetime, timedelta default_args = { "owner":"airflow", "email_on_failure":False, "email_on_retry":False, "email":"airflowadmin@airflow.com", "retries":1, "retry_delay":timedelta(minutes=5) }
with DAG(“forex_data_pipeline”,start_date=datetime(2023,11,14),schedule_interval=”@daily”,
default_args=default_args,catchup=False) as dag:
is_forex_rates_available = HttpSensor( task_id="is_forex_rates_available", http_conn_id="forex_api", endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b", response_check=lambda response: "rates" in response.text, poke_interval=5, timeout=20 )
is_forex_rates_available>>save_forex_rates
- We have is_forex_rates_available variable which is instanceHttpSensor
- For each first argument that we always have to specify whatever the operator used is the task ID. Task ID must be unique across all of the operators you have in the same Dag.
- http_conn_id=”forex_api”This specifies the connection ID to be used for making the HTTP request, connections in Airflow store connection information such as the URL that can be referenced by tasks.
- After that we have end point basically the end point is anything that we have after the host.
- After that we have another argument response_check and here we need to specify a python function in our case we have taken lamda which will check rates in response.text
- Another argument is poke_interval which is available for all the sensors check if the condition is true or false In our case, every five seconds you are going to verify if the if the Forex API is available or no.
- Last Argument is timeout As we can guess, it means that after 20s that the sensor is running you will receive a timeout and so the task will end up in failure. Why it is important is to specify a timeout is because you don’t want to keep your sensor running forever.
Now we test this task if this running successfully or not for that we have to run the below command
airflow tasks test forex_data_pipelineis_forex_rates_available 2023-11-01
Here forex_data_pipeline is dag id and is_forex_rates_available is task id and 2023-11-01 is our execution date in past
After the command you will see the following message
We are Airflow expert and have got multiple resources with lot of experience on Airflow as well as many other Modern data stack components as well.
Apache Airflow DAG Tutorial Apache Airflow Tasks Manage task and task group dependencies in Airflow What is Task in Apache Airflow What is TaskFlow in Airflow? What is the difference between task and DAG in Airflow? Working with TaskFlow in Airflow