it is pretty important for us to be notified when our data pipeline is done or where thereis a failure, for example. We can monitor our task through the user interface or logs in the airflow.
Let’s understand this using a simple example
from airflow import DAG from airflow.operators.python_operator import PythonOperator from datetime import datetime, timedelta import requests import pymongo default_args = { 'owner': 'airflow', 'start_date': datetime(2023, 1, 1), 'retries': 1, 'retry_delay': timedelta(minutes=15), } dag = DAG( 'openlibrary_to_mongodb', default_args=default_args, description='Fetch data from Open Library API and upload to MongoDB', schedule_interval=timedelta(days=1), ) def fetch_openlibrary_data(**kwargs): popular_books_url = 'http://openlibrary.org/search.json?q=popular&limit=20' response = requests.get(popular_books_url) if response.status_code == 200: data = response.json() return data else: raise ValueError(f"Failed to fetch data. Status code: {response.status_code}") def insert_into_mongodb(**kwargs): ti = kwargs['ti'] data = ti.xcom_pull(task_ids='fetch_openlibrary_data') client = pymongo.MongoClient("mongodb+srv://username:password@cluster0.h1pmwtk.mongodb.net/?retryWrites=true&w=majority") db = client['books_database'] collection = db['books_collection'] result = collection.insert_many(data['docs']) print(f"Inserted {len(result.inserted_ids)} documents into MongoDB") fetch_task = PythonOperator( task_id='fetch_openlibrary_data', python_callable=fetch_openlibrary_data, dag=dag, ) insert_task = PythonOperator( task_id='insert_into_mongodb', python_callable=insert_into_mongodb, provide_context=True, dag=dag, ) fetch_task >> insert_task
- Here we have two tasks first one fetching the data from the openlibrary and second task is uploading the data into mongoDB.
- If we test the first task separate it will execute successfully.
- But second task will show the error now we can go the user interface of the airflow.
- We can check the logs of task on the user interface
- We also have logs folder of the airflow and over there we have logs for different tasks
Now we need to test this if this is working or not we can run the below command in order to test
airflow tasks test openlibrary_to_mongodbfetch_openlibrary_data 2023-11-01
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
Apache Airflow Monitoring How do I check the status of DAG in Airflow? How do I know if Apache Airflow is running? How do I monitor my Airflow scheduler? How to easily monitor Apache Airflow How We Monitor Apache Airflow in Production Logging and Monitoring architecture Apache Airflow Monitor apache airflow tutorial What are the ways to monitor Apache Airflow?
Subscribe
Login
0 Comments