Let’s create a simple data pipeline using Apache airflow, our goal over here to upalod the local data in gcp big query and validate the data.
Let’s understand this using a simple example
from airflow.decorators import dag, task from datetime import datetime from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryCreateEmptyDatasetOperator from astro import sql as aql from astro.files import File from astro.sql.table import Table, Metadata from astro.constants import FileType @dag( start_date=datetime(2023, 1, 1), schedule=None, catchup=False, tags=['retail'], ) def retail(): upload_csv_to_gcs = LocalFilesystemToGCSOperator( task_id='upload_csv_to_gcs', src='include/dataset/online_retail.csv', dst='raw/online_retail.csv', bucket='abhishek_mishra_online_retail', gcp_conn_id='gcp', mime_type='text/csv', ) create_retail_dataset = BigQueryCreateEmptyDatasetOperator( task_id='create_retail_dataset', dataset_id='retail', gcp_conn_id='gcp', ) gcs_to_raw = aql.load_file( task_id='gcs_to_raw', input_file=File( 'gs://abhishek_mishra_online_retail/raw/online_retail.csv', conn_id='gcp', filetype=FileType.CSV, ), output_table=Table( name='raw_invoices', conn_id='gcp', metadata=Metadata(schema='retail') ), use_native_support=False, ) @task.external_python(python='/usr/local/airflow/soda_venv/bin/python') def check_load(scan_name='check_load', checks_subpath='sources'): from include.soda.check_function import check return check(scan_name, checks_subpath) check_load = check_load() retail()
- First we are using LocalFilesystemToGCSOperator and as the name suggest this is taking the csv file from our local file and uploading it to gcp.
- Then we have BigQueryCreateEmptyDatasetOperator which basically creating one schema in big query.
- Then we have gcs_to_raw in this we have three arguments first one from where we need to take the data and second is output file which basically means where we need to update.
- And check_load is using soda to validate the data of our big query.
We at Helical have more than 10 years of experience in providing solutions and services in the domain of data and have served more than 85+ clients. Please reach out to us for assistance, consulting, services, maintenance as well as POC and to hear about our past experience on Airflow. Please do reach out on nikhilesh@Helicaltech.com
Thank You
Abhishek Mishra
Helical IT Solutions
Subscribe
Login
0 Comments