What is sensors in Apache Airflow | Airflow sensors

Posted on by By admin, in Apache Airflow | 0

Sensors are a special type of operator that are designed to do exactly one thing – wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then when success do next task.

Let’s understand this by a simple example

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from datetime import datetime, timedelta


default_args = {
    "owner":"airflow",
    "email_on_failure":False,
    "email_on_retry":False,
    "email":"admin@airflow.com",
    "retries":1,
    "retry_delay":timedelta(minutes=5)
}



with DAG("forex_data_pipeline",start_date=datetime(2023,11,14),schedule_interval="@daily",
default_args=default_args,catchup=False) as dag:

is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api",
        endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
poke_interval=5,
        timeout=20
    )

is_forex_file_available = FileSensor(
task_id="is_forex_file_available",
fs_conn_id="forex_path",
filepath="forex_currencies.csv",
poke_interval=5,
        timeout=20
)

Now let’s understand each parameter

  • First we need to import the airflow.sensors.filesystem import FileSensor
  • Than we can provide the task id and it must be unique for that DAG
  • For the file sensor,there are two parameters that we have to specify
  • fs_conn_id = connection ID (basically the path of the file we want to check)
  • This connection contains the path that the file sensor will check if the file or the folder exists or not
  • Second parameter is file path, which is the name of the file that we are looking for.
  • In our case forex_currencies.csv
  • Another argument is poke_interval which is avilable for all the sensors check if the condition is true or false In our case, every five seconds we are going to verify if the if the is_forex_file_available is available or not.
  • timeout=20 is indicating if the task take more define time it will be mark as fail.

Now we are ready for the testing of our file sensor command is given below

airflow tasks test forex_data_pipelineis_forex_file_available 2023-11-01

What is sensors in Apache Airflow

After running the command we should get success message which indicate our task has been executed successfully

We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments