Notification in Apache Airflow | Airflow Notifications

Posted on by By admin, in Apache Airflow | 0

it is pretty important for us to be notified when our data pipeline is done or where thereis a failure.For example. that’s something we want to do and we can either do it through an email notification or through aslack notification. In the below example we will check the email notification.


from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "airflow",
    "email_on_failure": False,
    "email_on_retry": False,
    "email": "admin@localhost.com",
    "retries": 1,
    "retry_delay": timedelta(minutes=5)
}

def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
            with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')

with DAG("forex_data_pipeline", start_date=datetime(2021, 1 ,1), 
schedule_interval="@daily", default_args=default_args, catchup=False) as dag:

is_forex_rates_available = HttpSensor(
task_id="is_forex_rates_available",
http_conn_id="forex_api",
        endpoint="marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b",
response_check=lambda response: "rates" in response.text,
poke_interval=5,
        timeout=20
    )

is_forex_currencies_file_available = FileSensor(
task_id="is_forex_currencies_file_available",
fs_conn_id="forex_path",
filepath="forex_currencies.csv",
poke_interval=5,
        timeout=20
    )

downloading_rates = PythonOperator(
task_id="downloading_rates",
python_callable=download_rates
    )

saving_rates = BashOperator(
task_id="saving_rates",
bash_command="""
hdfsdfs -mkdir -p /forex && \
hdfsdfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
        """
    )


send_email_notification = EmailOperator(
task_id="send_email_notification",
        to="abhishekmishra776@gmail..com",
        subject="forex_data_pipeline",
html_content="<h3>forex_data_pipeline</h3>"
    )

  • In order to send the email we have to configure SMTP detail inairflow.cfg which is config file below is given example
  • Notification in Apache Airflow

  • After modifiying the settings we have to restart the airflow server

Now we need add the EmailOperator for this, we will use from airflow.operators.email import EmailOperator

  • After that we can provide the task id of this task in our case send_email_notification
  • To=emailaddress which you want to send the email
  • Subject is the subject of the mail
  • html_content this is basically the content of the email which we will send

Now we need to test this if this is working or not we can run the below command in order to test

airflow tasks test forex_data_pipelinesend_email_notification 2023-11-01

And we will be able to see the success message and also we can check our inbox in order to verify

We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments