Handling Duplicate Messages (Idempotent Consumers)

“At Least Once” message guarantees that a message will be delivered to a consumer once or many times. This means that you need to develop your consumers to be able to effectively handle duplicate messages. The term for this is having idempotent consumers.

Not doing so could result in some bad outcomes for your system.

For example, processing a message twice that creates an order, could create two orders. That would not likely be a good outcome.

Why do messages get delivered more than once?

How do you handle duplicates?

Here’s how to make idempotent consumers and be resilient to duplicate messages.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Delivery Guarantees

Before we jump too far ahead, I want to quickly cover message delivery guarantees from message brokers. Different brokers provide different types of guarantees, but they are broken down into these 3.

At Most Once

Consumers will receive a message once or possibly not at all.

At Least Once

Consumers will receive a message once or possibly multiple times. I’ll cover why it may be delivered multiple times and how to handle it in this post.

Exactly Once

This is a tricky one and is complex. Some brokers/event logs support this by having a producer send exactly once and the message will be delivered to consumers exactly once (excluding failures and retries).

Idempotent Consumers

At least once delivery is the most common among message brokers. But not only for that reason will you need to handle duplicate messages. Here are some reasons why a message can get delivered more than once.

At Least Once Delivery

When a message broker delivers a message to a consumer, it does not consider the message processed until the consumer acknowledges the delivery.

This can either happen implicitly or explicitly depending on the message library you’re using.

Idempotent Consumers

Idempotent Consumers

Unacknowledged & Timeouts

  1. If your consumer fails (for whatever reason) and never acknowledges the delivery to the broker, then the broker will send the message again to the consumer.
  2. If you’re using a library that requires you to acknowledge in code, and for whatever reason, it never occurs, that message again will be delivered to the consumer.
  3. Also, there is generally a timeout or an expected period of time given that an acknowledgment needs to occur. If you do send the acknowledgment, but it occurs after this time lapses, the broker will deem it unacknowledged and resend it to the consumer.

In any of the 3 cases above, if you’ve made a state change to your database, and the message gets delivered again, you’re going to make the same state change again.

This could have some very negative impacts. As mention as the starting, if you were creating an Order in the consumer, and you receive it more than once, you would end up creating multiple orders. Not ideal.

Producer Duplicating

Another reason you could receive duplicates are because the producer itself is sending the same message more than once.

This can occur simply because of a bug in your code but also because of the outbox pattern. You can refer to my post on the outbox pattern for the problem it solves, but it does introduce duplicate message issues.

The producer will pull messages/events from the database and then publish those to a message broker. After it does that, it then has to update the database to mark the messages as published. But because these are two different operations, the update could fail. If that happens, the producer will send the messages again to the message broker.

This will result in the consumer receiving the same message.

Handling Duplictates

In order to handle duplicate messages, we need to record what messages we’ve previously processed.

You want to record the message ID and the consumer when a consumer processes a message.

If you’re in a concurrent environment, then you also want to save this alongside your state changes to your application within the same database and transaction.

The code below is using Entity Framework Core. I’ve added two new methods:

IdempotentConsumer(), which adds a new record that contains the messageId and Consumer name.

HasBeenProcessed(), which checks to see if a record exists.

The IdempotentConsumer model has a Primary/Unique key on MessageId, Consumer. This is important in a concurrent environment.

Now in our consumer, we’re going to check if the message has been processed at the very beginning using the HasBeenProcessed() method. If it has, just exit early.

Then in the same transaction as our state change, we’re also going to use the IdempotentConsumer() method to add a new record.

If the same message is processed at the exact same time (concurrently), the unique key constraint on MessageId, Constumer will cause an exception when we save or commit the transaction.

We’ve now implemented an idempotent consumer. It can fully handle duplicate messages.

Naturally Idempotent

Not every consumer needs to keep track of messages it’s processed. If your consumer does not have any side-effects that will cause issues if they are executed again, then you might considered just letting it run.

For example, if a consumer sets the ShipplingLabel to Cancelled when an order is cancelled.

If this is executed multiple times by duplicate messages, there are no side-effects that we are concerned with. The state remains the same.

Having naturally idempotent consumers means you do not need to keep track of processed messages, however requires diligence. When code changes, you may introduce other side-effects that make it no longer naturally idempotent and do need to record that you’ve processed it before.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Links

Outbox Pattern: Reliably Save State & Publish Events

What’s the Outbox Pattern? A reliable way of saving state to your database and publishing a message/event to a message broker.

Why do you need it? When you get into messaging, you will often find code that needs to save state to your database, and then publish a message/event to a message broker. Unfortunately, because they are two different resources, you need a distributed transaction to make these atomic.

There is another option to use the Outbox Pattern which does not require a distributed transaction and most messaging libraries support it.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Unreliable

To illustrate the issue, the first step is saving/changing the state in your primary database. It doesn’t matter if this is an RDMBS or a NoSQL Store.

Subsequently, since you have changed state, you need to now publish a message/event to a queue or message broker to let other systems know of the state change.

Since each step is independent, if there is a failure in publishing the event to the queue, then you’ve made a state change without letting other systems know of that change.

Outbox Pattern

Why is this a problem? If you’re using messaging in an event-driven architecture, you likely rely on events being published. An event not being published could have all sorts of implications.

For example, if you’re using events to invalidate a cache, you’ll now have stale data. Or worst, if you’re using events apart of a Saga (long-running process), the next portion/step of the saga will possibly never occur.

You need atomicity. All or nothing.

Outbox Pattern

The Outbox pattern solves this by using the transaction from your primary database to store your state changes along with the messages/events your publishing.

Outbox Pattern

This means that messages your publishing are initially stored in the same database alongside your other application data. Each messaging library may implement this slightly differently in terms of the structure of the data and messages as well as how their API looks, but this is the overall idea.

The order of saving state or publishing an event doesn’t really matter anymore as they are saved to the database in the same transaction.

Once we commit our transaction, a secondary process/thread (Publisher) will pull the unpublished events from the primary database.

Outbox Pattern

Then the Publisher will publish the events it pulled to the queue or message broker.

Finally, the Publisher will update/delete the records back to the database, so it knows that the events have been successfully published to the queue.

Code Example

In this example, I’m using CAP, but as mentioned, different messaging libraries will have different ways they implement this in their API.

CAP’s is really straight forward. It provides an extension method on the Entity Framework Core DatabaseFacade to begin a database transaction. You simply pass along the ICapPublisher when starting the transaction. This tells CAP to save the published event to the database, rather than directly to the message broker, in my case RabbitMQ.

It’s really that straightforward. I’ve left out the configuration of CAP, which is also dead simple, it’s just a matter of using the correct data storage provider and specifying the configuration string in the ASP.NET Core Startup.ConfigureServices()

What this looks like in our primary database is a table that was created by CAP automatically called cap.Published. This is where it’s storing the published messages. Initially, the StatusName is null, and after successful publish to the queue, it updates it with Succeeded.

At Least Once Messaging

You may have noticed in the original diagrams, that Publisher that pulls from the database, publishes to the queue, then marks the event published in the database, ultimately has the same problem we started out with.

Really we just moved the problem but have a different outcome. If we publish the message to the queue, but for some reason, CAP cannot update the StatusName of the record, then we will ultimately publish the same event again.

In a lot of situations, this is a better problem to have. We are now in an “At Least Once” scenario. Meaning, events will get published once or possibly more. In the original scenario, we were in an “At Most Once”, which also implies only once or possibly none.

Learn how to create Idempotent Consumers to handle duplicate messages safely.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Links

The Complexity of Caching

Caching ain’t easy! There are many factors that add to the complexity of caching. My general recommendation is to avoid caching if you can.

However, caching can bring performance and scaling which you might need. If you’re starting to use a cache in your system here are some things to think about. Adding a cache isn’t that trivial and requires some thought about caching strategies, how to invalidate, and fallbacks to your database. Caching can improve performance and scalability, but can also bring your entire system down if it’s failing.

YouTube

Check out my YouTube channel where I post all kinds of content that accompanies my posts including this video showing everything that is in this post.

Strategy

The first thing to think about is the caching strategy. The two most common methods that I’ve noticed in code-bases are the write-through and cache aside methods.

The write-through method is when your application writes to its primary database, and then immediately updates the cached value. Meaning if you add a new record to your database, you immediately add the equivalent value to the cache. If you were to update a record, you would immediately update the cached value.

The Complexity of Caching
The Complexity of Caching

The second method most often used is the Cache Aside (Lazy Loading) method. This can be used in-conjunction with Write-Through or can be used by itself.

When the application needs something from cache, it first tries to retrieve it. If it does not exist (cache miss), it will then hit the primary database. Then it will write the value to the cache. Essentially you’re lazy loading the cache when data is requested for values that are not in the cache.

Invaliding the Cache

There are only two hard things in Computer Science: cache invalidation and naming things. -Phil Karlton

https://www.karlton.org/2017/12/naming-things-hard/

If you’re not using the write-through method, then that means your cache is stale when data gets updated in your primary database.

There are a couple methods I’ve used to invalidate the cache (remove the value from cache) and let the cache aside (lazy loading) method do it’s job.

Cache Expiry (TTL)

Most caches have the ability to expire a cached value after a period of time (time to live). When this occurs by the cache, the next call for an expired item will have to go through the 3 steps of the lazy loading method to re-populate the cache.

Async Messaging

The second method is using asynchronous messaging to notify another process that data has changed and to invalidate the cache.

This requires you to already be using messaging (events) and have a well defined API on where data is mutated in your system. If you have any external system modifying data within your database, you will not be able to emit an event everytime data is changed.

If you’re using something like Entity Framework, you could override the SaveChangesAsync to look at the ChangeTracker to determine which entities have changed and publish events.

Failures

One benefit to the Cache Aside (Lazy Loading) method is that if for whatever reason, you cannot reach the cache, you can fallback to using your database. This would work exactly like a cache miss. You would need to handle the appropriate Exceptions and Timeouts from the cache client to determine the Cache is unavailable, and then go directly to the database and return the value.

The Complexity of Caching

The one thing to very aware of, if you cache is unavailable, that all requests are now going to be fulfilled by the database. This could have a significant performance impact on your primary database. Depending on how many requests are normally handled by your cache are now adding all that extra load to your database.

Complexity of Caching

The complexity of caching isn’t trivial.

Avoid caching if you can.

First, look at the queries to your primary database before going down the path of adding a cache. There are many more complexities that you introduce when adding a cache. Avoid it if you can.

Follow @CodeOpinion on Twitter

Enjoy this post? Subscribe!

Subscribe to our weekly Newsletter and stay tuned.

Links