Pages

Wednesday, August 12, 2009

Apache Camel: Route Synchronization

I had a case to implement with Apache Camel, where the application reads XML files produced by an external system, imports the data into Oracle database, and in a while, it should process the data which has been reviewed by some business person.



Here's the simplified main code implementing the process:

package my.app;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;

public class MyRouteBuilder extends SpringRouteBuilder {

 protected void configureImportRoute() {
   String filesUri = "file:files/payments" + 
                     "?initialDelay=3000" + 
                     "&delay=1000" + 
                     "&useFixedDelay=true" +
                     "&include=.*[.]xml" +
                     "&move=backup/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml" +
                     "&moveFailed=files/${file:name.noext}-${date:now:yyyyMMddHHmmssSSS}.xml.error";

   from(filesUri).convertBodyTo(MyBean.class).transacted().to("importProcessor");

   String executionTriggerUri = "timer:executionTimer"
                              + "?fixedRate=true"
                              + "&daemon=true"
                              + "&delay=3000"
                              + "&period=3000";

   from(executionTriggerUri)
    .pipeline("bean:myDao?method=listItemsForExecution")
    .to("executioncProcessor");
}


What I wanted to do is to synchronize the routes. There could be several ways for doing this: using Camel BAM module, using a polling consumer and checking a flag. I came up with a solution that logically looks almost the same as Claus proposed, but instead of some explicit flag I used thread synchronization trick. Here's how the proof of concept code looks like.

The route builder:
package my.dummy.app;

import org.apache.camel.Processor;
import org.apache.camel.Exchange;
import org.apache.camel.spring.SpringRouteBuilder;

public class Route extends SpringRouteBuilder {
  public void configure() {
    from("timer:TriggerA?delay=100&period=1").to("A");
    from("timer:TriggerB?delay=100&period=1").to("B");
    from("timer:TriggerC?delay=100&period=1").to("C");
  }
}


Here we can see 3 concurrent routes being executed via timer component periodically.

I created 3 dummy processors that are emulating the business logic processors in the real application. Its function is just to create a delay, using Thread.sleep() with random period, and logs a debug message.

package my.dummy.app;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.log4j.Logger;
import java.util.Random;

public class A /*B*/ /*C*/ implements Processor {
  Random r = new Random();
  private static final Logger log = Logger.getLogger(Processor.class);
  public void process(Exchange exchange) throws Exception {
    Thread.sleep(r.nextInt(1000));
    log.info("processing " + exchange + " in " + getClass().getName());
  }
}


The Spring configuration for this dummy application is as follows:
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
  <packagescan>
     <package>my.dummy.app</package>
  </packagescan>
</camelcontext>

<bean class="my.dummy.app.A" id="A">
<bean class="my.dummy.app.B" id="B">
<bean class="my.dummy.app.C" id="C">
</beans>
I noticed that there's a DelegateProcessor in Camel that could be used to wrap the real processors. So I can use it to synchronize the routes like this:

package my.dummy.app;

import org.apache.log4j.Logger;

import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.processor.DelegateProcessor;

public class RouteSynchronizer extends DelegateProcessor {
private Logger log = Logger.getLogger(RouteSynchronizer.class);
private final static Object sync = new Object();

public void process(Exchange exchange) throws Exception {
  synchronized (sync) {
    log.debug("begin exchange processing by " + Thread.currentThread().getName());
    super.process(exchange);
    try {
      if (exchange.isFailed()) {
        throw new RuntimeCamelException(exchange.getException());
      }
    } finally {
      log.debug("end exchange processing by " + Thread.currentThread().getName());
    }
  }
}

}


Now I can wrap my original processor beans my the brand new delegate processor as follows:


<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://www.springframework.org/schema/beans" xsi:schemalocation="</p><p>http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd</p><p>http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

<camelcontext id="importing" xmlns="http://camel.apache.org/schema/spring">
<packagescan>
<package>my.dummy.app</package>
</packagescan>


<bean class="my.dummy.app.RouteSynchronizer" id="A"/>
<property name="processor"/>
</bean>

<bean class="my.dummy.app.A"/>
<bean class="my.dummy.app.RouteSynchronizer" id="B"/>
<bean class="my.dummy.app.B"/>
<bean class="my.dummy.app.RouteSynchronizer" id="C"/>
<bean class="my.dymmy.app.C"/>

<camelcontext>
</beans>

With this solution there's a performance penalty due to synchronization but this is not critical for my application at least and confirms to the requirements.

Here's how the dummy application log looks like:
12-08-2009 15:06:01,970 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,298 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:02,313 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:03,032 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:03,173 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:03,579 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:04,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:04,626 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:05,251 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.C
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerC?delay=100&period=1
12-08-2009 15:06:05,688 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.A
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerA?delay=100&period=1
12-08-2009 15:06:05,782 DEBUG RouteSynchronizer - begin exchange processing by timer://TriggerB?delay=100&period=1
12-08-2009 15:06:06,298 INFO  Processor - processing Exchange[Message: [Body is null]] in ee.hansa.markets.ktp.payments.tmp.B
12-08-2009 15:06:06,298 DEBUG RouteSynchronizer - end exchange processing by timer://TriggerB?delay=100&period=1


From the log we can see that this concept actually works - no crossing between the routes appeared.

What could be nice to have is the same support in the DSL, like:
from("...").synchronize().to("processorA");
from("...").synchronize().to("processorA");

1 comment:

Claus Ibsen said...

Nice blog post.

The .threads(1) should do this as well as the JDK executor will only have 1 thread in its poll.

But since it also have the affect of using a JDK task to route (eg = using another Thread) the processor approach you do can ensure using same current thread if needed.

I think you can use a Barrier in the JDK to indicate how many concurrent exchanges you want.

Maybe something we need for the Camel core as well? Gonna think a bit about this. But feel free to raise a ticket in JIRA so we wont forget.

Disqus for Code Impossible