Skip to content

Commit 4567627

Browse files
committed
More work in progress.
Signed-off-by: Simone Bordet <[email protected]>
1 parent 64a9716 commit 4567627

File tree

6 files changed

+120
-45
lines changed

6 files changed

+120
-45
lines changed

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Content.java

Lines changed: 30 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,6 @@
4949
import org.eclipse.jetty.util.BufferUtil;
5050
import org.eclipse.jetty.util.Callback;
5151
import org.eclipse.jetty.util.IO;
52-
import org.eclipse.jetty.util.IteratingCallback;
53-
import org.eclipse.jetty.util.IteratingNestedCallback;
5452
import org.eclipse.jetty.util.Promise;
5553
import org.slf4j.Logger;
5654
import org.slf4j.LoggerFactory;
@@ -109,28 +107,36 @@ public static void copy(Source source, Sink sink, Chunk.Processor chunkProcessor
109107
new ContentCopier(source, sink, chunkProcessor, callback).iterate();
110108
}
111109

112-
public static void copy(Source source, long length, boolean last, Sink sink, Callback callback)
110+
public static void copy(Source source, boolean last, Sink sink, Callback callback)
111+
{
112+
new ContentCopier(source, last, sink, null, callback).iterate();
113+
}
114+
115+
private static void copyRange(Source source, long length, Sink sink, Callback callback)
116+
{
117+
// TODO: it would be really difficult to make a source remember the bytes...
118+
// a subsequent call with the same source cannot have stored a chunk that
119+
// it returned previously, so do we really need a range?
120+
// Isn't the length always implicit to be the full length?
121+
// In HTTP/2 a large write is chunked and the write callback is not completed
122+
// until all the chunks are written (we store the BB in a DATA frame, and we
123+
// consume the BB chunk by chunk).
124+
// How can we do the same with a Source?
125+
// We can read a BB, wrap it in a DATA frame, even if larger than maxFrameSize
126+
// or flow control, as the Flusher will remember it.
127+
// But for transferTo(), we need a similar way for a Source to have position
128+
// and limit that a BB has, so perhaps we need a Source.Seekable.
129+
}
130+
131+
public static boolean transfer(Source source, long length, Sink sink, Callback callback)
113132
{
114133
if (source instanceof Transferable.From from)
115134
{
116135
if (from.transferTo(sink, length, callback))
117-
{
118-
// TODO: honor "last".
119-
return;
120-
}
136+
return true;
121137
}
122-
123-
IteratingCallback flusher = new IteratingNestedCallback(callback)
124-
{
125-
@Override
126-
protected Action process()
127-
{
128-
// TODO
129-
// TODO Honor "last".
130-
return null;
131-
}
132-
};
133-
flusher.iterate();
138+
copyRange(source, length, sink, callback);
139+
return false;
134140
}
135141

136142
/**
@@ -691,6 +697,8 @@ default boolean rewind()
691697
*/
692698
public interface Sink
693699
{
700+
ByteBuffer TRANSFER = ByteBuffer.allocate(0);
701+
694702
/**
695703
* <p>Wraps the given {@link OutputStream} as a {@link Sink}.
696704
* @param out The stream to wrap
@@ -907,12 +915,12 @@ static void write(Sink sink, boolean last, Content.Source source, Callback callb
907915
{
908916
// Optimization to enable zero-copy.
909917
aware.setContentSource(source);
910-
sink.write(last, null, callback);
918+
sink.write(last, TRANSFER, callback);
911919
}
912920
else
913921
{
914-
// Normal write.
915-
Content.copy(source, source.getLength(), last, sink, callback);
922+
// Normal source.read() + sink.write() full copy.
923+
Content.copy(source, last, sink, callback);
916924
}
917925
}
918926

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SelectableChannelEndPoint.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -253,20 +253,22 @@ public Runnable onSelected()
253253
if (LOG.isDebugEnabled())
254254
LOG.debug("onSelected {}->{} r={} w={} for {}", oldInterestOps, newInterestOps, fillable, flushable, this);
255255

256-
// return task to complete the job
257-
Runnable task = fillable
258-
? (flushable
259-
? _runCompleteWriteFillable
260-
: _runFillable)
261-
: (flushable
262-
? _runCompleteWrite
263-
: null);
256+
Runnable task = taskForSelected(fillable, flushable);
264257

265258
if (LOG.isDebugEnabled())
266259
LOG.debug("task {}", task);
267260
return task;
268261
}
269262

263+
protected Runnable taskForSelected(boolean fillable, boolean flushable)
264+
{
265+
if (fillable)
266+
return flushable ? _runCompleteWriteFillable : _runFillable;
267+
if (flushable)
268+
return _runCompleteWrite;
269+
return null;
270+
}
271+
270272
private void updateKeyAction(Selector selector)
271273
{
272274
updateKey();

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/SocketChannelEndPoint.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,17 @@
1616
import java.io.IOException;
1717
import java.net.SocketAddress;
1818
import java.nio.ByteBuffer;
19+
import java.nio.channels.ClosedChannelException;
1920
import java.nio.channels.FileChannel;
2021
import java.nio.channels.SelectionKey;
2122
import java.nio.channels.SocketChannel;
23+
import java.util.concurrent.TimeoutException;
24+
import java.util.concurrent.atomic.AtomicReference;
2225

2326
import org.eclipse.jetty.io.internal.Transferable;
2427
import org.eclipse.jetty.util.BufferUtil;
2528
import org.eclipse.jetty.util.Callback;
29+
import org.eclipse.jetty.util.thread.Invocable;
2630
import org.eclipse.jetty.util.thread.Scheduler;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
@@ -34,6 +38,8 @@ public class SocketChannelEndPoint extends SelectableChannelEndPoint implements
3438
{
3539
private static final Logger LOG = LoggerFactory.getLogger(SocketChannelEndPoint.class);
3640

41+
private final AtomicReference<Callback> transferCallback = new AtomicReference<>();
42+
3743
public SocketChannelEndPoint(SocketChannel channel, ManagedSelector selector, SelectionKey key, Scheduler scheduler)
3844
{
3945
super(scheduler, channel, selector, key);
@@ -48,10 +54,49 @@ public SocketChannel getChannel()
4854
@Override
4955
public boolean transferFrom(FileChannel fileChannel, long offset, long length, Callback callback)
5056
{
51-
Transferable.transfer(fileChannel, offset, length, getChannel(), callback);
57+
Transferable.transfer(fileChannel, offset, length, this, callback);
5258
return true;
5359
}
5460

61+
public void onIncompleteTransfer(Callback callback)
62+
{
63+
if (transferCallback.compareAndSet(null, callback))
64+
onIncompleteFlush();
65+
else
66+
throw new IllegalStateException("Transfer callback already present");
67+
}
68+
69+
@Override
70+
protected Runnable taskForSelected(boolean fillable, boolean flushable)
71+
{
72+
Callback callback = transferCallback.getAndSet(null);
73+
if (callback == null)
74+
return super.taskForSelected(fillable, flushable);
75+
76+
// For the transfer case, only flushable must be true.
77+
assert !fillable && flushable;
78+
79+
return new Invocable.ReadyTask(callback.getInvocationType(), callback::succeeded);
80+
}
81+
82+
@Override
83+
public void onClose(Throwable cause)
84+
{
85+
Callback callback = transferCallback.getAndSet(null);
86+
if (callback != null)
87+
callback.failed(cause == null ? new ClosedChannelException() : cause);
88+
super.onClose(cause);
89+
}
90+
91+
@Override
92+
protected void onIdleExpired(TimeoutException timeout)
93+
{
94+
Callback callback = transferCallback.getAndSet(null);
95+
if (callback != null)
96+
callback.failed(timeout);
97+
super.onIdleExpired(timeout);
98+
}
99+
55100
@Override
56101
public SocketAddress getRemoteSocketAddress()
57102
{

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/ContentCopier.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,15 +26,22 @@ public class ContentCopier extends IteratingNestedCallback
2626
private static final Logger LOG = LoggerFactory.getLogger(ContentCopier.class);
2727

2828
private final Content.Source source;
29+
private final boolean last;
2930
private final Content.Sink sink;
3031
private final Content.Chunk.Processor chunkProcessor;
3132
private Content.Chunk chunk;
3233
private boolean terminated;
3334

3435
public ContentCopier(Content.Source source, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
36+
{
37+
this(source, true, sink, chunkProcessor, callback);
38+
}
39+
40+
public ContentCopier(Content.Source source, boolean last, Content.Sink sink, Content.Chunk.Processor chunkProcessor, Callback callback)
3541
{
3642
super(callback);
3743
this.source = source;
44+
this.last = last;
3845
this.sink = sink;
3946
this.chunkProcessor = chunkProcessor;
4047
}
@@ -64,7 +71,7 @@ protected Action process() throws Throwable
6471
return Action.SCHEDULED;
6572
}
6673

67-
sink.write(chunk.isLast(), chunk.getByteBuffer(), this);
74+
sink.write(chunk.isLast() && last, chunk.getByteBuffer(), this);
6875
return Action.SCHEDULED;
6976
}
7077

jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/internal/Transferable.java

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
package org.eclipse.jetty.io.internal;
1515

1616
import java.nio.channels.FileChannel;
17-
import java.nio.channels.WritableByteChannel;
1817

1918
import org.eclipse.jetty.io.Content;
19+
import org.eclipse.jetty.io.SocketChannelEndPoint;
2020
import org.eclipse.jetty.util.Callback;
2121
import org.eclipse.jetty.util.IteratingNestedCallback;
2222

@@ -26,9 +26,9 @@ private Transferable()
2626
{
2727
}
2828

29-
public static void transfer(FileChannel fileChannel, long offset, long length, WritableByteChannel writeChannel, Callback callback)
29+
public static void transfer(FileChannel sourceChannel, long offset, long length, SocketChannelEndPoint endPoint, Callback callback)
3030
{
31-
Transferrer transferrer = new Transferrer(fileChannel, offset, length, writeChannel, callback);
31+
Transferrer transferrer = new Transferrer(sourceChannel, offset, length, endPoint, callback);
3232
transferrer.iterate();
3333
}
3434

@@ -45,28 +45,38 @@ public interface To
4545
private static class Transferrer extends IteratingNestedCallback
4646
{
4747
private final FileChannel fileChannel;
48-
private final long position;
48+
private final long offset;
4949
private final long length;
50-
private final WritableByteChannel writableChannel;
50+
private final SocketChannelEndPoint endPoint;
5151
private long transferred;
5252

53-
private Transferrer(FileChannel fileChannel, long position, long length, WritableByteChannel writableChannel, Callback callback)
53+
private Transferrer(FileChannel fileChannel, long offset, long length, SocketChannelEndPoint endPoint, Callback callback)
5454
{
5555
super(callback);
5656
this.fileChannel = fileChannel;
57-
this.position = position;
57+
this.offset = offset;
5858
this.length = length;
59-
this.writableChannel = writableChannel;
59+
this.endPoint = endPoint;
6060
}
6161

6262
@Override
6363
protected Action process() throws Throwable
6464
{
65-
// TODO: should I set writeInterest() in NIO if TCP congestion?
66-
transferred += fileChannel.transferTo(position + transferred, length - transferred, writableChannel);
67-
// TODO: call this.succeeded()
68-
if (transferred == length)
65+
long count = length - transferred;
66+
if (count == 0)
6967
return Action.SUCCEEDED;
68+
69+
long transfer = fileChannel.transferTo(offset + transferred, count, endPoint.getChannel());
70+
transferred += transfer;
71+
72+
if (transfer > 0)
73+
{
74+
endPoint.notIdle();
75+
succeeded();
76+
return Action.SCHEDULED;
77+
}
78+
79+
endPoint.onIncompleteTransfer(this);
7080
return Action.SCHEDULED;
7181
}
7282
}

jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/ResponseContentSourceTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,10 @@ public void testResponseContentSource(TransportType transportType) throws Except
5454
@Override
5555
public boolean handle(Request request, Response response, Callback callback)
5656
{
57-
Content.Sink.write(response, true, Content.Source.from(file), callback);
57+
Content.Source source = Content.Source.from(file);
58+
// TODO: possible alternative?
59+
// source.writeTo(response, true, callback);
60+
Content.Sink.write(response, true, source, callback);
5861
return true;
5962
}
6063
});

0 commit comments

Comments
 (0)