Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dune-project
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
(dune (>= 2.0))
(ounit2 :with-test))
(depopts
atomic
base-unix)
(tags
(topics io channels streams)))
Expand Down
2 changes: 1 addition & 1 deletion iostream.opam
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ depends: [
"dune" {>= "2.0"}
"ounit2" {with-test}
]
depopts: ["base-unix"]
depopts: ["atomic" "base-unix"]
build: [
["dune" "subst"] {pinned}
[
Expand Down
176 changes: 112 additions & 64 deletions src/camlzip/iostream_camlzip.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ open struct
let size = Option.value ~default:default_buf_size buf_size in
Bytes.create size

type decompress_state =
type transduce_state =

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual name, I would have gone with "codec" instead of "transducer" I think. Or maybe just "compressor".

| In_progress
| Consuming_rest
| Done
Expand All @@ -21,93 +21,141 @@ type mode =
| Inflate
| Deflate of int

class transduce_in_ ~mode (ic : #In_buf.t) : In.t =
let zlib_str =
match mode with
| Inflate -> Zlib.inflate_init false
| Deflate lvl -> Zlib.deflate_init lvl false
in
let state = ref In_progress in
object
method close () =
(match mode with
| Inflate -> Zlib.inflate_end zlib_str
| Deflate _ -> Zlib.deflate_end zlib_str);
In.close ic

method input buf i len =
let n_written = ref 0 in

while !n_written = 0 && !state != Done do
match !state with
| Done -> assert false
| In_progress ->
let islice = In_buf.fill_buf ic in
if islice.len = 0 then
state := Consuming_rest
else (
let finished, used_in, used_out =
(match mode with
| Inflate -> Zlib.inflate
| Deflate _ -> Zlib.deflate)
zlib_str islice.bytes islice.off islice.len buf i len
Zlib.Z_NO_FLUSH
in
if finished then state := Done;
In_buf.consume ic used_in;
n_written := used_out
)
| Consuming_rest ->
(* finish sending the internal state *)
let islice = Slice.empty in
module In_trans = struct
type t =
| St : {
ic: 'ic;
mode: mode;
mutable state: transduce_state;
zlib_str: Zlib.stream;
close: 'ic -> unit;
fill_buf: deadline:float option -> 'ic -> Slice.t;
}
-> t

let of_in ~mode (ic : #In_buf.t) : t =
let zlib_str =
match mode with
| Inflate -> Zlib.inflate_init false
| Deflate lvl -> Zlib.deflate_init lvl false
in
St
{
state = In_progress;
mode;
zlib_str;
ic;
close = In_buf.close;
fill_buf =
(fun ~deadline ic ->
assert (deadline = None);
In_buf.fill_buf ic);
}

let close (St self) =
(match self.mode with
| Inflate -> Zlib.inflate_end self.zlib_str
| Deflate _ -> Zlib.deflate_end self.zlib_str);
self.close self.ic

let input ~deadline (St self) buf i len : int =
let n_written = ref 0 in

while !n_written = 0 && self.state != Done do
match self.state with
| Done -> assert false
| In_progress ->
let islice = self.fill_buf ~deadline self.ic in
if islice.len = 0 then
self.state <- Consuming_rest
else (
let finished, used_in, used_out =
(match mode with
(match self.mode with
| Inflate -> Zlib.inflate
| Deflate _ -> Zlib.deflate)
zlib_str islice.bytes islice.off islice.len buf i len
Zlib.Z_FINISH
self.zlib_str islice.bytes islice.off islice.len buf i len
Zlib.Z_NO_FLUSH
in
assert (used_in = 0);
if finished then state := Done;
if finished then self.state <- Done;
Slice.consume islice used_in;
n_written := used_out
done;
!n_written
)
| Consuming_rest ->
(* finish sending the internal state *)
let islice = Slice.empty in
let finished, used_in, used_out =
(match self.mode with
| Inflate -> Zlib.inflate
| Deflate _ -> Zlib.deflate)
self.zlib_str islice.bytes islice.off islice.len buf i len
Zlib.Z_FINISH
in
assert (used_in = 0);
if finished then self.state <- Done;
n_written := used_out
done;
!n_written
end

class trans_in_ ~mode (ic : #In_buf.t) : In.t =
let st = In_trans.of_in ~mode ic in
object
method close () = In_trans.close st
method input buf i len = In_trans.input ~deadline:None st buf i len
end

let[@inline] decompress_in (ic : #In_buf.t) : In.t =
new transduce_in_ ~mode:Inflate ic
let decompress_in (ic : #In_buf.t) : In.t = new trans_in_ ~mode:Inflate ic

let[@inline] compress_in ?(level = _default_comp_level) (ic : #In_buf.t) : In.t
=
new transduce_in_ ~mode:(Deflate level) ic
let compress_in ?(level = _default_comp_level) (ic : #In_buf.t) : In.t =
new trans_in_ ~mode:(Deflate level) ic

let decompress_in_buf ?buf_size ?buf (ic : #In_buf.t) : In_buf.t =
class trans_in_buf_ ?buf_size ?buf ~mode (ic : #In_buf.t) : In_buf.t =
let bytes = get_buf ?buf_size ?buf () in
let st = In_trans.of_in ~mode ic in
object
(* use [transduce_in_] but hide its [input] method *)
inherit transduce_in_ ~mode:Inflate ic as underlying

(* use regular bufferized [input] *)
inherit! In_buf.t_from_refill ~bytes ()
inherit In_buf.t_from_refill ~bytes ()

method private refill (slice : Slice.t) =
slice.len <- underlying#input slice.bytes 0 (Bytes.length slice.bytes)
slice.len <-
In_trans.input ~deadline:None st slice.bytes 0
(Bytes.length slice.bytes)

method close () = In_trans.close st
end

let decompress_in_buf ?buf_size ?buf (ic : #In_buf.t) : In_buf.t =
new trans_in_buf_ ?buf_size ?buf ~mode:Inflate ic

let compress_in_buf ?buf_size ?buf ?(level = _default_comp_level)
(ic : #In_buf.t) : In_buf.t =
new trans_in_buf_ ?buf_size ?buf ~mode:(Deflate level) ic

class trans_in_buf_timeout_ ?buf_size ?buf ~now_s ~mode
(ic : #In_buf.t_with_timeout) : In_buf.t_with_timeout =
let bytes = get_buf ?buf_size ?buf () in
let st = In_trans.of_in ~mode ic in
object
(* use [transduce_in_] but hide its [input] method *)
inherit transduce_in_ ~mode:(Deflate level) ic as underlying

(* use regular bufferized [input] *)
inherit! In_buf.t_from_refill ~bytes ()
inherit In_buf.t_with_timeout_from_refill ~bytes ()

method private refill (slice : Slice.t) =
slice.len <- underlying#input slice.bytes 0 (Bytes.length slice.bytes)
method private refill_with_timeout t (slice : Slice.t) =
let deadline = now_s () +. t in
slice.len <-
In_trans.input ~deadline:(Some deadline) st slice.bytes 0
(Bytes.length slice.bytes)

method close () = In_trans.close st
end

let decompress_in_buf_with_timeout ?buf_size ?buf ~now_s
(ic : #In_buf.t_with_timeout) : In_buf.t_with_timeout =
new trans_in_buf_timeout_ ?buf_size ?buf ~now_s ~mode:Inflate ic

let compress_in_buf_with_timeout ?buf_size ?buf ?(level = _default_comp_level)
~now_s (ic : #In_buf.t_with_timeout) : In_buf.t_with_timeout =
new trans_in_buf_timeout_ ?buf_size ?buf ~mode:(Deflate level) ~now_s ic

(* write output buffer to out *)
let write_out (oc : #Out.t) (slice : Slice.t) : unit =
if slice.len > 0 then (
Expand Down
20 changes: 20 additions & 0 deletions src/camlzip/iostream_camlzip.mli
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ val decompress_in : #In_buf.t -> In.t
val decompress_in_buf : ?buf_size:int -> ?buf:bytes -> #In_buf.t -> In_buf.t
(** Like {!decompress_in} but the output is buffered as well. *)

val decompress_in_buf_with_timeout :
?buf_size:int ->
?buf:bytes ->
now_s:(unit -> float) ->
#In_buf.t_with_timeout ->
In_buf.t_with_timeout
(** Like {!decompress_in} but the output is buffered as well.
@param now_s a monotonic clock, in seconds
@since NEXT_RELEASE *)

val compress_in : ?level:int -> #In_buf.t -> In.t
(** [compress_in ?level ic] is a new input stream
that is the compressed version of [ic].
Expand All @@ -16,6 +26,16 @@ val compress_in_buf :
?buf_size:int -> ?buf:bytes -> ?level:int -> #In_buf.t -> In_buf.t
(** Same as {!compress_in} but returning a buffered input. *)

val compress_in_buf_with_timeout :
?buf_size:int ->
?buf:bytes ->
?level:int ->
now_s:(unit -> float) ->
#In_buf.t_with_timeout ->
In_buf.t_with_timeout
(** @param now_s a monotonic clock, in seconds
@since NEXT_RELEASE *)

val compressed_out :
?buf_size:int -> ?buf:bytes -> ?level:int -> #Out.t -> Out_buf.t
(** [compressed_out oc] takes a output stream [oc], and
Expand Down
9 changes: 9 additions & 0 deletions src/core/in.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ class type t_seekable =
inherit Seekable.t
end

class type t_with_timeout =
object
inherit t
method input_with_timeout : float -> bytes -> int -> int -> int
end

let create ?(close = ignore) ~input () : t =
object
method close = close
Expand Down Expand Up @@ -195,3 +201,6 @@ let input_all ?(buf = Bytes.create 128) (self : #t) : string =
Bytes.unsafe_to_string !buf
else
Bytes.sub_string !buf 0 !i

let[@inline] input_with_timeout (self : #t_with_timeout) t buf i len =
self#input_with_timeout t buf i len
20 changes: 20 additions & 0 deletions src/core/in.mli
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,18 @@ class type t_seekable =
inherit Seekable.t
end

(** Input stream where [input] takes a timeout.
This is useful for network operations.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And for so many other operations!

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tbh I don't know a lot of IOs that are truly non blocking besides networking? At least on linux, which is what I know, normal IOs on the filesystem are always blocking, for example.

@since NEXT_RELEASE *)
class type t_with_timeout =
object
inherit t

method input_with_timeout : float -> bytes -> int -> int -> int
(** [input_with_timeout t buf i len] tries to read [len] bytes into [buf]
at offset [i]. It raises {!Timeout.Timeout} after [t] seconds without a read *)
end

val create :
?close:(unit -> unit) -> input:(bytes -> int -> int -> int) -> unit -> t

Expand Down Expand Up @@ -110,3 +122,11 @@ val copy_into : ?buf:bytes -> #t -> #Out.t -> unit

val map_char : (char -> char) -> #t -> t
(** Transform the stream byte by byte *)

val input_with_timeout : #t_with_timeout -> float -> bytes -> int -> int -> int

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstrings here mention "read" more often than "input", maybe these functions should also be named read_*. (Not sure if that would create name clashes elsewhere.)

(** [input_with_timeout t buf i len] tries to read [len] bytes into [buf]
at offset [i]. It raises {!Timeout.Timeout} after [t] seconds without a read.
@raise Invalid_argument if the arguments do not denote a valid slice.
@raise Timeout.Timeout if a read didn't succeed in [t] seconds.
@since NEXT_RELEASE
*)
Loading