Skip to content

Commit 4cc82c3

Browse files
authored
iterate over retry_delays (#172)
* iterate over retry_delays * Check if job is started before retry loop
1 parent 70f4434 commit 4cc82c3

File tree

1 file changed

+45
-38
lines changed

1 file changed

+45
-38
lines changed

src/lsf.jl

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -15,54 +15,61 @@ struct LSFException <: Exception
1515
msg
1616
end
1717

18+
function parse_host_port(stream, port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)")
19+
bytestr = readline(stream)
20+
conn_info_match = match(port_host_regex, bytestr)
21+
if !isnothing(conn_info_match)
22+
host = conn_info_match.captures[2]
23+
port = parse(Int, conn_info_match.captures[1])
24+
@debug("lsf worker listening", connect_info=bytestr, host, port)
25+
26+
return true, bytestr, host, port
27+
end
28+
return false, bytestr, nothing, nothing
29+
end
30+
1831
function lsf_bpeek(manager::LSFManager, jobid, iarray)
19-
port_host_regex = r"julia_worker:([0-9]+)#([0-9.]+)"
2032
stream = Base.BufferStream()
2133
mark(stream) # so that we can reset to beginning after ensuring process started
2234

2335
streamer_cmd = pipeline(`$(manager.ssh_cmd) $(manager.bpeek_cmd) $(manager.bpeek_flags) $(jobid)\[$iarray\]`; stdout=stream, stderr=stream)
24-
backoff = manager.retry_delays
25-
delay, backoff_state = iterate(backoff)
36+
retry_delays = manager.retry_delays
2637
streamer_proc = run(streamer_cmd; wait=false)
27-
worker_started = false
28-
host = nothing
29-
port = nothing
30-
31-
while !worker_started
32-
bytestr = readline(stream)
33-
conn_info_match = match(port_host_regex, bytestr)
34-
if !isnothing(conn_info_match)
35-
host = conn_info_match.captures[2]
36-
port = parse(Int, conn_info_match.captures[1])
37-
@debug("lsf worker listening", connect_info=bytestr, host, port)
38-
# process started, reset to marked position and hand over to Distributed module
39-
reset(stream)
40-
worker_started = true
38+
39+
# Try once before retry loop in case user supplied an empty retry_delays iterator
40+
worker_started, bytestr, host, port = parse_host_port(stream)
41+
worker_started && return stream, host, port
42+
43+
for retry_delay in retry_delays
44+
if occursin("Not yet started", bytestr)
45+
# reset to marked position, bpeek process would have stopped
46+
wait(streamer_proc)
47+
mark(stream)
48+
49+
# Try bpeeking again after the retry delay
50+
sleep(retry_delay)
51+
streamer_proc = run(streamer_cmd; wait=false)
52+
elseif occursin("<< output from stdout >>", bytestr) || occursin("<< output from stderr >>", bytestr)
53+
# ignore this bpeek output decoration and continue to read the next line
54+
mark(stream)
4155
else
42-
if occursin("Not yet started", bytestr)
43-
# reset to marked position, bpeek process would have stopped
44-
wait(streamer_proc)
45-
mark(stream)
46-
47-
# retry with backoff if within retry limit
48-
if backoff_state[1] == 0
49-
close(stream)
50-
throw(LSFException(bytestr))
51-
end
52-
sleep(delay)
53-
delay, backoff_state = iterate(backoff, backoff_state)
54-
streamer_proc = run(streamer_cmd; wait=false)
55-
elseif occursin("<< output from stdout >>", bytestr) || occursin("<< output from stderr >>", bytestr)
56-
# ignore this bpeek output decoration and continue to read the next line
57-
mark(stream)
58-
else
59-
# unknown response from worker process
60-
close(stream)
61-
throw(LSFException(bytestr))
62-
end
56+
# unknown response from worker process
57+
close(stream)
58+
throw(LSFException(bytestr))
6359
end
60+
61+
worker_started, bytestr, host, port = parse_host_port(stream)
62+
worker_started && break
6463
end
6564

65+
if !worker_started
66+
close(stream)
67+
throw(LSFException(bytestr))
68+
end
69+
70+
# process started, reset to marked position and hand over to Distributed module
71+
reset(stream)
72+
6673
return stream, host, port
6774
end
6875

0 commit comments

Comments
 (0)