Big Query Data Pipeline in Apache Airflow

Posted on by By admin, in Apache Airflow | 0

Let’s create a simple data pipeline using Apache airflow, our goal over here to upalod the local data in gcp big query and validate the data.

Let’s understand this using a simple example

from airflow.decorators import dag, task
from datetime import datetime
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata
from astro.constants import FileType

@dag(
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False,
    tags=['retail'],
)
def retail():

    upload_csv_to_gcs = LocalFilesystemToGCSOperator(
        task_id='upload_csv_to_gcs',
        src='include/dataset/online_retail.csv',
        dst='raw/online_retail.csv',
        bucket='abhishek_mishra_online_retail',
        gcp_conn_id='gcp',
        mime_type='text/csv',
    )
    create_retail_dataset = BigQueryCreateEmptyDatasetOperator(
        task_id='create_retail_dataset',
        dataset_id='retail',
        gcp_conn_id='gcp',
    )

    gcs_to_raw = aql.load_file(
        task_id='gcs_to_raw',
        input_file=File(
            'gs://abhishek_mishra_online_retail/raw/online_retail.csv',
            conn_id='gcp',
            filetype=FileType.CSV,
        ),
        output_table=Table(
            name='raw_invoices',
            conn_id='gcp',
            metadata=Metadata(schema='retail')
        ),
        use_native_support=False,
    )

    @task.external_python(python='/usr/local/airflow/soda_venv/bin/python')
    def check_load(scan_name='check_load', checks_subpath='sources'):
        from include.soda.check_function import check

        return check(scan_name, checks_subpath)

    check_load = check_load()

retail()  
  • First we are using LocalFilesystemToGCSOperator and as the name suggest this is taking the csv file from our local file and uploading it to gcp.
  • Then we have BigQueryCreateEmptyDatasetOperator which basically creating one schema in big query.
  • Then we have gcs_to_raw in this we have three arguments first one from where we need to take the data and second is output file which basically means where we need to update.
  • And check_load is using soda to validate the data of our big query.

Big Query Data Pipeline in Apache Airflow

We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com

Thank You
Abhishek Mishra
Helical IT Solutions

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments