Alternative approach of using Insert/Update Step for Upsert in PDI

What is an UPSERT?

UPSERT is a combination of two activities in a table i.e. Update and Insert based upon a unique key(Unique iD). A Relational database uses MERGE JOIN to perform UPSERT operation on data where it updates if there is any change on the existing data and INSERTS if there is a new data based upon certain conditions (identified as unique ID).

How UPSERT works in an ETL?

While loading the data from the source to target, there are two prominent approach implemented. Namely,

  • Truncate and load: Flush out the old data and reload the new data.
  • Insert/Update (Upsert): Update the existing record and Insert if there is a new data. It matches on a particular key or a set of keys to perform Update/Insert. If the keys doesn’t match, then it is an INSERT or else it will update. SCD performs in the same way(SCD-I is an UPSERT whereas SCD-II is an INSERT).

Truncate and load approach are not recommended in some scenarios where the source data only has current data. In case of maintaining historical information as well, we use UPSERT mechanism.

UPSERT in Pentaho Data Integration(PDI) :

There are various components that are used to implement UPSERT in PDI. These components use the unique key as the keys for lookup from the target table and specific operation is performed based upon the condition. The following are the components used for Upsert

  1. Insert/Update(Insert when there is no match and update if there is match)
  2. Dimensional Lookup/Update(This is used to SCD Operations)
  3. Synchronise after Merge(This step needs a flag to execute. Based upon the flags it will insert/update)

The basic algorithm/flow of how the UPSERT functions in PDI is as follows,

Capture1

Disadvantages on the above steps :

Now the components that are present with PDI and the algorithms designed to implement the UPSERT is very slow while handling large amounts of data.

  1. The Insert/Update step does a lookup on all the values in the table and does one to one comparison to either insert or update. If the amount of data is huge, then the component will work very slowly and hence the whole process slows down.
  2. Similar case happens on the dimensional lookup/update and also on the Synchronise after merge step as well. Since it does a lookup on the whole table i.e. it compares with each and every data on the table, it slows down the process.

Alternative approach for the UPSERT technique:

There are certain scenarios where the ETL runs on a regular basis let’s say, on every 12 hours. In that case, the components used to implement the UPSERT technique wont help since they are very slow and if there are huge amounts of data, then its performance will be very slow. So there is a need of optimization.  Optimizing an ETL process  is one of the most important aspects that we need to undertake since it helps in faster processing. The following diagram will help you understand the alternate approach for UPSERT mechanism,

Capture2

This steps mentioned in the above diagram works exactly like the Insert/Update component but with some minor tweaks. The following are the description of each step,

  1. Table Input: Taking input from the Source data that needs to be loaded into the Target.
  2. Add Sequence: The Sequence number that needs to be added as a business key to the table(This can change according to your requirement).
  3. Update: This component works only when there is a need for Update. If there is a new data, this component will fail since it only works when there is a need to update on the existing data.
  4. Table Output: PDI has a functionality that supports error handling of each component. The error handling component of the Update step will bring you to the Table Output component which will INSERT the new data into the target table.

Please see the below diagram for more better understanding,

Capture3

Conclusion:

This approach is faster than the usual Insert/update step since it doesn’t need to lookup if there is a new data. It will directly insert the new data when the Update component fails.

Try once and share your feedback!! 🙂

 

Regards,

Nitish Mishra

Passing parameters from parent job to sub job/transformation in Pentaho Data Integration (Kettle) -Part 1

Passing parameters from parent job to sub-job/transformation in Pentaho Data Integration (Kettle) – Part 1

Part -1 : Using setter and getter methods :

1.Set variables in parent job/transformation.
2.Get variables in sub-job/transformation.

1.Set variables :

Identify the field names that you are going to set using set variables step and assign each with a proper variable name.

And also define the scope of the variable with following possible options.

Variable Scope type :

1.Valid in the virtual machine: the complete virtual machine will know about this variable.
Warning: this makes your transformation only fit to run in a stand-alone fashion.
Running on an application server like on the Pentaho framework can become a problem.
That is because other transformations running on the server will also see the changes this step makes.

2.Valid in the parent job: the variable is only valid in the parent job.
3.Valid in the grand-parent job: the variable is valid in the grand-parent job and all the child jobs and transformations.
4.Valid in the root job: the variable is valid in the root job and all the child jobs and transformations.

Configure set variable step like below.

set_variables_pdi

But please note that, IT IS NOT POSSIBLE TO SET AND USE A VARIABLE IN THE SAME TRANSFORMATION.This is because all steps run in parallel.

2.Get variables :

Use get variables step in sub-job/transformation to get the same information from the parent job/transformation.

But you need to make sure that you have specified the variable name in a correct format like ${variable}
or %%variable%% (as described in Variables). That means you can also enter complete strings in the variable column, not just a variable.

Configure get variables step like below.

get_variables_pdi

So now you can utilize the same variables in your sub-job/transformation wherever required.

Apart from this,we can also pass all parameters down to sub-job/transformation using job / transformation executor steps.

Please follow my next blog for part 2 :
Passing parameters from parent job to sub job/transformation in Pentaho Data Integration (Kettle) -Part 2,

Thanks,

Sayagoud

Passing parameters from parent job to sub job/transformation in Pentaho Data Integration (Kettle) -Part 2

Passing parameters from parent job to sub job/transformation in Pentaho Data Integration (Kettle) -Part 2

In part 1, I have mentioned about passing parameter values down to sub-job/transformation using setter/getter methods.

Please refer my previous post for part 1 Passing parameters from parent job to sub job/transformation in Pentaho Data Integration (Kettle) -Part 1

But in this part we will use executor steps to do the same process.

Part 2 : Using job/transformation executor steps :

In order to pass the parameters from the main job to sub-job/transformation,we will use job/transformation executor steps depends upon the requirement.

Following are the steps :

1.Define variables in job properties section
2.Define variables in tranformation properties section
3.Configure job/transformation executor step

1.Define variables in job properties section :

Right-click any where on your job and select settings and go to parameters section.Define all the variables and assign some default values to each.

job_properties_pdi

2.Define variables in transformation properties section :

Right-click any where on your sub job/transformation and select settings and go to parameters section.
Use the same variables that you have defined in your parent job (i.e.Step1) and assign some default values to each.
transform_proeprties_pdi
3.Configure job/transformation executor step :

In this step,we have to configure sub job/transformation path details and need to pass the same parameters in job entry details section like below.

Double click on job/transformation executor step and provide transformation file path details.

transform_executor_config

Go to parameters section and make sure you have checked the Pass all parameter values down to the sub-transformation check box.

transform_executor_params

So now you can utilize same variables in your sub-transformation.

 

Thanks,

Sayagoud

Command line execution and Redirection Process in Pentaho (Kettle)

Command line execution and Redirection Process in Pentaho (Kettle)


Command line execution in Pentaho (Kettle) :


Pentaho provides two commanline utility tools to execute your job/transformation from outside of spoon.

1.Pan:

Pan is the PDI command line tool for executing transformations.Pan runs transformations, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Pan:

./Pan.sh /file="/home/pdi/command_line_tr.ktr" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1

2.Kitchen:

Kitchen is the PDI command line tool for executing jobs.
Kitchen runs jobs, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Kitchen:

./kitchen.sh /file="/home/pdi/command_line_test.kjb" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1


Capture and redirect job execution logs to a file (Linux) :


We can also capture the execution logs using a file and it is possible to redirect the standard error and standard output with following commands.

what does 2>&1 command means in the script?

2 refers to the second file descriptor of the process, i.e. stderr.

> means redirection.

&1 means the target of the redirection should be the same location as the first file descriptor, i.e. stdout.

So this command first redirects stdout to home/log/command_line_test_job.log and then redirects stderr there as well.

what does redirection means ?

Redirection simply means capturing output from a file, command, program, script, or even code block within a script and sending it as input to another file, command, program, or script.

what does ‘>>’ operator means?

Instead of overwriting file data, you can also append text to an existing file using two subsequent greater-than signs.

Thanks,
Sayagoud

Issues while implementing loops in Pentaho Data Integration

Issues while implementing loops in Pentaho Data Integration

Generally for implementing batch processing we use the looping concept provided by Pentaho in their ETL jobs. The loops in PDI are supported only on jobs(kjb) and it is not supported in transformations(ktr).

While implementing loops in PDI, we have come across many blog suggesting us to use “Wait For” step and join the output hop to the previous step. Look into the below screenshot for more clarification,

However the limitation in this kind of looping is that in PDI this causes recursive stack allocation by JVM during job execution and the system may run out of memory after a high number of iterations (depending the system available available memory). While implementing this, the JVM may run out of memory and the program crashes. So it is not advisable to implement to have higher number of iterations while implementing loops in PDI.

Possible Solutions:

1. The first thing you have to take is to minimize the number of iterations. The looping works properly up to 500 iterations. Try reducing it to less than 500 iterations.

2. Never use loops for scheduling. For scheduling purposes if we use the looping concept, it goes into an infinite loop which crashes the whole program.

3. Increase your batch size so that number of iterations is less. While implementing external batch processing, take this thing into consideration.

4. For incrementing the value, it is advisable to use another separate transformation instead of a javascript because the javascript cosumes more memory compared to a separate transformation. Create a new transformation, use the formula step to increment the values and then set those variables.

5. Suggested approach for infinite looping – One of the possible way is to use the settings of ‘Start’ step. Set the ‘Repeat’ flag and add interval configuration. This cause the job to be re-initialize completely as a new instance and does not cause any memory issue.

 

Thanks,

Nitish Kumar Mishra

Batch process implementation in kettle (Pentaho Data Integration):

Batch process implementation in kettle (Pentaho Data Integration):

In order to implement the batch process we needs to have the looping logic. But I did not find any component or suitable method in kettle to create those loops.

So I have created a way to resolve that problem in one of my project and updated the same here.

Below is the step by step process:

1.Get Batch Details:

 Create a new transformation:

 

get_batch_details

Create a separate transformation and name it as get_batch_details.

Fetch min, max and batch size details from the source system and calculate total number of batches count using java script step.

Calculate total batch number count:

generate_batch_details

Set each variable scope to valid thorough out the root job using set variable step like below

 

2.Create a main job:

Create a main job with the following steps.

        a. configure get_batch_details transformation.
	b. configure for each batch evaluation step
	c. configure your actual transformation with transformation executor step
	(This is your main logic)
	d. configure batch number iteration transformation

Main job will looks like below.

 

 

 

main_job

3.Evaluate and iterate through each batch number in main job:

Evaluation:

In this step we have to evaluate each batch number with the total number of batches like  below.

This process will continue until the condition set to true.

For each step configuration:

 

for_each_batch

 

 

In main job create a variable with the name as batch_no and assign some default value to it.

And make use same variable in evaluation step like above and make sure to have the success condition as If value is greater than and the value as nr_of_batches. (This value we are getting from get_batch_details transformation)

So for the first time batch_no = 1, then it will compare with nr_of_batches each time, if the condition is false then your actual job will execute other wise it will exit from the loop.

Now the next step is to increment the batch_no, so we have to create a batch_iteration transformation like below and configure get and set variable steps accordingly.

batch_increment

 

Increment batch number step:

Use formula step to implement this logic and configure the values as shown like below.

Increment_batch_nr_using_formula_step

We have configured everything properly but the main thing is to how to make use batch info and create start and end flags in our actual transformation.

 

get_batch_info_actual_job

Calculate batch flags using java script step like below.

calculate_batch_flags

So use start flag and end flag details wherever required in the job.

For example:

step 1: calculate total number of batches

      max value = 12000,
       
      batch_size = 4000,

      nr_of_batches = ceil (12000)/4000 = 3 

So in this process it will create 3 different batches with batch size as 4000 each time.

step 2: evaluate batch number and total number of batches in main job.

      nr_of_batches = 3;

      batch_no = 1 (default);

     If (batch_no > nr_of_batches)

     then exit

     else go to actual job. 

So here, 1> 3 which is false then it will go to actual job.

step 3: calculate batch flags in actual transformation.

    var start_flag=0;

    var end_flag=0;

    start_flag=(batch_no - 1)*batch_size + 1;

    end_flag= (batch_no*batch_size);

    start_flag = (1-1)*4000+1;

    end_flag = (1*4000);

    Now start_flag=1 and end_flag = 4000;

So we can use above flag values in our query to fetch data from the sources system using any table input step.

Step 4: Increment batch number

      batch_no = batch_no + 1; 

So now the batch_no = 2 and the same process will continue until the condition set to true.

 

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly changing dimension(SCD) is a common mechanism in Datawarehousing concepts. The exact definition of SCD is the dimension that changes slowly over a time rather than on a regular schedule. In Data Warehouse there is a need to track changes in dimension attributes in order to report historical data. In other words, implementing one of the SCD types should enable users assigning proper dimension’s attribute value for given date. There are various approaches to deal with the data using SCD. The most commonly used approaches are:

1. SCD Type-I : Update the existing record(Overwrite)

2. SCD Type-II: Creates a new record and set the flag of the new record(historical).

3. SCD Type- III: Creates a new column which keeps the last updated record. Here the history is limited.

In kettle, there are components through which we can implement SCD on the dimension. One such component is the dimensional lookup/update.

SCD

The dimension lookup/update component allows to perform the Type-I and Type-II approach of SCD.

.SCD_1

Keys:The keys are used to lookup the values with the destination table(dimension).

Technical key: It is basically the surrogate key which will be created if a new record is found.

SCD_2

Fields: The fields columns are the fields that are present on the dimension table on which you want to perform operations.A number of optional fields (in the “Fields” tab) are automatically managed by the step. You can specify the table field name in the “Dimension Field” column.

Type of Dimension update:

Insert: This is SCD-II mechanism where the a new row is inserted if changes are found based on the lookup. If the new record coming from the source table is not found then it will insert. if the changes are found on the table based on the lookup values, a new record is inserted.

Update: This is a conventional SCD-I approach . These attributes in the last dimension record version are updated. If we keep all the fields as Update then it performs SCD-I approach. If some of the fields are Insert and some are update then it applies SCD-II and the fields in which Update is applied will only update the data based upon the last version.

Punch Through: This mechanism is applied on those fields where the data changes very rarely and if it changes it is just a correction. For example, in case of names of the products or null values in price columns. It is also used for SCD_I but its slower than the update mechanism.

Note: If u mix Insert,update and punch through in one dimension, it works like hybrid slowly changing mechanism which is of type-6.

Date of last insert or update (without stream field as source) : adds and manges a Date field.

Date of last insert (without stream field as source) : adds and manges a Date fieldDate of last update (without stream field as source) : adds and manges a Date field.

Last version (without stream field as source) : adds and manges a Boolean field. (converted into Char(1) or boolean database data type depending on your database connection settings and availability of such data type). This acts as a current valid dimension entry entry indicator for the last version: So when a type II attribute changes and a new version is created (to keep track of the history) the ‘Last version’ attribute in the previous version is set to ‘False/N’ and the new record with the latest version is set to ‘True/Y’.

Note: This dimension entry is added automatically to the dimension table when the update is first run. If you have “NOT NULL” fields in your table, adding this empty row and then the entire step will fail! So make sure that you have a record with the ID field = 0 or 1 in your table if you don’t want PDI to insert a potentially invalid empty record.

This component will work faster if you apply the caching mechanism. A cache size of 0 caches as many rows as possible and until your JVM runs out of memory. Use this option wisely with dimensions that can’t grow too large. A cache size of -1 means that caching is disabled.

Note: There are various other components which can perform SCD in kettle. The Insert/update can also be used which performs SCD-II mechanism. If a new record is found then it will insert or else it will update the dimension. Similarly, the Update step performs SCD-I mechanism. However these components are very slow compared to dimension lookup/update. Similarly, you can use Merge Join(Diff) and then use Synchronise after merge which also performs SCD mechanism. This is fastest among all the above but however the number of records on both the dimension and staging should be same otherwise it wont work.

Thanks,

Nitish

Looping and batch processing in PDI

Implementing Loops and batch processing in Kettle

What is batch processing?
Batch processing is basically a series of execution of jobs from source to destination. Here the data is divided into smaller batches and transferred to the destination. This helps in faster execution and also helps in memory management. This is a popular technique used in ETL processes where the data is generally very large.

Creating loops in PDI:
Lets say suppose you want to implement a for loop in PDI where you want to send 10 lakhs of records in batches of 100. Looping technique is complicated in PDI because it can only be implemented in jobs not in the transformation as kettle doesnt allow loops in transformations.
So inorder to implement loops in PDI, you need a main job which will call the subjob where the loops are desgined and the transformation which contains the logic to send data from source to destination.

The below is the overall design of the subjob which will help in batch processing and in looping.

Pic1

 

NrBatches: This is the transformation which will process the batch where it will count the total data present in the table and based on that count it will divide into various batches. Kindly check the below screenshots for more details

NrBatches Transformation design:

Pic2

 

The table input component is used to generate the count and min value of primary key(ID as primary key) from the source table. The Modified javascript value component is used to assign the batch size and to generate the no of batches using batch count(which is count value of the table) and the batch size. Kindly look into the below screenshot for better understanding.

Pic3

 

The set variables component present in the NrBatches transformation is used to set the values of batch size, batch count and no of batches so that these can be in other transformations.

Table Exists: This step is used to check whether the destination table is present or not. If its not present it will execute the SQL statement to create the table and then it goes into the next step. If it exists then it goes to the next step.

Set variables: The set variables component present in the Jobs is used to set the incrementer value(Just like i component in for loop). The initial value of the incrementor is set here. Kindly refer the below screenshot for more clarification.

Pic4

 

Simple Evaluation:

This step is equivalent to the conditional statement where the iterator is checked against the success condition(much like the If conditional statement where lets say for example if(i<50) or while(i< 50)). In this step, the variable set in the previous step is checked against the total no of batches(created in NrBatches Transformation). Kindly check the below screenshot for more clarification.

Pic5

 Transformation Step:

Inside the transformation, set the flags i.e the start flags and end flags using Javascript component. Please see the below screenshot for more clarification on how to set the flags. Alternatively you can also use the formula step or the calculator step which is a better option if you have large amount of data.

.javascript

Pic6

 

 

 

 

 

 

 

Increment Step:

Now since we are using a loop, there has to be an increment. Here we used the javascript which will increment the batch_no(see Set variables step)and thereby the looping mechanism gets completed. Here also you can use another transformation instead of Javascript step which helps in faster processing and consumes lesser memory. Please see the below screenshot for more understanding.

-Javascript Step:

Pic7

-Using Another Transformation:

BatchProcessIncrement

The formula step is used to increment the batch no. See the below screenshot for more clarification.

Formula

So this is how the looping works in Kettle. But there are some limitation to this. This looping technique consumes a lot of memory space if the number of iterations is very high. Use this technique only for problems where the number of loop executions is very low. It will help in enhacing the performance.

Thanks,

Nitish

 

Audit Logs in Pentaho Data Integration (PDI)

Audit Logs in Pentaho Data Integration

Audit Logs at Job level and Transformation Level are very useful for ETL projects to track the details regarding Job name, Start Date, End Date, Transformation Name, Error,Number of Lines Read, Number of Line Write, Number of lines from Input, Number of Lines in output etc.

I have created a simple PDI Job to track audit logs at Job level, Job entry level and Transformation Level in Mysql database. Please find the process as follows:

Created a Kettle Job FileToDB.kjb which contains two job entries (Start and Transformation) and one database connection(PSQL_Log). I have created a separate database ‘PDI_Audit_Logs’ in Mysql DB. I configured this DB details as database connection.

FileToDB_KJB

Right click on the work area and select Job settings as shown in below screen shot.

jobselect1

It will pop up a Job Properties window. Click on Log tab. Select ‘Job entry log table’ option, it will display the variable names under this section. You can enable all the variable are if you consider default variables is also fine. Now configure the log table connection as shown in below screen shot:

Jobproperties

Log Connection–> We have already created this connection.(In this case MYSQL DB)

Log Schema–>Existed Database Name

**Log Table–> Table Name (in which you want to store the tracked details). It can be existed table name.  We will see the process for new table in the later steps.

Log line timeout–>Number of days that you want to store the log details.

**If you are creating the log table first time you have to click on SQL button which is at the below otherwise click on Ok button. So SQL button will pop up SQL statement to create this table with the above shown column names as . I gave log table name as “FileToDB_JobEntryLog” My job name and level of logging.

sql1

click on Execute button, it will create that table in the given schema/database. And will prompt below window. click on OK and then click on close of SQL editor.

ex1

So Job Entry level Log configuration is done. Now select Job Log table section to configure the log connection for Job level. Configure the values as shown in below Screen shot.

And for Log Table Name, same steps have to follow as explained above .

job1

Now Job Audit Logs settings are done. Open the transformation. it has system info, file input and DB output steps connected with Hops. And the DB output schema is different and Audit Logs schema is different So we have created two connection PSQL and PSQL_Log.

Trans_img

Right click on the work area and select Transformation Settings.

trans_set

It will pop up Transformation Properties window. Select Logging tab.And then click on Transformation option. It will display the variables that can be tracked from this option.

trans2

For the fields LINES_READ,LINES_WRITTEN,LINES_UPDATED,LINES_INPUT,LINES_OUTPUT,LINES_REJECTED should be given a step names that it should track from which step accordingly.

Same like Job settings here also you have to configure Log connections and click on SQL button if it is first time to create the Log Table Name otherwise click on OK.

Now Audit Logs created for transformation also. So execute the Job. And check in the database for these Log tables. It will create 3 tables (job Entry level, job level, Transformation level ) under the given schema.

Job Entry Level Table:

As there are two Job entries it generated the details for both the job entries

job_entry_log_table

Job Level Log Table:

joblogtable

Transformation Level Log Table:

translogtable

Hope this topic will be useful.

Thank you.

Sorting Data In Mysql Based On Field Value Of a column

This blog will teach you how to sort Data In Mysql Based On Field Value Of a column

We can sort data based on field value of a column by using ORDER BY FIELD () in mysql.

Database : world

Table name : country

If i will execute the below query :

“select distinct Continent from country” then i will get the output as :

Capture

Let say that we want to sort the data in a specific order by “Asia,Africa,North America,South America,Antarctica,Oceania,Europe”.

So in this case we can not perform simply order by because either by ascending or descending , we can not get our required output.

So let’s use the below query :

“select distinct Continent from country

order by field (Continent,’Asia’,

‘Africa’,

‘North America’,

‘South America’,

‘Antarctica’,

‘Oceania’,

‘Europe’)”

And we will get the output as required:

Capture1

Thanks,

Rupam Bhardwaj