Basic ETL with Spark (pySpark)

Posted on by By admin, in ETL | 0

Moving from our Traditional ETL tools like Pentaho or Talend which I’m using too, I came across Spark(pySpark).

Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.

Get your 30 Days Trail Version

What is Spark?

Spark is a distributed in-memory cluster computing framework, pyspark, on the other hand, is an API developed in python for writing Spark applications in Python style.

Now I ask my self how do I learn something new or where do I Start from?

So I decided to do things on Spark I would commonly do on Pentaho/Talend

One basic thing I do is: Load data from Source DB to Destination DB.

Next Question is how do I Start?

I found a notebook Called Apache Zeppelin, A completely open web-based notebook that enables interactive data analytics. It is a new and incubating multi-purposed web-based notebook which brings data ingestion, data exploration, visualization, sharing and collaboration features to Hadoop and Spark.

Now with this and the right interpreter I was able to write my code conveniently.

Note: If you have troubles installing Apache Zeppelin and Spark On your Local windows reach out to me or drop a comment I would love to help.

Moving On, here the idea is to Load A Table from Source to Destination emphasis on “a table”

so with help of a JDBC Spark Read I was able to read my source table to a data frame.

tablename = spark.read \
 .format(“jdbc”) \
 .option(“url”, url) \
 .option(“user”, username) \
 .option(“password”, password) \
 .option(“driver”, “org.postgresql.Driver”) \
 .option(“dbtable”, tablename) \
 .load()

Now the following Assumption was a beginner assumption

Ok now I have a data-frame that holds my source data, next I need to load that data into a destination.

If you are familiar with Pentaho, we or I know we first check if the destination table exist, if it did then we load, else we create the table at destination and load.

Now to check using JDBC Spark Read I Looked into the destination information_Schema table of that database and filtered on the tablename

i’m to load into:

infoschema = infoschema.filter(infoschema.table_name == tablename)

If it returned value then table Exist else it doesn’t.

Next if table exist then Load the data if not I told myself

OK i need to create a table

So i looked for options to create a table via Spark but that wasn’t fruitful,
then I decided lets use a traditional method to do this. I Created a Create SQL Statement of the table I wanted at a destination and saved as a SQL file, then used a python code to execute the SQL statement
but to call a python code in my Spark JOB, I needed to import py file with the Spark Context so I did this:

sc.addPyFile(‘E:/spark-application/pyfiles/CreateTable.py’)
import CreateTable as ct

Once that was done Next I could Load the Source Data to destination with the help of JDBC Spark WRITE:

get_src_tbl.write.jdbc(destination_url, 
tablename,properties=
{“user”: destination_username, “password”: destination_password}, mode = mode)

Now I asked myself I want to transform the source data a little before loading into a destination so I derived 3 additional columns from other columns in the Source table.

Next loading the data now required me to update the SQL file for creating the table to accommodate the new columns. Now here I found something cool which changed my Assumption to Spark which was no matter how many columns I had created if the data-frame had 3 columns only, Spark drops the already created table and Creates a new one based on the schema of the transformed/source data-frame.

Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.

Grab The 30 Days Free Trail

Now the above step looks useless, sorry but nothing is considered useless the step can be used to add or define constraints to table created.

To make this process much simpler I created functions to handle multiple tables and transformation and reading data and so on.

I had created a dictionary of table parameters that hold values, functions to run specifically for each table. below is the code to Load Data from Source to Destination for more than “A Table” or “1 Table”

%pyspark
from pyspark.sql.functions import lit,unix_timestamp
from pyspark.sql.functions import *
from functools import reduce
from pyspark.sql import DataFrame
import pyspark.sql.functions as f
from pyspark.sql.functions import coalesce, round
from pyspark.sql.window import Window
from pyspark.sql.types import StructField
from pyspark.sql.types import *
from pyspark.sql.types import StructType
import copy
import time
sc.addPyFile('E:/spark-application/pyfiles/CreateTable.py')
import CreateTable as ct
import re
import datetime
from pyspark.sql.functions import year, month, dayofmonth
from pyspark.sql.window import Window



sqlfilepath = "E:/spark-application/sql/"
source_hostname = "localhost"
source_port = "5433"
source_database = "LearningWorkspace"
source_url = "jdbc:postgresql://"+source_hostname+":"+source_port+"/"+source_database
source_username ="postgres"
source_password ="postgres"

destination_hostname = "localhost"
destination_port = "5433"
destination_database = "Informations"
destination_url = "jdbc:postgresql://"+destination_hostname+":"+destination_port+"/"+destination_database
destination_username ="postgres"
destination_password ="postgres"

timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S')

#Table Name is the major parameter
def getTable(url,username,password,tablename):
    tablename = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("user", username) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", tablename) \
    .load()
    return tablename

def ifTableExist(url,username,password,tablename):
    infoschema = getTable(url,username,password,"information_schema.tables")
    infoschema = infoschema.filter(infoschema.table_name == tablename)
    table = infoschema.count()
    tableexist = ""
    if table > 0:
        tableexist = "Yes Table: " + str(tablename) + " Exist"
    else:
        tableexist = "No Table: " + str(tablename) + " Does Not Exist"
    return tableexist


def notransform(dataframe):
    return dataframe

def customsalary(dataframe):
    #dataframe = dataframe.filter(dataframe.salary > '85000')
    my_window = Window.partitionBy("username").orderBy("user_id")
    dataframe = dataframe \
        .withColumn('paid_year',year("paid_on")) \
        .withColumn('paid_month',month("paid_on")) \
        .withColumn('prev_salary',round(coalesce(f.lag(dataframe.salary).over(my_window),lit(0.0)),2))
    dataframe = dataframe.select(['user_id','username','paid_on','paid_month','paid_year',round('salary',2).alias('salary'),'created_on'])
    dataframe.show()
    #print('Filter out Salary Greater than 85000')
    print('Added Three New Columns')
    return dataframe
    
loadTableParams = {
    'company': {
        'tablename':'company',
        'mode': "Append",
        'excutefunction' : notransform
    },
    'salary': {
        'tablename':'salary',
        'mode': "overwrite",
        'excutefunction' : customsalary
    }
}
#above is a List of Table Parameters I Need to Load Data into from Source, each of this dictionary value has a table name and Functions
#to RUN for each specific table as you can see company has a function notransform meaning there is no need to transform that table
#salary has a customsalary function which helps to add 3 derived Colunms.
#Mode is what mode we want to write data in Append and Overwrite
#Remeber to define the functions before the loadTableParams

def LoadDataToTables(tablename,currentJob,**loadTableParams):
    #get Parameters
    mode = currentJob['mode']
    print(mode)
        
    #Get Source Table
    get_src = getTable(source_url,source_username,source_password,tablename)
    get_src_tbl = currentJob['excutefunction'](get_src)
    dest_tbl_exist = ifTableExist(destination_url,destination_username,destination_password,tablename)
    print(dest_tbl_exist)
    #Check for Destination Table
    src_tblexist = re.split("( )",dest_tbl_exist)
    istable = ""
    for i in range(len(src_tblexist)):
        istable = src_tblexist[0]
    print(istable)
    if istable == "Yes":
        print("Table Already Exist")
        print("Creating a Table is Not Requried Hence Writing to Table " + tablename)
        get_src_tbl.write.jdbc(destination_url, tablename,properties={"user": destination_username, "password": destination_password}, mode = mode)
        get_des_tbl = getTable(destination_url,destination_username,destination_password,tablename)
        print("Loaded Data Complete for Table "+ tablename +", Table Count is: "+ str(get_des_tbl.count()))
    else:
        #print("Table Does Not Exist")
        #print("Creating Table " + tablename)
        #ct.CreateTable.excutescript(hostname=destination_hostname,username=destination_username,password=destination_password,portno=int(destination_port),database=destination_database,sqlpath=sqlfilepath,sqltablename=tablename)
        print("Table Created")
        get_src_tbl.write.jdbc(destination_url, tablename,properties={"user": destination_username, "password": destination_password}, mode = mode)
        get_des_tbl = getTable(destination_url,destination_username,destination_password,tablename)
        print("Loaded Data Complete for Table "+ tablename +", Table Count is: "+ str(get_des_tbl.count()))



for i in loadTableParams:
    currentJob = loadTableParams[i]
    tablename = currentJob['tablename']
    LoadDataToTables(tablename,currentJob,**loadTableParams)

Now, this code was tailored to run on small scale data and with traditional Database (Postgres),with the right changes, it could be tailored to cater for medium data tables

Please do leave a comment if you require any help with other scenarios.

Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.

Claim Your 30 Days Free Trail

In Case if you have any queries please get us at support@helicaltech.com

Thank You
Sohail Izebhijie
Helical IT Solutions Pvt Ltd

logo

Best Open Source Business Intelligence Software Helical Insight Here

logo

A Business Intelligence Framework


logo

Best Open Source Business Intelligence Software Helical Insight is Here

logo

A Business Intelligence Framework

0 0 votes
Article Rating
Subscribe
Notify of
0 Comments
Inline Feedbacks
View all comments