Skip to content

Commit ee66f2e

Browse files
authored
Merge pull request #25200 from JuliaLang/jn/flush-uv
implement flush for libuv streams
2 parents de87aca + 2a881d3 commit ee66f2e

File tree

6 files changed

+149
-64
lines changed

6 files changed

+149
-64
lines changed

base/stream.jl

Lines changed: 64 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,17 @@ function eof(s::LibuvStream)
5555
return !isopen(s) && nb_available(s) <= 0
5656
end
5757

58-
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
58+
# Limit our default maximum read and buffer size,
59+
# to avoid DoS-ing ourself into an OOM situation
60+
const DEFAULT_READ_BUFFER_SZ = 10485760 # 10 MB
61+
62+
# manually limit our write size, if the OS doesn't support full-size writes
63+
if Sys.iswindows()
64+
const MAX_OS_WRITE = UInt(0x1FF0_0000) # 511 MB (determined semi-empirically, limited to 31 MB on XP)
65+
else
66+
const MAX_OS_WRITE = UInt(typemax(Csize_t))
67+
end
68+
5969

6070
const StatusUninit = 0 # handle is allocated, but not initialized
6171
const StatusInit = 1 # handle is valid, but not connected/active
@@ -458,7 +468,10 @@ function notify_filled(buffer::IOBuffer, nread::Int)
458468
end
459469
end
460470

461-
alloc_buf_hook(stream::LibuvStream, size::UInt) = alloc_request(stream.buffer, UInt(size))
471+
function alloc_buf_hook(stream::LibuvStream, size::UInt)
472+
throttle = UInt(stream.throttle)
473+
return alloc_request(stream.buffer, (size > throttle) ? throttle : size)
474+
end
462475

463476
function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid})
464477
hd = uv_handle_data(handle)
@@ -477,6 +490,10 @@ function uv_alloc_buf(handle::Ptr{Cvoid}, size::Csize_t, buf::Ptr{Cvoid})
477490
if data == C_NULL
478491
newsize = 0
479492
end
493+
# avoid aliasing of `nread` with `errno` in uv_readcb
494+
# or exceeding the Win32 maximum uv_buf_t len
495+
maxsize = @static Sys.iswindows() ? typemax(Cint) : typemax(Cssize_t)
496+
newsize > maxsize && (newsize = maxsize)
480497
end
481498

482499
ccall(:jl_uv_buf_set_base, Cvoid, (Ptr{Cvoid}, Ptr{Cvoid}), buf, data)
@@ -815,28 +832,21 @@ function readuntil(this::LibuvStream, c::UInt8)
815832
end
816833

817834
uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof(p)))
835+
818836
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
819-
check_open(s)
820-
uvw = Libc.malloc(_sizeof_uv_write)
821-
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
822-
err = ccall(:jl_uv_write,
823-
Int32,
824-
(Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}),
825-
s, p, n, uvw,
826-
uv_jl_writecb_task::Ptr{Cvoid})
827-
if err < 0
828-
Libc.free(uvw)
829-
uv_error("write", err)
830-
end
837+
uvw = uv_write_async(s, p, n)
831838
ct = current_task()
832839
preserve_handle(ct)
833840
try
841+
# wait for the last chunk to complete (or error)
842+
# assume that any errors would be sticky,
843+
# (so we don't need to monitor the error status of the intermediate writes)
834844
uv_req_set_data(uvw, ct)
835845
wait()
836846
finally
837847
if uv_req_data(uvw) != C_NULL
838848
# uvw is still alive,
839-
# so make sure we don't get spurious notifications later
849+
# so make sure we won't get spurious notifications later
840850
uv_req_set_data(uvw, C_NULL)
841851
else
842852
# done with uvw
@@ -847,6 +857,33 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
847857
return Int(n)
848858
end
849859

860+
# helper function for uv_write that returns the uv_write_t struct for the write
861+
# rather than waiting on it
862+
function uv_write_async(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
863+
check_open(s)
864+
while true
865+
uvw = Libc.malloc(_sizeof_uv_write)
866+
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
867+
nwrite = min(n, MAX_OS_WRITE) # split up the write into chunks the OS can handle.
868+
# TODO: use writev, when that is added to uv-win
869+
err = ccall(:jl_uv_write,
870+
Int32,
871+
(Ptr{Cvoid}, Ptr{Cvoid}, UInt, Ptr{Cvoid}, Ptr{Cvoid}),
872+
s, p, nwrite, uvw,
873+
uv_jl_writecb_task::Ptr{Cvoid})
874+
if err < 0
875+
Libc.free(uvw)
876+
uv_error("write", err)
877+
end
878+
n -= nwrite
879+
p += nwrite
880+
if n == 0
881+
return uvw
882+
end
883+
end
884+
end
885+
886+
850887
# Optimized send
851888
# - smaller writes are buffered, final uv write on flush or when buffer full
852889
# - large isbits arrays are unbuffered and written directly
@@ -872,14 +909,15 @@ function unsafe_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
872909
end
873910

874911
function flush(s::LibuvStream)
875-
if s.sendbuf === nothing
876-
return
877-
end
878-
buf = s.sendbuf
879-
if nb_available(buf) > 0
880-
arr = take!(buf) # Array of UInt8s
881-
uv_write(s, arr)
912+
if s.sendbuf !== nothing
913+
buf = s.sendbuf
914+
if nb_available(buf) > 0
915+
arr = take!(buf) # Array of UInt8s
916+
uv_write(s, arr)
917+
return
918+
end
882919
end
920+
uv_write(s, Ptr{UInt8}(Base.eventloop()), UInt(0)) # zero write from a random pointer to flush current queue
883921
return
884922
end
885923

@@ -900,12 +938,13 @@ end
900938
function uv_writecb_task(req::Ptr{Cvoid}, status::Cint)
901939
d = uv_req_data(req)
902940
if d != C_NULL
903-
uv_req_set_data(req, C_NULL)
941+
uv_req_set_data(req, C_NULL) # let the Task know we got the writecb
942+
t = unsafe_pointer_to_objref(d)::Task
904943
if status < 0
905944
err = UVError("write", status)
906-
schedule(unsafe_pointer_to_objref(d)::Task, err, error=true)
945+
schedule(t, err, error=true)
907946
else
908-
schedule(unsafe_pointer_to_objref(d)::Task)
947+
schedule(t)
909948
end
910949
else
911950
# no owner for this req, safe to just free it

src/gf.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,7 @@ static void method_overwrite(jl_typemap_entry_t *newentry, jl_method_t *oldvalue
13011301
jl_printf(s, " in module %s", jl_symbol_name(newmod->name));
13021302
print_func_loc(s, method);
13031303
jl_printf(s, ".\n");
1304+
jl_uv_flush(s);
13041305
}
13051306
}
13061307

src/jl_uv.c

Lines changed: 67 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -98,19 +98,72 @@ static void jl_uv_closeHandle(uv_handle_t *handle)
9898
free(handle);
9999
}
100100

101-
static void jl_uv_shutdownCallback(uv_shutdown_t *req, int status)
102-
{
103-
/*
104-
* This happens if the remote machine closes the connecition while we're
105-
* in the shutdown request (in that case we call uv_close, thus cancelling
106-
* this request).
107-
*/
108-
if (status != UV__ECANCELED && !uv_is_closing((uv_handle_t*)req->handle)) {
109-
uv_close((uv_handle_t*)req->handle, &jl_uv_closeHandle);
101+
static void jl_uv_flush_close_callback(uv_write_t *req, int status)
102+
{
103+
uv_stream_t *stream = req->handle;
104+
req->handle = NULL;
105+
// ignore attempts to close the stream while attempting a graceful shutdown
106+
#ifdef _OS_WINDOWS_
107+
if (stream->stream.conn.shutdown_req)
108+
#else
109+
if (stream->shutdown_req)
110+
#endif
111+
{
112+
free(req);
113+
return;
114+
}
115+
if (status == 0 && uv_is_writable(stream) && stream->write_queue_size != 0) {
116+
// new data was written, wait for it to flush too
117+
uv_buf_t buf;
118+
buf.base = (char*)(req + 1);
119+
buf.len = 0;
120+
req->data = NULL;
121+
if (uv_write(req, stream, &buf, 1, (uv_write_cb)jl_uv_flush_close_callback) == 0)
122+
return;
123+
}
124+
if (!uv_is_closing((uv_handle_t*)stream)) { // avoid double-close on the stream
125+
if (stream->type == UV_TTY)
126+
uv_tty_set_mode((uv_tty_t*)stream, UV_TTY_MODE_NORMAL);
127+
uv_close((uv_handle_t*)stream, &jl_uv_closeHandle);
110128
}
111129
free(req);
112130
}
113131

132+
static void uv_flush_callback(uv_write_t *req, int status)
133+
{
134+
*(int*)(req->data) = 1;
135+
uv_stop(req->handle->loop);
136+
free(req);
137+
}
138+
139+
// Turn a normal write into a blocking write (primarly for use from C and gdb).
140+
// Warning: This calls uv_run, so it can have unbounded side-effects.
141+
// Be care where you call it from! - the libuv loop is also not reentrant.
142+
void jl_uv_flush(uv_stream_t *stream)
143+
{
144+
if (stream == (void*)STDIN_FILENO ||
145+
stream == (void*)STDOUT_FILENO ||
146+
stream == (void*)STDERR_FILENO)
147+
return;
148+
if (stream->type != UV_TTY &&
149+
stream->type != UV_TCP &&
150+
stream->type != UV_NAMED_PIPE)
151+
return;
152+
while (uv_is_writable(stream) && stream->write_queue_size != 0) {
153+
int fired = 0;
154+
uv_buf_t buf;
155+
buf.base = (char*)(&buf + 1);
156+
buf.len = 0;
157+
uv_write_t *write_req = (uv_write_t*)malloc(sizeof(uv_write_t));
158+
write_req->data = (void*)&fired;
159+
if (uv_write(write_req, stream, &buf, 1, uv_flush_callback) != 0)
160+
return;
161+
while (!fired) {
162+
uv_run(uv_default_loop(), UV_RUN_DEFAULT);
163+
}
164+
}
165+
}
166+
114167
// getters and setters
115168
JL_DLLEXPORT void *jl_uv_process_data(uv_process_t *p) { return p->data; }
116169
JL_DLLEXPORT void *jl_uv_buf_base(const uv_buf_t *buf) { return buf->base; }
@@ -196,39 +249,15 @@ JL_DLLEXPORT void jl_close_uv(uv_handle_t *handle)
196249
return;
197250
}
198251

199-
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP) {
200-
uv_stream_t *stream = (uv_stream_t*)handle;
201-
#ifdef _OS_WINDOWS_
202-
if (stream->stream.conn.shutdown_req) {
203-
#else
204-
if (stream->shutdown_req) {
205-
#endif
206-
// don't close the stream while attempting a graceful shutdown
207-
return;
208-
}
209-
if (uv_is_writable(stream) && stream->write_queue_size != 0) {
210-
// attempt graceful shutdown of writable streams to give them a chance to flush first
211-
// TODO: introduce a uv_drain cb API instead of abusing uv_shutdown in this way
212-
uv_shutdown_t *req = (uv_shutdown_t*)malloc(sizeof(uv_shutdown_t));
213-
req->data = 0;
214-
/*
215-
* We are explicitly ignoring the error here for the following reason:
216-
* There is only two scenarios in which this returns an error:
217-
* a) In case the stream is already shut down, in which case we're likely
218-
* in the process of closing this stream (since there's no other call to
219-
* uv_shutdown).
220-
* b) In case the stream is already closed, in which case uv_close would
221-
* cause an assertion failure.
222-
*/
223-
uv_shutdown(req, stream, &jl_uv_shutdownCallback);
224-
return;
225-
}
252+
if (handle->type == UV_NAMED_PIPE || handle->type == UV_TCP || handle->type == UV_TTY) {
253+
uv_write_t *req = (uv_write_t*)malloc(sizeof(uv_write_t));
254+
req->handle = (uv_stream_t*)handle;
255+
jl_uv_flush_close_callback(req, 0);
256+
return;
226257
}
227258

228259
if (!uv_is_closing(handle)) {
229260
// avoid double-closing the stream
230-
if (handle->type == UV_TTY)
231-
uv_tty_set_mode((uv_tty_t*)handle, UV_TTY_MODE_NORMAL);
232261
uv_close(handle, &jl_uv_closeHandle);
233262
}
234263
}

src/julia_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,7 @@ extern uv_loop_t *jl_io_loop;
462462
JL_DLLEXPORT void jl_uv_associate_julia_struct(uv_handle_t *handle,
463463
jl_value_t *data);
464464
JL_DLLEXPORT int jl_uv_fs_result(uv_fs_t *f);
465+
void jl_uv_flush(uv_stream_t *stream);
465466

466467
typedef struct _typeenv {
467468
jl_tvar_t *var;

src/toplevel.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -816,7 +816,7 @@ JL_DLLEXPORT jl_value_t *jl_load(jl_module_t *module, const char *fname)
816816
if (module->istopmod) {
817817
jl_printf(JL_STDOUT, "%s\r\n", fname);
818818
#ifdef _OS_WINDOWS_
819-
uv_run(uv_default_loop(), (uv_run_mode)1);
819+
jl_uv_flush(JL_STDOUT);
820820
#endif
821821
}
822822
uv_stat_t stbuf;

test/read.jl

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,3 +537,18 @@ end # mktempdir() do dir
537537
@test countlines(file,'\n') == 4
538538
rm(file)
539539
end
540+
541+
let p = Pipe()
542+
Base.link_pipe(p, julia_only_read=true, julia_only_write=true)
543+
t = @schedule read(p)
544+
@sync begin
545+
@async write(p, zeros(UInt16, 660_000))
546+
for i = 1:typemax(UInt16)
547+
@async write(p, UInt16(i))
548+
end
549+
@async close(p.in)
550+
end
551+
s = reinterpret(UInt16, wait(t))
552+
@test length(s) == 660_000 + typemax(UInt16)
553+
@test s[(end - typemax(UInt16)):end] == UInt16.(0:typemax(UInt16))
554+
end

0 commit comments

Comments
 (0)