What is custom operator in Apache Airflow

Airflow allows you to create new operators to suit the requirements of you or your team.You can create any operator you want by extending the airflow.models.baseoperator.BaseOperator

Here is one basic custom function script which print hello as the name suggest

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def __init__(self, name: str, **kwargs) -> None:
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        return message

Constructor – Define the parameters required for the operator.

Execute – The code to execute when the runner calls the operator.

Now we need to import this to our main dag file

from airflow.decorators import dag, task
from datetime import datetime
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata
from astro.constants import FileType
from include.dbt.cosmos_config import DBT_PROJECT_CONFIG, DBT_CONFIG
from cosmos.airflow.task_group import DbtTaskGroup
from cosmos.constants import LoadMode
from cosmos.config import ProjectConfig, RenderConfig
from airflow.models.baseoperator import chain
from include.custom_operator.hello_operator import HelloOperator

    start_date=datetime(2023, 1, 1),
def retail():

    def check_transform(scan_name='check_transform', checks_subpath='transform'):
        from include.soda.check_function import check

        return check(scan_name, checks_subpath)

    report = DbtTaskGroup(

    def check_report(scan_name='check_report', checks_subpath='report'):
        from include.soda.check_function import check

        return check(scan_name, checks_subpath)
    hello_task = HelloOperator(task_id="sample-task", 

  • Now we can test our operator using the test command through airflow cli
  • Use “airflow tasks test retail hello_task 2023-01-01”
  • This is will the success message as shown in the below

