diff --git a/Project.toml b/Project.toml index e5e5749..b3d998c 100644 --- a/Project.toml +++ b/Project.toml @@ -6,6 +6,7 @@ version = "0.4.3" ClusterManagers = "34f1f09b-3a8b-5176-ab39-66d58a4d544e" Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" Parameters = "d96e819e-fc66-5662-9728-84c9c7592b0a" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" @@ -14,6 +15,7 @@ ThreadPinning = "811555cd-349b-4f26-b7bc-1f208b848042" ClusterManagers = "0.4.6" Distributed = "1" LinearAlgebra = "1" +Logging = "1" Parameters = "0.12, 0.13" Pkg = "1" ThreadPinning = "0.7.22" diff --git a/src/ParallelProcessingTools.jl b/src/ParallelProcessingTools.jl index 9face71..1d502d0 100644 --- a/src/ParallelProcessingTools.jl +++ b/src/ParallelProcessingTools.jl @@ -13,8 +13,12 @@ import Pkg import ClusterManagers import ThreadPinning +using Logging: @logmsg, LogLevel, Info, Debug + using Parameters: @with_kw +include("exceptions.jl") +include("fileio.jl") include("threadsafe.jl") include("threadlocal.jl") include("onthreads.jl") diff --git a/src/exceptions.jl b/src/exceptions.jl new file mode 100644 index 0000000..b58ca59 --- /dev/null +++ b/src/exceptions.jl @@ -0,0 +1,50 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + ParallelProcessingTools.original_exception(err) + +Replaces `TaskFailedException`s and `RemoteException`s with the underlying +exception that originated within the task or on the remote process. +""" +function original_exception end + +original_exception(err) = err +original_exception(err::CompositeException) = CompositeException(original_exception.(err.exceptions)) +original_exception(err::TaskFailedException) = err.task.result +original_exception(err::RemoteException) = err.captured.ex + + +""" + ParallelProcessingTools.onlyfirst_exception(err) + +Replaces `CompositeException`s with their first exception. + +Also employs `original_exception` if `simplify` is `true`. +""" +function onlyfirst_exception end + +onlyfirst_exception(err) = err +onlyfirst_exception(err::CompositeException) = first(err.exceptions) + + +""" + @userfriendly_exceptions expr + +Transforms exceptions originating from `expr` into more user-friendly ones. + +If multiple exceptions originate from parallel code in `expr`, only one +is rethrown, and `TaskFailedException`s and `RemoteException`s are replaced +by the original exceptions that caused them. + +See [`original_exception`] and [`onlyfirst_exception`](@ref). +""" +macro userfriendly_exceptions(expr) + quote + try + $(esc(expr)) + catch err + rethrow(original_exception(onlyfirst_exception(err))) + end + end +end +export @userfriendly_exceptions diff --git a/src/fileio.jl b/src/fileio.jl new file mode 100644 index 0000000..078ce7d --- /dev/null +++ b/src/fileio.jl @@ -0,0 +1,324 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +""" + ParallelProcessingTools.split_basename_ext(file_basename_with_ext::AbstractString) + +Splits a filename (given without its directory path) into a basename without +file extension and the file extension. Returns a tuple `(basename_noext, ext)`. + +Example: + +``` +ParallelProcessingTools.split_basename_ext("myfile.tar.gz") == ("myfile", ".tar.gz") +``` +""" +function split_basename_ext(bn_ext::AbstractString) + ext_startpos = findfirst('.', bn_ext) + bn, ext = isnothing(ext_startpos) ? (bn_ext, "") : (bn_ext[1:ext_startpos-1], bn_ext[ext_startpos:end]) + return bn, ext +end + + +""" + ParallelProcessingTools.tmp_filename(fname::AbstractString) + ParallelProcessingTools.tmp_filename(fname::AbstractString, dir::AbstractString) + +Returns a temporary filename, based on `fname`. + +By default, the temporary filename is in the same directory as `fname`, +otherwise in `dir`. + +Does *not* create the temporary file, only returns the filename (including +directory path). +""" +function tmp_filename end + +function tmp_filename(fname::AbstractString, dir::AbstractString) + bn_ext = basename(fname) + bn, ext = split_basename_ext(bn_ext) + tag = _rand_fname_tag() + joinpath(dir, "$(bn)_$(tag)$(ext)") +end + +tmp_filename(fname::AbstractString) = tmp_filename(fname, dirname(fname)) + +_rand_fname_tag() = String(rand(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ", 8)) + + +""" + function create_files( + f_write, filenames::AbstractString...; + overwrite::Bool = true, + use_cache::Bool = false, cache_dir::AbstractString = tempdir(), + create_dirs::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = true + ) + +Creates `filenames` in an atomic fashion via a user-provided function +`f_write`. Returns `nothing`. + +Using temporary filenames, calls `f_write(temporary_filenames...)`. If +`f_write` doesn't throw an exception, the files `temporary_filenames` are +renamed to `filenames`. If `f_write` throws an exception, the temporary files +are either deleted (if `delete_tmp_onerror` is `true`) or left in place (e.g. for +debugging purposes). + +If `create_dirs` is `true`, the `temporary_filenames` are created in +`cache_dir` and then atomically moved to `filenames`, otherwise, they are +created next to `filenames` (in the same directories). + +If `create_dirs` is `true`, directories are created if necessary. + +If all of `filenames` already exist and `overwrite` is `false`, takes no +action (or, on case the files are created by other code running in parallel, +while `f_write` is running, does not replace them). + +If `verbose` is `true`, uses log-level `Logging.Info` to log file creation, +otherwise `Logging.Debug`. + +Throws an error if only some of the files exist and `overwrite` is `false`. + +Returns `nothing`. + +Example: + +```julia +create_files("foo.txt", "bar.txt", use_cache = true) do foo, bar + write(foo, "Hello") + write(bar, "World") +end +``` + +Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all +intermediate steps. + +On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use +the default Linux RAM disk as an intermediate directory. +""" +function create_files( + @nospecialize(f_write), @nospecialize(filenames::AbstractString...); + overwrite::Bool = true, + use_cache::Bool = false, cache_dir::AbstractString = tempdir(), + create_dirs::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = true +) + loglevel = verbose ? Info : Debug + + target_fnames = String[filenames...] # Fix type + staging_fnames = String[] + cache_fnames = String[] + move_complete = similar(target_fnames, Bool) + fill!(move_complete, false) + + pre_existing = isfile.(target_fnames) + if any(pre_existing) + if all(pre_existing) + if !overwrite + @logmsg loglevel "Files $target_fnames already exist, nothing to do." + return nothing + end + else + !overwrite && throw(ErrorException("Only some of $target_fnames exist but not allowed to overwrite")) + end + end + + dirs = dirname.(target_fnames) + if create_dirs + for dir in dirs + if !isdir(dir) && create_dirs + mkpath(dir) + @logmsg loglevel "Created output directory $dir." + end + end + + if use_cache && !isdir(cache_dir) + mkpath(cache_dir) + @logmsg loglevel "Created write-cache directory $cache_dir." + end + end + + try + if use_cache + append!(cache_fnames, tmp_filename.(target_fnames, Ref(cache_dir))) + @assert !any(isfile, cache_fnames) + end + + append!(staging_fnames, tmp_filename.(target_fnames)) + @assert !any(isfile, staging_fnames) + + writeto_fnames = use_cache ? cache_fnames : staging_fnames + @debug "Creating intermediate files $writeto_fnames." + f_write(writeto_fnames...) + + post_f_write_existing = isfile.(target_fnames) + if any(post_f_write_existing) + if all(post_f_write_existing) + if !overwrite + @logmsg loglevel "Files $target_fnames already exist, won't replace." + return nothing + end + else + !overwrite && throw(ErrorException("Only some of $target_fnames exist but not allowed to replace files")) + end + end + + try + if use_cache + @userfriendly_exceptions @sync for (cache_fn, staging_fn) in zip(cache_fnames, staging_fnames) + Threads.@spawn begin + @assert cache_fn != staging_fn + @debug "Moving file \"$cache_fn\" to \"$staging_fn\"." + isfile(cache_fn) || error("Expected file \"$cache_fn\" to exist, but it doesn't.") + mv(cache_fn, staging_fn; force=true) + end + end + empty!(cache_fnames) + end + + @userfriendly_exceptions @sync for i in eachindex(staging_fnames, target_fnames) + Threads.@spawn begin + staging_fn = staging_fnames[i] + target_fn = target_fnames[i] + @assert staging_fn != target_fn + @debug "Renaming file \"$staging_fn\" to \"$target_fn\"." + isfile(staging_fn) || error("Expected file \"$staging_fn\" to exist, but it doesn't.") + mv(staging_fn, target_fn; force=true) + isfile(target_fn) || error("Tried to rename file \"$staging_fn\" to \"$target_fn\", but \"$target_fn\" doesn't exist.") + move_complete[i] = true + end + end + empty!(staging_fnames) + + @logmsg loglevel "Created files $target_fnames." + catch + if any(move_complete) && !all(move_complete) + to_remove = target_fnames[findall(move_complete)] + @error "Failed to rename some of the temporary files to target files, removing $to_remove" + for fname in to_remove + rm(fname; force=true) + end + end + rethrow() + end + + @assert isempty(cache_fnames) + @assert isempty(staging_fnames) + finally + if delete_tmp_onerror + for cache_fn in cache_fnames + if isfile(cache_fn) + @debug "Removing left-over write-cache file \"$cache_fn\"." + rm(cache_fn; force=true) + end + end + for staging_fn in staging_fnames + if isfile(staging_fn) + @debug "Removing left-over write-staging file \"$staging_fn\"." + rm(staging_fn; force=true) + end + end + end + end + + return nothing +end +export create_files + + +""" + function read_files( + f_read, filenames::AbstractString...; + use_cache::Bool = true, cache_dir::AbstractString = tempdir(), + create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = true + ) + +Reads `filenames` in an atomic fashion (i.e. only if all `filenames` exist) +via a user-provided function `f_read`. The returns value of `f_read` is +passed through. + +If `use_cache` is `true`, then the files are first copied to the +temporary directory `cache_dir` under temporary names, and +`f_read(temporary_filenames...)` is called. The temporary files are deleted +afterwards. + +If `create_cachedir` is `true`, then `cache_dir` will be created if it doesn't +exist yet. If `delete_tmp_onerror` is true, then temporary files are +deleted even if `f_write` throws an exception. + +If `verbose` is `true`, uses log-level `Logging.Info` to log file reading, +otherwise `Logging.Debug`. + +```julia +write("foo.txt", "Hello"); write("bar.txt", "World") + +read_files("foo.txt", "bar.txt", use_cache = true) do foo, bar + read(foo, String) * " " * read(bar, String) +end +``` + +Set `ENV["JULIA_DEBUG"] = "ParallelProcessingTools"` to see a log of all +intermediate steps. + +On Linux you can set `use_cache = true` and `cache_dir = "/dev/shm"` to use +the default Linux RAM disk as an intermediate directory. +""" +function read_files( + @nospecialize(f_read), @nospecialize(filenames::AbstractString...); + use_cache::Bool = true, cache_dir::AbstractString = tempdir(), + create_cachedir::Bool = true, delete_tmp_onerror::Bool=true, + verbose::Bool = true +) + loglevel = verbose ? Info : Debug + + source_fnames = String[filenames...] # Fix type + cache_fnames = String[] + + input_exists = isfile.(source_fnames) + if !all(input_exists) + missing_inputs = source_fnames[findall(!, input_exists)] + throw(ErrorException("Missing input files $(missing_inputs).")) + end + + try + if use_cache + if !isdir(cache_dir) && create_cachedir + mkpath(cache_dir) + @logmsg loglevel "Created read-cache directory $cache_dir." + end + + append!(cache_fnames, tmp_filename.(source_fnames, Ref(cache_dir))) + @assert !any(isfile, cache_fnames) + + @userfriendly_exceptions @sync for (cache_fn, source_fn) in zip(cache_fnames, source_fnames) + Threads.@spawn begin + @assert cache_fn != source_fn + @debug "Copying file \"$source_fn\" to \"$cache_fn\"." + cp(source_fn, cache_fn) + isfile(cache_fn) || error("Tried to copy file \"$source_fn\" to \"$cache_fn\", but \"$cache_fn\" doesn't exist.") + end + end + end + + readfrom_fnames = use_cache ? cache_fnames : source_fnames + + @debug "Reading $(use_cache ? "cached " : "")files $readfrom_fnames." + result = f_read(readfrom_fnames...) + @logmsg loglevel "Read files $source_fnames." + + @userfriendly_exceptions @sync for cache_fn in cache_fnames + Threads.@spawn rm(cache_fn; force=true); + end + return result + finally + if delete_tmp_onerror + for cache_fn in cache_fnames + if isfile(cache_fn) + @debug "Removing left-over read-cache file \"$cache_fn\"." + rm(cache_fn; force=true) + end + end + end + end +end +export read_files diff --git a/test/runtests.jl b/test/runtests.jl index 11cf428..6e8dfb1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -6,6 +6,7 @@ Test.@testset "Package ParallelProcessingTools" begin @info "Testing with $(Base.Threads.nthreads()) Julia threads." include("test_aqua.jl") + include("test_fileio.jl") include("test_threadsafe.jl") include("test_threadlocal.jl") include("test_workpartition.jl") diff --git a/test/test_fileio.jl b/test/test_fileio.jl new file mode 100644 index 0000000..6c01bad --- /dev/null +++ b/test/test_fileio.jl @@ -0,0 +1,117 @@ +# This file is a part of ParallelProcessingTools.jl, licensed under the MIT License (MIT). + +using Test +using ParallelProcessingTools + +using ParallelProcessingTools: split_basename_ext, tmp_filename + +old_julia_debug = get(ENV, "JULIA_DEBUG", "") +ENV["JULIA_DEBUG"] = old_julia_debug * ",ParallelProcessingTools" + +@testset "fileio" begin + @testset "split_basename_ext" begin + @test @inferred(split_basename_ext("foo_bar baz.tar.gz")) == ("foo_bar baz", ".tar.gz") + end + + @testset "tmp_filename" begin + dir = joinpath("foo", "bar") + tmpdir = joinpath(tempdir(), "somedir") + bn = "test.tar.gz" + fn = joinpath(dir, bn) + + @test @inferred(tmp_filename(fn)) isa AbstractString + let tmpfn = @inferred tmp_filename(fn) + @test dirname(tmpfn) == dir + tmp_bn, tmp_ex = split_basename_ext(basename(tmpfn)) + @test startswith(tmp_bn, "test_") + @test tmp_ex == ".tar.gz" + end + + @test @inferred(tmp_filename(fn, tmpdir)) isa AbstractString + let tmpfn = @inferred tmp_filename(fn, tmpdir) + @test dirname(tmpfn) == tmpdir + tmp_bn, tmp_ex = split_basename_ext(basename(tmpfn)) + @test startswith(tmp_bn, "test_") + @test tmp_ex == ".tar.gz" + end + end + + for use_cache in [false, true] + @testset "create_files $(use_cache ? "with" : "without") cache" begin + mktempdir() do dir + data1 = "Hello" + data2 = "World" + + fn1 = joinpath(dir, "targetdir", "hello.txt") + fn2 = joinpath(dir, "targetdir", "world.txt") + + # Target directory does not exist yet: + try + # Will not create missing target directory: + create_files(fn1, fn2, use_cache = use_cache, create_dirs = false, verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end + @test false # Should have thrown an exception + catch err + @test err isa SystemError || err isa Base.IOError + end + + # Test atomicity, fail in between writing files: + @test_throws ErrorException create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + write(fn1, data1) + error("Some error") + write(fn2, data2) + end + @test !isfile(fn1) && !isfile(fn2) + + # Will create: + create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end + @test read(fn1, String) == data1 && read(fn2, String) == data2 + + # Modify the target files: + write(fn1, "dummy content"); write(fn2, "dummy content"); + + # Wont't overwrite: + create_files(fn1, fn2, use_cache = use_cache, overwrite = false, verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end + @test read(fn1, String) != data1 && read(fn2, String) != data2 + + # Will overwrite: + create_files(fn1, fn2, use_cache = use_cache, verbose = true) do fn1, fn2 + write(fn1, data1); write(fn2, data2) + end + @test read(fn1, String) == data1 && read(fn2, String) == data2 + end + end + end + + for use_cache in [false, true] + @testset "read_files $(use_cache ? "with" : "without") cache" begin + mktempdir() do dir + data1 = "Hello" + data2 = "World" + + fn1 = joinpath(dir, "targetdir", "hello.txt") + fn2 = joinpath(dir, "targetdir", "world.txt") + + mkpath(dirname(fn1)); mkpath(dirname(fn2)) + write(fn1, data1); write(fn2, data2) + + @test_throws ErrorException read_files( + (fn1, fn2) -> (read(fn1, String), read(fn2, String)), + fn1, "nosuchfile.txt", use_cache = use_cache, verbose = true + ) + + @test @inferred(read_files( + (fn1, fn2) -> (read(fn1, String), read(fn2, String)), + fn1, fn2, use_cache = use_cache, verbose = true + )) == (data1, data2) + end + end + end +end + +ENV["JULIA_DEBUG"] = old_julia_debug; nothing diff --git a/test/test_threadsafe.jl b/test/test_threadsafe.jl index 66eeef3..67b0287 100644 --- a/test/test_threadsafe.jl +++ b/test/test_threadsafe.jl @@ -24,7 +24,7 @@ using Base.Threads @testset "LockableIO" begin lv = @inferred LockableIO(IOBuffer()) - @test typeof(lv) <: LockableIO{Base.Base.GenericIOBuffer{Array{UInt8,1}}} + @test typeof(lv) <: LockableIO{typeof(IOBuffer())} f = s -> write(s, 10) broadcast(f, lv)