Skip to content

Commit

Permalink
Merge pull request #1 from warpstreamlabs/ra/inc
Browse files Browse the repository at this point in the history
Fix bug in idempotency implementation as well as when MaxOpenRequests=1 and strict ordering is desired
  • Loading branch information
richardartoul authored Jul 19, 2024
2 parents d2246cc + 144000c commit 31175b3
Show file tree
Hide file tree
Showing 2 changed files with 184 additions and 27 deletions.
195 changes: 168 additions & 27 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ type ProducerMessage struct {
sequenceNumber int32
producerEpoch int16
hasSequence bool
hasBeenBatched bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand Down Expand Up @@ -695,6 +696,9 @@ func (pp *partitionProducer) dispatch() {
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
if msg.hasSequence {
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them")
}
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
Expand Down Expand Up @@ -737,6 +741,13 @@ func (pp *partitionProducer) flushRetryBuffers() {
}

for _, msg := range pp.retryState[pp.highWatermark].buf {
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
if msg.hasSequence {
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them")
}
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
pp.brokerProducer.input <- msg
}

Expand Down Expand Up @@ -800,6 +811,7 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {

// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)

// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
Expand All @@ -823,19 +835,66 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
}
}

// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
err := broker.AsyncProduce(request, sendResponse)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
if p.conf.Producer.Idempotent || p.conf.Net.MaxOpenRequests == 1 {
// Idempotency being enabled or MaxOpenRequests == 1 are special cases where
// the caller has communicated: I really care about ordering. As a result, we
// we do our best to guarantee that by using the synchronous (blocking) version
// of produce so that there is only one outstanding Produce request at any given
// time. A few notes:
//
// 1. We shouldn't *have* to do this since that is what p.conf.Net.MaxOpenRequests
// is for. However, as noted in the comments of the P.R that introduced pipeline
// in the first place, the implementation of pipelining is such that
// p.conf.net.MaxOpenRequests is no longer strictly honored, so we have to do it
// this way if we want to maintain strict ordering guarantees. P.R that introduced
// request pipelining: https://github.com/IBM/sarama/pull/2094
//
//
// 2. In theory when idempotency is enabled, it *should* be possible to maintain
// strict ordering with concurrent produce requests as long as all the produce
// requests are received by the broker in the correct order (say, by sending them
// in the right order over a single connection). However, the current implementation
// of this library is such that idempotency breaks in certain error/retry scenarios
// when request pipelining is enabled. This can manifest as either the assertion in
// `produce_set.go` firing and complaining that a batch was created that contains out
// of sequence messages (https://github.com/IBM/sarama/issues/2803), or the Kafka
// broker itself detecting that the ordering guarantees have been violated and
// rejecting the batch, effectively causing data loss since there is no way for the
// client to retry or handle this error without risk of introducing duplicates:
// https://github.com/IBM/sarama/issues/2619. I wish I could say with certainty
// exactly *why* this happens, but after two days of strenous debugging I still can't
// say for sure other than this file was written with a lot of assumptiosn and things
// that happened to work because there was no request pipelining and the best path
// forward is to just make sure there is no request pipelining when strict ordering
// is required.
resp, err := broker.Produce(request)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
} else {
sendResponse(resp, nil)
}
} else {
// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
err := broker.AsyncProduce(request, sendResponse)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
}
}
}
// Wait for all in flight requests to close the pending channel safely
Expand Down Expand Up @@ -974,6 +1033,11 @@ func (bp *brokerProducer) run() {
continue
}
}

if bp.parent.conf.Producer.Idempotent {
panic("msg made it to brokerProducer goroutine without being sequenced while idempotency is enabled")
}

if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
Expand Down Expand Up @@ -1101,7 +1165,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.returnErrors(pSet.msgs, block.Err)
Expand Down Expand Up @@ -1134,32 +1198,55 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
if bp.parent.conf.Producer.Idempotent {
// I don't really understand why we need a dedicated goroutine for this, but it was like
// that before.
go bp.parent.retryBatch(topic, partition, pSet, block.Err)
} else {
bp.parent.retryMessages(pSet.msgs, block.Err)
// dropping the following messages has the side effect of incrementing their retry count.
//
// Note, I'm not 100% sure why this is here since we're not about to call bp.rollover(),
// however, it's very important that this function is not called in the idempotency path
// because it would break apart the messages in an existing (sequenced) batch and the
// messages could end up split across multiple batches or in a different order which
// violates how the idempotency protocol works and will cause an assertion to fire in the
// client (dropping the messages) or for the Broker to reject the messages. Either way, its
// data loss.
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
}
})
}
}

func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
// Due to how this library is structured / implemented, retryBatch can end up getting called in
// a very tight loop in some scenarios (like when a Broker is down and the library is trying to
// re-establish a connection). As a result, we need to make sure that retryBatch honors the
// configured Retry.Backoff just like the regular message retries do.
time.Sleep(p.conf.Producer.Retry.Backoff)

Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
produceSet := newProduceSet(p)
produceSet.msgs[topic] = make(map[int32]*partitionSet)
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
// Once a producer ID and epoch have been assigned to messages, they can *never* be changed
// in order for the idempotency protocol between the client and Broker to be correct. As a
// result, we make sure that we copy over the old producer ID and epoch (older versions of
// this library did not do this which resulted in bugs where the client would issue batches
// with new producer ID / epochs, but with old sequence numbers which makes no sense).
produceSet.producerID = pSet.recordsToSend.RecordBatch.ProducerID
produceSet.producerEpoch = pSet.recordsToSend.RecordBatch.ProducerEpoch
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnErrors(pSet.msgs, kerr)
Expand All @@ -1168,18 +1255,39 @@ func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitio
msg.retries++
}

// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
// Previous versions of this library did not retry the call to p.client.Leader(topic, partition)
// if it failed. That was problematic because it meant during periods of time where we couldn't
// figure out who the leader was (because it had just died, or there was a network problem, or
// whatever) idempotent Produce requests would fail immediately instead of retrying for awhile
// as expected. This retry loop is very important since prematurely (and unnecessarily) failing
// an idempotent batch is ~equivalent to data loss.
succeeded := false
for i := 0; i < p.conf.Producer.Retry.Max; i++ {
// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, attempt: %d of %d\n", topic, partition, err, i, p.conf.Producer.Retry.Max)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}

time.Sleep(p.conf.Producer.Retry.Backoff)
continue
}

bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
succeeded = true
break
}

if !succeeded {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, no more retries\n", topic, partition)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
Expand All @@ -1193,11 +1301,34 @@ func (bp *brokerProducer) handleError(sent *produceSet, err error) {
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err

sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
// It is very important that we use retryBatch instead of retryMessages when idempotency
// is enabled. Otherwise batches that have already been sequenced (and potentially even
// sent to Brokers) will get split up and rebuilt in a different way, order, or with the
// messages split across different batches. This is not allowed in the idempotency
// protocol. The expectation on the Broker side is that if the client retries any batches,
// it will send them with exact same messages and sequence numbers as the original attempt,
// otherwise the protocol doesn't work.
if bp.parent.conf.Producer.Idempotent {
// IDK if this needs/should have an external goroutine, but the only other place where
// retryBatch() was called did it this way, so I did the same for posterity.
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError)
} else {
bp.parent.retryMessages(pSet.msgs, err)
}
})

// Everything in bp.buffer needs to be retried because we're about to call bp.rollOver()
// which will clear bp.buffer. Why we need to call bp.rollOver() here I'm not sure.
bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
// Same comment as above.
if bp.parent.conf.Producer.Idempotent {
// Same comment as above.
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError)
} else {
bp.parent.retryMessages(pSet.msgs, err)
}
})
bp.rollOver()
}
Expand Down Expand Up @@ -1319,6 +1450,16 @@ func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
}

func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
if p.conf.Producer.Idempotent && msg.hasSequence && msg.hasBeenBatched {
// If the message has had a sequence number assigned *and* it's
// been added to a batch (produce_set.go) then it is illegal to
// use the retryMessage function on it because that means the
// calling function is breaking apart and retrying messages from
// a batch that was already potentially sent to a Kafka broker
// which is a violation of the idempotency protocol.
panic("retrying msg with sequence that has been batched before")
}

if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, err)
} else {
Expand Down
16 changes: 16 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,26 @@ func (ps *produceSet) add(msg *ProducerMessage) error {

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
Logger.Println(
"assertion failed: message out of sequence added to batch",
"producer_id",
ps.producerID,
set.recordsToSend.RecordBatch.ProducerID,
"producer_epoch",
ps.producerEpoch,
set.recordsToSend.RecordBatch.ProducerEpoch,
"sequence_number",
msg.sequenceNumber,
set.recordsToSend.RecordBatch.FirstSequence,
"buffer_count",
ps.bufferCount,
"msg_has_sequence",
msg.hasSequence)
return errors.New("assertion failed: message out of sequence added to a batch")
}
}

msg.hasBeenBatched = true
// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)

Expand Down

0 comments on commit 31175b3

Please sign in to comment.