Skip to content

Commit 77b0802

Browse files
committed
Add more structured Run operations
1 parent 035e19c commit 77b0802

14 files changed

+435
-76
lines changed

bench/bench_run.ocaml4.ml

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
let run_suite ~budgetf:_ = []

bench/bench_run.ocaml5.ml

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
open Multicore_bench
2+
open Picos_std_structured
3+
module Multififos = Picos_mux_multififo
4+
5+
let run_one_multififo ~budgetf ~n_domains ~n () =
6+
let context = ref (Obj.magic ()) in
7+
8+
let m = if n < 1_000_000 then 1_000_000 / n else 1 in
9+
10+
let before _ = context := Multififos.context () in
11+
let init _ = !context in
12+
let wrap i context action =
13+
if i = 0 then Multififos.run ~context action else action ()
14+
in
15+
let work i context =
16+
if i <> 0 then Multififos.runner_on_this_thread context
17+
else
18+
for _ = 1 to m do
19+
Run.for_n n ignore
20+
done
21+
in
22+
23+
let config =
24+
Printf.sprintf "%d mfifo%s, run_n %d" n_domains
25+
(if n_domains = 1 then "" else "s")
26+
n
27+
in
28+
Times.record ~budgetf ~n_domains ~before ~init ~wrap ~work ()
29+
|> Times.to_thruput_metrics ~n:(n * m) ~singular:"ignore" ~config
30+
31+
let run_suite ~budgetf =
32+
Util.cross [ 1; 2; 4; 8 ]
33+
[ 100; 1_000; 10_000; 100_000; 1_000_000; 10_000_000 ]
34+
|> List.concat_map @@ fun (n_domains, n) ->
35+
if Picos_domain.recommended_domain_count () < n_domains then []
36+
else run_one_multififo ~budgetf ~n_domains ~n ()

bench/dune

+6
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
(run %{test} -brief "Picos binaries")
2424
(run %{test} -brief "Bounded_q with Picos_std_sync")
2525
(run %{test} -brief "Memory usage")
26+
(run %{test} -brief "Picos_std_structured.Run")
2627
;;
2728
))
2829
(foreign_stubs
@@ -49,6 +50,11 @@
4950
from
5051
(picos_mux.fifo -> scheduler.ocaml5.ml)
5152
(picos_mux.thread -> scheduler.ocaml4.ml))
53+
(select
54+
bench_run.ml
55+
from
56+
(picos_mux.multififo -> bench_run.ocaml5.ml)
57+
(-> bench_run.ocaml4.ml))
5258
(select
5359
bench_fib.ml
5460
from

bench/main.ml

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ let benchmarks =
2222
("Picos binaries", Bench_binaries.run_suite);
2323
("Bounded_q with Picos_std_sync", Bench_bounded_q.run_suite);
2424
("Memory usage", Bench_memory.run_suite);
25+
("Picos_std_structured.Run", Bench_run.run_suite);
2526
]
2627

2728
let () = Multicore_bench.Cmd.run ~benchmarks ()

lib/picos_std.structured/bundle.ml

+48-28
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,17 @@ type t = [ `Bundle ] tdt
1616

1717
external config_as_atomic : t -> int Atomic.t = "%identity"
1818

19-
let config_terminated_bit = 0x01
20-
and config_callstack_mask = 0x3E
21-
and config_callstack_shift = 1
22-
and config_one = 0x40 (* memory runs out before overflow *)
19+
let config_on_return_terminate_bit = 0x01
20+
and config_on_terminate_raise_bit = 0x02
21+
and config_callstack_mask = 0x6C
22+
and config_callstack_shift = 2
23+
and config_one = 0x80 (* memory runs out before overflow *)
2324

2425
let flock_key : [ `Bundle | `Nothing ] tdt Fiber.FLS.t = Fiber.FLS.create ()
2526

26-
let terminate_as callstack (Bundle { bundle = Packed bundle; _ } : t) =
27-
Computation.cancel bundle Control.Terminate callstack
28-
29-
let terminate ?callstack t =
30-
terminate_as (Control.get_callstack_opt callstack) t
27+
let terminate ?callstack (Bundle { bundle = Packed bundle; _ } : t) =
28+
Computation.cancel bundle Control.Terminate
29+
(Control.get_callstack_opt callstack)
3130

3231
let terminate_after ?callstack (Bundle { bundle = Packed bundle; _ } : t)
3332
~seconds =
@@ -39,25 +38,33 @@ let error ?callstack (Bundle r as t : t) exn bt =
3938
terminate ?callstack t;
4039
Control.Errors.push r.errors exn bt
4140
end
41+
else if
42+
Atomic.get (config_as_atomic t) land config_on_terminate_raise_bit <> 0
43+
then terminate ?callstack t
4244

4345
let decr (Bundle r as t : t) =
4446
let n = Atomic.fetch_and_add (config_as_atomic t) (-config_one) in
4547
if n < config_one * 2 then begin
46-
terminate_as Control.empty_bt t;
4748
Trigger.signal r.finished
4849
end
4950

5051
type _ pass = FLS : unit pass | Arg : t pass
5152

5253
let[@inline never] no_flock () = invalid_arg "no flock"
5354

55+
let[@inline] on_terminate = function
56+
| None | Some `Ignore -> `Ignore
57+
| Some `Raise -> `Raise
58+
5459
let get_flock fiber =
5560
match Fiber.FLS.get fiber flock_key ~default:Nothing with
5661
| Bundle _ as t -> t
5762
| Nothing -> no_flock ()
5863

5964
let await (Bundle r as t : t) fiber packed canceler outer =
6065
Fiber.set_computation fiber packed;
66+
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
67+
Fiber.FLS.set fiber flock_key outer;
6168
let forbid = Fiber.exchange fiber ~forbid:true in
6269
let n = Atomic.fetch_and_add (config_as_atomic t) (-config_one) in
6370
if config_one * 2 <= n then begin
@@ -66,14 +73,22 @@ let await (Bundle r as t : t) fiber packed canceler outer =
6673
write from being delayed after the [Trigger.await] below. *)
6774
if config_one <= Atomic.fetch_and_add (config_as_atomic t) 0 then
6875
Trigger.await r.finished |> ignore
69-
end
70-
else terminate_as Control.empty_bt t;
76+
end;
7177
Fiber.set fiber ~forbid;
72-
if Fiber.FLS.get fiber flock_key ~default:Nothing != outer then
73-
Fiber.FLS.set fiber flock_key outer;
7478
let (Packed parent) = packed in
7579
Computation.detach parent canceler;
7680
Control.Errors.check r.errors;
81+
begin
82+
let (Packed bundle) = r.bundle in
83+
match Computation.peek_exn bundle with
84+
| _ -> ()
85+
| exception Computation.Running ->
86+
Computation.cancel bundle Control.Terminate Control.empty_bt
87+
| exception Control.Terminate
88+
when Atomic.get (config_as_atomic t) land config_on_terminate_raise_bit
89+
= 0 ->
90+
()
91+
end;
7792
Fiber.check fiber
7893

7994
let[@inline never] raised exn t fiber packed canceler outer =
@@ -84,7 +99,7 @@ let[@inline never] raised exn t fiber packed canceler outer =
8499

85100
let[@inline never] returned value (t : t) fiber packed canceler outer =
86101
let config = Atomic.get (config_as_atomic t) in
87-
if config land config_terminated_bit <> 0 then begin
102+
if config land config_on_return_terminate_bit <> 0 then begin
88103
let callstack =
89104
let n = (config land config_callstack_mask) lsr config_callstack_shift in
90105
if n = 0 then None else Some n
@@ -99,25 +114,30 @@ let join_after_realloc x fn t fiber packed canceler outer =
99114
| value -> returned value t fiber packed canceler outer
100115
| exception exn -> raised exn t fiber packed canceler outer
101116

102-
let join_after_pass (type a) ?callstack ?on_return (fn : a -> _) (pass : a pass)
103-
=
117+
let join_after_pass (type a) ?callstack ?on_return ?on_terminate (fn : a -> _)
118+
(pass : a pass) =
104119
(* The sequence of operations below ensures that nothing is leaked. *)
105120
let (Bundle r as t : t) =
106-
let terminated =
121+
let config =
107122
match on_return with
108-
| None | Some `Wait -> 0
109-
| Some `Terminate -> config_terminated_bit
123+
| None | Some `Wait -> config_one
124+
| Some `Terminate -> config_one lor config_on_return_terminate_bit
110125
in
111-
let callstack =
126+
let config =
127+
match on_terminate with
128+
| None | Some `Ignore -> config
129+
| Some `Raise -> config lor config_on_terminate_raise_bit
130+
in
131+
let config =
112132
match callstack with
113-
| None -> 0
133+
| None -> config
114134
| Some n ->
115-
if n <= 0 then 0
135+
if n <= 0 then config
116136
else
117-
Int.min n (config_callstack_mask lsr config_callstack_shift)
118-
lsl config_callstack_shift
137+
config
138+
lor Int.min n (config_callstack_mask lsr config_callstack_shift)
139+
lsl config_callstack_shift
119140
in
120-
let config = config_one lor callstack lor terminated in
121141
let bundle = Computation.Packed (Computation.create ~mode:`LIFO ()) in
122142
let errors = Control.Errors.create () in
123143
let finished = Trigger.signaled in
@@ -219,8 +239,8 @@ let fork_pass (type a) (Bundle r as t : t) thunk (pass : a pass) =
219239
let is_running (Bundle { bundle = Packed bundle; _ } : t) =
220240
Computation.is_running bundle
221241

222-
let join_after ?callstack ?on_return fn =
223-
join_after_pass ?callstack ?on_return fn Arg
242+
let join_after ?callstack ?on_return ?on_terminate fn =
243+
join_after_pass ?callstack ?on_return ?on_terminate fn Arg
224244

225245
let fork t thunk = fork_pass t thunk Arg
226246
let fork_as_promise t thunk = fork_as_promise_pass t thunk Arg

lib/picos_std.structured/control.ml

+9-7
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ module Errors = struct
4141
| [ (exn, bt) ] -> Printexc.raise_with_backtrace exn bt
4242
| exn_bts -> check exn_bts []
4343

44-
let rec push t exn bt backoff =
45-
let before = Atomic.get t in
46-
let after = (exn, bt) :: before in
47-
if not (Atomic.compare_and_set t before after) then
48-
push t exn bt (Backoff.once backoff)
49-
50-
let push t exn bt = push t exn bt Backoff.default
44+
let push t exn bt =
45+
let backoff = ref Backoff.default in
46+
while
47+
let before = Atomic.get t in
48+
let after = (exn, bt) :: before in
49+
not (Atomic.compare_and_set t before after)
50+
do
51+
backoff := Backoff.once !backoff
52+
done
5153
end
5254

5355
let raise_if_canceled () = Fiber.check (Fiber.current ())

lib/picos_std.structured/dune

+12
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,15 @@
1+
(rule
2+
(enabled_if
3+
(<= 5.0.0 %{ocaml_version}))
4+
(action
5+
(copy for.ocaml5.ml for.ml)))
6+
7+
(rule
8+
(enabled_if
9+
(< %{ocaml_version} 5.0.0))
10+
(action
11+
(copy for.ocaml4.ml for.ml)))
12+
113
(library
214
(name picos_std_structured)
315
(public_name picos_std.structured)

lib/picos_std.structured/flock.ml

+2-2
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ let error ?callstack exn_bt = Bundle.error (get ()) ?callstack exn_bt
1010
let fork_as_promise thunk = Bundle.fork_as_promise_pass (get ()) thunk FLS
1111
let fork action = Bundle.fork_pass (get ()) action FLS
1212

13-
let join_after ?callstack ?on_return fn =
14-
Bundle.join_after_pass ?callstack ?on_return fn Bundle.FLS
13+
let join_after ?callstack ?on_return ?on_terminate fn =
14+
Bundle.join_after_pass ?callstack ?on_return ?on_terminate fn Bundle.FLS
+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
type _ tdt =
2+
| Empty : [> `Empty ] tdt
3+
| Range : {
4+
mutable lo : int;
5+
hi : int;
6+
parent : [ `Empty | `Range ] tdt;
7+
}
8+
-> [> `Range ] tdt
9+
10+
let[@poll error] cas_lo (Range r : [ `Range ] tdt) before after =
11+
r.lo == before
12+
&& begin
13+
r.lo <- after;
14+
true
15+
end
16+
17+
let rec for_out t (Range r as range : [ `Range ] tdt) action =
18+
let lo_before = r.lo in
19+
let n = r.hi - lo_before in
20+
if 0 < n then begin
21+
if Bundle.is_running t then begin
22+
let lo_after = lo_before + 1 in
23+
if cas_lo range lo_before lo_after then begin
24+
try action lo_before
25+
with exn -> Bundle.error t exn (Printexc.get_raw_backtrace ())
26+
end;
27+
for_out t range action
28+
end
29+
end
30+
else
31+
match r.parent with
32+
| Empty -> ()
33+
| Range _ as range -> for_out t range action
34+
35+
let rec for_in t (Range r as range : [ `Range ] tdt) action =
36+
let lo_before = r.lo in
37+
let n = r.hi - lo_before in
38+
if n <= 1 then for_out t range action
39+
else
40+
let lo_after = lo_before + (n asr 1) in
41+
if cas_lo range lo_before lo_after then begin
42+
Bundle.fork t (fun () -> for_in t range action);
43+
let child = Range { lo = lo_before; hi = lo_after; parent = range } in
44+
for_in t child action
45+
end
46+
else for_in t range action
47+
48+
let for_n ?on_terminate n action =
49+
if 0 < n then
50+
if n = 1 then
51+
try action 0
52+
with
53+
| Control.Terminate when Bundle.on_terminate on_terminate == `Ignore ->
54+
()
55+
else
56+
let range = Range { lo = 0; hi = n; parent = Empty } in
57+
Bundle.join_after ?on_terminate @@ fun t -> for_in t range action

0 commit comments

Comments
 (0)