What is operator in Apache Airflow
An Operator is conceptually a template for a predefined task, that you can just define declaratively inside your DAG. Airflow has a very extensive set of operators available. Some popular operators from core include
- BashOperator – As the name suggest executes a bash command
- PythonOperator – As the name suggest calls an arbitrary Python function
- EmailOperator – As the name suggest sends an email
Let’s understand python operator this by a simple example
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.bash import BashOperator from datetime import datetime, timedelta import requests import csv import json default_args = { "owner":"airflow", "email_on_failure":False, "email_on_retry":False, "email":"admin@airflow.com", "retries":1, "retry_delay":timedelta(minutes=5) } # Download forex rates according to the currencies we want to watch # described in the file forex_currencies.csv def download_rates(): BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/" ENDPOINTS = { 'USD': 'api_forex_exchange_usd.json', 'EUR': 'api_forex_exchange_eur.json' } with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies: reader = csv.DictReader(forex_currencies, delimiter=';') for idx, row in enumerate(reader): base = row['base'] with_pairs = row['with_pairs'].split(' ') indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json() outdata = {'base': base, 'rates': {}, 'last_update': indata['date']} for pair in with_pairs: outdata['rates'][pair] = indata['rates'][pair] with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile: json.dump(outdata, outfile) outfile.write('\n') with DAG("forex_data_pipeline",start_date=datetime(2023,11,14),schedule_interval="@daily", default_args=default_args,catchup=False) as dag: download_rates = PythonOperator( task_id="download_rates", python_callable=download_rates) save_forex_rates = BashOperator( task_id="save_forex_rates", bash_command=""" hdfsdfs -mkdir -p /forex && \ hdfsdfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex """ )
Now let’s understand each parameter
- First we need to import from airflow.operators.python import PythonOperator
- Than we can provide the task id and it must be unique for that DAG
- For the python operator, we have python_callable parameters that we have to specify
- python_callable = python function which we have created in our case download_rates function
- download_rates() function the function is reading a CSV file containing information about currency pairs
download exchange rates for each currency pair and store the data in a JSON file
Now we are ready for the testing of our file sensor command is given below
airflow tasks test forex_data_pipelinedownload_rates 2023-11-01
After running the command we should get success message which indicate our task has been executed successfully
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
Airflow Documentation Airflow operators airflow operators example airflow operators list Apache Airflow DAG Tutorial Getting to Know Airflow Operators How do you create an Airflow operator? Operators In Apache Airflow Understanding Apache Airflow Operators What is operator in Apache Airflow What is operator in apache airflow with example What is the difference between operator and task in Airflow?