Skip to content

Commit 9a1343a

Browse files
committed
remove global withlock builder, pass it as argument instead
1 parent f10992e commit 9a1343a

File tree

2 files changed

+43
-22
lines changed

2 files changed

+43
-22
lines changed

src/ws/tiny_httpd_ws.ml

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ module With_lock = struct
1919
Mutex.unlock mutex;
2020
raise e);
2121
}
22-
23-
let builder : builder ref = ref default_builder
2422
end
2523

2624
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
@@ -78,15 +76,15 @@ module Writer = struct
7876
mutex: With_lock.t;
7977
}
8078

81-
let create ?(buf_size = 16 * 1024) ~oc () : t =
79+
let create ?(buf_size = 16 * 1024) ~with_lock ~oc () : t =
8280
{
8381
header = Header.create ();
8482
header_buf = Bytes.create 16;
8583
buf = Bytes.create buf_size;
8684
offset = 0;
8785
oc;
8886
closed = false;
89-
mutex = !With_lock.builder ();
87+
mutex = with_lock;
9088
}
9189

9290
let[@inline] close self = self.closed <- true
@@ -403,8 +401,8 @@ module Reader = struct
403401
)
404402
end
405403

406-
let upgrade ic oc : _ * _ =
407-
let writer = Writer.create ~oc () in
404+
let upgrade ?(with_lock = With_lock.default_builder ()) ic oc : _ * _ =
405+
let writer = Writer.create ~with_lock ~oc () in
408406
let reader = Reader.create ~ic ~writer () in
409407
let ws_ic : IO.Input.t =
410408
object
@@ -431,6 +429,7 @@ let upgrade ic oc : _ * _ =
431429
upgrade handler *)
432430
module Make_upgrade_handler (X : sig
433431
val accept_ws_protocol : string -> bool
432+
val with_lock : With_lock.builder
434433
val handler : handler
435434
end) : Server.UPGRADE_HANDLER with type handshake_state = unit Request.t =
436435
struct
@@ -475,17 +474,20 @@ struct
475474
try Ok (handshake_ req) with Bad_req s -> Error s
476475

477476
let handle_connection req ic oc =
478-
let ws_ic, ws_oc = upgrade ic oc in
477+
let with_lock = X.with_lock () in
478+
let ws_ic, ws_oc = upgrade ~with_lock ic oc in
479479
try X.handler req ws_ic ws_oc
480480
with Close_connection ->
481481
Log.debug (fun k -> k "websocket: requested to close the connection");
482482
()
483483
end
484484

485485
let add_route_handler ?accept ?(accept_ws_protocol = fun _ -> true) ?middlewares
486-
(server : Server.t) route (f : handler) : unit =
486+
?(with_lock = With_lock.default_builder) (server : Server.t) route
487+
(f : handler) : unit =
487488
let module M = Make_upgrade_handler (struct
488489
let handler = f
490+
let with_lock = with_lock
489491
let accept_ws_protocol = accept_ws_protocol
490492
end) in
491493
let up : Server.upgrade_handler = (module M) in

src/ws/tiny_httpd_ws.mli

Lines changed: 33 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,36 @@
33
This sub-library ([tiny_httpd.ws]) exports a small implementation for a
44
websocket server. It has no additional dependencies. *)
55

6+
(** Synchronization primitive used to allow both the reader to reply to "ping",
7+
and the handler to send messages, without stepping on each other's toes.
8+
9+
@since NEXT_RELEASE *)
10+
module With_lock : sig
11+
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
12+
(** A primitive to run the callback in a critical section where others cannot
13+
run at the same time.
14+
15+
The default is a mutex, but that works poorly with thread pools so it's
16+
possible to use a semaphore or a cooperative mutex instead. *)
17+
18+
type builder = unit -> t
19+
20+
val default_builder : builder
21+
(** Lock using [Mutex]. *)
22+
end
23+
624
type handler = unit Request.t -> IO.Input.t -> IO.Output.t -> unit
725
(** Websocket handler *)
826

9-
val upgrade : IO.Input.t -> IO.Output.t -> IO.Input.t * IO.Output.t
10-
(** Upgrade a byte stream to the websocket framing protocol. *)
27+
val upgrade :
28+
?with_lock:With_lock.t ->
29+
IO.Input.t ->
30+
IO.Output.t ->
31+
IO.Input.t * IO.Output.t
32+
(** Upgrade a byte stream to the websocket framing protocol.
33+
@param with_lock
34+
if provided, use this to prevent reader and writer to compete on sending
35+
frames. since NEXT_RELEASE. *)
1136

1237
exception Close_connection
1338
(** Exception that can be raised from IOs inside the handler, when the
@@ -17,14 +42,19 @@ val add_route_handler :
1742
?accept:(unit Request.t -> (unit, int * string) result) ->
1843
?accept_ws_protocol:(string -> bool) ->
1944
?middlewares:Server.Head_middleware.t list ->
45+
?with_lock:With_lock.builder ->
2046
Server.t ->
2147
(Server.upgrade_handler, Server.upgrade_handler) Route.t ->
2248
handler ->
2349
unit
2450
(** Add a route handler for a websocket endpoint.
2551
@param accept_ws_protocol
2652
decides whether this endpoint accepts the websocket protocol sent by the
27-
client. Default accepts everything. *)
53+
client. Default accepts everything.
54+
@param with_lock
55+
if provided, use this to synchronize writes between the frame reader
56+
(replies "pong" to "ping") and the handler emitting writes. since
57+
NEXT_RELEASE. *)
2858

2959
(**/**)
3060

@@ -33,15 +63,4 @@ module Private_ : sig
3363
mask_key:bytes -> mask_offset:int -> bytes -> int -> int -> unit
3464
end
3565

36-
(** @since NEXT_RELEASE *)
37-
module With_lock : sig
38-
type t = { with_lock: 'a. (unit -> 'a) -> 'a }
39-
type builder = unit -> t
40-
41-
val default_builder : builder
42-
(** Lock using [Mutex]. *)
43-
44-
val builder : builder ref
45-
end
46-
4766
(**/**)

0 commit comments

Comments
 (0)