Featured Post

Applying Email Validation to a JavaFX TextField Using Binding

This example uses the same controller as in a previous post but adds a use case to support email validation.  A Commons Validator object is ...

Wednesday, June 22, 2011

Dynamic Schemas in Talend Open Studio

When working with input data from a source like a spreadsheet, which is subject to editing by users, you can't always count on a strict format.  For instance, the columns may be reordered.  But if the spreadsheet contains column headers that are still accurate, you can map it to a schema using Talend Open Studio.

In Talend Open Studio (TOS), you define schemas to describe the purpose and data types for a source or target.  With flat files or spreadsheets, TOS is strict about associating the column position with a particular field definition.  If the first field is designated as "First Name" in the File Excel wizard, the input is expected to contain "First Name" in its first field despite what the column header says.


To build a schema on-the-fly, take two passes on the input file.  First, record the header information in a data structure to be used in later processing.  For each input record, consult the data structure.

  The Talend Exchange has a good example of this in "Example job for the dynamic schema routine" at http://www.talendforge.org/exchange/tos/extension_view.php?eid=177 (rbaldwin).  The job uses Java Reflection wrapped up in a Talend Routine.

The following job is an alternative example that is identical in its algorithm, but varies in the Java and the set of Talend Components.

A Talend Job Reading a Header Prior to Processing

Algorithm

The file is processed in three stages

  1. Initialize a data structure
  2. Read in and record the header
  3. For each input record, map the fields to the correct schema using the header data structure
The header data structure is a java.util.Map.  This is created and associated with the globalMap in a tSetGlobalVar.  There are also Java imports in the Advanced settings for java.util.* and java.lang.reflect.*.

The tSetGlobalVar creates the java.util.Map.  In the "Basic settings" of the tSetGlobalVar, add an entry with key "fieldMap".  This will reference the following object.

  new HashMap<String, String>()

"fieldMap" should be quoted.  Don't quote the new command.

Record the Header

The header is read in using a tFileInputDelimited which is configured to stop after the first record (Limit=1).  The tFileInputDelimited is based on a loose schema where each field is a type and there is one field for each possible input column.  The names of the fields are left generic so that they aren't mislabed if a different column order is used.

A tJavaRow calls the put() method on the header Map.  The key of the map is the true field name, taken from the first line of the input file.  The value of the map is the corresponding column name.  For example, if the first column header is "Field_B" and the first field of the loose tFileInputDelimited schema is "Column0", then Field_B/Column0 is recorded.

Here is the Java Reflection code used in the tJavaRow_3 component.  For each field in the input row, record the first line's text (it's only reading in one line) and the column name of the loose schema.

Map<String, String> fieldMap = (Map<String, String>)globalMap.get("fieldMap");

for( Field f : row1.getClass().getFields() ) {
   fieldMap.put((String)f.get(row1), f.getName());
}


Input Processing

The file is processed in a second pass.  The same tFileDelimited component is used (same file, same schema).  However, the second pass will not use Limit=1, but will use Header=1.  This skips the header and reads in the rest of the file.

This job uses a tMap to establish the target schema.  The tMap controls the eventual ordering of the columns.
Map Using Dynamic Column Values

 Vars are used to reduce the amount of Java code in the target mapping.  Each Var reads in the mapped values of the fieldMap using the target field name as a key.  The mapped values are used in the OUT schema.

Video

This is a video walkthrough of the dynamic schema job.


From this example, and rbaldwin's on Talend Exchange, you can see that it's feasible to accept input files where the column order is rearranged.  However, you will need to work with Java (or Perl) to extend the standard File Delimited or File Excel wizards.

Update for 5.x

I think there may have been a change in the imports available to Java embedded in Talend components since I wrote this post.  I don't have a 4.x around to verify.  But to fix this, you have two options.

  1. Fully qualify the Java classes.  Replace each "HashMap" with "java.util.HashMap" and each "Map" with "java.util.Map".
  2. Use a tJava to add the imports.  See the screenshot below for an job that starts with a tJava leading into the tSetGlobalVar.
Adding Imports to the Job
Troubleshooting

A few comments have mentioned an NPE on the searchFields() method.  If you get an error like this

Exception in thread "main" java.lang.NullPointerException
at java.lang.Class.searchFields(Class.java:2956)
at java.lang.Class.getField0(Class.java:2975)
at java.lang.Class.getField(Class.java:1701)

at com.bekwam.resignator.ReflTest.main(ReflTest.java:13)

It's from a null being passed into the getField() call.  For example,

row1.getClass().getField(null).get(row1)

Another potential problem is "fieldMap".  If that String is different than the Map added in the preceding step, then the code that I have in the Var section will throw an NPE.


63 comments:

  1. Thank you very much for this post, it really helped me a lot. I have subscribed to your RSS feed, interesting stuff...

    ReplyDelete
  2. in tjavaRow component
    for(Field f: row1.getClass().getFields())
    what is the code of Field class here, can you please type full code as i am not a java guy so its difficult for me to write it
    mail me at buddyanchit@gmail.com

    ReplyDelete
    Replies
    1. This is the full Java code for the example. Make sure that you follow the import step described in "Algorithm".

      1. On the tJavaRow, select the Advanced settings tab.
      2. Add the following lines to the Import box

      import java.util.*;
      import java.lang.reflect.*;

      Delete
  3. thanks carl for ur help..some errors are removed..
    now these errors are upfronting...
    Exception in component tJavaRow_1
    java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map
    at anchit.anchit_0_1.anchit.tFileInputDelimited_1Process(anchit.java:679)
    at anchit.anchit_0_1.anchit.tSetGlobalVar_1Process(anchit.java:364)
    at anchit.anchit_0_1.anchit.runJobInTOS(anchit.java:1779)
    at anchit.anchit_0_1.anchit.main(anchit.java:1643)
    JOB NAME IS "anchit"

    and can u send me your e-mail id so we can be in contact...i have mentioned mine above....
    thanks...

    ReplyDelete
    Replies
    1. Hi,

      Take a look at the screenshot "Map Using Dynamic Column Values" and make sure that the entries in the Var panel match what you're using. Pay particular attention to the parenthesis.

      My email is bekwam@ gmail.com.

      Delete
    2. I just checked all my entries but I am still getting the java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map error. Could you please help me on this?

      Delete
  4. Error is not in tmap
    it's in tjavarow component...and these are run time error...No compile time errors are there.

    ReplyDelete
    Replies
    1. Double-check the code in the tSetGlobalVar. See the video at :28 seconds in.

      The problem may be with this line

      (Map)globalMap.get("fieldMap")

      which is storing a String under "fieldMap" rather than a Map.

      I'm adding the tSetGlobalVar code in the video to the post under the "Algorithm" section.

      Delete
  5. ya thanks carl, that job is done.

    Regards,
    Anchit

    ReplyDelete
  6. Hi Carl,

    Can Dynamic Schema be applied on addition of columns also in TOS, as if the source file is updated by adding a new column then the same job should take that additional column by itself.

    Thanks in advance
    Anchit

    ReplyDelete
    Replies
    1. Hi Anchit,

      I haven't seen this in Open Studio. Enterprise has a feature that may be what you're looking for.

      Delete
  7. Hi Carl,

    It's ok about additional columns.
    How can we edit automap funtionality of tmap component in TOS and make that automapping as direct mapping without matching column names.

    Thanks in advance
    Anchit

    ReplyDelete
  8. Hi,

    I used a "Match By Position" setting in Pervasive Data Integrator as an alternative to "Match By Name", but haven't found a similar feature in Talend. Not only are matching names important in Talend, but order is also. I found that two schemas -- say from a tMap into a tOracleOutput -- will produce an error if the order isn't matched manually or Sync Columns is pressed.

    ReplyDelete
  9. Hi Carl,

    Thanks for the information, but anyhow can we edit tmap in talend to use "automap" option as match by position.
    By writing some java code or removing any code from tmap.

    Regards
    Anchit

    ReplyDelete
  10. Hi Carl,

    I have a similar query as above. Can we map the columns dynamically?
    I mean in tMap we have to map columns manually. But lets say we have a file with input and out column names which needs to be mapped. Can we somehow read this file and then map the columns accordingly? Any workaround?

    Thanks,
    Kaushal

    ReplyDelete
    Replies
    1. Hi,

      It sounds like you don't want to use tMap at all and rely on the positions of your source/target mapping file to serve as the map.

      If this is the case, you can expand on this blog post by including two other data structures. The blog post defines a fieldMap which is a mapping of the input fields to the column-oriented position (for example, LastName/Column0, FirstName/Column1). You need the same thing for the output fields, but in reverse (for example, Column0/LN, Column1/FN). Finally, you'll need a map for the source / target mapping (LastName/LN, FirstName/FN).

      Use a tJavaRow to process the input coming from a component like a tMySqlInput. The tMySqlInput will be based on a general schema with sufficient colums for the mappings and with basic datatypes. Such as schema could be Column0:String, Column1: String, Column2: String. In the tJavaRow, lookup the input field name based on position. Take this field name, and lookup the output field name in the source / target mapping. Write the output field by looking up the position (Column0) from the output field mapping based on what you've gathered from the source / target mapping.

      The basic structure of the job will include 3 flows that take 3 passes on the source / target mapping field to produce fieldmap, outputfieldmap, and sourcetargetmap. An input component -- say a tMySqlInput -- is based on a loosely defined schema with columns for each of the fields left with their positional names (Column1, Column2). This components is run into a tJavaRow that applies the maps. Finally, write the output with an output component with a positional-based schema (Column1, Column2).

      Good luck

      Delete
    2. Correction: The output field map doesn't need to be in reverse (Column0/LN) but rather structured so that the field name from the mapping file is the key (LN / Column0).

      Delete
  11. Exception in component tMap_1
    java.lang.NullPointerException
    at java.lang.Class.searchFields(Unknown Source)
    at java.lang.Class.getField0(Unknown Source)
    at java.lang.Class.getField(Unknown Source)
    at projectname.internet_scenario_0_1.INTERNET_SCENARIO.tFileInputDelimited_2Process(INTERNET_SCENARIO.java:1481)
    at projectname.internet_scenario_0_1.INTERNET_SCENARIO.tFileInputDelimited_1Process(INTERNET_SCENARIO.java:840)
    at projectname.internet_scenario_0_1.INTERNET_SCENARIO.tSetGlobalVar_1Process(INTERNET_SCENARIO.java:441)
    at projectname.internet_scenario_0_1.INTERNET_SCENARIO.runJobInTOS(INTERNET_SCENARIO.java:1890)
    at projectname.internet_scenario_0_1.INTERNET_SCENARIO.main(INTERNET_SCENARIO.java:1751)

    i am getting this error.can you please help me ..

    ReplyDelete
    Replies
    1. Is this related to Dynamic Schemas or some other error?

      Delete
  12. Hi Carl,

    I tried this exercise, but got an error that HashMap can not be resolved as type.

    I am using TOS 5.4 version.

    Could you please help me.

    Thanks

    ReplyDelete
    Replies
    1. Hi. I put a screenshot up in the post that should help. It's an old post and I think there were some implied imports in previous versions of Talend.

      Delete
  13. here in this scenario if the incoming file column names change,then how to fetch them dynamically in talend open studio ?

    ReplyDelete
    Replies
    1. You can look at the first line of the file (the header) and parse the results into a Java structure called a Map. As you step through the tokens, you can build up this data structure using a LinkedHashMap which will retain the positions for later mapping using this technique. For example, chopping up the string "first_name,last_name" would result in two map records first_name/0 and last_name/1.

      Take a look at the File Positional definition in the metadata section. Maybe that can implement your requirements off-the-shelf.

      Delete
    2. Thanks Carl, any additional tuto?

      Delete
  14. Hello,
    I used your solution to load an excel file with dynamic schema. It was working until the total number of columns exceed 53 columns.
    I want to extract 53 columns in my excel file but when an extra column is added between theses 53 columns and not after the last one, I get this error :
    Exception in component tMap_4
    java.lang.NullPointerException
    at java.lang.Class.searchFields(Unknown Source)
    at java.lang.Class.getField0(Unknown Source)
    at java.lang.Class.getField(Unknown Source)
    E.tFileInputExcel_5Process(E.java:5747)
    E.tFileInputExcel_3Process(E.java:2885)
    E.tFileList_1Process(E.java:1091)
    E.tFileDelete_1Process(E.java:835)
    E.runJobInTOS(E.java:15239)
    E.main(E.java:14961).
    I tried to fix the initial capacity of hashmap in the tsetGlobalVar without success (new HashMap(100000) ).
    Do you know why? Thank you.

    ReplyDelete
    Replies
    1. It seems like something is missing rather than a problem with the capacity. Try adding pair of System.out sbefore the get() call in the for loop.

      > System.out.println("f=" + f);
      > System.out.println("f.row1=" + f.get(row1));
      fieldMap.put((String)f.get(row1), f.getName());

      Delete
  15. Dear Carl,
    Can you please share your actual job so that we can just import as item and can run in our talend. May be upload in your Google drive and share link.
    Thanks!

    ReplyDelete
    Replies
    1. Hi,

      Unfortunately, I lost some of the examples in this blog to a hard drive crash back in 2011.

      Delete
  16. Hi Carl,

    I have tried to follow the tutorial but I am getting the below error in the t_Map component:

    Exception in component tMap_1
    java.lang.NullPointerException
    at firstproject.dynamicschema_0_1.DynamicSchema.tFileInputDelimited_2Process(DynamicSchema.java:1688)
    at firstproject.dynamicschema_0_1.DynamicSchema.tFileInputDelimited_1Process(DynamicSchema.java:1001)
    at firstproject.dynamicschema_0_1.DynamicSchema.tSetGlobalVar_1Process(DynamicSchema.java:575)
    at firstproject.dynamicschema_0_1.DynamicSchema.tJava_1Process(DynamicSchema.java:460)
    at firstproject.dynamicschema_0_1.DynamicSchema.runJobInTOS(DynamicSchema.java:2111)
    at firstproject.dynamicschema_0_1.DynamicSchema.main(DynamicSchema.java:1968)

    ReplyDelete
    Replies
    1. Hi,

      Have you followed my troubleshooting suggestion from January 29th? It's a few messages up.

      Delete
    2. Hi Carl,

      Thanks for your reply. I have tried your suggestion, and the output is:

      f=public java.lang.String firstproject.dynamicschema_0_1.DynamicSchema$row1Struct.Column0
      f.row1=Field_A
      f=public java.lang.String firstproject.dynamicschema_0_1.DynamicSchema$row1Struct.Column1
      f.row1=Field_B
      f=public java.lang.String firstproject.dynamicschema_0_1.DynamicSchema$row1Struct.Column2
      f.row1=Field_C

      And I am still getting the same error.

      Delete
    3. Can you verify the settings in the tMap screenshot with the tMap? Specifically, make sure that there is an object in globalMap called "fieldMap" and that each of the field lookup values is of the property case.

      Delete
    4. Hi Carl,

      Thanks for your reply again. I have identified the problem. A novice mistake (which I am).

      The problem is, the HashMap I have created is named "fieldMap", but while using it in GlobalMap I was referring to it as "FieldMap". It was a problem of one uppercase letter.

      Really sorry for wasting your time, I am feeling really dumb now.

      Delete
  17. No problem. You're anonymous. We'll keep this exchange up for the next person.

    ReplyDelete
  18. Hello Carl,

    I followed your approcach but it is giving nullpointerException whenever there is NULL in the source column. How can we handle this? Although all the columns are selected as NULLable in the mapping.

    ReplyDelete
    Replies
    1. I posted an update that might help solve your NPE.

      Delete
  19. Nice tutorial, I was searching for something like this for quite a while. Thanks a lot

    ReplyDelete
  20. I'm not sure if this post is still active. But, I'm getting two errors on the tJavaRow component, "row1 cannot be resolved", "row1 cannot be resolved to a variable".

    I added tJava component as you recommended, because I'm using 5.5.1.

    ReplyDelete
  21. Hi Carl
    i m getting error on this tJavaRow
    Field cannot be resolved to a type
    and Type mismatch: can not convert from element type java.lang.Reflect.Field to Field

    ReplyDelete
    Replies
    1. Have you followed the "import" instructions that are in italics under the Algorithm section?

      Delete
    2. Thank you very much Carl it works for me after adding in import if tJava

      import java.util.*;
      import java.lang.reflect.*;

      Delete
  22. Hi Carl,
    I have created the dynamic schema for the tFileInputDelimited as it has the option limit for selecting first row we can get column names.But i m trying this with tHDFSInput but it dont have the option limit option to get first row any idea that how can we get first row from the tHDFSInput component for the column names.
    Please suggest any other way to find.

    Thanks

    ReplyDelete
  23. Hi Carl i am getting below error is there any solution
    Exception in component tMap_1
    java.lang.NoSuchFieldException: Col8
    at java.lang.Class.getField(Class.java:1584)

    ReplyDelete
  24. Hi Carl,

    It does not work on TOS 6.1 with Java 1.8. Got error:
    Field cannot be resolved to a type
    Type mismatch: cannot convert from element type java.lang.reflect.Field to Field

    ReplyDelete
    Replies
    1. Hi,

      I haven't downloaded 6.1 yet. This sounds like a ClassLoader problem. The Field in the tJavaRow (left hand of loop) is from a different ClassLoader than the Field coming in as the row (right hand of loop).

      Thanks for pointing this out. I'll update the post when I start with TOS 6.

      Delete
    2. HI,

      You just need to put the additional import statement for java.lang.reflect.Field and it will run.

      Best,
      Aditya

      Delete
  25. Hi, I used the same approach as yours but null pointer exception is coming .
    Exception in component tMap_1
    java.lang.NullPointerException
    at java.lang.Class.searchFields(Unknown Source)
    at java.lang.Class.getField0(Unknown Source)
    at java.lang.Class.getField(Unknown Source)
    at local_project.dynamicschema_0_1.dynamicSchema.tFileInputDelimited_2Process(dynamicSchema.java:1628)
    at local_project.dynamicschema_0_1.dynamicSchema.tFileInputDelimited_1Process(dynamicSchema.java:1044)
    at local_project.dynamicschema_0_1.dynamicSchema.tSetGlobalVar_1Process(dynamicSchema.java:703)
    at local_project.dynamicschema_0_1.dynamicSchema.tJava_1Process(dynamicSchema.java:589)
    at local_project.dynamicschema_0_1.dynamicSchema.tLibraryLoad_1Process(dynamicSchema.java:476)
    at local_project.dynamicschema_0_1.dynamicSchema.runJobInTOS(dynamicSchema.java:2041)
    at local_project.dynamicschema_0_1.dynamicSchema.main(dynamicSchema.java:1898)

    Read above comments as well but no solution found

    ReplyDelete
  26. Check out the Troubleshooting section at the end of the post. This looks like the Reflection expression is getting a null value when it's expecting a field name. So, comment out the getField() call and replace it with a println. Print the name you expect to pass to getField(), looking for nulls.

    Good luck

    ReplyDelete
  27. Hi Carl, The number of columns may vary in the source each time. I need to pull them to target. How to handle such scenario

    ReplyDelete
  28. Hello,
    I am new on talend can anyone please tell me that how can we migrate table from one database to another without updating the schema for each table.?

    ReplyDelete
  29. Hi , I am using talend 6.2.1 ver and getting below error(run time) in tJavaRow Component :

    [FATAL]: crs.colordermixed_0_1.ColOrderMixed - tJavaRow_1 java.lang.String cannot be cast to java.util.Map
    Exception in component tJavaRow_1
    java.lang.ClassCastException: java.lang.String cannot be cast to java.util.Map
    at crs.colordermixed_0_1.ColOrderMixed.tFileInputExcel_1Process(ColOrderMixed.java:1299)
    at crs.colordermixed_0_1.ColOrderMixed.tSetGlobalVar_1Process(ColOrderMixed.java:576)
    at crs.colordermixed_0_1.ColOrderMixed.tJava_1Process(ColOrderMixed.java:441)
    at crs.colordermixed_0_1.ColOrderMixed.runJobInTOS(ColOrderMixed.java:3618)
    at crs.colordermixed_0_1.ColOrderMixed.main(ColOrderMixed.java:3452)
    [statistics] disconnected


    I have set the value in tSetGlobalVar :
    Key:"fieldMap"
    Value: "new java.util.HashMap()"

    in the tJavaRow :

    java.util.Map fieldMap = (java.util.Map)globalMap.get("fieldMap");


    for( Field f : row1.getClass().getFields() ) {
    fieldMap.put((String)f.get(row1), f.getName());
    }


    Added import in tJavaRow :
    import java.util.*;
    import java.lang.reflect.*;

    tMap: below is the mapping

    ((Map)globalMap.get("fieldMap")).get("field_1") --> (String)row2.getClass().getField(Var.fn_field1).get(row2)

    dont understand where I am doing wrong?

    Would you please assist me?

    ReplyDelete
  30. Hi Carl,
    How to load columns of tMysqlInput and tfilexmloutput from XML File and map them at runtime.
    Pls help me.

    ReplyDelete
  31. Hi Carl, great post... I have an issue of a similar nature although it involves the number of columns changing. Essentially I am using a generic schema to which the number of rows/columns may vary... the requirement here is that the job can run regardless of how the schema looks (using the salesforce components).

    I had just wondered if you have achieved something of this nature in the past? any help is much appreciated

    ReplyDelete
    Replies
    1. After playing around a bit i managed to get it sorted

      for(Field f : output_row.getClass().getFields()) {
      f.set(row2, input_row.getClass().getField(f.getName()).get(row1));
      }

      Thanks

      Delete
    2. Good. I'm not sure what's available in the Palette or Exchange, but this is plain Java that should hold up pretty well.

      Delete
  32. we can read multiple csv file with different schema into database by using dynamic schema option with unknown column name and dynamic type in talend but when we create dynamic type it will read all column data type as Character varying(100) in output database. but data types are integer, string, date like this. Please can you provide the solution for this problem.

    ReplyDelete
  33. Hi Carl,
    The solution you have provided is suitable when we know the number of columns in the input file. How can we program it so that it can read 'n'columns that are defined in another file or database table?

    ReplyDelete
  34. You are so amazing! I really don't see that I really overlook something like that at the moment. It is very rare to find someone with unique reviews on this issue. Really ... grateful I took it. safe tech buzz is what the site needs, a site with fast growth!

    ReplyDelete
  35. I have following all instruction but a get this error

    Une erreur est survenue (Erreurs de compilation du Job
    Au moins le Job "Test" contient une erreur de compilation. Corrigez-la et réexportez.
    Ligne en erreur: 1122
    Message détaillé: Type mismatch: cannot convert from element type java.lang.reflect.Field to Field

    ReplyDelete