I don't want to ever see the phrase "Exactly Once" without several asterisks behind it. It might be exactly once from an "overall" point of view, but the client effectively needs infinitely durable infinite memory to perform the "distributed transaction" of acting on the message and responding to the server.
Imagine:
- Server delivers message M
- Client process event E entailed by message M
- Client tries to ack (A) message on server, but "packet loss"
- To make matters worse, let's say the client also immediately dies after this
How do you handle this situation?
The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...
I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.
Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.
Having spent 7 years of my life working with Pat Helland in implementing Exactly Once In Order messaging with SQL Server Service Broker[0] I can assure you that practical EOIO messaging is possible, exists, and works as advertised. Delivering data EOIO is not rocket science, TCP has been doing it for decades. Extending the TCP paradigms (basically retries and acks) to messaging is not hard if you buy into transacted persisted storage (= a database) for keeping undelivered messages (transmission queue) and storing received messages before application consumption (destination queue). Just ack after you commit locally.
We've been doing this in 2005 at +10k msgs/sec (1k payload), durable, transacted, fully encrypted, with no two phase commit, supporting long disconnects (I know for documented cases conversations that resumed and continued after +40 days of partner network disconnect).
Running into resource limits (basically out of disk space) is something the database community knows how to monitor, detect and prevent for decades now.
I really don't get why so many articles, blogs and comments claim this is not working or impossible or even hard. My team shipped this +12 years ago, is used by major deployments, technology is proven and little changed in the original protocol.
TCP does deliver data more than once though. Sure within a TCP session you are guaranteed not to get the same byte twice out of your socket, bytes are delivered exactly once, in order. Now if your application that uses TCP pulls data out of the socket and then dies the data will need to be delivered again and the TCP protocol is unable to help us there, it's application level logic at that point.
So everyone is talking about the same thing here, data can get delivered twice, and the application must handle it in an idempotent way. For TCP this happens to be looking at some counter and throwing away anything that's already been seen. With a single client/server connection maintaining sequence numbers like TCP does solve the problem but it's harder to use this same technique in multi-client, multi-server, "stateless" requests that are common in the web world.
EDIT: To clarify, what I mean by TCP "does deliver data more than once", is that one side of the connection can send the same data twice. It's true that it's then discarded by the other end but this is what people are talking about when they talk about the theoretical impossibility of never sending anything twice. The rest is basically the idem-potency thing of ensuring that data received twice somehow doesn't cause anything abnormal.
Are you talking about distributed environment, where network partitions can occur? If yes, then there's Two Generals Problem and "FLP result", that just prove it impossible. So I guess you're talking about non-distributed environment.
In other words, to reliably agree on a system state (whether message id was delivered) you need the system to be Consistent. And per CAP theorem, it cannot be Available in presence of Partitions.
So other people you're referring to probably talk about distributed systems.
> Extending the TCP paradigms (basically retries and acks) to messaging is not hard
> Just ack after you commit locally.
wait ... what is this "retry" and "ack"? is this so that the sender knows not to send the message again? that sounds suspiciously familiar ...
I mean, come on, you're just describing a framework you wrote that handles "at least once delivery" plus "idempotent operations" for the programmer. That's fine. It's marketing to say it's "exactly once". You're just arguing over what to call it.
I would say this problem is quite different than Two Generals, because client doesn't need to know that acknowledgment has been received, i.e., a solution doesn't require the receipt of message to be common knowledge.
It it sufficient that server stops trying to send the message once it has received the ACK.
> Thus it quickly becomes evident that no matter how many rounds of confirmation are made, there is no way to guarantee the second requirement that each general be sure the other has agreed to the attack plan.
This isn't true. There is a point where both sides have confirmed receipt of the last new info (the original confirmation back to the initial sender), and further confirmation is just icing. What kind of general is it that says "I've received ten acknowledgements so far from the other side, but I'm still not sure!"?
Remember that it's the message itself being confirmed, not 'did the last message get through'. There is a point where both sides are aware of both the message and a confirmed confirmation of that message.
Everybody knows "exactly once" means deduplication. This is not exactly a new problem.
That said it's still a difficult problem and I actually wish people would stop trying to roll their own schemes. For example, this scheme relies on examining a Kafka outbound topic to resolve in-doubt outbound messages. But what happens if the outbound message + commit is still "in flight" when the system recovers so the system retransmits the in-doubt outbound message and so rather than deduping it now is generating dupes? Yes, the chances of this are minimal which means it will happen.
Everyone might know that, but it's certainly the case that a lot of systems have _claimed_ it where they actually meant "disastrous characteristics under load and/or (partial) failure".
Exactly-once messaging is not a hard problem so long as you change the problem a little. (Plug warning: this is the way Urbit does EOM, or EOM* if you prefer.)
TLDR, you don't need infinitely durable infinite memory. You just need (a) a single-level store in which every event is a transaction, (b) a message protocol with true end-to-end acks, and (c) a permanent session between every pair of nodes. We don't have single-level storage hardware (although XPoint comes close) but it's easy to simulate semantically.
Think about the problem intuitively. I want to send you a stream of messages, each of which you act on exactly once.
I do it like this: I put a sequence number in each message. I keep sending you message 1 until you send me an acknowledgment that you heard message 1. I can also send messages 2, 3, or whatever (lockstep isn't needed), but you process messages in order and ignore messages you've already heard. Never ack an ack, always ack a dup.
What does implementing this design require? It requires a persistent sequence number, on the sending endpoint, for every receiving endpoint in the world. (Edit: and of course another SN on the receiving end.) Of course, for every endpoint you haven't sent to yet, the number is 0 and doesn't need to be stored. This is not a terribly onerous amount of storage.
A sequence number is the piece of state generally known as a "session" in networking theory parlance. Of course a TCP connection is a session. We're simply saying that every two endpoints have a sort of implicit, persistent connection.
Moreover, every message must be a transaction. Acknowledging the message acknowledges that the application, as well as the sequence number, has been fully and persistently updated with the knowledge that the message contains. One way to think of this is that we're adding the latency of a transaction to persistent storage to our packet latency. SSDs are good here.
Furthermore, since in real life semantic messages need to be delivered in packet-sized fragments, a fragmentation model with end-to-end "piggybacked" acks is needed. There can't be a separate message-level ack (the "stack of acks" problem is the curse of the Internet stack) -- acknowledging all the fragment packets acknowledges the message.
You've embedded the idempotency into the protocol, which is nice, but doesn't get around the problems of not being able to do EOM. You also have the case where you sent a message and lost network connectivity before any ack could come back, resulting in you not knowing if your message arrived 0 or 1 times, which isn't surprising since that case is a constant on a network with anything less than 100% reliability.
That's not EOM, that's just the sensible workarounds to use in light of the fact EOM doesn't exist. Obviously a lot of protocols and applications use such things since the inability to have EOM in the real world has not rendered our entire network edifice impossible or useless in real life.
Here's a radical solution. Instead of becoming a scala pro akka stream 200k engineer with a cluster of kafka nodes that costs your company over $100,000 of engineering time, technical debt, opportunity cost, and server costs, just put it all in bigtable, with deduping by id....
Enough of resume-driven-engineering, why does every need to reinvent the wheel?
Yup. Databases, whether relational or not, have been designed to solve all these problems in a much more "bulletproof" way than your piddly [1] several-dozen-engineer team could ever manage, no matter how genius they are.
[1] No disrespect meant - just a description of size. Source: running a piddly 2-person engineering team.
In terms of connectivity, we deal with a similar problem here at CloudWalk to process payment transactions from POS terminals, where most of them rely on GPRS connections.
Our network issues are nearly 6 times higher (~3.5%) due to GPRS, and we solved the duplication problem with an approach involving both client and server side.
Clients would always ensure that all the information sent by the server was successfully received. If something goes wrong, instead of retrying (sending the payment again), the client sends just the transaction UUID to the server, and the server might either respond with: A. the corresponding response for the transaction or B. not found.
In the scenario A, the POS terminal managed to properly send all the information to the server but failed to receive the response.
In the scenario B, the POS terminal didn't even manage to properly send the information to the server, so the POS can safely retry.
Why not just send the data too along with the UUID? It'll save another roundtrip in case of scenario B right?
Or do you have data to prove that scenario B is a lot less likely to occur, making it sensible to save bandwidth by not re-transmitting the data?
So, a combination of a best effort "at least once" messaging with deduplication near the receiving edge. Fairly standard, honestly.
There is still a potential for problems in the message delivery to the endpoints (malformed messages, Kafka errors, messages not being consumed fast enough and lost), or duplication at that level (restart a listener on the Kafka stream with the wrong message ID) as well.
This is based on my own pains with Kinesis and Lambda (which, I know, isn't Kafka).
In my experience, better to just allow raw "at least once" messaging and perform idempotant actions based off the messages. It's not always possible (and harder when it is possible), but its tradeoffs mean you're less likely to lose messages.
My understanding (noob here) is that it allows producers to retry without fear of duplication. You still have to consider the system feeding the producer though. In Segment's example, clients might deliver their messages more than once to the API. Kafka's mechanism wouldn't detect duplicate messages sent to the producer, just that any given message a producer wants to append to Kafka won't be duplicated.
That's not everything: This website contacted 56 IPs in 6 countries across 47 domains to perform 100 HTTP transactions. In total, 3 MB of data was transfered, which is 5 MB uncompressed. It took 4.103 seconds to load this page. 37 cookies were set, and 8 messages to the console were logged.
"The single requirement of all data pipelines is that they cannot lose data."
Unless the business value of data is derived after applying some summary statistics, than even sampling the data works, and you can lose events in an event stream, while not changing the insight gained. Originally Kafka was designed to be a high throughput data bus for analytical pipeline where losing messages was ok. More recently they are experimenting with exactly once delivery.
Yeah, this was a major overstatement. There are lots of data pipelines where it's ok to lose some data. Consider a sensor that sends measurements hundreds of time a second to an app that operates on a 1-second timeframe. And UDP is used all the time on the internet, yet carries no delivery guarantee.
Having built something similar with RabbitMQ in a high-volume industry, there are a lot of benefits people in this thread seem to be glossing over and are instead debating semantics. Yes, this is not "exactly once" -- there really is no such thing in a distributed system. The best you can hope for is that your edge consumers are idempotent.
There is a lot of value derived from de-duping near ingress of a heavy stream such as this. You're saving downstream consumers time (money) and potential headaches. You may be in an industry where duplicates can be handled by a legacy system, but it takes 5-10 minutes of manual checks and corrections by support staff. That was my exact use case and I can't count the number of times we were thankful our de-duping handled "most" cases.
Imagine:
- Server delivers message M
- Client process event E entailed by message M
- Client tries to ack (A) message on server, but "packet loss"
- To make matters worse, let's say the client also immediately dies after this
How do you handle this situation? The client must transactionally/simultaneously commit both E and A/intent-to-A. Since the server never received an acknowledgment of M, it will either redeliver the message, in which case some record of E must be kept to deduplicate on, or it will wait for client to resend A, or some mixture of both. Note: if you say "just make E idempotent", then you don't need exactly-once delivery in the first place...
I suppose you could go back to some kind of lock-step processing of messages to avoid needing to record all (E,A) that are in flight, but that would obviously kill throughput of the message queue.
Exactly Once can only ever be At Least Once with some out-of-the-box idempotency that may not be as cheap as the natural idempotency of your system.
EDIT: Recommended reading: "Life Beyond Distributed Transactions", Pat Helland - http://queue.acm.org/detail.cfm?id=3025012
We've been doing this in 2005 at +10k msgs/sec (1k payload), durable, transacted, fully encrypted, with no two phase commit, supporting long disconnects (I know for documented cases conversations that resumed and continued after +40 days of partner network disconnect).
Running into resource limits (basically out of disk space) is something the database community knows how to monitor, detect and prevent for decades now.
I really don't get why so many articles, blogs and comments claim this is not working or impossible or even hard. My team shipped this +12 years ago, is used by major deployments, technology is proven and little changed in the original protocol.
[0] https://docs.microsoft.com/en-us/sql/database-engine/configu...
So everyone is talking about the same thing here, data can get delivered twice, and the application must handle it in an idempotent way. For TCP this happens to be looking at some counter and throwing away anything that's already been seen. With a single client/server connection maintaining sequence numbers like TCP does solve the problem but it's harder to use this same technique in multi-client, multi-server, "stateless" requests that are common in the web world.
EDIT: To clarify, what I mean by TCP "does deliver data more than once", is that one side of the connection can send the same data twice. It's true that it's then discarded by the other end but this is what people are talking about when they talk about the theoretical impossibility of never sending anything twice. The rest is basically the idem-potency thing of ensuring that data received twice somehow doesn't cause anything abnormal.
In other words, to reliably agree on a system state (whether message id was delivered) you need the system to be Consistent. And per CAP theorem, it cannot be Available in presence of Partitions.
So other people you're referring to probably talk about distributed systems.
> Just ack after you commit locally.
wait ... what is this "retry" and "ack"? is this so that the sender knows not to send the message again? that sounds suspiciously familiar ...
I mean, come on, you're just describing a framework you wrote that handles "at least once delivery" plus "idempotent operations" for the programmer. That's fine. It's marketing to say it's "exactly once". You're just arguing over what to call it.
https://en.wikipedia.org/wiki/Two_Generals%27_Problem
This isn't true. There is a point where both sides have confirmed receipt of the last new info (the original confirmation back to the initial sender), and further confirmation is just icing. What kind of general is it that says "I've received ten acknowledgements so far from the other side, but I'm still not sure!"?
Remember that it's the message itself being confirmed, not 'did the last message get through'. There is a point where both sides are aware of both the message and a confirmed confirmation of that message.
That said it's still a difficult problem and I actually wish people would stop trying to roll their own schemes. For example, this scheme relies on examining a Kafka outbound topic to resolve in-doubt outbound messages. But what happens if the outbound message + commit is still "in flight" when the system recovers so the system retransmits the in-doubt outbound message and so rather than deduping it now is generating dupes? Yes, the chances of this are minimal which means it will happen.
1. Sender sends message.
2. If no acknowledgement from recipient is returned to sender, resend message until an acknowledgement is received from recipient.
3. If recipient receives message, check store to see if it's been received before.
4. If it's not in the store, store it, acknowledge receipt to sender, and process it.
5. If it's already in the recipient's store, acknowledge receipt to sender, and discard message.
TLDR, you don't need infinitely durable infinite memory. You just need (a) a single-level store in which every event is a transaction, (b) a message protocol with true end-to-end acks, and (c) a permanent session between every pair of nodes. We don't have single-level storage hardware (although XPoint comes close) but it's easy to simulate semantically.
Think about the problem intuitively. I want to send you a stream of messages, each of which you act on exactly once.
I do it like this: I put a sequence number in each message. I keep sending you message 1 until you send me an acknowledgment that you heard message 1. I can also send messages 2, 3, or whatever (lockstep isn't needed), but you process messages in order and ignore messages you've already heard. Never ack an ack, always ack a dup.
What does implementing this design require? It requires a persistent sequence number, on the sending endpoint, for every receiving endpoint in the world. (Edit: and of course another SN on the receiving end.) Of course, for every endpoint you haven't sent to yet, the number is 0 and doesn't need to be stored. This is not a terribly onerous amount of storage.
A sequence number is the piece of state generally known as a "session" in networking theory parlance. Of course a TCP connection is a session. We're simply saying that every two endpoints have a sort of implicit, persistent connection.
Moreover, every message must be a transaction. Acknowledging the message acknowledges that the application, as well as the sequence number, has been fully and persistently updated with the knowledge that the message contains. One way to think of this is that we're adding the latency of a transaction to persistent storage to our packet latency. SSDs are good here.
Furthermore, since in real life semantic messages need to be delivered in packet-sized fragments, a fragmentation model with end-to-end "piggybacked" acks is needed. There can't be a separate message-level ack (the "stack of acks" problem is the curse of the Internet stack) -- acknowledging all the fragment packets acknowledges the message.
All this and more is explained here (plug alert):
http://media.urbit.org/whitepaper.pdf
That's not EOM, that's just the sensible workarounds to use in light of the fact EOM doesn't exist. Obviously a lot of protocols and applications use such things since the inability to have EOM in the real world has not rendered our entire network edifice impossible or useless in real life.
Deleted Comment
Enough of resume-driven-engineering, why does every need to reinvent the wheel?
[1] No disrespect meant - just a description of size. Source: running a piddly 2-person engineering team.
Our network issues are nearly 6 times higher (~3.5%) due to GPRS, and we solved the duplication problem with an approach involving both client and server side.
Clients would always ensure that all the information sent by the server was successfully received. If something goes wrong, instead of retrying (sending the payment again), the client sends just the transaction UUID to the server, and the server might either respond with: A. the corresponding response for the transaction or B. not found.
In the scenario A, the POS terminal managed to properly send all the information to the server but failed to receive the response.
In the scenario B, the POS terminal didn't even manage to properly send the information to the server, so the POS can safely retry.
Dead Comment
There is still a potential for problems in the message delivery to the endpoints (malformed messages, Kafka errors, messages not being consumed fast enough and lost), or duplication at that level (restart a listener on the Kafka stream with the wrong message ID) as well.
This is based on my own pains with Kinesis and Lambda (which, I know, isn't Kafka).
In my experience, better to just allow raw "at least once" messaging and perform idempotant actions based off the messages. It's not always possible (and harder when it is possible), but its tradeoffs mean you're less likely to lose messages.
- Talk from Kafka Summit: https://www.confluent.io/kafka-summit-nyc17/resource/#exactl...
- Proposal: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+E...
https://urlscan.io/result/b2e27a08-1298-491a-863f-8cadc45e73...
Unless the business value of data is derived after applying some summary statistics, than even sampling the data works, and you can lose events in an event stream, while not changing the insight gained. Originally Kafka was designed to be a high throughput data bus for analytical pipeline where losing messages was ok. More recently they are experimenting with exactly once delivery.
There is a lot of value derived from de-duping near ingress of a heavy stream such as this. You're saving downstream consumers time (money) and potential headaches. You may be in an industry where duplicates can be handled by a legacy system, but it takes 5-10 minutes of manual checks and corrections by support staff. That was my exact use case and I can't count the number of times we were thankful our de-duping handled "most" cases.