Alternative approach of using Insert/Update Step for Upsert in PDI

What is an UPSERT?

UPSERT is a combination of two activities in a table i.e. Update and Insert based upon a unique key(Unique iD). A Relational database uses MERGE JOIN to perform UPSERT operation on data where it updates if there is any change on the existing data and INSERTS if there is a new data based upon certain conditions (identified as unique ID).

How UPSERT works in an ETL?

While loading the data from the source to target, there are two prominent approach implemented. Namely,

  • Truncate and load: Flush out the old data and reload the new data.
  • Insert/Update (Upsert): Update the existing record and Insert if there is a new data. It matches on a particular key or a set of keys to perform Update/Insert. If the keys doesn’t match, then it is an INSERT or else it will update. SCD performs in the same way(SCD-I is an UPSERT whereas SCD-II is an INSERT).

Truncate and load approach are not recommended in some scenarios where the source data only has current data. In case of maintaining historical information as well, we use UPSERT mechanism.

UPSERT in Pentaho Data Integration(PDI) :

There are various components that are used to implement UPSERT in PDI. These components use the unique key as the keys for lookup from the target table and specific operation is performed based upon the condition. The following are the components used for Upsert

  1. Insert/Update(Insert when there is no match and update if there is match)
  2. Dimensional Lookup/Update(This is used to SCD Operations)
  3. Synchronise after Merge(This step needs a flag to execute. Based upon the flags it will insert/update)

The basic algorithm/flow of how the UPSERT functions in PDI is as follows,

Capture1

Disadvantages on the above steps :

Now the components that are present with PDI and the algorithms designed to implement the UPSERT is very slow while handling large amounts of data.

  1. The Insert/Update step does a lookup on all the values in the table and does one to one comparison to either insert or update. If the amount of data is huge, then the component will work very slowly and hence the whole process slows down.
  2. Similar case happens on the dimensional lookup/update and also on the Synchronise after merge step as well. Since it does a lookup on the whole table i.e. it compares with each and every data on the table, it slows down the process.

Alternative approach for the UPSERT technique:

There are certain scenarios where the ETL runs on a regular basis let’s say, on every 12 hours. In that case, the components used to implement the UPSERT technique wont help since they are very slow and if there are huge amounts of data, then its performance will be very slow. So there is a need of optimization.  Optimizing an ETL process  is one of the most important aspects that we need to undertake since it helps in faster processing. The following diagram will help you understand the alternate approach for UPSERT mechanism,

Capture2

This steps mentioned in the above diagram works exactly like the Insert/Update component but with some minor tweaks. The following are the description of each step,

  1. Table Input: Taking input from the Source data that needs to be loaded into the Target.
  2. Add Sequence: The Sequence number that needs to be added as a business key to the table(This can change according to your requirement).
  3. Update: This component works only when there is a need for Update. If there is a new data, this component will fail since it only works when there is a need to update on the existing data.
  4. Table Output: PDI has a functionality that supports error handling of each component. The error handling component of the Update step will bring you to the Table Output component which will INSERT the new data into the target table.

Please see the below diagram for more better understanding,

Capture3

Conclusion:

This approach is faster than the usual Insert/update step since it doesn’t need to lookup if there is a new data. It will directly insert the new data when the Update component fails.

Try once and share your feedback!! 🙂

 

Regards,

Nitish Mishra

Different ways of implementing loop in PDI

Different ways of implementing loop in PDI


We come across certain scenarios where in running of the Job/Transformation in loop becomes necessary in Pentaho. Below are the different ways of implementing the loop to the Job in PDI.

 

1. Using Execute for every input row

Build a job, and let the first transformation retrieve all the rows via Table Input. Use a “Copy rows to result” step to save the result. Create a second transformation configured for single row execution – refer the image advanced transformation settings. Use “Get rows from result” to start single row processing.

blog_3

 

 

2. Use simple evaluation step

blog_1

  • Get the number of count for the loop

  • Assign the count to a variable.

  • Call the variable in the simple evaluation step( as shown in the figure below)

    blog_2

  • Give the appropriate condition according to the requirement.

  • Connect the hop to the job which has to run in a loop.

Handling Rollback and Database transactions in Pentaho Data Integration.

Handling Rollback and Database transactions in Pentaho Data Integration.


 

 

While inserting the data into the database table it often happens that when a job/transformation fails in between the data load the whole transaction gets rolled back.

In general, the table output step in PDI will insert and commit any number of records until there is an error. Consider an example where the ETL process processes 5000 records and the 3999 record has a string instead of an integer (eg. the DDL defines the field as Integer), the job/transformation will come to a sudden halt.

Now the problem is though, that we have 3,999 records in the DB table from this current ETL run.In some situations this might not be an issue, e.g. if you have a CDC process set up which can deal with this (e.g. it can just start off the highest id or timestamp), but in other scenarios, this situation might be a problem.

Pentaho Data Integration provides a transactional feature, which is really quite easy to use:Make sure your DB engine supports transactions! E.g. MySQL MyISAM does not support transactions. So the transformation will run and insert records even though there was an error.

Transactions in transformations: simply by enabling the “Make the transformation database transactional” option in the “Miscellaneous” tab of the transformation settings dialog.

grfedgf

Note: database transactions will not work in the following cases:

  • This will disable the Use batch update for insert option in the Table Output step also ignore the Commit Size setting. So effectively this will slow down the insert operation.
  • if you use database partitioning and multiple connections per step copy need to be created. Performance wise it doesn’t make any sense in that case anyway.

How to avoid duplication of values in the columns in PDI if column names are same?

How to avoid duplication of values in the columns in PDI if column names are same?

There can be requirements where we want same column names in the transformation (PDI). In such case the value of field is overwritten on another field.

For ex: If for a company we want 3 pairs of Loan Id and Loan Amounts , if we give the same name for the columns as per requirement then there will be duplication of data. If we don’t handle this scenario properly this can happen.

Suppose: We have 3 Loan Amounts.

Loan Amount 1: 10$

Loan Amount 2: 20$

Loan Amount 3: 30$

The column names are same i.e Loan Amount, Loan Amount, Loan Amount.If we directly pass the names in the text output then there will be replication , all three amounts will be coming 10$.

In this case we cannot using Select Values step (Renaming) will not solve the replication issue.

To get rid of such situation we can create a separate header transformation and check append in text file output  in the next transformation.

In this case we can keep the name of column like “Loan Amount” for n no of times in the header and for the differentiation purpose in the next step we can keep Loan Amount 1, LoanAmount 2, Loan Amount 3.

Note: For checking append in the text file output follow these steps:

  • Double click on text file output
  • Go to content
  • Check append and uncheck header

Thanks

Scheduling Jobs in Pentaho Data Integration

Scheduling Jobs in Pentaho Data Integration

The following are the steps to schedule a kettle job:

In the main kettle job (.kjb) , there is a START icon should be there. Double click on the icon and it will pop up a job scheduling window as follows:

By default, the type is No Scheduling and the time options are disabled.

1

If we select the type as Interval, it will enable the Interval related time options as shown in the below image:

2

If the type is Daily, then respective time of day is enabled as shown in the image:

3

If the Type is Weekly: Then time of Day and Day of week is enabled.

4

If the Type is Monthly then Time of Day and Day of Month both options are enabled as shown in below image:

5

If you want to repeat the scheduling time you can enable Repeat option in the Job scheduling window.

Thanks you

Lalitha

Command line execution and Redirection Process in Pentaho (Kettle)

Command line execution and Redirection Process in Pentaho (Kettle)


Command line execution in Pentaho (Kettle) :


Pentaho provides two commanline utility tools to execute your job/transformation from outside of spoon.

1.Pan:

Pan is the PDI command line tool for executing transformations.Pan runs transformations, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Pan:

./Pan.sh /file="/home/pdi/command_line_tr.ktr" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1

2.Kitchen:

Kitchen is the PDI command line tool for executing jobs.
Kitchen runs jobs, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Kitchen:

./kitchen.sh /file="/home/pdi/command_line_test.kjb" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1


Capture and redirect job execution logs to a file (Linux) :


We can also capture the execution logs using a file and it is possible to redirect the standard error and standard output with following commands.

what does 2>&1 command means in the script?

2 refers to the second file descriptor of the process, i.e. stderr.

> means redirection.

&1 means the target of the redirection should be the same location as the first file descriptor, i.e. stdout.

So this command first redirects stdout to home/log/command_line_test_job.log and then redirects stderr there as well.

what does redirection means ?

Redirection simply means capturing output from a file, command, program, script, or even code block within a script and sending it as input to another file, command, program, or script.

what does ‘>>’ operator means?

Instead of overwriting file data, you can also append text to an existing file using two subsequent greater-than signs.

Thanks,
Sayagoud

Issues while implementing loops in Pentaho Data Integration

Issues while implementing loops in Pentaho Data Integration

Generally for implementing batch processing we use the looping concept provided by Pentaho in their ETL jobs. The loops in PDI are supported only on jobs(kjb) and it is not supported in transformations(ktr).

While implementing loops in PDI, we have come across many blog suggesting us to use “Wait For” step and join the output hop to the previous step. Look into the below screenshot for more clarification,

However the limitation in this kind of looping is that in PDI this causes recursive stack allocation by JVM during job execution and the system may run out of memory after a high number of iterations (depending the system available available memory). While implementing this, the JVM may run out of memory and the program crashes. So it is not advisable to implement to have higher number of iterations while implementing loops in PDI.

Possible Solutions:

1. The first thing you have to take is to minimize the number of iterations. The looping works properly up to 500 iterations. Try reducing it to less than 500 iterations.

2. Never use loops for scheduling. For scheduling purposes if we use the looping concept, it goes into an infinite loop which crashes the whole program.

3. Increase your batch size so that number of iterations is less. While implementing external batch processing, take this thing into consideration.

4. For incrementing the value, it is advisable to use another separate transformation instead of a javascript because the javascript cosumes more memory compared to a separate transformation. Create a new transformation, use the formula step to increment the values and then set those variables.

5. Suggested approach for infinite looping – One of the possible way is to use the settings of ‘Start’ step. Set the ‘Repeat’ flag and add interval configuration. This cause the job to be re-initialize completely as a new instance and does not cause any memory issue.

 

Thanks,

Nitish Kumar Mishra

Batch process implementation in kettle (Pentaho Data Integration):

Batch process implementation in kettle (Pentaho Data Integration):

In order to implement the batch process we needs to have the looping logic. But I did not find any component or suitable method in kettle to create those loops.

So I have created a way to resolve that problem in one of my project and updated the same here.

Below is the step by step process:

1.Get Batch Details:

 Create a new transformation:

 

get_batch_details

Create a separate transformation and name it as get_batch_details.

Fetch min, max and batch size details from the source system and calculate total number of batches count using java script step.

Calculate total batch number count:

generate_batch_details

Set each variable scope to valid thorough out the root job using set variable step like below

 

2.Create a main job:

Create a main job with the following steps.

        a. configure get_batch_details transformation.
	b. configure for each batch evaluation step
	c. configure your actual transformation with transformation executor step
	(This is your main logic)
	d. configure batch number iteration transformation

Main job will looks like below.

 

 

 

main_job

3.Evaluate and iterate through each batch number in main job:

Evaluation:

In this step we have to evaluate each batch number with the total number of batches like  below.

This process will continue until the condition set to true.

For each step configuration:

 

for_each_batch

 

 

In main job create a variable with the name as batch_no and assign some default value to it.

And make use same variable in evaluation step like above and make sure to have the success condition as If value is greater than and the value as nr_of_batches. (This value we are getting from get_batch_details transformation)

So for the first time batch_no = 1, then it will compare with nr_of_batches each time, if the condition is false then your actual job will execute other wise it will exit from the loop.

Now the next step is to increment the batch_no, so we have to create a batch_iteration transformation like below and configure get and set variable steps accordingly.

batch_increment

 

Increment batch number step:

Use formula step to implement this logic and configure the values as shown like below.

Increment_batch_nr_using_formula_step

We have configured everything properly but the main thing is to how to make use batch info and create start and end flags in our actual transformation.

 

get_batch_info_actual_job

Calculate batch flags using java script step like below.

calculate_batch_flags

So use start flag and end flag details wherever required in the job.

For example:

step 1: calculate total number of batches

      max value = 12000,
       
      batch_size = 4000,

      nr_of_batches = ceil (12000)/4000 = 3 

So in this process it will create 3 different batches with batch size as 4000 each time.

step 2: evaluate batch number and total number of batches in main job.

      nr_of_batches = 3;

      batch_no = 1 (default);

     If (batch_no > nr_of_batches)

     then exit

     else go to actual job. 

So here, 1> 3 which is false then it will go to actual job.

step 3: calculate batch flags in actual transformation.

    var start_flag=0;

    var end_flag=0;

    start_flag=(batch_no - 1)*batch_size + 1;

    end_flag= (batch_no*batch_size);

    start_flag = (1-1)*4000+1;

    end_flag = (1*4000);

    Now start_flag=1 and end_flag = 4000;

So we can use above flag values in our query to fetch data from the sources system using any table input step.

Step 4: Increment batch number

      batch_no = batch_no + 1; 

So now the batch_no = 2 and the same process will continue until the condition set to true.

 

Handling Null values in Talend and Pentaho

Handling Null values in Talend and Pentaho

Handling Null values in Talend:

Make all the columns as Nullable in tMap

Now for the columns which are having Null values you can assign default values by writing  ternary operator as shown in below screen-shot.

talend_tmap

Handling Null values in Pentaho:

In Pentaho, to handle null values we have a component to provide a default value for the null values:

‘If field value is null’ is the component name.

You can assign default value for null values for the fields and also as per the datatypes.

You can configure the component as shown below:

pentaho_null

Thank you.

Lalitha

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly changing dimension(SCD) is a common mechanism in Datawarehousing concepts. The exact definition of SCD is the dimension that changes slowly over a time rather than on a regular schedule. In Data Warehouse there is a need to track changes in dimension attributes in order to report historical data. In other words, implementing one of the SCD types should enable users assigning proper dimension’s attribute value for given date. There are various approaches to deal with the data using SCD. The most commonly used approaches are:

1. SCD Type-I : Update the existing record(Overwrite)

2. SCD Type-II: Creates a new record and set the flag of the new record(historical).

3. SCD Type- III: Creates a new column which keeps the last updated record. Here the history is limited.

In kettle, there are components through which we can implement SCD on the dimension. One such component is the dimensional lookup/update.

SCD

The dimension lookup/update component allows to perform the Type-I and Type-II approach of SCD.

.SCD_1

Keys:The keys are used to lookup the values with the destination table(dimension).

Technical key: It is basically the surrogate key which will be created if a new record is found.

SCD_2

Fields: The fields columns are the fields that are present on the dimension table on which you want to perform operations.A number of optional fields (in the “Fields” tab) are automatically managed by the step. You can specify the table field name in the “Dimension Field” column.

Type of Dimension update:

Insert: This is SCD-II mechanism where the a new row is inserted if changes are found based on the lookup. If the new record coming from the source table is not found then it will insert. if the changes are found on the table based on the lookup values, a new record is inserted.

Update: This is a conventional SCD-I approach . These attributes in the last dimension record version are updated. If we keep all the fields as Update then it performs SCD-I approach. If some of the fields are Insert and some are update then it applies SCD-II and the fields in which Update is applied will only update the data based upon the last version.

Punch Through: This mechanism is applied on those fields where the data changes very rarely and if it changes it is just a correction. For example, in case of names of the products or null values in price columns. It is also used for SCD_I but its slower than the update mechanism.

Note: If u mix Insert,update and punch through in one dimension, it works like hybrid slowly changing mechanism which is of type-6.

Date of last insert or update (without stream field as source) : adds and manges a Date field.

Date of last insert (without stream field as source) : adds and manges a Date fieldDate of last update (without stream field as source) : adds and manges a Date field.

Last version (without stream field as source) : adds and manges a Boolean field. (converted into Char(1) or boolean database data type depending on your database connection settings and availability of such data type). This acts as a current valid dimension entry entry indicator for the last version: So when a type II attribute changes and a new version is created (to keep track of the history) the ‘Last version’ attribute in the previous version is set to ‘False/N’ and the new record with the latest version is set to ‘True/Y’.

Note: This dimension entry is added automatically to the dimension table when the update is first run. If you have “NOT NULL” fields in your table, adding this empty row and then the entire step will fail! So make sure that you have a record with the ID field = 0 or 1 in your table if you don’t want PDI to insert a potentially invalid empty record.

This component will work faster if you apply the caching mechanism. A cache size of 0 caches as many rows as possible and until your JVM runs out of memory. Use this option wisely with dimensions that can’t grow too large. A cache size of -1 means that caching is disabled.

Note: There are various other components which can perform SCD in kettle. The Insert/update can also be used which performs SCD-II mechanism. If a new record is found then it will insert or else it will update the dimension. Similarly, the Update step performs SCD-I mechanism. However these components are very slow compared to dimension lookup/update. Similarly, you can use Merge Join(Diff) and then use Synchronise after merge which also performs SCD mechanism. This is fastest among all the above but however the number of records on both the dimension and staging should be same otherwise it wont work.

Thanks,

Nitish