What is operator in Apache Airflow – Airflow operators

Posted on by By admin, in Airbyte | 0

What is operator in Apache Airflow

An Operator is conceptually a template for a predefined task, that you can just define declaratively inside your DAG. Airflow has a very extensive set of operators available. Some popular operators from core include

  • BashOperator – As the name suggest executes a bash command
  • PythonOperator – As the name suggest calls an arbitrary Python function
  • EmailOperator – As the name suggest sends an email

Let’s understand python operator this by a simple example

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import requests
import csv
import json



default_args = {
    "owner":"airflow",
    "email_on_failure":False,
    "email_on_retry":False,
    "email":"admin@airflow.com",
    "retries":1,
    "retry_delay":timedelta(minutes=5)
}
# Download forex rates according to the currencies we want to watch
# described in the file forex_currencies.csv
def download_rates():
    BASE_URL = "https://gist.githubusercontent.com/marclamberti/f45f872dea4dfd3eaa015a4a1af4b39b/raw/"
    ENDPOINTS = {
        'USD': 'api_forex_exchange_usd.json',
        'EUR': 'api_forex_exchange_eur.json'
    }
    with open('/opt/airflow/dags/files/forex_currencies.csv') as forex_currencies:
        reader = csv.DictReader(forex_currencies, delimiter=';')
        for idx, row in enumerate(reader):
            base = row['base']
with_pairs = row['with_pairs'].split(' ')
indata = requests.get(f"{BASE_URL}{ENDPOINTS[base]}").json()
outdata = {'base': base, 'rates': {}, 'last_update': indata['date']}
            for pair in with_pairs:
outdata['rates'][pair] = indata['rates'][pair]
         with open('/opt/airflow/dags/files/forex_rates.json', 'a') as outfile:
json.dump(outdata, outfile)
outfile.write('\n')


with DAG("forex_data_pipeline",start_date=datetime(2023,11,14),schedule_interval="@daily",
default_args=default_args,catchup=False) as dag:

download_rates = PythonOperator(
task_id="download_rates",
python_callable=download_rates)

save_forex_rates = BashOperator(
task_id="save_forex_rates",
bash_command="""
hdfsdfs -mkdir -p /forex && \
hdfsdfs -put -f $AIRFLOW_HOME/dags/files/forex_rates.json /forex
        """
    )

Now let’s understand each parameter

  • First we need to import from airflow.operators.python import PythonOperator
  • Than we can provide the task id and it must be unique for that DAG
  • For the python operator, we have python_callable parameters that we have to specify
  • python_callable = python function which we have created in our case download_rates function
  • download_rates() function the function is reading a CSV file containing information about currency pairs

download exchange rates for each currency pair and store the data in a JSON file

Now we are ready for the testing of our file sensor command is given below

airflow tasks test forex_data_pipelinedownload_rates 2023-11-01

After running the command we should get success message which indicate our task has been executed successfully

We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments