@@ -260,7 +260,7 @@ def transition_to_abortable_error(self, exc):
260
260
with self ._lock :
261
261
if self ._current_state == TransactionState .ABORTING_TRANSACTION :
262
262
log .debug ("Skipping transition to abortable error state since the transaction is already being "
263
- " aborted. Underlying exception: " , exc )
263
+ " aborted. Underlying exception: %s " , exc )
264
264
return
265
265
self ._transition_to (TransactionState .ABORTABLE_ERROR , error = exc )
266
266
@@ -687,7 +687,7 @@ def handle_response(self, response):
687
687
if error is Errors .NoError :
688
688
continue
689
689
elif error in (Errors .CoordinatorNotAvailableError , Errors .NotCoordinatorError ):
690
- self .transaction_manager ._lookup_coordinator ('transaction' , self .transactiona_id )
690
+ self .transaction_manager ._lookup_coordinator ('transaction' , self .transactional_id )
691
691
self .reenqueue ()
692
692
return
693
693
elif error is Errors .ConcurrentTransactionsError :
@@ -726,7 +726,7 @@ def handle_response(self, response):
726
726
self .transaction_manager ._pending_partitions_in_transaction -= partitions
727
727
728
728
if unauthorized_topics :
729
- self .abortable_error (Errors .TopicAuthorizationError (unauthorized_topics ))
729
+ self .abortable_error (Errors .TopicAuthorizationFailedError (unauthorized_topics ))
730
730
elif has_partition_errors :
731
731
self .abortable_error (Errors .KafkaError ("Could not add partitions to transaction due to errors: %s" % (results )))
732
732
else :
@@ -795,7 +795,7 @@ def handle_response(self, response):
795
795
elif error is Errors .TransactionalIdAuthorizationFailedError :
796
796
self .fatal_error (error ())
797
797
elif error is Errors .GroupAuthorizationFailedError :
798
- self .abortable_error (Errors . GroupAuthorizationError (self ._coord_key ))
798
+ self .abortable_error (error (self ._coord_key ))
799
799
else :
800
800
self .fatal_error (Errors .KafkaError (
801
801
"Could not find a coordinator with type %s with key %s due to"
@@ -888,7 +888,7 @@ def handle_response(self, response):
888
888
elif error is Errors .TransactionalIdAuthorizationFailedError :
889
889
self .fatal_error (error ())
890
890
elif error is Errors .GroupAuthorizationFailedError :
891
- self .abortable_error (Errors . GroupAuthorizationError (self .consumer_group_id ))
891
+ self .abortable_error (error (self .consumer_group_id ))
892
892
else :
893
893
self .fatal_error (Errors .KafkaError ("Unexpected error in AddOffsetsToTxnResponse: %s" % (error ())))
894
894
@@ -955,7 +955,7 @@ def handle_response(self, response):
955
955
elif error is Errors .UnknownTopicOrPartitionError :
956
956
retriable_failure = True
957
957
elif error is Errors .GroupAuthorizationFailedError :
958
- self .abortable_error (Errors . GroupAuthorizationError (self .consumer_group_id ))
958
+ self .abortable_error (error (self .consumer_group_id ))
959
959
return
960
960
elif error in (Errors .TransactionalIdAuthorizationFailedError ,
961
961
Errors .InvalidProducerEpochError ,
0 commit comments