What is Task in Apache Airflow

Posted on by By admin, in Apache Airflow | 0

A Task is the basic unit of execution in Airflow. Tasks are arranged into DAG’s, and then have upstream and downstream dependencies set between them into order to express the order they should run in.

Task=Operators. Basically we have three types of operators

  • Action
  • Transfer
  • Sensor

Now let’s try to understand task with a simple example

from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner":"airflow",
    "email_on_failure":False,
    "email_on_retry":False,
    "email":"airflowadmin@airflow.com",
    "retries":1,
    "retry_delay":timedelta(minutes=5)
}

with DAG(“forex_data_pipeline”,start_date=datetime(2023,11,14),schedule_interval=”@daily”,
default_args=default_args,catchup=False) as dag:

is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api",
        endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
poke_interval=5,
        timeout=20
    )

is_forex_rates_available>>save_forex_rates

  • We have is_forex_rates_available variable which is instanceHttpSensor
  • For each first argument that we always have to specify whatever the operator used is the task ID. Task ID must be unique across all of the operators you have in the same Dag.
  • http_conn_id=”forex_api”This specifies the connection ID to be used for making the HTTP request, connections in Airflow store connection information such as the URL that can be referenced by tasks.
  • After that we have end point basically the end point is anything that we have after the host.
  • After that we have another argument response_check and here we need to specify a python function in our case we have taken lamda which will check rates in response.text
  • Another argument is poke_interval which is available for all the sensors check if the condition is true or false In our case, every five seconds you are going to verify if the if the Forex API is available or no.
  • Last Argument is timeout As we can guess, it means that after 20s that the sensor is running you will receive a timeout and so the task will end up in failure. Why it is important is to specify a timeout is because you don’t want to keep your sensor running forever.

Now we test this task if this running successfully or not for that we have to run the below command

airflow tasks test forex_data_pipelineis_forex_rates_available 2023-11-01

Here forex_data_pipeline is dag id and is_forex_rates_available is task id and 2023-11-01 is our execution date in past

After the command you will see the following message

What is Task in Apache Airflow

We are Airflow expert and have got multiple resources with lot of experience on Airflow as well as many other Modern data stack components as well.

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments