Sensors are a special type of operator that are designed to do exactly one thing – wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then when success do next task.
Let’s understand this by a simple example
from airflow import DAG from airflow.sensors.filesystem import FileSensor from datetime import datetime, timedelta default_args = { "owner":"airflow", "email_on_failure":False, "email_on_retry":False, "email":"admin@airflow.com", "retries":1, "retry_delay":timedelta(minutes=5) } with DAG("forex_data_pipeline",start_date=datetime(2023,11,14),schedule_interval="@daily", default_args=default_args,catchup=False) as dag: is_forex_rates_available = HttpSensor( task_id="is_forex_rates_available", http_conn_id="forex_api", endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b", response_check=lambda response: "rates" in response.text, poke_interval=5, timeout=20 ) is_forex_file_available = FileSensor( task_id="is_forex_file_available", fs_conn_id="forex_path", filepath="forex_currencies.csv", poke_interval=5, timeout=20 )
Now let’s understand each parameter
- First we need to import the airflow.sensors.filesystem import FileSensor
- Than we can provide the task id and it must be unique for that DAG
- For the file sensor,there are two parameters that we have to specify
- fs_conn_id = connection ID (basically the path of the file we want to check)
- This connection contains the path that the file sensor will check if the file or the folder exists or not
- Second parameter is file path, which is the name of the file that we are looking for.
- In our case forex_currencies.csv
- Another argument is poke_interval which is avilable for all the sensors check if the condition is true or false In our case, every five seconds we are going to verify if the if the is_forex_file_available is available or not.
- timeout=20 is indicating if the task take more define time it will be mark as fail.
Now we are ready for the testing of our file sensor command is given below
airflow tasks test forex_data_pipelineis_forex_file_available 2023-11-01
After running the command we should get success message which indicate our task has been executed successfully
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
airflow sensor example Airflow sensors airflow sql sensor example Apache Airflow sensors in detail What is an Airflow sensor? What is file sensor in Airflow? What is HTTP sensor in Airflow? What is sensors in Apache Airflow What is sensors in apache airflow dag What is the difference between operators and sensors in Airflow?