Wednesday, 10 December 2014

Running Multiple pollEnrich Tags on XML Message with simple AggregationStrategy using org.w3c.dom Classes



I am attempting to learn how to work with Apache Camel 2.14 so I can complete a project for a client. The client wants a webservice that will



  1. Receive a request

  2. Use data from the request to query an existing webservice

  3. Use data returned from the existing service to query a second existing webservice, but also keep the data from the first service.

  4. Merge the data from both services into the webservice response


In order to learn how the necessary Camel EIPs work, I've created a simple route to mimic the behavior with file endpoints. I think the best way to do this is with multiple <pollEnrich> elements in series, since I haven't found a way to send the data from the first service to the second with an <aggregator> element.


The Spring XML defining the Camel Route:



<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://ift.tt/GArMu6"
xmlns:util="http://ift.tt/OGfeTW" xmlns:xsi="http://ift.tt/ra1lAU"
xsi:schemaLocation="
http://ift.tt/GArMu6 http://ift.tt/1jdM0fG
http://ift.tt/TZ5qsO http://ift.tt/1apCv6i
http://ift.tt/OGfeTW http://ift.tt/1bFKsJT">

<camelContext xmlns="http://ift.tt/TZ5qsO"
useMDCLogging="true" id="CamelPatternTest_camelContext">

<route id="AggregatorPOC_Route">
<from
uri="file:../webapps/camel-pattern-test-0/WEB-INF/classes/aggregatorPOC/input/aggregation_requests" />
<log message="File picked up" />

<!-- Apparently, pollEnrich can only open one file at a time. -->
<log message="pollEnriching CompanyProfile" />
<pollEnrich
uri="file:../webapps/camel-pattern-test-0/WEB-INF/classes/aggregatorPOC/input/aggregation_responses?noop=true&amp;fileName=CompanyProfile.xml&amp;idempotent=false"
strategyRef="testAggregationStrategy" />
<log message="pollEnriching CorporationProfile" />
<pollEnrich
uri="file:../webapps/camel-pattern-test-0/WEB-INF/classes/aggregatorPOC/input/aggregation_responses?noop=true&amp;fileName=CorporationProfile.xml&amp;idempotent=false"
strategyRef="testAggregationStrategy" />
<setHeader headerName="CamelFileName">
<simple>${file:onlyname.noext}AggregatedResponse.xml</simple>
</setHeader>
<log message="Enrichment complete" />

<to
uri="file:../webapps/camel-pattern-test-0/WEB-INF/classes/aggregatorPOC/output/aggregation?autoCreate=true" />
<log message="Sent file to output" />
<log message="Route complete." />
</route>
</camelContext>

<bean
class="org.springframework.beans.factory.annotation.AutowiredAnnotationBeanPostProcessor" />

<bean id="testAggregationStrategy" class="com.amex.gbt.psd.util.TestAggregationStrategy" />

</beans>


The TestAggregationStrategy class:



import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.camel.Exchange;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.NodeList;

public class TestAggregationStrategy implements AggregationStrategy {

private static final Logger LOG = LoggerFactory
.getLogger(TestAggregationStrategy.class);

@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// The oldExchange will never be null in a pollEnrich EIP

LOG.debug("The oldExchange's In holds:\n" + oldExchange.getIn().getBody(String.class));
LOG.debug("The newExchange's In holds:\n" + newExchange.getIn().getBody(String.class));

Document oldBody = oldExchange.getIn().getBody(Document.class);
Document newBody = newExchange.getIn().getBody(Document.class);

LOG.debug("The root node is \"" + oldBody.getFirstChild().getLocalName() + "\"");

// If the oldBody is the request, replace it with an empty aggregatedXML document.
if (oldBody.getFirstChild().getLocalName().equals("Request")){
oldBody = aggregatedXML();
LOG.debug("oldBody is \"" + oldBody.getFirstChild() + "\"");
}

// It seems NodeList doesn't implement iterable, so we can't use a for-each loop
NodeList newBodyContent = newBody.getChildNodes();
for (int index=0;index<newBodyContent.getLength();index++)
{
LOG.debug("Currently copying node \"" + newBodyContent.item(index).getNodeName() + "\"");
oldBody.getFirstChild().appendChild(oldBody.adoptNode(newBodyContent.item(index)));
}

oldExchange.getIn().setBody(oldBody);
LOG.debug("oldExchange's In now holds:\n" + oldExchange.getIn().getBody(String.class));

return oldExchange;
}

/**
* Returns an empty XML document of the type that will contain an aggregated
* response
*
* @return
*/
private Document aggregatedXML() {
DocumentBuilderFactory dbf = DocumentBuilderFactory.newInstance();
DocumentBuilder db = null;
try {
db = dbf.newDocumentBuilder();
} catch (ParserConfigurationException e) {
LOG.error("You need to configure the DocumentBuilderFactory", e);
}
Document emptyAggregate = db.newDocument();

emptyAggregate.appendChild(emptyAggregate.createElement("aggregateData"));

LOG.debug("emptyAggregate is \"" + emptyAggregate.getFirstChild() + "\"");

return emptyAggregate;
}

}


The first <pollEnrich> executes without stopping for any errors. However, when the second <pollEnrich> tag sends the exchanges to the aggregate() method, I get a NullPointerException on the first line where I access oldBody. When I look at the logs where it prints out oldExchange's In now holds:..., it prints out an XML document that does not have the header ?<?xml version="1.0" encoding="UTF-8"?>. All of the handmade XML documents that I've sent to this method have that header. So, what I think is happening is that, when the second <pollEnrich> gets the exchanges and sends them to aggregate(), the method oldExchange.getIn().getBody(Document.class) sees that the incoming message body doesn't have the XML header it expects, and thus returns an empty XML document instead of a document containing the XML fragment it receives.


My questions:



  1. Have I correctly diagnosed the problem?

  2. Is there a way to make this route work? If I have indeed diagnosed correctly, I would need to make oldExchange.getIn().setBody(oldBody) include the ?<?xml version="1.0" encoding="UTF-8"?> header, or find some way to make Document oldBody = oldExchange.getIn().getBody(Document.class); accept XML documents that do not have that header.


Thank you for your time.


No comments:

Post a Comment