What is operator in Apache Airflow – Airflow operators

What is operator in Apache Airflow

An Operator is conceptually a template for a predefined task, that you can just define declaratively inside your DAG. Airflow has a very extensive set of operators available. Some popular operators from core include

  • BashOperator – As the name suggest executes a bash command
  • PythonOperator – As the name suggest calls an arbitrary Python function
  • EmailOperator – As the name suggest sends an email

Let’s understand python operator this by a simple example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import requests
import csv
import json

default_args = {
# Download forex rates according to the currencies we want to watch
# described in the file forex_currencies.csv
def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
         with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)

with DAG("forex_data_pipeline",start_date=datetime(2023,11,14),schedule_interval="@daily",
default_args=default_args,catchup=False) as dag:

download_rates = PythonOperator(

save_forex_rates = BashOperator(
hdfsdfs -mkdir -p /forex && \
hdfsdfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex

Now let’s understand each parameter

  • First we need to import from airflow.operators.python import PythonOperator
  • Than we can provide the task id and it must be unique for that DAG
  • For the python operator, we have python_callable parameters that we have to specify
  • python_callable = python function which we have created in our case download_rates function
  • download_rates() function the function is reading a CSV file containing information about currency pairs

download exchange rates for each currency pair and store the data in a JSON file

Now we are ready for the testing of our file sensor command is given below

airflow tasks test forex_data_pipelinedownload_rates 2023-11-01

After running the command we should get success message which indicate our task has been executed successfully

