@@ -83,20 +83,20 @@ async Task PublishMessagesInBatchAsync()
83
83
QueueDeclareOk queueDeclareResult = await channel . QueueDeclareAsync ( ) ;
84
84
string queueName = queueDeclareResult . QueueName ;
85
85
86
- int batchSize = MAX_OUTSTANDING_CONFIRMS / 2 ;
87
- int outstandingMessageCount = 0 ;
86
+ int batchSize = Math . Max ( 1 , MAX_OUTSTANDING_CONFIRMS / 2 ) ;
88
87
89
- var sw = new Stopwatch ( ) ;
90
- sw . Start ( ) ;
88
+ var sw = Stopwatch . StartNew ( ) ;
91
89
92
90
var publishTasks = new List < ValueTask > ( ) ;
93
91
for ( int i = 0 ; i < MESSAGE_COUNT ; i ++ )
94
92
{
95
93
byte [ ] body = Encoding . UTF8 . GetBytes ( i . ToString ( ) ) ;
96
- publishTasks . Add ( channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body , mandatory : true , basicProperties : props ) ) ;
97
- outstandingMessageCount ++ ;
94
+ ValueTask publishTask = channel . BasicPublishAsync ( exchange : string . Empty , routingKey : queueName , body : body , mandatory : true , basicProperties : props ) ;
95
+ publishTasks . Add ( publishTask ) ;
98
96
99
- if ( outstandingMessageCount == batchSize )
97
+ // NOTE: [publishTasks] should be published after the final message has been added,
98
+ // even if the # of tasks it contains isn't equal to [batchSize].
99
+ if ( publishTasks . Count == batchSize || i + 1 == MESSAGE_COUNT )
100
100
{
101
101
foreach ( ValueTask pt in publishTasks )
102
102
{
@@ -110,25 +110,7 @@ async Task PublishMessagesInBatchAsync()
110
110
}
111
111
}
112
112
publishTasks . Clear ( ) ;
113
- outstandingMessageCount = 0 ;
114
- }
115
- }
116
-
117
- if ( publishTasks . Count > 0 )
118
- {
119
- foreach ( ValueTask pt in publishTasks )
120
- {
121
- try
122
- {
123
- await pt ;
124
- }
125
- catch ( Exception ex )
126
- {
127
- Console . Error . WriteLine ( $ "{ DateTime . Now } [ERROR] saw nack or return, ex: '{ ex } '") ;
128
- }
129
113
}
130
- publishTasks . Clear ( ) ;
131
- outstandingMessageCount = 0 ;
132
114
}
133
115
134
116
sw . Stop ( ) ;
0 commit comments