Moving from our Traditional ETL tools like Pentaho or Talend which I’m using too, I came across Spark(pySpark).
Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.
What is Spark?
Spark is a distributed in-memory cluster computing framework, pyspark, on the other hand, is an API developed in python for writing Spark applications in Python style.
Now I ask my self how do I learn something new or where do I Start from?
So I decided to do things on Spark I would commonly do on Pentaho/Talend
One basic thing I do is: Load data from Source DB to Destination DB.
Next Question is how do I Start?
I found a notebook Called Apache Zeppelin, A completely open web-based notebook that enables interactive data analytics. It is a new and incubating multi-purposed web-based notebook which brings data ingestion, data exploration, visualization, sharing and collaboration features to Hadoop and Spark.
Now with this and the right interpreter I was able to write my code conveniently.
Note: If you have troubles installing Apache Zeppelin and Spark On your Local windows reach out to me or drop a comment I would love to help.
Moving On, here the idea is to Load A Table from Source to Destination emphasis on “a table”
so with help of a JDBC Spark Read I was able to read my source table to a data frame.
tablename = spark.read \ .format(“jdbc”) \ .option(“url”, url) \ .option(“user”, username) \ .option(“password”, password) \ .option(“driver”, “org.postgresql.Driver”) \ .option(“dbtable”, tablename) \ .load()
Now the following Assumption was a beginner assumption
Ok now I have a data-frame that holds my source data, next I need to load that data into a destination.
If you are familiar with Pentaho, we or I know we first check if the destination table exist, if it did then we load, else we create the table at destination and load.
Now to check using JDBC Spark Read I Looked into the destination information_Schema table of that database and filtered on the tablename
i’m to load into:
infoschema = infoschema.filter(infoschema.table_name == tablename)
If it returned value then table Exist else it doesn’t.
Next if table exist then Load the data if not I told myself
OK i need to create a table
So i looked for options to create a table via Spark but that wasn’t fruitful,
then I decided lets use a traditional method to do this. I Created a Create SQL Statement of the table I wanted at a destination and saved as a SQL file, then used a python code to execute the SQL statement
but to call a python code in my Spark JOB, I needed to import py file with the Spark Context so I did this:
sc.addPyFile(‘E:/spark-application/pyfiles/CreateTable.py’) import CreateTable as ct
Once that was done Next I could Load the Source Data to destination with the help of JDBC Spark WRITE:
get_src_tbl.write.jdbc(destination_url, tablename,properties= {“user”: destination_username, “password”: destination_password}, mode = mode)
Now I asked myself I want to transform the source data a little before loading into a destination so I derived 3 additional columns from other columns in the Source table.
Next loading the data now required me to update the SQL file for creating the table to accommodate the new columns. Now here I found something cool which changed my Assumption to Spark which was no matter how many columns I had created if the data-frame had 3 columns only, Spark drops the already created table and Creates a new one based on the schema of the transformed/source data-frame.
Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.
Now the above step looks useless, sorry but nothing is considered useless the step can be used to add or define constraints to table created.
To make this process much simpler I created functions to handle multiple tables and transformation and reading data and so on.
I had created a dictionary of table parameters that hold values, functions to run specifically for each table. below is the code to Load Data from Source to Destination for more than “A Table” or “1 Table”
%pyspark from pyspark.sql.functions import lit,unix_timestamp from pyspark.sql.functions import * from functools import reduce from pyspark.sql import DataFrame import pyspark.sql.functions as f from pyspark.sql.functions import coalesce, round from pyspark.sql.window import Window from pyspark.sql.types import StructField from pyspark.sql.types import * from pyspark.sql.types import StructType import copy import time sc.addPyFile('E:/spark-application/pyfiles/CreateTable.py') import CreateTable as ct import re import datetime from pyspark.sql.functions import year, month, dayofmonth from pyspark.sql.window import Window sqlfilepath = "E:/spark-application/sql/" source_hostname = "localhost" source_port = "5433" source_database = "LearningWorkspace" source_url = "jdbc:postgresql://"+source_hostname+":"+source_port+"/"+source_database source_username ="postgres" source_password ="postgres" destination_hostname = "localhost" destination_port = "5433" destination_database = "Informations" destination_url = "jdbc:postgresql://"+destination_hostname+":"+destination_port+"/"+destination_database destination_username ="postgres" destination_password ="postgres" timestamp = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') #Table Name is the major parameter def getTable(url,username,password,tablename): tablename = spark.read \ .format("jdbc") \ .option("url", url) \ .option("user", username) \ .option("password", password) \ .option("driver", "org.postgresql.Driver") \ .option("dbtable", tablename) \ .load() return tablename def ifTableExist(url,username,password,tablename): infoschema = getTable(url,username,password,"information_schema.tables") infoschema = infoschema.filter(infoschema.table_name == tablename) table = infoschema.count() tableexist = "" if table > 0: tableexist = "Yes Table: " + str(tablename) + " Exist" else: tableexist = "No Table: " + str(tablename) + " Does Not Exist" return tableexist def notransform(dataframe): return dataframe def customsalary(dataframe): #dataframe = dataframe.filter(dataframe.salary > '85000') my_window = Window.partitionBy("username").orderBy("user_id") dataframe = dataframe \ .withColumn('paid_year',year("paid_on")) \ .withColumn('paid_month',month("paid_on")) \ .withColumn('prev_salary',round(coalesce(f.lag(dataframe.salary).over(my_window),lit(0.0)),2)) dataframe = dataframe.select(['user_id','username','paid_on','paid_month','paid_year',round('salary',2).alias('salary'),'created_on']) dataframe.show() #print('Filter out Salary Greater than 85000') print('Added Three New Columns') return dataframe loadTableParams = { 'company': { 'tablename':'company', 'mode': "Append", 'excutefunction' : notransform }, 'salary': { 'tablename':'salary', 'mode': "overwrite", 'excutefunction' : customsalary } } #above is a List of Table Parameters I Need to Load Data into from Source, each of this dictionary value has a table name and Functions #to RUN for each specific table as you can see company has a function notransform meaning there is no need to transform that table #salary has a customsalary function which helps to add 3 derived Colunms. #Mode is what mode we want to write data in Append and Overwrite #Remeber to define the functions before the loadTableParams def LoadDataToTables(tablename,currentJob,**loadTableParams): #get Parameters mode = currentJob['mode'] print(mode) #Get Source Table get_src = getTable(source_url,source_username,source_password,tablename) get_src_tbl = currentJob['excutefunction'](get_src) dest_tbl_exist = ifTableExist(destination_url,destination_username,destination_password,tablename) print(dest_tbl_exist) #Check for Destination Table src_tblexist = re.split("( )",dest_tbl_exist) istable = "" for i in range(len(src_tblexist)): istable = src_tblexist[0] print(istable) if istable == "Yes": print("Table Already Exist") print("Creating a Table is Not Requried Hence Writing to Table " + tablename) get_src_tbl.write.jdbc(destination_url, tablename,properties={"user": destination_username, "password": destination_password}, mode = mode) get_des_tbl = getTable(destination_url,destination_username,destination_password,tablename) print("Loaded Data Complete for Table "+ tablename +", Table Count is: "+ str(get_des_tbl.count())) else: #print("Table Does Not Exist") #print("Creating Table " + tablename) #ct.CreateTable.excutescript(hostname=destination_hostname,username=destination_username,password=destination_password,portno=int(destination_port),database=destination_database,sqlpath=sqlfilepath,sqltablename=tablename) print("Table Created") get_src_tbl.write.jdbc(destination_url, tablename,properties={"user": destination_username, "password": destination_password}, mode = mode) get_des_tbl = getTable(destination_url,destination_username,destination_password,tablename) print("Loaded Data Complete for Table "+ tablename +", Table Count is: "+ str(get_des_tbl.count())) for i in loadTableParams: currentJob = loadTableParams[i] tablename = currentJob['tablename'] LoadDataToTables(tablename,currentJob,**loadTableParams)
Now, this code was tailored to run on small scale data and with traditional Database (Postgres),with the right changes, it could be tailored to cater for medium data tables
Please do leave a comment if you require any help with other scenarios.
Make data easy with Helical Insight.
Helical Insight is the world’s best open source business intelligence tool.
In Case if you have any queries please get us at support@helicaltech.com
Thank You
Sohail Izebhijie
Helical IT Solutions Pvt Ltd
Best Open Source Business Intelligence Software Helical Insight Here
A Business Intelligence Framework
Best Open Source Business Intelligence Software Helical Insight is Here