1616package software .amazon .awssdk .core .internal .async ;
1717
1818import java .nio .ByteBuffer ;
19+ import java .util .ArrayList ;
1920import java .util .Arrays ;
21+ import java .util .Collections ;
22+ import java .util .List ;
2023import java .util .Optional ;
24+ import java .util .Set ;
25+ import java .util .concurrent .ConcurrentHashMap ;
2126import java .util .concurrent .atomic .AtomicBoolean ;
2227import java .util .concurrent .atomic .AtomicInteger ;
28+ import java .util .concurrent .atomic .AtomicLong ;
2329import org .reactivestreams .Subscriber ;
2430import org .reactivestreams .Subscription ;
2531import software .amazon .awssdk .annotations .SdkInternalApi ;
32+ import software .amazon .awssdk .annotations .SdkTestInternalApi ;
2633import software .amazon .awssdk .core .async .AsyncRequestBody ;
34+ import software .amazon .awssdk .core .exception .NonRetryableException ;
2735import software .amazon .awssdk .core .internal .util .Mimetype ;
36+ import software .amazon .awssdk .core .internal .util .NoopSubscription ;
2837import software .amazon .awssdk .utils .Logger ;
38+ import software .amazon .awssdk .utils .SdkAutoCloseable ;
39+ import software .amazon .awssdk .utils .Validate ;
2940
3041/**
3142 * An implementation of {@link AsyncRequestBody} for providing data from the supplied {@link ByteBuffer} array. This is created
3243 * using static methods on {@link AsyncRequestBody}
3344 *
45+ * <h3>Subscription Behavior:</h3>
46+ * <ul>
47+ * <li>Each subscriber receives a read-only view of the buffered data</li>
48+ * <li>Subscribers receive data independently based on their own demand signaling</li>
49+ * <li>If the body is closed, new subscribers will receive an error immediately</li>
50+ * </ul>
51+ *
52+ * <h3>Resource Management:</h3>
53+ * The body should be closed when no longer needed to free buffered data and notify active subscribers.
54+ * Closing the body will:
55+ * <ul>
56+ * <li>Clear all buffered data</li>
57+ * <li>Send error notifications to all active subscribers</li>
58+ * <li>Prevent new subscriptions</li>
59+ * </ul>
3460 * @see AsyncRequestBody#fromBytes(byte[])
3561 * @see AsyncRequestBody#fromBytesUnsafe(byte[])
3662 * @see AsyncRequestBody#fromByteBuffer(ByteBuffer)
4066 * @see AsyncRequestBody#fromString(String)
4167 */
4268@ SdkInternalApi
43- public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody {
69+ public final class ByteBuffersAsyncRequestBody implements AsyncRequestBody , SdkAutoCloseable {
4470 private static final Logger log = Logger .loggerFor (ByteBuffersAsyncRequestBody .class );
4571
4672 private final String mimetype ;
4773 private final Long length ;
48- private final ByteBuffer [] buffers ;
74+ private List <ByteBuffer > buffers ;
75+ private final Set <ReplayableByteBufferSubscription > subscriptions ;
76+ private final Object lock = new Object ();
77+ private boolean closed ;
4978
50- private ByteBuffersAsyncRequestBody (String mimetype , Long length , ByteBuffer ... buffers ) {
79+ private ByteBuffersAsyncRequestBody (String mimetype , Long length , List < ByteBuffer > buffers ) {
5180 this .mimetype = mimetype ;
52- this .length = length ;
5381 this .buffers = buffers ;
82+ this .length = length ;
83+ this .subscriptions = ConcurrentHashMap .newKeySet ();
5484 }
5585
5686 @ Override
@@ -64,61 +94,25 @@ public String contentType() {
6494 }
6595
6696 @ Override
67- public void subscribe (Subscriber <? super ByteBuffer > s ) {
68- // As per rule 1.9 we must throw NullPointerException if the subscriber parameter is null
69- if (s == null ) {
70- throw new NullPointerException ("Subscription MUST NOT be null." );
97+ public void subscribe (Subscriber <? super ByteBuffer > subscriber ) {
98+ Validate .paramNotNull (subscriber , "subscriber" );
99+ synchronized (lock ) {
100+ if (closed ) {
101+ subscriber .onSubscribe (new NoopSubscription (subscriber ));
102+ subscriber .onError (NonRetryableException .create (
103+ "AsyncRequestBody has been closed" ));
104+ return ;
105+ }
71106 }
72107
73- // As per 2.13, this method must return normally (i.e. not throw).
74108 try {
75- s .onSubscribe (
76- new Subscription () {
77- private final AtomicInteger index = new AtomicInteger (0 );
78- private final AtomicBoolean completed = new AtomicBoolean (false );
79-
80- @ Override
81- public void request (long n ) {
82- if (completed .get ()) {
83- return ;
84- }
85-
86- if (n > 0 ) {
87- int i = index .getAndIncrement ();
88-
89- if (buffers .length == 0 && completed .compareAndSet (false , true )) {
90- s .onComplete ();
91- }
92-
93- if (i >= buffers .length ) {
94- return ;
95- }
96-
97- long remaining = n ;
98-
99- do {
100- ByteBuffer buffer = buffers [i ];
101-
102- s .onNext (buffer .asReadOnlyBuffer ());
103- remaining --;
104- } while (remaining > 0 && (i = index .getAndIncrement ()) < buffers .length );
105-
106- if (i >= buffers .length - 1 && completed .compareAndSet (false , true )) {
107- s .onComplete ();
108- }
109- } else {
110- s .onError (new IllegalArgumentException ("§3.9: non-positive requests are not allowed!" ));
111- }
112- }
113-
114- @ Override
115- public void cancel () {
116- completed .set (true );
117- }
118- }
119- );
109+ ReplayableByteBufferSubscription replayableByteBufferSubscription =
110+ new ReplayableByteBufferSubscription (subscriber );
111+ subscriber .onSubscribe (replayableByteBufferSubscription );
112+ subscriptions .add (replayableByteBufferSubscription );
120113 } catch (Throwable ex ) {
121- log .error (() -> s + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe." , ex );
114+ log .error (() -> subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe." ,
115+ ex );
122116 }
123117 }
124118
@@ -127,34 +121,167 @@ public String body() {
127121 return BodyType .BYTES .getName ();
128122 }
129123
130- public static ByteBuffersAsyncRequestBody of (ByteBuffer ... buffers ) {
131- long length = Arrays .stream (buffers )
132- .mapToLong (ByteBuffer ::remaining )
133- .sum ();
124+ public static ByteBuffersAsyncRequestBody of (List < ByteBuffer > buffers ) {
125+ long length = buffers .stream ()
126+ .mapToLong (ByteBuffer ::remaining )
127+ .sum ();
134128 return new ByteBuffersAsyncRequestBody (Mimetype .MIMETYPE_OCTET_STREAM , length , buffers );
135129 }
136130
131+ public static ByteBuffersAsyncRequestBody of (ByteBuffer ... buffers ) {
132+ return of (Arrays .asList (buffers ));
133+ }
134+
137135 public static ByteBuffersAsyncRequestBody of (Long length , ByteBuffer ... buffers ) {
138- return new ByteBuffersAsyncRequestBody (Mimetype .MIMETYPE_OCTET_STREAM , length , buffers );
136+ return new ByteBuffersAsyncRequestBody (Mimetype .MIMETYPE_OCTET_STREAM , length , Arrays . asList ( buffers ) );
139137 }
140138
141139 public static ByteBuffersAsyncRequestBody of (String mimetype , ByteBuffer ... buffers ) {
142140 long length = Arrays .stream (buffers )
143141 .mapToLong (ByteBuffer ::remaining )
144142 .sum ();
145- return new ByteBuffersAsyncRequestBody (mimetype , length , buffers );
143+ return new ByteBuffersAsyncRequestBody (mimetype , length , Arrays . asList ( buffers ) );
146144 }
147145
148146 public static ByteBuffersAsyncRequestBody of (String mimetype , Long length , ByteBuffer ... buffers ) {
149- return new ByteBuffersAsyncRequestBody (mimetype , length , buffers );
147+ return new ByteBuffersAsyncRequestBody (mimetype , length , Arrays . asList ( buffers ) );
150148 }
151149
152150 public static ByteBuffersAsyncRequestBody from (byte [] bytes ) {
153151 return new ByteBuffersAsyncRequestBody (Mimetype .MIMETYPE_OCTET_STREAM , (long ) bytes .length ,
154- ByteBuffer .wrap (bytes ));
152+ Collections . singletonList ( ByteBuffer .wrap (bytes ) ));
155153 }
156154
157155 public static ByteBuffersAsyncRequestBody from (String mimetype , byte [] bytes ) {
158- return new ByteBuffersAsyncRequestBody (mimetype , (long ) bytes .length , ByteBuffer .wrap (bytes ));
156+ return new ByteBuffersAsyncRequestBody (mimetype , (long ) bytes .length ,
157+ Collections .singletonList (ByteBuffer .wrap (bytes )));
158+ }
159+
160+ @ Override
161+ public void close () {
162+ synchronized (lock ) {
163+ if (closed ) {
164+ return ;
165+ }
166+
167+ closed = true ;
168+ buffers = new ArrayList <>();
169+ subscriptions .forEach (s -> s .notifyError (new IllegalStateException ("The publisher has been closed" )));
170+ subscriptions .clear ();
171+ }
172+ }
173+
174+ @ SdkTestInternalApi
175+ public List <ByteBuffer > bufferedData () {
176+ return buffers ;
177+ }
178+
179+ private class ReplayableByteBufferSubscription implements Subscription {
180+ private final AtomicInteger index = new AtomicInteger (0 );
181+ private volatile boolean done ;
182+ private final AtomicBoolean processingRequest = new AtomicBoolean (false );
183+ private Subscriber <? super ByteBuffer > currentSubscriber ;
184+ private final AtomicLong outstandingDemand = new AtomicLong ();
185+
186+ private ReplayableByteBufferSubscription (Subscriber <? super ByteBuffer > subscriber ) {
187+ this .currentSubscriber = subscriber ;
188+ }
189+
190+ @ Override
191+ public void request (long n ) {
192+ if (n <= 0 ) {
193+ currentSubscriber .onError (new IllegalArgumentException ("§3.9: non-positive requests are not allowed!" ));
194+ currentSubscriber = null ;
195+ return ;
196+ }
197+
198+ if (done ) {
199+ return ;
200+ }
201+
202+ if (buffers .size () == 0 ) {
203+ currentSubscriber .onComplete ();
204+ done = true ;
205+ subscriptions .remove (this );
206+ return ;
207+ }
208+
209+ outstandingDemand .updateAndGet (current -> {
210+ if (Long .MAX_VALUE - current < n ) {
211+ return Long .MAX_VALUE ;
212+ }
213+
214+ return current + n ;
215+ });
216+ processRequest ();
217+ }
218+
219+ private void processRequest () {
220+ do {
221+ if (!processingRequest .compareAndSet (false , true )) {
222+ // Some other thread is processing the queue, so we don't need to.
223+ return ;
224+ }
225+
226+ try {
227+ doProcessRequest ();
228+ } catch (Throwable e ) {
229+ notifyError (new IllegalStateException ("Encountered fatal error in publisher" , e ));
230+ subscriptions .remove (this );
231+ break ;
232+ } finally {
233+ processingRequest .set (false );
234+ }
235+
236+ } while (shouldProcessRequest ());
237+ }
238+
239+ private boolean shouldProcessRequest () {
240+ return !done && outstandingDemand .get () > 0 && index .get () < buffers .size ();
241+ }
242+
243+ private void doProcessRequest () {
244+ while (true ) {
245+ if (!shouldProcessRequest ()) {
246+ return ;
247+ }
248+
249+ int currentIndex = this .index .getAndIncrement ();
250+
251+ if (currentIndex >= buffers .size ()) {
252+ // This should never happen because shouldProcessRequest() ensures that index.get() < buffers.size()
253+ // before incrementing. If this condition is true, it likely indicates a concurrency bug or that buffers
254+ // was modified unexpectedly. This defensive check is here to catch such rare, unexpected situations.
255+ notifyError (new IllegalStateException ("Index out of bounds" ));
256+ subscriptions .remove (this );
257+ return ;
258+ }
259+
260+ ByteBuffer buffer = buffers .get (currentIndex );
261+ currentSubscriber .onNext (buffer .asReadOnlyBuffer ());
262+ outstandingDemand .decrementAndGet ();
263+
264+ if (currentIndex == buffers .size () - 1 ) {
265+ done = true ;
266+ currentSubscriber .onComplete ();
267+ subscriptions .remove (this );
268+ break ;
269+ }
270+ }
271+ }
272+
273+ @ Override
274+ public void cancel () {
275+ done = true ;
276+ subscriptions .remove (this );
277+ }
278+
279+ public void notifyError (Exception exception ) {
280+ if (currentSubscriber != null ) {
281+ done = true ;
282+ currentSubscriber .onError (exception );
283+ currentSubscriber = null ;
284+ }
285+ }
159286 }
160287}
0 commit comments