Provide semaphores in the threading library (#9930)
This commit adds a new thread-related module Semaphore, implementing counting semaphores and binary semaphores. The two kinds of semaphores are presented as two different abstract types in two sub-modules, Counting and Binary.master
parent
f809e9dec1
commit
426b10c6a8
5
Changes
5
Changes
|
@ -300,6 +300,11 @@ Working version
|
|||
(Ivan Gotovchits and Xavier Leroy, review by Sébastien Hinderer and
|
||||
David Allsopp)
|
||||
|
||||
- #9930: new module Semaphore in the thread library, implementing
|
||||
counting semaphores and binary semaphores
|
||||
(Xavier Leroy, review by Daniel Bünzli and Damien Doligez,
|
||||
additional suggestions by Stephen Dolan and Craig Ferguson)
|
||||
|
||||
- #9958: Raise exception in case of error in Unix.setsid.
|
||||
(Nicolás Ojeda Bär, review by Stephen Dolan)
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ COMPILER_LIBS_INTF = Asthelper.tex Astmapper.tex Asttypes.tex \
|
|||
$(COMPILER_LIBS_PLUGIN_HOOKS)
|
||||
|
||||
OTHERLIB_INTF = Unix.tex UnixLabels.tex Str.tex \
|
||||
Thread.tex Mutex.tex Condition.tex Event.tex \
|
||||
Thread.tex Mutex.tex Condition.tex Semaphore.tex Event.tex \
|
||||
Dynlink.tex Bigarray.tex
|
||||
|
||||
INTF = $(CORE_INTF) $(STDLIB_INTF) $(COMPILER_LIBS_INTF) $(OTHERLIB_INTF)
|
||||
|
|
|
@ -31,11 +31,13 @@ the "-I +threads" option (see chapter~\ref{c:camlc}).
|
|||
\item \ahref{libref/Thread.html}{Module \texttt{Thread}: lightweight threads}
|
||||
\item \ahref{libref/Mutex.html}{Module \texttt{Mutex}: locks for mutual exclusion}
|
||||
\item \ahref{libref/Condition.html}{Module \texttt{Condition}: condition variables to synchronize between threads}
|
||||
\item \ahref{libref/Semaphore.html}{Module \texttt{Semaphore}: semaphores, another thread synchronization mechanism}
|
||||
\item \ahref{libref/Event.html}{Module \texttt{Event}: first-class synchronous communication}
|
||||
\end{links}
|
||||
\else
|
||||
\input{Thread.tex}
|
||||
\input{Mutex.tex}
|
||||
\input{Condition.tex}
|
||||
\input{Semaphore.tex}
|
||||
\input{Event.tex}
|
||||
\fi
|
||||
|
|
|
@ -23,7 +23,7 @@ STR_MLIS = $(addprefix $(SRC)/otherlibs/str/, str.mli)
|
|||
UNIX_MLIS = $(addprefix $(SRC)/otherlibs/unix/, unix.mli unixLabels.mli)
|
||||
DYNLINK_MLIS = $(addprefix $(SRC)/otherlibs/dynlink/, dynlink.mli)
|
||||
THREAD_MLIS = $(addprefix $(SRC)/otherlibs/systhreads/, \
|
||||
thread.mli condition.mli mutex.mli event.mli threadUnix.mli)
|
||||
thread.mli condition.mli mutex.mli event.mli semaphore.mli threadUnix.mli)
|
||||
DRIVER_MLIS = $(SRC)/driver/pparse.mli
|
||||
|
||||
|
||||
|
|
|
@ -20,6 +20,15 @@ mutex.cmo : \
|
|||
mutex.cmx : \
|
||||
mutex.cmi
|
||||
mutex.cmi :
|
||||
semaphore.cmo : \
|
||||
mutex.cmi \
|
||||
condition.cmi \
|
||||
semaphore.cmi
|
||||
semaphore.cmx : \
|
||||
mutex.cmx \
|
||||
condition.cmx \
|
||||
semaphore.cmi
|
||||
semaphore.cmi :
|
||||
thread.cmo : \
|
||||
thread.cmi
|
||||
thread.cmx : \
|
||||
|
|
|
@ -49,12 +49,15 @@ LIBNAME=threads
|
|||
BYTECODE_C_OBJS=st_stubs.b.$(O)
|
||||
NATIVECODE_C_OBJS=st_stubs.n.$(O)
|
||||
|
||||
THREADS_SOURCES = thread.ml mutex.ml condition.ml event.ml threadUnix.ml
|
||||
THREADS_SOURCES = thread.ml mutex.ml condition.ml event.ml threadUnix.ml \
|
||||
semaphore.ml
|
||||
|
||||
THREADS_BCOBJS = $(THREADS_SOURCES:.ml=.cmo)
|
||||
THREADS_NCOBJS = $(THREADS_SOURCES:.ml=.cmx)
|
||||
|
||||
MLIFILES=thread.mli mutex.mli condition.mli event.mli threadUnix.mli
|
||||
MLIFILES=thread.mli mutex.mli condition.mli event.mli threadUnix.mli \
|
||||
semaphore.mli
|
||||
|
||||
CMIFILES=$(MLIFILES:.mli=.cmi)
|
||||
|
||||
all: lib$(LIBNAME).$(A) $(LIBNAME).cma $(CMIFILES)
|
||||
|
|
|
@ -0,0 +1,86 @@
|
|||
(**************************************************************************)
|
||||
(* *)
|
||||
(* OCaml *)
|
||||
(* *)
|
||||
(* Xavier Leroy, Collège de France and INRIA Paris *)
|
||||
(* *)
|
||||
(* Copyright 2020 Institut National de Recherche en Informatique et *)
|
||||
(* en Automatique. *)
|
||||
(* *)
|
||||
(* All rights reserved. This file is distributed under the terms of *)
|
||||
(* the GNU Lesser General Public License version 2.1, with the *)
|
||||
(* special exception on linking described in the file LICENSE. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** Semaphores *)
|
||||
|
||||
type sem = {
|
||||
mut: Mutex.t; (* protects [v] *)
|
||||
mutable v: int; (* the current value *)
|
||||
nonzero: Condition.t (* signaled when [v > 0] *)
|
||||
}
|
||||
|
||||
module Counting = struct
|
||||
|
||||
type t = sem
|
||||
|
||||
let make v =
|
||||
if v < 0 then invalid_arg "Semaphore.Counting.init: wrong initial value";
|
||||
{ mut = Mutex.create(); v; nonzero = Condition.create() }
|
||||
|
||||
let release s =
|
||||
Mutex.lock s.mut;
|
||||
if s.v < max_int then begin
|
||||
s.v <- s.v + 1;
|
||||
Condition.signal s.nonzero;
|
||||
Mutex.unlock s.mut
|
||||
end else begin
|
||||
Mutex.unlock s.mut;
|
||||
raise (Sys_error "Semaphore.Counting.release: overflow")
|
||||
end
|
||||
|
||||
let acquire s =
|
||||
Mutex.lock s.mut;
|
||||
while s.v = 0 do Condition.wait s.nonzero s.mut done;
|
||||
s.v <- s.v - 1;
|
||||
Mutex.unlock s.mut
|
||||
|
||||
let try_acquire s =
|
||||
Mutex.lock s.mut;
|
||||
let ret = if s.v = 0 then false else (s.v <- s.v - 1; true) in
|
||||
Mutex.unlock s.mut;
|
||||
ret
|
||||
|
||||
let get_value s = s.v
|
||||
|
||||
end
|
||||
|
||||
module Binary = struct
|
||||
|
||||
type t = sem
|
||||
|
||||
let make b =
|
||||
{ mut = Mutex.create();
|
||||
v = if b then 1 else 0;
|
||||
nonzero = Condition.create() }
|
||||
|
||||
let release s =
|
||||
Mutex.lock s.mut;
|
||||
s.v <- 1;
|
||||
Condition.signal s.nonzero;
|
||||
Mutex.unlock s.mut
|
||||
|
||||
let acquire s =
|
||||
Mutex.lock s.mut;
|
||||
while s.v = 0 do Condition.wait s.nonzero s.mut done;
|
||||
s.v <- 0;
|
||||
Mutex.unlock s.mut
|
||||
|
||||
let try_acquire s =
|
||||
Mutex.lock s.mut;
|
||||
let ret = if s.v = 0 then false else (s.v <- 0; true) in
|
||||
Mutex.unlock s.mut;
|
||||
ret
|
||||
|
||||
end
|
|
@ -0,0 +1,140 @@
|
|||
(**************************************************************************)
|
||||
(* *)
|
||||
(* OCaml *)
|
||||
(* *)
|
||||
(* Xavier Leroy, Collège de France and INRIA Paris *)
|
||||
(* *)
|
||||
(* Copyright 2020 Institut National de Recherche en Informatique et *)
|
||||
(* en Automatique. *)
|
||||
(* *)
|
||||
(* All rights reserved. This file is distributed under the terms of *)
|
||||
(* the GNU Lesser General Public License version 2.1, with the *)
|
||||
(* special exception on linking described in the file LICENSE. *)
|
||||
(* *)
|
||||
(**************************************************************************)
|
||||
|
||||
(** Semaphores
|
||||
|
||||
A semaphore is a thread synchronization device that can be used to
|
||||
control access to a shared resource.
|
||||
|
||||
Two flavors of semaphores are provided: counting semaphores and
|
||||
binary semaphores.
|
||||
|
||||
@since 4.12 *)
|
||||
|
||||
(** {2 Counting semaphores} *)
|
||||
|
||||
(**
|
||||
A counting semaphore is a counter that can be accessed concurrently
|
||||
by several threads. The typical use is to synchronize producers and
|
||||
consumers of a resource by counting how many units of the resource
|
||||
are available.
|
||||
|
||||
The two basic operations on semaphores are:
|
||||
- "release" (also called "V", "post", "up", and "signal"), which
|
||||
increments the value of the counter. This corresponds to producing
|
||||
one more unit of the shared resource and making it available to others.
|
||||
- "acquire" (also called "P", "wait", "down", and "pend"), which
|
||||
waits until the counter is greater than zero and decrements it.
|
||||
This corresponds to consuming one unit of the shared resource.
|
||||
|
||||
@since 4.12 *)
|
||||
|
||||
module Counting : sig
|
||||
|
||||
type t
|
||||
(** The type of counting semaphores. *)
|
||||
|
||||
val make : int -> t
|
||||
(** [make n] returns a new counting semaphore, with initial value [n].
|
||||
The initial value [n] must be nonnegative.
|
||||
|
||||
@raise Invalid_argument if [n < 0]
|
||||
*)
|
||||
|
||||
val release : t -> unit
|
||||
(** [release s] increments the value of semaphore [s].
|
||||
If other threads are waiting on [s], one of them is restarted.
|
||||
If the current value of [s] is equal to [max_int], the value of
|
||||
the semaphore is unchanged and a [Sys_error] exception is raised
|
||||
to signal overflow.
|
||||
|
||||
@raise Sys_error if the value of the semaphore would overflow [max_int]
|
||||
*)
|
||||
|
||||
val acquire : t -> unit
|
||||
(** [acquire s] blocks the calling thread until the value of semaphore [s]
|
||||
is not zero, then atomically decrements the value of [s] and returns.
|
||||
*)
|
||||
|
||||
val try_acquire : t -> bool
|
||||
(** [try_acquire s] immediately returns [false] if the value of semaphore [s]
|
||||
is zero. Otherwise, the value of [s] is atomically decremented
|
||||
and [try_acquire s] returns [true].
|
||||
*)
|
||||
|
||||
val get_value : t -> int
|
||||
(** [get_value s] returns the current value of semaphore [s].
|
||||
The current value can be modified at any time by concurrent
|
||||
{!release} and {!acquire} operations. Hence, the [get_value]
|
||||
operation is racy, and its result should only be used for debugging
|
||||
or informational messages.
|
||||
*)
|
||||
|
||||
end
|
||||
|
||||
(** {2 Binary semaphores} *)
|
||||
|
||||
(** Binary semaphores are a variant of counting semaphores
|
||||
where semaphores can only take two values, 0 and 1.
|
||||
|
||||
A binary semaphore can be used to control access to a single
|
||||
shared resource, with value 1 meaning "resource is available" and
|
||||
value 0 meaning "resource is unavailable".
|
||||
|
||||
The "release" operation of a binary semaphore sets its value to 1,
|
||||
and "acquire" waits until the value is 1 and sets it to 0.
|
||||
|
||||
A binary semaphore can be used instead of a mutex (see module
|
||||
{!Mutex}) when the mutex discipline (of unlocking the mutex from the
|
||||
thread that locked it) is too restrictive. The "acquire" operation
|
||||
corresponds to locking the mutex, and the "release" operation to
|
||||
unlocking it, but "release" can be performed in a thread different
|
||||
than the one that performed the "acquire". Likewise, it is safe
|
||||
to release a binary semaphore that is already available.
|
||||
|
||||
@since 4.12
|
||||
*)
|
||||
|
||||
module Binary : sig
|
||||
|
||||
type t
|
||||
(** The type of binary semaphores. *)
|
||||
|
||||
val make : bool -> t
|
||||
(** [make b] returns a new binary semaphore.
|
||||
If [b] is [true], the initial value of the semaphore is 1, meaning
|
||||
"available". If [b] is [false], the initial value of the
|
||||
semaphore is 0, meaning "unavailable".
|
||||
*)
|
||||
|
||||
val release : t -> unit
|
||||
(** [release s] sets the value of semaphore [s] to 1, putting it in the
|
||||
"available" state. If other threads are waiting on [s], one of them is
|
||||
restarted.
|
||||
*)
|
||||
|
||||
val acquire : t -> unit
|
||||
(** [acquire s] blocks the calling thread until the semaphore [s]
|
||||
has value 1 (is available), then atomically sets it to 0
|
||||
and returns.
|
||||
*)
|
||||
|
||||
val try_acquire : t -> bool
|
||||
(** [try_acquire s] immediately returns [false] if the semaphore [s]
|
||||
has value 0. If [s] has value 1, its value is atomically set to 0
|
||||
and [try_acquire s] returns [true].
|
||||
*)
|
||||
|
||||
end
|
Loading…
Reference in New Issue