Apache Airflow Monitoring

Posted on by By admin, in Apache Airflow | 0

it is pretty important for us to be notified when our data pipeline is done or where thereis a failure, for example. We can monitor our task through the user interface or logs in the airflow.

Let’s understand this using a simple example

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
import pymongo

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=15),
}

dag = DAG(
    'openlibrary_to_mongodb',
    default_args=default_args,
    description='Fetch data from Open Library API and upload to MongoDB',
    schedule_interval=timedelta(days=1),
)

def fetch_openlibrary_data(**kwargs):
    popular_books_url = 'http://openlibrary.org/search.json?q=popular&limit=20'
    response = requests.get(popular_books_url)

    if response.status_code == 200:
        data = response.json()
        return data
    else:
        raise ValueError(f"Failed to fetch data. Status code: {response.status_code}")


def insert_into_mongodb(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='fetch_openlibrary_data')


    client = pymongo.MongoClient("mongodb+srv://username:password@cluster0.h1pmwtk.mongodb.net/?retryWrites=true&w=majority")
    db = client['books_database']
    collection = db['books_collection']


    result = collection.insert_many(data['docs'])
    print(f"Inserted {len(result.inserted_ids)} documents into MongoDB")

fetch_task = PythonOperator(
    task_id='fetch_openlibrary_data',
    python_callable=fetch_openlibrary_data,
    dag=dag,
)


insert_task = PythonOperator(
    task_id='insert_into_mongodb',
    python_callable=insert_into_mongodb,
    provide_context=True, 
    dag=dag,
)

fetch_task >> insert_task
  • Here we have two tasks first one fetching the data from the openlibrary and second task is uploading the data into mongoDB.
  • If we test the first task separate it will execute successfully.
  • But second task will show the error now we can go the user interface of the airflow.
  • We can check the logs of task on the user interface
  • We also have logs folder of the airflow and over there we have logs for different tasks

Now we need to test this if this is working or not we can run the below command in order to test

airflow tasks test openlibrary_to_mongodbfetch_openlibrary_data 2023-11-01

We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments