Airflow allows you to create new operators to suit the requirements of you or your team.You can create any operator you want by extending the airflow.models.baseoperator.BaseOperator
Here is one basic custom function script which print hello as the name suggest
from airflow.models.baseoperator import BaseOperator class HelloOperator(BaseOperator): def __init__(self, name: str, **kwargs) -> None: super().__init__(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" print(message) return message
Constructor – Define the parameters required for the operator.
Execute – The code to execute when the runner calls the operator.
Now we need to import this to our main dag file
from airflow.decorators import dag, task from datetime import datetime from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator from astro import sql as aql from astro.files import File from astro.sql.table import Table, Metadata from astro.constants import FileType from include.dbt.cosmos_config import DBT_PROJECT_CONFIG, DBT_CONFIG from cosmos.airflow.task_group import DbtTaskGroup from cosmos.constants import LoadMode from cosmos.config import ProjectConfig, RenderConfig from airflow.models.baseoperator import chain from include.custom_operator.hello_operator import HelloOperator @dag( start_date=datetime(2023, 1, 1), schedule=None, catchup=False, tags=['retail'], ) def retail(): @task.external_python(python='/usr/local/airflow/soda_venv/bin/python') def check_transform(scan_name='check_transform', checks_subpath='transform'): from include.soda.check_function import check return check(scan_name, checks_subpath) report = DbtTaskGroup( group_id='report', project_config=DBT_PROJECT_CONFIG, profile_config=DBT_CONFIG, render_config=RenderConfig( load_method=LoadMode.DBT_LS, select=['path:models/report'] ) ) @task.external_python(python='/usr/local/airflow/soda_venv/bin/python') def check_report(scan_name='check_report', checks_subpath='report'): from include.soda.check_function import check return check(scan_name, checks_subpath) hello_task = HelloOperator(task_id="sample-task", name="Abhishek")
- Now we can test our operator using the test command through airflow cli
- Use “airflow tasks test retail hello_task 2023-01-01”
- This is will the success message as shown in the below
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
Thank You
Abhishek Mishra
Helical IT Solutions