@@ -280,6 +280,7 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
280280 }
281281 }
282282
283+ var retentionPolicy * utils.RetentionPolicies
283284 if params .RetentionTime != nil || params .RetentionSize != nil {
284285 retentionTime := - 1
285286 retentionSize := - 1
@@ -301,18 +302,16 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
301302 retentionSize = int (params .RetentionSize .ScaledValue (resource .Mega ))
302303 }
303304 }
304- retentionPolicy := utils .NewRetentionPolicies (retentionTime , retentionSize )
305- err = p .adminClient .Topics ().SetRetention (* topicName , retentionPolicy )
306- if err != nil {
307- return err
308- }
305+ policy := utils .NewRetentionPolicies (retentionTime , retentionSize )
306+ retentionPolicy = & policy
309307 }
310308
309+ var backlogQuotaPolicy * utils.BacklogQuota
310+ var backlogQuotaType utils.BacklogQuotaType
311311 if (params .BacklogQuotaLimitTime != nil || params .BacklogQuotaLimitSize != nil ) &&
312312 params .BacklogQuotaRetentionPolicy != nil {
313313 backlogTime := int64 (- 1 )
314314 backlogSize := int64 (- 1 )
315- var backlogQuotaType utils.BacklogQuotaType
316315 if params .BacklogQuotaLimitTime != nil {
317316 t , err := params .BacklogQuotaLimitTime .Parse ()
318317 if err != nil {
@@ -325,13 +324,24 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
325324 backlogSize = params .BacklogQuotaLimitSize .Value ()
326325 backlogQuotaType = utils .DestinationStorage
327326 }
328- backlogQuotaPolicy := utils.BacklogQuota {
327+ backlogQuotaPolicy = & utils.BacklogQuota {
329328 LimitTime : backlogTime ,
330329 LimitSize : backlogSize ,
331330 Policy : utils .RetentionPolicy (* params .BacklogQuotaRetentionPolicy ),
332331 }
333- err = p .adminClient .Topics ().SetBacklogQuota (* topicName , backlogQuotaPolicy , backlogQuotaType )
334- if err != nil {
332+ }
333+
334+ switch {
335+ case retentionPolicy != nil && backlogQuotaPolicy != nil :
336+ if err := p .applyRetentionAndBacklogPolicies (topicName , retentionPolicy , backlogQuotaPolicy , backlogQuotaType ); err != nil {
337+ return err
338+ }
339+ case retentionPolicy != nil :
340+ if err := p .adminClient .Topics ().SetRetention (* topicName , * retentionPolicy ); err != nil {
341+ return err
342+ }
343+ case backlogQuotaPolicy != nil :
344+ if err := p .adminClient .Topics ().SetBacklogQuota (* topicName , * backlogQuotaPolicy , backlogQuotaType ); err != nil {
335345 return err
336346 }
337347 }
@@ -577,6 +587,38 @@ func (p *PulsarAdminClient) applyTopicPolicies(topicName *utils.TopicName, param
577587 return nil
578588}
579589
590+ func (p * PulsarAdminClient ) applyRetentionAndBacklogPolicies (topicName * utils.TopicName , retention * utils.RetentionPolicies ,
591+ backlog * utils.BacklogQuota , backlogType utils.BacklogQuotaType ) error {
592+ if err := p .adminClient .Topics ().SetRetention (* topicName , * retention ); err != nil {
593+ if ! isRetentionBacklogOrderingError (err ) {
594+ return err
595+ }
596+
597+ if err := p .adminClient .Topics ().SetBacklogQuota (* topicName , * backlog , backlogType ); err != nil {
598+ return err
599+ }
600+
601+ if err := p .adminClient .Topics ().SetRetention (* topicName , * retention ); err != nil {
602+ return err
603+ }
604+
605+ return nil
606+ }
607+
608+ if err := p .adminClient .Topics ().SetBacklogQuota (* topicName , * backlog , backlogType ); err != nil {
609+ return err
610+ }
611+
612+ return nil
613+ }
614+
615+ func isRetentionBacklogOrderingError (err error ) bool {
616+ if ErrorReason (err ) != ReasonInvalidParameter {
617+ return false
618+ }
619+ return strings .Contains (err .Error (), "Retention Quota must exceed configured backlog quota" )
620+ }
621+
580622// GetTopicClusters get the assigned clusters of the topic to the local default cluster
581623func (p * PulsarAdminClient ) GetTopicClusters (name string , persistent * bool ) ([]string , error ) {
582624 completeTopicName := MakeCompleteTopicName (name , persistent )
0 commit comments