Enterprise Integration Patterns applied in Mule - Dead Letter Channel
Overview
This post explains how to use Mule with RabbitMQ AMQP Connector with Dead Letter queues. The Sample code can be easily adapted to other Message Brokers.The Problem
We have a Mule application consuming messages from a Message Broker and pushing those messages to a database table. The database server becomes temporally unavailable returning ConnectExceptions.
How to save that message that failed to be delivered so it can be reprocessed later when the target system is back?
How to retry delivering the same message to the target system before giving up?
How to wait between each retry, avoiding overflowing the target system with requests that will most likely fail?
How to retry delivering the same message to the target system before giving up?
How to wait between each retry, avoiding overflowing the target system with requests that will most likely fail?
Introducing the Dead-letter Channel Integration Pattern
This pattern gives the consumer applications an opportunity to reprocess the same message several times. If a final retry also fails, the message is forwarded to an auxiliary queue for further manual intervention.Being selective on the errors we want to retry
Although a lot of things can go wrong when a message is being processed by a Mule Application, only some environmental caused issues should force the application to re-delivery and reprocess of the same message.
We would not want to retry to reprocess a message that has an invalid date, but we sure want to retry if the operation system system returned too many files open.
So, to be selective on what exceptions we want to retry, the consumer application uses a catch exception strategy that will intercept only the environment exceptions for further re-delivery. Ideally, those exceptions are configurable (i.e. externalized in some property file).
Pausing in between each retry
Between each retry, the consumer application should wait a configured amount of time before trying to consume the message again.This waiting period can be achieved on the Broker itself or on the application.
I like the application old school trick Thread.sleep better but I have used delayed queues on both ActiveMQ and RabbitMQ in the past.
You can also achieve the same delay effect on the message broker side by creating extra queues and exchanges on RabbitMQ, and sending the message back with a TTL (expiration). Message can be routed based on expiration rules.
In general, though I think broker tricks to delay delivery hurts maintainability and broker performance, so I avoid them.
How to count the retries?
You are going to need some message property header that counts how many times the message was delivered to the Mule Application.
RabbitMQ already has this property out of the box: amqp.delivery-tag is a flow variable and delivery-tag is a inbound property that you can use to control your flow.
The Mule Application logic
The flow that tries to insert into the database has a reference to an exception handler strategy:
<flow name="save-bulkFlow" processingStrategy="synchronous">
....
<exception-strategy ref="allExceptions" doc:name="Reference Exception Strategy"/>
</flow>
....
<exception-strategy ref="allExceptions" doc:name="Reference Exception Strategy"/>
</flow>
On the allExceptions choice exception strategy we first verify if the exception should be retried:
<choice-exception-strategy name="allExceptions">
<catch-exception-strategy
when="#[exception.cause!=null && '${retryable.exceptions}'.contains(exception.cause.toString())]" doc:name="Retryable Exceptions">
....
<catch-exception-strategy
when="#[exception.cause!=null && '${retryable.exceptions}'.contains(exception.cause.toString())]" doc:name="Retryable Exceptions">
....
The consumer application then checks how many times it has already retried and if the count is smaller than the configured limit:
- it pauses the application thread for a configured number of seconds
- it rejects the messages sending it back to the same queue
- it logs the retry action
...
<choice doc:name="Retry exausted ?">
<when expression="#[flowVars['amqp.delivery-tag']<(${retryable.limit})]">
<scripting:component doc:name="Pause">
<scripting:script engine="Groovy"><![CDATA[
sleep(${retryable.pause.between.in.ms}*flowVars['amqp.delivery-tag']);
return message.payload;]]>
</scripting:script>
</scripting:component>
<amqps:reject-message requeue="true" doc:name="Reject / Requeue"/>
<logger message="This is the retry # #[flowVars['amqp.delivery-tag']]. Going to retry again"
level="DEBUG" category="myMuleApp" doc:name="Logs it will retry"/>
</when>
...
<choice doc:name="Retry exausted ?">
<when expression="#[flowVars['amqp.delivery-tag']<(${retryable.limit})]">
<scripting:component doc:name="Pause">
<scripting:script engine="Groovy"><![CDATA[
sleep(${retryable.pause.between.in.ms}*flowVars['amqp.delivery-tag']);
return message.payload;]]>
</scripting:script>
</scripting:component>
<amqps:reject-message requeue="true" doc:name="Reject / Requeue"/>
<logger message="This is the retry # #[flowVars['amqp.delivery-tag']]. Going to retry again"
level="DEBUG" category="myMuleApp" doc:name="Logs it will retry"/>
</when>
...
In the other hand, if the retry limit is exhausted, the message is rejected and Rabbit MQ sends it to the configured dead letter queue (because requeue=false).
<otherwise>
<amqps:reject-message requeue="false" doc:name="Reject Message & Send to Dead letter"/>
<logger message="The last retry # #[flowVars['amqp.delivery-tag']] also failed. Giving up
and sending to dead letter queue for manual support/handling." level="DEBUG"
category="myMuleApp" doc:name="Logs it won't retry"/>
</otherwise>
<amqps:reject-message requeue="false" doc:name="Reject Message & Send to Dead letter"/>
<logger message="The last retry # #[flowVars['amqp.delivery-tag']] also failed. Giving up
and sending to dead letter queue for manual support/handling." level="DEBUG"
category="myMuleApp" doc:name="Logs it won't retry"/>
</otherwise>
Specific RabbitMQ Configurations
All queues that have dead letters attached to it, must be created with a x-dead-letter attribute. See the in example below how q.mymessages has a dead letter attached to it (I like to end the name of my dead letter queues with .manual because they require "manual work" to be reprocessed).
{
"exchanges": [{
"name": "e.mymessages",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
},
{
"name": "e.mymessages.manual",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}],"queues": [{
"name": "q.mymessages",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {"x-dead-letter-exchange":"e.emymessages.manual"}
},
{
"name": "q.mymessages.manual",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {}
}],
"bindings": [{
"source": "e.mymessages",
"vhost": "/",
"destination": "q.mymessages",
"destination_type": "queue",
"routing_key": "",
"arguments": {}
},
{
"source": "e.mymessages.manual",
"vhost": "/",
"destination": "q.mymessages.manual",
"destination_type": "queue",
"routing_key": "",
"arguments": {}
}]
}
"exchanges": [{
"name": "e.mymessages",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
},
{
"name": "e.mymessages.manual",
"vhost": "/",
"type": "fanout",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}],"queues": [{
"name": "q.mymessages",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {"x-dead-letter-exchange":"e.emymessages.manual"}
},
{
"name": "q.mymessages.manual",
"vhost": "/",
"durable": true,
"auto_delete": false,
"arguments": {}
}],
"bindings": [{
"source": "e.mymessages",
"vhost": "/",
"destination": "q.mymessages",
"destination_type": "queue",
"routing_key": "",
"arguments": {}
},
{
"source": "e.mymessages.manual",
"vhost": "/",
"destination": "q.mymessages.manual",
"destination_type": "queue",
"routing_key": "",
"arguments": {}
}]
}
Everything should be externalized in properties
Use configurable properties to set limits and pause between each retry. Something like this should be added to your property file:
retryable.limit=3
retryable.pause.between.in.ms=5000
retryable.pause.between.in.ms=5000
Also, you may want to make your list of exceptions that will force retries the most flexible possible (the "retryable" exception list):
retryable.exceptions=ConnectionRefusedException ConnectionTimeout SocketTimeoutException java.net.ConnectException
The complete code:
<choice-exception-strategy name="allExceptions">
<catch-exception-strategy xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" when="#[exception.cause !=null && '${retryable.exceptions}'.contains(exception.cause.toString())]" doc:name="Retryable Exceptions">
<logger message="Retryable #[exception]" level="ERROR" category="myMuleApp" doc:name="Logs error"/>
<choice doc:name="Retry exausted ?">
<when expression="#[flowVars['amqp.delivery-tag']<(${retryable.limit})]">
<scripting:component xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" doc:name="Pause">
<scripting:script engine="Groovy"><![CDATA[
sleep(${retryable.pause.between.in.ms}*flowVars['amqp.delivery-tag']);
return message.payload;]]> </scripting:script>
</scripting:component>
<amqps:reject-message xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps" requeue="true" doc:name="Reject / Requeue"> </amqps:reject-message>
<logger message="This is the retry # #[flowVars['amqp.delivery-tag']]. Going to retry again" level="DEBUG" category="myMuleApp" doc:name="Logs it will retry"/>
</when>
<otherwise>
<amqps:reject-message xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps" doc:name="Reject Message & Send to Dead letter"> </amqps:reject-message>
<logger message="The last retry # #[flowVars['amqp.delivery-tag']] also failed. Giving up and sending to dead letter queue for manual support/handling." level="DEBUG" category="myMuleApp" doc:name="Logs it won't retry"/>
</otherwise>
</choice>
</catch-exception-strategy>
<catch-exception-strategy xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" doc:name="non retryable exceptions">
<logger message="Error #[message.exception.getRootException()]" level="ERROR" category="myMuleApp" doc:name="Logs error"/>
</catch-exception-strategy>
</choice-exception-strategy>
<catch-exception-strategy xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" when="#[exception.cause !=null && '${retryable.exceptions}'.contains(exception.cause.toString())]" doc:name="Retryable Exceptions">
<logger message="Retryable #[exception]" level="ERROR" category="myMuleApp" doc:name="Logs error"/>
<choice doc:name="Retry exausted ?">
<when expression="#[flowVars['amqp.delivery-tag']<(${retryable.limit})]">
<scripting:component xmlns:scripting="http://www.mulesoft.org/schema/mule/scripting" doc:name="Pause">
<scripting:script engine="Groovy"><![CDATA[
sleep(${retryable.pause.between.in.ms}*flowVars['amqp.delivery-tag']);
return message.payload;]]> </scripting:script>
</scripting:component>
<amqps:reject-message xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps" requeue="true" doc:name="Reject / Requeue"> </amqps:reject-message>
<logger message="This is the retry # #[flowVars['amqp.delivery-tag']]. Going to retry again" level="DEBUG" category="myMuleApp" doc:name="Logs it will retry"/>
</when>
<otherwise>
<amqps:reject-message xmlns:amqps="http://www.mulesoft.org/schema/mule/amqps" doc:name="Reject Message & Send to Dead letter"> </amqps:reject-message>
<logger message="The last retry # #[flowVars['amqp.delivery-tag']] also failed. Giving up and sending to dead letter queue for manual support/handling." level="DEBUG" category="myMuleApp" doc:name="Logs it won't retry"/>
</otherwise>
</choice>
</catch-exception-strategy>
<catch-exception-strategy xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" doc:name="non retryable exceptions">
<logger message="Error #[message.exception.getRootException()]" level="ERROR" category="myMuleApp" doc:name="Logs error"/>
</catch-exception-strategy>
</choice-exception-strategy>
Comments
Post a Comment