it is pretty important for us to be notified when our data pipeline is done or where thereis a failure.For example. that’s something we want to do and we can either do it through an email notification or through aslack notification. In the below example we will check the email notification.
from airflow import DAG from airflow.providers.http.sensors.http import HttpSensor from airflow.sensors.filesystem import FileSensor from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from airflow.operators.email import EmailOperator from datetime import datetime, timedelta default_args = { "owner": "airflow", "email_on_failure": False, "email_on_retry": False, "email": "admin@localhost.com", "retries": 1, "retry_delay": timedelta(minutes=5) } def download_rates(): BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/" ENDPOINTS = { 'USD': 'api_forex_exchange_usd.json', 'EUR': 'api_forex_exchange_eur.json' } with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies: reader = csv.DictReader(forex_currencies, delimiter=';') for idx, row in enumerate(reader): base = row['base'] with_pairs = row['with_pairs'].split(' ') indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json() outdata = {'base': base, 'rates': {}, 'last_update': indata['date']} for pair in with_pairs: outdata['rates'][pair] = indata['rates'][pair] with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile: json.dump(outdata, outfile) outfile.write('\n') with DAG("forex_data_pipeline", start_date=datetime(2021, 1 ,1), schedule_interval="@daily", default_args=default_args, catchup=False) as dag: is_forex_rates_available = HttpSensor( task_id="is_forex_rates_available", http_conn_id="forex_api", endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b", response_check=lambda response: "rates" in response.text, poke_interval=5, timeout=20 ) is_forex_currencies_file_available = FileSensor( task_id="is_forex_currencies_file_available", fs_conn_id="forex_path", filepath="forex_currencies.csv", poke_interval=5, timeout=20 ) downloading_rates = PythonOperator( task_id="downloading_rates", python_callable=download_rates ) saving_rates = BashOperator( task_id="saving_rates", bash_command=""" hdfsdfs -mkdir -p /forex && \ hdfsdfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex """ ) send_email_notification = EmailOperator( task_id="send_email_notification", to="abhishekmishra776@gmail..com", subject="forex_data_pipeline", html_content="<h3>forex_data_pipeline</h3>" )
- In order to send the email we have to configure SMTP detail inairflow.cfg which is config file below is given example
- After modifiying the settings we have to restart the airflow server
Now we need add the EmailOperator for this, we will use from airflow.operators.email import EmailOperator
- After that we can provide the task id of this task in our case send_email_notification
- To=emailaddress which you want to send the email
- Subject is the subject of the mail
- html_content this is basically the content of the email which we will send
Now we need to test this if this is working or not we can run the below command in order to test
airflow tasks test forex_data_pipelinesend_email_notification 2023-11-01
And we will be able to see the success message and also we can check our inbox in order to verify
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
Airflow Notifications airflow notifications teams Amazon Simple Notification Service (SNS) How to use notification in apache airflow Manage Airflow DAG notifications Notification in Apache Airflow