Skip to content

Commit 615a5fa

Browse files
committed
Improve create_files and add read_files
1 parent 36cb947 commit 615a5fa

File tree

2 files changed

+180
-34
lines changed

2 files changed

+180
-34
lines changed

src/fileio.jl

+154-32
Original file line numberDiff line numberDiff line change
@@ -48,8 +48,9 @@ _rand_fname_tag() = String(rand(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJ
4848
"""
4949
function create_files(
5050
f_write, filenames::AbstractString...;
51-
create_dirs::Bool = true, overwrite::Bool = true, delete_on_error::Bool=true,
51+
overwrite::Bool = true,
5252
use_cache::Bool = false, cache_dir::AbstractString = tempdir(),
53+
create_dirs::Bool = true, delete_tmp_onerror::Bool=true,
5354
verbose::Bool = true
5455
)
5556
@@ -59,7 +60,7 @@ Creates `filenames` in an atomic fashion via a user-provided function
5960
Using temporary filenames, calls `f_write(temporary_filenames...)`. If
6061
`f_write` doesn't throw an exception, the files `temporary_filenames` are
6162
renamed to `filenames`. If `f_write` throws an exception, the temporary files
62-
are either deleted (if `delete_on_error` is `true`) or left in place (e.g. for
63+
are either deleted (if `delete_tmp_onerror` is `true`) or left in place (e.g. for
6364
debugging purposes).
6465
6566
If `create_dirs` is `true`, the `temporary_filenames` are created in
@@ -96,16 +97,18 @@ the default Linux RAM disk as an intermediate directory.
9697
"""
9798
function create_files(
9899
@nospecialize(f_write), @nospecialize(filenames::AbstractString...);
99-
create_dirs::Bool = true, overwrite::Bool = true, delete_on_error::Bool=true,
100+
overwrite::Bool = true,
100101
use_cache::Bool = false, cache_dir::AbstractString = tempdir(),
102+
create_dirs::Bool = true, delete_tmp_onerror::Bool=true,
101103
verbose::Bool = true
102104
)
103105
loglevel = verbose ? Info : Debug
104106

105107
target_fnames = String[filenames...] # Fix type
106108
staging_fnames = String[]
107-
writeto_fnames = String[]
108-
completed_fnames = String[]
109+
cache_fnames = String[]
110+
move_complete = similar(target_fnames, Bool)
111+
fill!(move_complete, false)
109112

110113
pre_existing = isfile.(target_fnames)
111114
if any(pre_existing)
@@ -124,23 +127,26 @@ function create_files(
124127
for dir in dirs
125128
if !isdir(dir) && create_dirs
126129
mkpath(dir)
127-
@logmsg loglevel "Created directory $dir."
130+
@logmsg loglevel "Created output directory $dir."
128131
end
129132
end
130133

131134
if use_cache && !isdir(cache_dir)
132135
mkpath(cache_dir)
133-
@logmsg loglevel "Created cache directory $cache_dir."
136+
@logmsg loglevel "Created write-cache directory $cache_dir."
134137
end
135138
end
136139

137140
try
138-
staging_fnames = tmp_filename.(target_fnames)
139-
@assert !any(isfile, staging_fnames)
141+
if use_cache
142+
append!(cache_fnames, tmp_filename.(target_fnames, Ref(cache_dir)))
143+
@assert !any(isfile, cache_fnames)
144+
end
140145

141-
writeto_fnames = use_cache ? tmp_filename.(target_fnames, Ref(cache_dir)) : staging_fnames
142-
@assert !any(isfile, writeto_fnames)
146+
append!(staging_fnames, tmp_filename.(target_fnames))
147+
@assert !any(isfile, staging_fnames)
143148

149+
writeto_fnames = use_cache ? cache_fnames : staging_fnames
144150
@debug "Creating intermediate files $writeto_fnames."
145151
f_write(writeto_fnames...)
146152

@@ -158,45 +164,161 @@ function create_files(
158164

159165
try
160166
if use_cache
161-
for (writeto_fn, staging_fn) in zip(writeto_fnames, staging_fnames)
162-
@assert writeto_fn != staging_fn
163-
@debug "Moving file \"$writeto_fn\" to \"$staging_fn\"."
164-
isfile(writeto_fn) || error("Expected file \"$writeto_fn\" to exist, but it doesn't.")
165-
mv(writeto_fn, staging_fn; force=true)
166-
isfile(staging_fn) || error("Tried to move file \"$writeto_fn\" to \"$staging_fn\", but \"$staging_fn\" doesn't exist.")
167+
@userfriendly_exceptions @sync for (cache_fn, staging_fn) in zip(cache_fnames, staging_fnames)
168+
Threads.@spawn begin
169+
@assert cache_fn != staging_fn
170+
@debug "Moving file \"$cache_fn\" to \"$staging_fn\"."
171+
isfile(cache_fn) || error("Expected file \"$cache_fn\" to exist, but it doesn't.")
172+
mv(cache_fn, staging_fn; force=true)
173+
end
167174
end
175+
empty!(cache_fnames)
168176
end
169-
for (staging_fn, target_fn) in zip(staging_fnames, target_fnames)
170-
@assert staging_fn != target_fn
171-
@debug "Renaming file \"$staging_fn\" to \"$target_fn\"."
172-
isfile(staging_fn) || error("Expected file \"$staging_fn\" to exist, but it doesn't.")
173-
mv(staging_fn, target_fn; force=true)
174-
isfile(target_fn) || error("Tried to rename file \"$staging_fn\" to \"$target_fn\", but \"$target_fn\" doesn't exist.")
175-
push!(completed_fnames, target_fn)
177+
178+
@userfriendly_exceptions @sync for i in eachindex(staging_fnames, target_fnames)
179+
Threads.@spawn begin
180+
staging_fn = staging_fnames[i]
181+
target_fn = target_fnames[i]
182+
@assert staging_fn != target_fn
183+
@debug "Renaming file \"$staging_fn\" to \"$target_fn\"."
184+
isfile(staging_fn) || error("Expected file \"$staging_fn\" to exist, but it doesn't.")
185+
mv(staging_fn, target_fn; force=true)
186+
isfile(target_fn) || error("Tried to rename file \"$staging_fn\" to \"$target_fn\", but \"$target_fn\" doesn't exist.")
187+
move_complete[i] = true
188+
end
176189
end
190+
empty!(staging_fnames)
191+
177192
@logmsg loglevel "Created files $target_fnames."
178193
catch
179-
if !isempty(completed_fnames)
180-
@error "Failed to rename some temporary files to final filenames, removing $completed_fnames"
181-
for fname in completed_fnames
194+
if any(move_complete) && !all(move_complete)
195+
to_remove = target_fnames[findall(move_complete)]
196+
@error "Failed to rename some of the temporary files to target files, removing $to_remove"
197+
for fname in to_remove
182198
rm(fname; force=true)
183199
end
184200
end
185201
rethrow()
186202
end
187203

188-
@assert all(fn -> !isfile(fn), staging_fnames)
204+
@assert isempty(cache_fnames)
205+
@assert isempty(staging_fnames)
189206
finally
190-
if delete_on_error
191-
for writeto_fn in writeto_fnames
192-
isfile(writeto_fn) && rm(writeto_fn; force=true);
207+
if delete_tmp_onerror
208+
for cache_fn in cache_fnames
209+
if isfile(cache_fn)
210+
@debug "Removing left-over write-cache file \"$cache_fn\"."
211+
rm(cache_fn; force=true)
212+
end
193213
end
194214
for staging_fn in staging_fnames
195-
isfile(staging_fn) && rm(staging_fn; force=true);
215+
if isfile(staging_fn)
216+
@debug "Removing left-over write-staging file \"$staging_fn\"."
217+
rm(staging_fn; force=true)
218+
end
196219
end
197220
end
198221
end
199222

200223
return nothing
201224
end
202225
export create_files
226+
227+
228+
"""
229+
function read_files(
230+
f_read, filenames::AbstractString...;
231+
use_cache::Bool = true, cache_dir::AbstractString = tempdir(),
232+
create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,
233+
verbose::Bool = true
234+
)
235+
236+
Reads `filenames` in an atomic fashion (i.e. only if all `filenames` exist)
237+
via a user-provided function `f_read`. The returns value of `f_read` is
238+
passed through.
239+
240+
If `use_cache` is `true`, then the files are first copied to the
241+
temporary directory `cache_dir` under temporary names, and
242+
`f_read(temporary_filenames...)` is called. The temporary files are deleted
243+
afterwards.
244+
245+
If `create_cachedir` is `true`, then `cache_dir` will be created if it doesn't
246+
exist yet. If `delete_tmp_onerror` is true, then temporary files are
247+
deleted even if `f_write` throws an exception.
248+
249+
If `verbose` is `true`, uses log-level `Logging.Info` to log file reading,
250+
otherwise `Logging.Debug`.
251+
252+
```julia
253+
write("foo.txt", "Hello"); write("bar.txt", "World")
254+
255+
read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar
256+
read(foo, String) * " " * read(bar, String)
257+
end
258+
```
259+
260+
Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all
261+
intermediate steps.
262+
263+
On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use
264+
the default Linux RAM disk as an intermediate directory.
265+
"""
266+
function read_files(
267+
@nospecialize(f_read), @nospecialize(filenames::AbstractString...);
268+
use_cache::Bool = true, cache_dir::AbstractString = tempdir(),
269+
create_cachedir::Bool = true, delete_tmp_onerror::Bool=true,
270+
verbose::Bool = true
271+
)
272+
loglevel = verbose ? Info : Debug
273+
274+
source_fnames = String[filenames...] # Fix type
275+
cache_fnames = String[]
276+
277+
input_exists = isfile.(source_fnames)
278+
if !all(input_exists)
279+
missing_inputs = source_fnames[findall(!, input_exists)]
280+
throw(ErrorException("Missing input files $(missing_inputs)."))
281+
end
282+
283+
try
284+
if use_cache
285+
if !isdir(cache_dir) && create_cachedir
286+
mkpath(cache_dir)
287+
@logmsg loglevel "Created read-cache directory $cache_dir."
288+
end
289+
290+
append!(cache_fnames, tmp_filename.(source_fnames, Ref(cache_dir)))
291+
@assert !any(isfile, cache_fnames)
292+
293+
@userfriendly_exceptions @sync for (cache_fn, source_fn) in zip(cache_fnames, source_fnames)
294+
Threads.@spawn begin
295+
@assert cache_fn != source_fn
296+
@debug "Copying file \"$source_fn\" to \"$cache_fn\"."
297+
cp(source_fn, cache_fn)
298+
isfile(cache_fn) || error("Tried to copy file \"$source_fn\" to \"$cache_fn\", but \"$cache_fn\" doesn't exist.")
299+
end
300+
end
301+
end
302+
303+
readfrom_fnames = use_cache ? cache_fnames : source_fnames
304+
305+
@debug "Reading $(use_cache ? "cached " : "")files $readfrom_fnames."
306+
result = f_read(readfrom_fnames...)
307+
@logmsg loglevel "Read files $source_fnames."
308+
309+
@userfriendly_exceptions @sync for cache_fn in cache_fnames
310+
Threads.@spawn rm(cache_fn; force=true);
311+
end
312+
return result
313+
finally
314+
if delete_tmp_onerror
315+
for cache_fn in cache_fnames
316+
if isfile(cache_fn)
317+
@debug "Removing left-over read-cache file \"$cache_fn\"."
318+
rm(cache_fn; force=true)
319+
end
320+
end
321+
end
322+
end
323+
end
324+
export read_files

test/test_fileio.jl

+26-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ using ParallelProcessingTools: split_basename_ext, tmp_filename
88
old_julia_debug = get(ENV, "JULIA_DEBUG", "")
99
ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools"
1010

11-
1211
@testset "fileio" begin
1312
@testset "split_basename_ext" begin
1413
@test @inferred(split_basename_ext("foo_bar baz.tar.gz")) == ("foo_bar baz", ".tar.gz")
@@ -38,7 +37,7 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools"
3837
end
3938

4039
for use_cache in [false, true]
41-
@testset "create_files" begin
40+
@testset "create_files $(use_cache ? "with" : "without") cache" begin
4241
mktempdir() do dir
4342
data1 = "Hello"
4443
data2 = "World"
@@ -88,6 +87,31 @@ ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools"
8887
end
8988
end
9089
end
90+
91+
for use_cache in [false, true]
92+
@testset "read_files $(use_cache ? "with" : "without") cache" begin
93+
mktempdir() do dir
94+
data1 = "Hello"
95+
data2 = "World"
96+
97+
fn1 = joinpath(dir, "targetdir", "hello.txt")
98+
fn2 = joinpath(dir, "targetdir", "world.txt")
99+
100+
mkpath(dirname(fn1)); mkpath(dirname(fn2))
101+
write(fn1, data1); write(fn2, data2)
102+
103+
@test_throws ErrorException read_files(
104+
(fn1, fn2) -> (read(fn1, String), read(fn2, String)),
105+
fn1, "nosuchfile.txt", use_cache = use_cache, verbose = true
106+
)
107+
108+
@test @inferred(read_files(
109+
(fn1, fn2) -> (read(fn1, String), read(fn2, String)),
110+
fn1, fn2, use_cache = use_cache, verbose = true
111+
)) == (data1, data2)
112+
end
113+
end
114+
end
91115
end
92116

93117
ENV["JULIA_DEBUG"] = old_julia_debug; nothing

0 commit comments

Comments
 (0)