|
17 | 17 | use std::sync::Arc; |
18 | 18 | use std::sync::atomic::{AtomicUsize, Ordering}; |
19 | 19 |
|
20 | | -use snafu::ensure; |
21 | | - |
22 | 20 | use crate::error::{Result, TooManyConcurrentRequestsSnafu}; |
23 | 21 |
|
24 | 22 | /// Limiter for total memory usage of concurrent request bodies. |
@@ -62,35 +60,30 @@ impl RequestMemoryLimiter { |
62 | 60 | return Ok(None); |
63 | 61 | }; |
64 | 62 |
|
65 | | - let mut current = inner.current_usage.load(Ordering::Relaxed); |
66 | | - loop { |
67 | | - let new_usage = current.saturating_add(request_size); |
68 | | - |
69 | | - ensure!( |
70 | | - new_usage <= inner.max_memory, |
71 | | - TooManyConcurrentRequestsSnafu { |
72 | | - limit: inner.max_memory, |
73 | | - request_size, |
74 | | - } |
75 | | - ); |
76 | | - |
77 | | - match inner.current_usage.compare_exchange_weak( |
78 | | - current, |
79 | | - new_usage, |
80 | | - Ordering::Release, |
81 | | - Ordering::Relaxed, |
82 | | - ) { |
83 | | - Ok(_) => { |
84 | | - return Ok(Some(RequestMemoryGuard { |
85 | | - size: request_size, |
86 | | - limiter: Arc::clone(inner), |
87 | | - usage_snapshot: new_usage, |
88 | | - })); |
89 | | - } |
90 | | - Err(actual) => { |
91 | | - current = actual; |
92 | | - } |
| 63 | + let mut new_usage = 0; |
| 64 | + let result = |
| 65 | + inner |
| 66 | + .current_usage |
| 67 | + .fetch_update(Ordering::Release, Ordering::Relaxed, |current| { |
| 68 | + new_usage = current.saturating_add(request_size); |
| 69 | + if new_usage <= inner.max_memory { |
| 70 | + Some(new_usage) |
| 71 | + } else { |
| 72 | + None |
| 73 | + } |
| 74 | + }); |
| 75 | + |
| 76 | + match result { |
| 77 | + Ok(_) => Ok(Some(RequestMemoryGuard { |
| 78 | + size: request_size, |
| 79 | + limiter: Arc::clone(inner), |
| 80 | + usage_snapshot: new_usage, |
| 81 | + })), |
| 82 | + Err(_current) => TooManyConcurrentRequestsSnafu { |
| 83 | + limit: inner.max_memory, |
| 84 | + request_size, |
93 | 85 | } |
| 86 | + .fail(), |
94 | 87 | } |
95 | 88 | } |
96 | 89 |
|
|
0 commit comments