Camel development series – part 4

Welcome again to this fourth installed of the Camel development series. It is almost 1am but am determined to write one post a week as to not fall behind my original goal. This post will not cover a lot of functionality but will touch upon two parts which I think are essentially to developing production ready code. These are:

  1. ensuring single copy of deployment. What I mean here is that we don’t want to develop one copy for our test environment, one copy for our QA and one for production environment. We want a single copy which can be deployed to all of our environments, thereby ensuring consistency and knowing that what is in production is the same as what is in test in QA. It has therefore been tested and verified. To achieve this, we need a way to hide away endpoint parameters and hence deal with property files.
  2. producing log files. To track our integrations we need to know what they have done and off course if something has gone wrong, the simplest way is to produce log files and write logs during important steps. We will cover this step. In a later post I will go through MDC logging when you deploy to a Karaf environment so that you can produce log files based on a route or context id which gives you better granularity.

Ok let’s start with the actually code and return to our standard project. You will find the source here https://github.com/SoucianceEqdamRashti/Integration/tree/master/MapCsvsToJson

Blueprint code

 

Here is our blueprint.xml code:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="        http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd        http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">

  <bean id="CsvToJsonRouteBuilder" class="org.souciance.integration.csvstojson.CsvsToJsonRouteBuilder">
  </bean>
  <bean id="IdentityToJson" class="org.souciance.integration.csvstojson.IdentityToJson">
  </bean>
  <bean id="properties"	class="org.apache.camel.component.properties.PropertiesComponent">
		<property name="location" value="classpath:MapCsvsToJson.properties" />
</bean>

  <camelContext xmlns="http://camel.apache.org/schema/blueprint" useMDCLogging="true" id="CsvToJson">
    <routeBuilder ref="CsvToJsonRouteBuilder" />    

  </camelContext>

</blueprint>

There are two additions here. One is that we have added useMDCLogging=”true” attribute on the camelContext tag. We will use this later in another post. But more importantly we have added the bean for reading properties. You can see that there is bean declared with id=”properties” and the name of the properties field is MapCsvsToJson.properties and it should be present somewhere on the classpath. Off course in real life you follow some naming convention but this is just tho show how it works. The beauty of this approach is that it enables us to inject any kind of property values onto java class member fields. This is in essence how we enable our goal of single copy.

MapCsvsToJson.properties

We have also created a properties file called MapCsvsToJson.properties and it is in our src/resources folder. The properties defined are:
input.folder =C:/test
output.folder =C:/test
input.filename =input.csv
output.filename =output.json

As you can see we have written down filepath and filenames for our input and output. You can easily see that this can be extended to any endpoint be it HTTP, FTP, WMQ, Rabbit, TCP etc. This means that the actual integration code remains the same, you just change the endpoint when you move your package from test environment to qa and add the qa endpoints and same when you go to production. You can add other system parameters here which may need to change from one environment to another or to make your code more flexible.

MapCsvsToJsonRouteBuilder.java

In our route builder class we have made some additions. Here is how it looks now:

package org.souciance.integration.csvstojson;

import org.apache.camel.LoggingLevel;
import org.apache.camel.PropertyInject;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A bean which we use in the route
 */
public class CsvsToJsonRouteBuilder extends RouteBuilder {
	@PropertyInject("{{input.folder}}")
	private String inputFolder;
	@PropertyInject("{{output.folder}}")
	private String outputFolder;
	@PropertyInject("{{input.filename}}")
	private String inputFileName;
	@PropertyInject("{{output.filename}}")
	private String outputFileName;

	@Override
	public void configure() throws Exception {
		// TODO Auto-generated method stub
		BindyCsvDataFormat bindy = new BindyCsvDataFormat(org.souciance.integration.csvstojson.Identity.class);		

		from("file:" + inputFolder+"/?fileName="+inputFileName).routeId("CsvsToJson")
		.log(LoggingLevel.INFO, "CsvToJson", "Processing csv message ${id} with body ${body}")
		.unmarshal(bindy)
		.log(LoggingLevel.INFO, "CsvToJson", "Mapped CSV to Pojo ${body}")
		.to("IdentityToJson")
		.log(LoggingLevel.INFO, "CsvToJson", "Mapped Pojo to json: ${body}")
		.to("file:"+outputFolder+"/?fileName="+outputFileName)
		.log(LoggingLevel.INFO, "CsvToJson", "Json file written to output folder found in headers ${in.headers}")
		.end();

	}
}

As you can see we have now written the following when declaring our class member fields:

@PropertyInject("{{input.folder}}")
	private String inputFolder;

What this does is to inject the value of property input.folder which is C:/test to the field inputFolder. The same goes for the other variables.

Further down you can see this:

from("file:" + inputFolder+"/?fileName="+inputFileName).routeId("CsvsToJson")

This is now different from our previous code. We have hidden the input folder and the input file name. In some sense we are creating a dynamic endpoint because the actual endpoint URI are retrieved from our property file. So again, imagine having a properties file for test, qa and prod and simply deploying a single copy with the right property file rather than multiple copies of your integration for each environment. The same applies to the ”to” endpoint at the bottom. This demonstrates the main functionality of property injection, you can off course inject to normal POJO classes or even to input fields of a method. There is a rich complexity here not found in the commercial tools.

Ok let us also mention the code we added for the logging. You have noticed several statements of the form:

.log(LoggingLevel.INFO, "CsvToJson", "Processing csv message ${id} with body ${body}")

Here we are using the Log dsl to do a few things. First, we set the logging level to INFO as we are just logging steps. We are giving the logger a name corresponding to the appender name in the log4j properties file. Finally we have some log text. Here we are logging the camel id of the message and the body. The language is called simple language and is a kind of camel query language where you can access the body, headers and camel properties. To do that you write ${variable}. As you see we are logging when we receive the file, after we mapped to a POJO, after we mapped to Json and after we have written the output file.

The log4j properties looks like this:

log4j.rootLogger=INFO, out, CsvsToJson
#log4j.logger.org.apache.camel=DEBUG, file

# CONSOLE appender
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} – %m%n

log4j.appender.CsvsToJson=org.apache.log4j.RollingFileAppender
log4j.appender.CsvsToJson.layout=org.apache.log4j.PatternLayout
log4j.appender.CsvsToJson.layout.ConversionPattern=%d{ISO8601} | %-5.5p | %-16.16t | %-32.32c{1} | %X{camel.routeId} | %m%n
log4j.appender.CsvsToJson.file=C:/test/logs/camel.log
log4j.appender.CsvsToJson.MaxFileSize=50MB
log4j.appender.CsvsToJson.MaxBackupIndex=10
log4j.appender.CsvsToJson.append=true

You can see the log name CsvsToJson matches the name in teh rootLogger and the actual appender. There are several options and ways to write. This is just one example.

Running our integration..

So when you run this, you will see in your console the following log statements to begin with.

[INFO] Using org.apache.camel.test.blueprint.Main to initiate a CamelContext
[INFO] Starting Camel ...
2016-02-14 20:35:39,223 [int.Main.main()] INFO  Activator                      - Camel activator starting
2016-02-14 20:35:39,236 [int.Main.main()] INFO  Activator                      - Camel activator started
2016-02-14 20:35:39,367 [int Extender: 1] INFO  BlueprintContainerImpl         - Bundle csvstojson/0.0.1.SNAPSHOT is waiting for namespace handlers [http://camel.apache.org/schema/blueprint]
2016-02-14 20:35:40,374 [int Extender: 1] INFO  BlueprintCamelContext          - Apache Camel 2.16.1 (CamelContext: CsvToJson) is starting
2016-02-14 20:35:40,374 [int Extender: 1] INFO  BlueprintCamelContext          - MDC logging is enabled on CamelContext: CsvToJson
2016-02-14 20:35:40,375 [int Extender: 1] INFO  ManagedManagementStrategy      - JMX is enabled
2016-02-14 20:35:40,623 [int Extender: 1] INFO  DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-02-14 20:35:40,623 [int Extender: 1] INFO  BlueprintCamelContext          - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
2016-02-14 20:35:40,623 [int Extender: 1] INFO  BlueprintCamelContext          - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-02-14 20:35:40,623 [int Extender: 1] INFO  BlueprintCamelContext          - Total 0 routes, of which 0 is started.
2016-02-14 20:35:40,624 [int Extender: 1] INFO  BlueprintCamelContext          - Apache Camel 2.16.1 (CamelContext: CsvToJson) started in 0.250 seconds
2016-02-14 20:35:40,798 [int Extender: 1] INFO  BlueprintCamelContext          - Route: CsvsToJson started and consuming from: Endpoint[file://C:/test/?fileName=input.csv]
2016-02-14 20:35:55,426 [file://C:/test/] INFO  CsvToJson                      - Processing csv message ID-moeed-Dator-63752-1455478540192-0-1 with body 12345,souciance,rashti,012458478,Sweden
12346,souciance,eqdam rashti,012458478,Sweden
12347,souciance,eqdam,012458478,Sweden
12348,Moeed,eqdam rashti,012458478,Sweden
2016-02-14 20:35:55,441 [file://C:/test/] INFO  CsvToJson                      - Mapped CSV to Pojo [org.souciance.integration.csvstojson.Identity@14718c6, org.souciance.integration.csvstojson.Identity@1b2d615, org.souciance.integration.csvstojson.Identity@1857aaa, org.souciance.integration.csvstojson.Identity@1dfdcc0]
2016-02-14 20:35:55,615 [file://C:/test/] INFO  CsvToJson                      - Mapped Pojo to json: {"ListOfRows":[{"firstname":"souciance","lastname":"rashti","phone":12458478,"country":"Sweden"},{"firstname":"souciance","lastname":"eqdam rashti","phone":12458478,"country":"Sweden"},{"firstname":"souciance","lastname":"eqdam","phone":12458478,"country":"Sweden"},{"firstname":"Moeed","lastname":"eqdam rashti","phone":12458478,"country":"Sweden"}]}
2016-02-14 20:35:55,674 [file://C:/test/] INFO  CsvToJson                      - Json file written to output folder found in headers {breadcrumbId=ID-moeed-Dator-63752-1455478540192-0-1, CamelFileAbsolute=true, CamelFileAbsolutePath=C:\test\input.csv, CamelFileContentType=application/vnd.ms-excel, CamelFileLastModified=1454862757199, CamelFileLength=169, CamelFileName=input.csv, CamelFileNameConsumed=input.csv, CamelFileNameOnly=input.csv, CamelFileNameProduced=C:\test\output.json, CamelFileParent=C:\test, CamelFilePath=C:\test\input.csv, CamelFileRelativePath=input.csv}

As you can see the logs highlight the different steps but near the end the different log statements that we are are also visible. The actual log file produced contains the same log.

[INFO] Using org.apache.camel.test.blueprint.Main to initiate a CamelContext
[INFO] Starting Camel ...
2016-02-14 20:35:39,223 [int.Main.main()] INFO Activator - Camel activator starting
2016-02-14 20:35:39,236 [int.Main.main()] INFO Activator - Camel activator started
2016-02-14 20:35:39,367 [int Extender: 1] INFO BlueprintContainerImpl - Bundle csvstojson/0.0.1.SNAPSHOT is waiting for namespace handlers [http://camel.apache.org/schema/blueprint]
2016-02-14 20:35:40,374 [int Extender: 1] INFO BlueprintCamelContext - Apache Camel 2.16.1 (CamelContext: CsvToJson) is starting
2016-02-14 20:35:40,374 [int Extender: 1] INFO BlueprintCamelContext - MDC logging is enabled on CamelContext: CsvToJson
2016-02-14 20:35:40,375 [int Extender: 1] INFO ManagedManagementStrategy - JMX is enabled
2016-02-14 20:35:40,623 [int Extender: 1] INFO DefaultRuntimeEndpointRegistry - Runtime endpoint registry is in extended mode gathering usage statistics of all incoming and outgoing endpoints (cache limit: 1000)
2016-02-14 20:35:40,623 [int Extender: 1] INFO BlueprintCamelContext - AllowUseOriginalMessage is enabled. If access to the original message is not needed, then its recommended to turn this option off as it may improve performance.
2016-02-14 20:35:40,623 [int Extender: 1] INFO BlueprintCamelContext - StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2016-02-14 20:35:40,623 [int Extender: 1] INFO BlueprintCamelContext - Total 0 routes, of which 0 is started.
2016-02-14 20:35:40,624 [int Extender: 1] INFO BlueprintCamelContext - Apache Camel 2.16.1 (CamelContext: CsvToJson) started in 0.250 seconds
2016-02-14 20:35:40,798 [int Extender: 1] INFO BlueprintCamelContext - Route: CsvsToJson started and consuming from: Endpoint[file://C:/test/?fileName=input.csv]
2016-02-14 20:35:55,426 [file://C:/test/] INFO CsvToJson - Processing csv message ID-moeed-Dator-63752-1455478540192-0-1 with body 12345,souciance,rashti,012458478,Sweden
12346,souciance,eqdam rashti,012458478,Sweden
12347,souciance,eqdam,012458478,Sweden
12348,Moeed,eqdam rashti,012458478,Sweden
2016-02-14 20:35:55,441 [file://C:/test/] INFO CsvToJson - Mapped CSV to Pojo [org.souciance.integration.csvstojson.Identity@14718c6, org.souciance.integration.csvstojson.Identity@1b2d615, org.souciance.integration.csvstojson.Identity@1857aaa, org.souciance.integration.csvstojson.Identity@1dfdcc0]
2016-02-14 20:35:55,615 [file://C:/test/] INFO CsvToJson - Mapped Pojo to json: {"ListOfRows":[{"firstname":"souciance","lastname":"rashti","phone":12458478,"country":"Sweden"},{"firstname":"souciance","lastname":"eqdam rashti","phone":12458478,"country":"Sweden"},{"firstname":"souciance","lastname":"eqdam","phone":12458478,"country":"Sweden"},{"firstname":"Moeed","lastname":"eqdam rashti","phone":12458478,"country":"Sweden"}]}
2016-02-14 20:35:55,674 [file://C:/test/] INFO CsvToJson - Json file written to output folder found in headers {breadcrumbId=ID-moeed-Dator-63752-1455478540192-0-1, CamelFileAbsolute=true, CamelFileAbsolutePath=C:\test\input.csv, CamelFileContentType=application/vnd.ms-excel, CamelFileLastModified=1454862757199, CamelFileLength=169, CamelFileName=input.csv, CamelFileNameConsumed=input.csv, CamelFileNameOnly=input.csv, CamelFileNameProduced=C:\test\output.json, CamelFileParent=C:\test, CamelFilePath=C:\test\input.csv, CamelFileRelativePath=input.csv}

If you use Eclipse and you just want test things out you can right click on your project –>run as–> run configurations –>jre and add

-Dlog4j.configuration=file:<pathto yourlogfile> and then run with camel:run as your maven goal. This should create the log file for you.

Summary

So in this post we have covered how to inject properties in your java class fields which can have great value when you want to hide your endpoints and make them more dynamic to make your integrations more consistent and easier to maintain.  We have also see how log files can be created and how you can write custom log statements to log headers and the message paylaod. Thanks for tuning in.

Camel development series – Part 3

Hello again and welcome to the third part of the Camel development series.

In part 2 we touched upon a very basic mapping scenario, namely mapping a single row of CSV data to a simple json structure. It touched upon some basic Camel ideas such as combing blueprint and the java dsl, injecting beans, using processors and the jackson json API.

In this part we will create a new project and handle multiple rows of CSV data and map it to a json array structure. You can find all the source code and test data from here https://github.com/SoucianceEqdamRashti/Integration

Input and Output

12345,souciance,rashti,012458478,Sweden
12346,souciance,eqdam rashti,012458478,Sweden
12347,souciance,eqdam,012458478,Sweden
12348,Moeed,eqdam rashti,012458478,Sweden

This above is our input. As you can see these are simple fields and each row is delimited by a control line feed and each field is delimited by a comma. Our desired output is:

{
	"ListOfRows": [{
		"firstname": "souciance",
		"lastname": "rashti",
		"phone": 12458478,
		"country": "Sweden"
	},
	{
		"firstname": "souciance",
		"lastname": "eqdam rashti",
		"phone": 12458478,
		"country": "Sweden"
	},
	{
		"firstname": "souciance",
		"lastname": "eqdam",
		"phone": 12458478,
		"country": "Sweden"
	},
	{
		"firstname": "Moeed",
		"lastname": "eqdam rashti",
		"phone": 12458478,
		"country": "Sweden"
	}]
}

Our output contains a JSON array called ListOfRows which contains the data from every row in the CSV file. Let’s go through the code to see how we can acheive this.

Blueprint code

The blueprint.xml file is a copy of the one found in part 2. It looks like this:

<?xml version="1.0" encoding="UTF-8"?>
<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"        xsi:schemaLocation="        http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd        http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd">

  <bean id="CsvToJsonRouteBuilder" class="org.souciance.integration.csvstojson.CsvToJsonRouteBuilder">
  </bean>
  <bean id="IdentityToJson" class="org.souciance.integration.csvstojson.IdentityToJson">
  </bean>

  <camelContext xmlns="http://camel.apache.org/schema/blueprint">
    <routeBuilder ref="CsvToJsonRouteBuilder" />    

  </camelContext>

</blueprint>

The only thing that has changed is the package names. I should point out one thing. I am probably the worst person in creating good names for classes, variables and packages and have to constantly refactor. So, don’t pay too much attention to the naming 😉

RouteBuilder

Again the RouteBuilder class is pretty much a copy of the one in part 2. It looks like this:

package org.souciance.integration.csvstojson;

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.dataformat.bindy.csv.BindyCsvDataFormat;

/**
 * A bean which we use in the route
 */
public class CsvToJsonRouteBuilder extends RouteBuilder {

	@Override
	public void configure() throws Exception {
		// TODO Auto-generated method stub
		BindyCsvDataFormat bindy = new BindyCsvDataFormat(org.souciance.integration.csvstojson.Identity.class);		

		from("file:C:/test/?fileName=input.csv")
		.unmarshal(bindy)
		.log("${body}")
		.to("IdentityToJson")
		.to("file:C:/test/?fileName=output.json")
		.log("done!")
		.end();

	}
}

What we are doing is to define our bindy dataformat as CSV. We let it know to use the Identity POJO as our model class to parse the CSV rows. You can see that we are doing the unmarshal directly after the ”from” statement. Since this file contains multiple rows the important part is to understand that the exchange now contains a list. This means that we are dealing with a list of objects rather than the objects themselves. Here we can approach it in different ways.

  1. We can use the Splitter EIP to split the file and produce a file for each row. But this is not what we want.
  2. We can split and aggregate at the end – possible but a bit too complicated for our scenario.
  3. We send the entire list to our mapping class and deal with the list there. This is our approach.

Identity class – our model object

Just for reference our model class for the bindy dataformat looks like this:

package org.souciance.integration.csvstojson;

import org.apache.camel.dataformat.bindy.annotation.CsvRecord;
import org.apache.camel.dataformat.bindy.annotation.DataField;

@CsvRecord(separator = ",")
public class Identity {

	@DataField(pos=1)
	private int identity;
	@DataField(pos=2)
	private String firstname;
	@DataField(pos=3)
	private String lastname;
	@DataField(pos=4)
	private int phone;
	@DataField(pos=5)
	private String country;
	public int getIdentity() {
		return identity;
	}
	public void setIdentity(int identity) {
		this.identity = identity;
	}
	public String getFirstname() {
		return firstname;
	}
	public void setFirstname(String firstname) {
		this.firstname = firstname;
	}
	public String getLastname() {
		return lastname;
	}
	public void setLastname(String lastname) {
		this.lastname = lastname;
	}
	public int getPhone() {
		return phone;
	}
	public void setPhone(int phone) {
		this.phone = phone;
	}
	public String getCountry() {
		return country;
	}
	public void setCountry(String country) {
		this.country = country;
	}

}

As you can see we have not changed anything there.

IdentityToJson class – Where we do all the json magic

In this class we do all the json conversion. It looks as follows:

package org.souciance.integration.csvstojson;

import java.io.ByteArrayOutputStream;
import java.util.List;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;

public class IdentityToJson implements Processor {

	@Override
	public void process(Exchange exchange) throws Exception {
		// TODO Auto-generated method stub

		/*initialize Jackson
		 *we need to create an outer object, an inner array and node objects for individual array elements
		 */
		JsonNodeFactory factory = new JsonNodeFactory(false);
		JsonFactory jsonFactory = new JsonFactory();
		ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
		JsonGenerator generator = jsonFactory.createGenerator(outputStream);
		ObjectMapper mapper = new ObjectMapper();
		ObjectNode listOfPersons = mapper.createObjectNode();
		ArrayNode persons = factory.arrayNode();

		/*get the current row and add it to the json array item by
		*traversing the incoming List of Identity objects
		*person is the json object containing each row
		*persons is the array that contains multiple person items
		*/
		if (exchange.getIn().getBody() instanceof java.util.List) {
			@SuppressWarnings("unchecked")
			List<Identity> listOfIdentities = ((List<Identity>)exchange.getIn().getBody());
			for (Identity identity : listOfIdentities ) {
				ObjectNode person = factory.objectNode();
				person.put("firstname", identity.getFirstname());
				person.put("lastname", identity.getLastname());
				person.put("phone", identity.getPhone());
				person.put("country", identity.getCountry());
				persons.add(person);
			}
		}
		//finally we create the entire json structure by adding the array to the root object ListOfRows
		listOfPersons.putArray("ListOfRows").addAll(persons);

		//write the json string to the exchange
		mapper.writeTree(generator,  listOfPersons);
		String json = new String(outputStream.toString());
		exchange.getIn().setBody(json);

	}

}

There are several changes made here.

  1. In the json initialization block you can see that we have added not only normal json objects but also array nodes. Essentially listOfPersons contains Persons which contains mulitple persons.
  2. The crucial part is the second part where we check if the object in the exchange is an instaneof List – that is if it contains a list. If it does, we iterate and extract the values and add it the person node. At the end of the iteration the person node is added to persons array.
  3. Once the iteration is done, we add the array to the listOfPersons object using the putArray and addAll jackson methods.

Summary

In essence this is one approach to map Csv to JSON format and customising your json structure. There are I am sure better ways or other approaches. If you know of others please do post them 😉 Stay tuned for the next series.