Handling Rollback and Database transactions in Pentaho Data Integration.

Handling Rollback and Database transactions in Pentaho Data Integration.


 

 

While inserting the data into the database table it often happens that when a job/transformation fails in between the data load the whole transaction gets rolled back.

In general, the table output step in PDI will insert and commit any number of records until there is an error. Consider an example where the ETL process processes 5000 records and the 3999 record has a string instead of an integer (eg. the DDL defines the field as Integer), the job/transformation will come to a sudden halt.

Now the problem is though, that we have 3,999 records in the DB table from this current ETL run.In some situations this might not be an issue, e.g. if you have a CDC process set up which can deal with this (e.g. it can just start off the highest id or timestamp), but in other scenarios, this situation might be a problem.

Pentaho Data Integration provides a transactional feature, which is really quite easy to use:Make sure your DB engine supports transactions! E.g. MySQL MyISAM does not support transactions. So the transformation will run and insert records even though there was an error.

Transactions in transformations: simply by enabling the “Make the transformation database transactional” option in the “Miscellaneous” tab of the transformation settings dialog.

grfedgf

Note: database transactions will not work in the following cases:

  • This will disable the Use batch update for insert option in the Table Output step also ignore the Commit Size setting. So effectively this will slow down the insert operation.
  • if you use database partitioning and multiple connections per step copy need to be created. Performance wise it doesn’t make any sense in that case anyway.

How to avoid duplication of values in the columns in PDI if column names are same?

How to avoid duplication of values in the columns in PDI if column names are same?

There can be requirements where we want same column names in the transformation (PDI). In such case the value of field is overwritten on another field.

For ex: If for a company we want 3 pairs of Loan Id and Loan Amounts , if we give the same name for the columns as per requirement then there will be duplication of data. If we don’t handle this scenario properly this can happen.

Suppose: We have 3 Loan Amounts.

Loan Amount 1: 10$

Loan Amount 2: 20$

Loan Amount 3: 30$

The column names are same i.e Loan Amount, Loan Amount, Loan Amount.If we directly pass the names in the text output then there will be replication , all three amounts will be coming 10$.

In this case we cannot using Select Values step (Renaming) will not solve the replication issue.

To get rid of such situation we can create a separate header transformation and check append in text file output  in the next transformation.

In this case we can keep the name of column like “Loan Amount” for n no of times in the header and for the differentiation purpose in the next step we can keep Loan Amount 1, LoanAmount 2, Loan Amount 3.

Note: For checking append in the text file output follow these steps:

  • Double click on text file output
  • Go to content
  • Check append and uncheck header

Thanks

Scheduling Jobs in Pentaho Data Integration

Scheduling Jobs in Pentaho Data Integration

The following are the steps to schedule a kettle job:

In the main kettle job (.kjb) , there is a START icon should be there. Double click on the icon and it will pop up a job scheduling window as follows:

By default, the type is No Scheduling and the time options are disabled.

1

If we select the type as Interval, it will enable the Interval related time options as shown in the below image:

2

If the type is Daily, then respective time of day is enabled as shown in the image:

3

If the Type is Weekly: Then time of Day and Day of week is enabled.

4

If the Type is Monthly then Time of Day and Day of Month both options are enabled as shown in below image:

5

If you want to repeat the scheduling time you can enable Repeat option in the Job scheduling window.

Thanks you

Lalitha

Command line execution and Redirection Process in Pentaho (Kettle)

Command line execution and Redirection Process in Pentaho (Kettle)


Command line execution in Pentaho (Kettle) :


Pentaho provides two commanline utility tools to execute your job/transformation from outside of spoon.

1.Pan:

Pan is the PDI command line tool for executing transformations.Pan runs transformations, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Pan:

./Pan.sh /file="/home/pdi/command_line_tr.ktr" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1

2.Kitchen:

Kitchen is the PDI command line tool for executing jobs.
Kitchen runs jobs, either from a PDI repository (database or enterprise), or from a local file.

Go to PDI installation directory path and execute the jobs using following command with Kitchen:

./kitchen.sh /file="/home/pdi/command_line_test.kjb" /param:"name=pentaho" /param:"org_id=1" /param:"instance_id=1" /param:"batch_size=1000" /level:"Detailed" >> "home/log/command_line_test_job.log" 2>&1


Capture and redirect job execution logs to a file (Linux) :


We can also capture the execution logs using a file and it is possible to redirect the standard error and standard output with following commands.

what does 2>&1 command means in the script?

2 refers to the second file descriptor of the process, i.e. stderr.

> means redirection.

&1 means the target of the redirection should be the same location as the first file descriptor, i.e. stdout.

So this command first redirects stdout to home/log/command_line_test_job.log and then redirects stderr there as well.

what does redirection means ?

Redirection simply means capturing output from a file, command, program, script, or even code block within a script and sending it as input to another file, command, program, or script.

what does ‘>>’ operator means?

Instead of overwriting file data, you can also append text to an existing file using two subsequent greater-than signs.

Thanks,
Sayagoud

Issues while implementing loops in Pentaho Data Integration

Issues while implementing loops in Pentaho Data Integration

Generally for implementing batch processing we use the looping concept provided by Pentaho in their ETL jobs. The loops in PDI are supported only on jobs(kjb) and it is not supported in transformations(ktr).

While implementing loops in PDI, we have come across many blog suggesting us to use “Wait For” step and join the output hop to the previous step. Look into the below screenshot for more clarification,

However the limitation in this kind of looping is that in PDI this causes recursive stack allocation by JVM during job execution and the system may run out of memory after a high number of iterations (depending the system available available memory). While implementing this, the JVM may run out of memory and the program crashes. So it is not advisable to implement to have higher number of iterations while implementing loops in PDI.

Possible Solutions:

1. The first thing you have to take is to minimize the number of iterations. The looping works properly up to 500 iterations. Try reducing it to less than 500 iterations.

2. Never use loops for scheduling. For scheduling purposes if we use the looping concept, it goes into an infinite loop which crashes the whole program.

3. Increase your batch size so that number of iterations is less. While implementing external batch processing, take this thing into consideration.

4. For incrementing the value, it is advisable to use another separate transformation instead of a javascript because the javascript cosumes more memory compared to a separate transformation. Create a new transformation, use the formula step to increment the values and then set those variables.

5. Suggested approach for infinite looping – One of the possible way is to use the settings of ‘Start’ step. Set the ‘Repeat’ flag and add interval configuration. This cause the job to be re-initialize completely as a new instance and does not cause any memory issue.

 

Thanks,

Nitish Kumar Mishra

Batch process implementation in kettle (Pentaho Data Integration):

Batch process implementation in kettle (Pentaho Data Integration):

In order to implement the batch process we needs to have the looping logic. But I did not find any component or suitable method in kettle to create those loops.

So I have created a way to resolve that problem in one of my project and updated the same here.

Below is the step by step process:

1.Get Batch Details:

 Create a new transformation:

 

get_batch_details

Create a separate transformation and name it as get_batch_details.

Fetch min, max and batch size details from the source system and calculate total number of batches count using java script step.

Calculate total batch number count:

generate_batch_details

Set each variable scope to valid thorough out the root job using set variable step like below

 

2.Create a main job:

Create a main job with the following steps.

        a. configure get_batch_details transformation.
	b. configure for each batch evaluation step
	c. configure your actual transformation with transformation executor step
	(This is your main logic)
	d. configure batch number iteration transformation

Main job will looks like below.

 

 

 

main_job

3.Evaluate and iterate through each batch number in main job:

Evaluation:

In this step we have to evaluate each batch number with the total number of batches like  below.

This process will continue until the condition set to true.

For each step configuration:

 

for_each_batch

 

 

In main job create a variable with the name as batch_no and assign some default value to it.

And make use same variable in evaluation step like above and make sure to have the success condition as If value is greater than and the value as nr_of_batches. (This value we are getting from get_batch_details transformation)

So for the first time batch_no = 1, then it will compare with nr_of_batches each time, if the condition is false then your actual job will execute other wise it will exit from the loop.

Now the next step is to increment the batch_no, so we have to create a batch_iteration transformation like below and configure get and set variable steps accordingly.

batch_increment

 

Increment batch number step:

Use formula step to implement this logic and configure the values as shown like below.

Increment_batch_nr_using_formula_step

We have configured everything properly but the main thing is to how to make use batch info and create start and end flags in our actual transformation.

 

get_batch_info_actual_job

Calculate batch flags using java script step like below.

calculate_batch_flags

So use start flag and end flag details wherever required in the job.

For example:

step 1: calculate total number of batches

      max value = 12000,
       
      batch_size = 4000,

      nr_of_batches = ceil (12000)/4000 = 3 

So in this process it will create 3 different batches with batch size as 4000 each time.

step 2: evaluate batch number and total number of batches in main job.

      nr_of_batches = 3;

      batch_no = 1 (default);

     If (batch_no > nr_of_batches)

     then exit

     else go to actual job. 

So here, 1> 3 which is false then it will go to actual job.

step 3: calculate batch flags in actual transformation.

    var start_flag=0;

    var end_flag=0;

    start_flag=(batch_no - 1)*batch_size + 1;

    end_flag= (batch_no*batch_size);

    start_flag = (1-1)*4000+1;

    end_flag = (1*4000);

    Now start_flag=1 and end_flag = 4000;

So we can use above flag values in our query to fetch data from the sources system using any table input step.

Step 4: Increment batch number

      batch_no = batch_no + 1; 

So now the batch_no = 2 and the same process will continue until the condition set to true.

 

Handling Null values in Talend and Pentaho

Handling Null values in Talend and Pentaho

Handling Null values in Talend:

Make all the columns as Nullable in tMap

Now for the columns which are having Null values you can assign default values by writing  ternary operator as shown in below screen-shot.

talend_tmap

Handling Null values in Pentaho:

In Pentaho, to handle null values we have a component to provide a default value for the null values:

‘If field value is null’ is the component name.

You can assign default value for null values for the fields and also as per the datatypes.

You can configure the component as shown below:

pentaho_null

Thank you.

Lalitha

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly Changing Dimension in Pentaho Data Integration(Kettle)

Slowly changing dimension(SCD) is a common mechanism in Datawarehousing concepts. The exact definition of SCD is the dimension that changes slowly over a time rather than on a regular schedule. In Data Warehouse there is a need to track changes in dimension attributes in order to report historical data. In other words, implementing one of the SCD types should enable users assigning proper dimension’s attribute value for given date. There are various approaches to deal with the data using SCD. The most commonly used approaches are:

1. SCD Type-I : Update the existing record(Overwrite)

2. SCD Type-II: Creates a new record and set the flag of the new record(historical).

3. SCD Type- III: Creates a new column which keeps the last updated record. Here the history is limited.

In kettle, there are components through which we can implement SCD on the dimension. One such component is the dimensional lookup/update.

SCD

The dimension lookup/update component allows to perform the Type-I and Type-II approach of SCD.

.SCD_1

Keys:The keys are used to lookup the values with the destination table(dimension).

Technical key: It is basically the surrogate key which will be created if a new record is found.

SCD_2

Fields: The fields columns are the fields that are present on the dimension table on which you want to perform operations.A number of optional fields (in the “Fields” tab) are automatically managed by the step. You can specify the table field name in the “Dimension Field” column.

Type of Dimension update:

Insert: This is SCD-II mechanism where the a new row is inserted if changes are found based on the lookup. If the new record coming from the source table is not found then it will insert. if the changes are found on the table based on the lookup values, a new record is inserted.

Update: This is a conventional SCD-I approach . These attributes in the last dimension record version are updated. If we keep all the fields as Update then it performs SCD-I approach. If some of the fields are Insert and some are update then it applies SCD-II and the fields in which Update is applied will only update the data based upon the last version.

Punch Through: This mechanism is applied on those fields where the data changes very rarely and if it changes it is just a correction. For example, in case of names of the products or null values in price columns. It is also used for SCD_I but its slower than the update mechanism.

Note: If u mix Insert,update and punch through in one dimension, it works like hybrid slowly changing mechanism which is of type-6.

Date of last insert or update (without stream field as source) : adds and manges a Date field.

Date of last insert (without stream field as source) : adds and manges a Date fieldDate of last update (without stream field as source) : adds and manges a Date field.

Last version (without stream field as source) : adds and manges a Boolean field. (converted into Char(1) or boolean database data type depending on your database connection settings and availability of such data type). This acts as a current valid dimension entry entry indicator for the last version: So when a type II attribute changes and a new version is created (to keep track of the history) the ‘Last version’ attribute in the previous version is set to ‘False/N’ and the new record with the latest version is set to ‘True/Y’.

Note: This dimension entry is added automatically to the dimension table when the update is first run. If you have “NOT NULL” fields in your table, adding this empty row and then the entire step will fail! So make sure that you have a record with the ID field = 0 or 1 in your table if you don’t want PDI to insert a potentially invalid empty record.

This component will work faster if you apply the caching mechanism. A cache size of 0 caches as many rows as possible and until your JVM runs out of memory. Use this option wisely with dimensions that can’t grow too large. A cache size of -1 means that caching is disabled.

Note: There are various other components which can perform SCD in kettle. The Insert/update can also be used which performs SCD-II mechanism. If a new record is found then it will insert or else it will update the dimension. Similarly, the Update step performs SCD-I mechanism. However these components are very slow compared to dimension lookup/update. Similarly, you can use Merge Join(Diff) and then use Synchronise after merge which also performs SCD mechanism. This is fastest among all the above but however the number of records on both the dimension and staging should be same otherwise it wont work.

Thanks,

Nitish

Audit Logs in Pentaho Data Integration (PDI)

Audit Logs in Pentaho Data Integration

Audit Logs at Job level and Transformation Level are very useful for ETL projects to track the details regarding Job name, Start Date, End Date, Transformation Name, Error,Number of Lines Read, Number of Line Write, Number of lines from Input, Number of Lines in output etc.

I have created a simple PDI Job to track audit logs at Job level, Job entry level and Transformation Level in Mysql database. Please find the process as follows:

Created a Kettle Job FileToDB.kjb which contains two job entries (Start and Transformation) and one database connection(PSQL_Log). I have created a separate database ‘PDI_Audit_Logs’ in Mysql DB. I configured this DB details as database connection.

FileToDB_KJB

Right click on the work area and select Job settings as shown in below screen shot.

jobselect1

It will pop up a Job Properties window. Click on Log tab. Select ‘Job entry log table’ option, it will display the variable names under this section. You can enable all the variable are if you consider default variables is also fine. Now configure the log table connection as shown in below screen shot:

Jobproperties

Log Connection–> We have already created this connection.(In this case MYSQL DB)

Log Schema–>Existed Database Name

**Log Table–> Table Name (in which you want to store the tracked details). It can be existed table name.  We will see the process for new table in the later steps.

Log line timeout–>Number of days that you want to store the log details.

**If you are creating the log table first time you have to click on SQL button which is at the below otherwise click on Ok button. So SQL button will pop up SQL statement to create this table with the above shown column names as . I gave log table name as “FileToDB_JobEntryLog” My job name and level of logging.

sql1

click on Execute button, it will create that table in the given schema/database. And will prompt below window. click on OK and then click on close of SQL editor.

ex1

So Job Entry level Log configuration is done. Now select Job Log table section to configure the log connection for Job level. Configure the values as shown in below Screen shot.

And for Log Table Name, same steps have to follow as explained above .

job1

Now Job Audit Logs settings are done. Open the transformation. it has system info, file input and DB output steps connected with Hops. And the DB output schema is different and Audit Logs schema is different So we have created two connection PSQL and PSQL_Log.

Trans_img

Right click on the work area and select Transformation Settings.

trans_set

It will pop up Transformation Properties window. Select Logging tab.And then click on Transformation option. It will display the variables that can be tracked from this option.

trans2

For the fields LINES_READ,LINES_WRITTEN,LINES_UPDATED,LINES_INPUT,LINES_OUTPUT,LINES_REJECTED should be given a step names that it should track from which step accordingly.

Same like Job settings here also you have to configure Log connections and click on SQL button if it is first time to create the Log Table Name otherwise click on OK.

Now Audit Logs created for transformation also. So execute the Job. And check in the database for these Log tables. It will create 3 tables (job Entry level, job level, Transformation level ) under the given schema.

Job Entry Level Table:

As there are two Job entries it generated the details for both the job entries

job_entry_log_table

Job Level Log Table:

joblogtable

Transformation Level Log Table:

translogtable

Hope this topic will be useful.

Thank you.

Define JNDI for Pentaho BA server 5.0

Define JNDI for Pentaho BA server 5.0

Note: For illustration I’m showing Oracle 11g configuration. Replace the Resource name, username, password, driverClassName, and url parameters, or any relevant connection settings.

Add Driver

Place appropriate driver for Oracle 11g which is ojdbc6-11.2.0.2.jar to this directory for the BA Server: /pentaho/server/biserver-ee/tomcat/lib/.

Note: There should be only one driver for one database in this directory. Ensure that there are no other versions of the same vendor’s driver in this directory. If there are, back up the old driver files and remove them to avoid version conflicts.

Specify JNDI connections

  1. Stop server & BA server.
  2. Edit the /tomcat/webapps/pentaho/WEB-INF/web.xml file.
  3. At the end of the <web-app> element, in the same part of the file where you see <!– insert additional resource-refs –>, add this XML snippet.
  4. myDatasource
    jdbc/ myDatasource
    javax.sql.DataSource
    Container

  5. Save & Close web.xml
  6. Open & modify this file /tomcat/webapps/pentaho/META-INF/context.xml.
  7. Add this XML snippet anywhere in context.xml
  8. Save & Close context.xml
  9. Open simple jndi
  10. Open & Modify this file biserver-ce/pentaho-solutions/system/simple-jndi/jdbc.properties.
  11. Add these lines
  12. myDatasource /type=javax.sql.DataSource
    myDatasource /driver= oracle.jdbc.OracleDriver
    myDatasource /url= jdbc:oracle:thin:@//localhost:1521/sysdba
    myDatasource /user=dbuser
    myDatasource /password=password

  13. Save & Close jdbc.properties.
  14. Delete the pentaho.xml filed located in the /tomcat/conf/catalina/directory.
  15. Start tomcat & BA server.