Skip to content

Commit

Permalink
Merge pull request #13 from gares/fix-second-chance
Browse files Browse the repository at this point in the history
Fix second chance
  • Loading branch information
gares authored Nov 7, 2024
2 parents 68eb770 + 46108d8 commit b403e0c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 42 deletions.
18 changes: 10 additions & 8 deletions lib/events.ml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type 'a res = ('a,exn) result

(* Events in progress *)
type _ in_progress =
| Line : Bytes.t * Buffer.t -> string in_progress
| Line : int * Bytes.t * Buffer.t -> string in_progress
| Bytes : int * Bytes.t -> Bytes.t in_progress

(* Reified function composition *)
Expand Down Expand Up @@ -126,7 +126,7 @@ let mkReadInProgress fd = function
| FCons _ as f -> Advanced (ReadInProgress(fd,f))
| FNil x -> Yes x

let one_line () = Line (Bytes.make 1 '0', Buffer.create 40)
let one_line ?(at_least=1) () = Line (at_least,Bytes.make at_least '0', Buffer.create (max 40 at_least))
let some_bytes n ?(buff=Bytes.create n) () = Bytes(n,buff)

module On = struct
Expand Down Expand Up @@ -161,10 +161,12 @@ let parse_content_length_or err k s =
with
(Scanf.Scan_failure _ | Failure _ | End_of_file | Invalid_argument _) as e ->
err (Error e)

let len_httpcle_header = String.length "CONTENT-LENGTH: \n"

let an_httpcle (k : Bytes.t res -> 'b) : 'b fcomp =
let (--?) x y = err k x y in
one_line ()
one_line ~at_least:len_httpcle_header ()
--? (parse_content_length_or (finish_with k) (fun length ->
one_line ()
--? (fun _discard ->
Expand Down Expand Up @@ -199,19 +201,19 @@ let advance_system ready_fds _ = function
else ready_fds, Yes (k code)
| ReadInProgress(_, FNil _) -> assert false
| ReadInProgress(fd,_) as x when not (List.mem fd ready_fds) -> ready_fds, No x
| ReadInProgress(fd, FCons(Line (buff,acc) as line,rest)) ->
| ReadInProgress(fd, FCons(Line (m,buff,acc),rest)) ->
let ready_fds = List.filter ((<>) fd) ready_fds in
ready_fds,
begin try
let n = Unix.read fd buff 0 1 in
let n = Unix.read fd buff 0 m in
if n = 0 then begin
Buffer.clear acc;
mkReadInProgress fd (rest (Error End_of_file))
end else
let c = Bytes.get buff 0 in
let c = Bytes.get buff (n-1) in
if c != '\n' then begin
Buffer.add_char acc c;
mkReadInProgress fd (FCons(line,rest))
Buffer.add_bytes acc (Bytes.sub buff 0 n);
mkReadInProgress fd (FCons(Line (max (m-n) 1,buff,acc),rest))
end else begin
let one_line = Buffer.contents acc in
Buffer.clear acc;
Expand Down
36 changes: 19 additions & 17 deletions lib/sel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -92,22 +92,21 @@ end
(* Like List.filter but also returns the minimum priority of ready events.
Moreover ~advance can make the event advance (whilst not being ready yet)*)
let pull_ready ~advance st l =
let rec pull_ready yes no min_priority st l =
let rec pull_ready yes min_priority_ready no st l =
match Sorted.look l with
| Sorted.Nil -> yes, no, min_priority
| Sorted.Nil -> yes, no, min_priority_ready
| Sorted.Cons(({ WithAttributes.it; cancelled; priority; _ } as e, _), rest) ->
match advance st cancelled it with
| st, Yes y ->
let min_priority = Sorted.min_user min_priority priority in
let min_priority_ready = Sorted.min_user min_priority_ready priority in
let e = drop_event_type y e in
pull_ready (Sorted.cons e e.priority yes) no min_priority st rest
pull_ready (Sorted.cons e e.priority yes) min_priority_ready no st rest
| st, Advanced x ->
let min_priority = Sorted.min_user min_priority priority in
pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
| st, No x ->
pull_ready yes (Sorted.cons { e with it = x } e.priority no) min_priority st rest
pull_ready yes min_priority_ready (Sorted.cons { e with it = x } e.priority no) st rest
in
pull_ready Sorted.nil Sorted.nil Sorted.max_priority st l
pull_ready Sorted.nil Sorted.max_priority Sorted.nil st l

type ('a,'b) ev_checker =
'a WithAttributes.t Sorted.t -> 'b WithAttributes.t Sorted.t * 'a WithAttributes.t Sorted.t * Sorted.priority
Expand All @@ -128,21 +127,21 @@ let filter_file_descriptor fds = function
The result is that it when reading 'n' bytes, it is no longer necessary to interleave up to 'n' ready tasks.
*)
let check_for_system_events min_prio_task_queue : ('a system_event,'a) ev_checker = fun waiting ->
let rec check_for_system_events new_ready waiting_skipped min_prio waiting =
let rec check_for_system_events new_ready waiting_skipped min_prio_ready waiting =
let fds = file_descriptors_of waiting in
let ready_fds, _, _ = Unix.select fds [] [] 0.0 in
let new_ready_1, waiting, min_prio_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready_1, waiting, min_prio_ready_1 = pull_ready ~advance:advance_system ready_fds waiting in
let new_ready = Sorted.append new_ready_1 new_ready in
let min_prio = Sorted.min_user min_prio_1 min_prio in
let min_prio_ready = Sorted.min_user min_prio_ready_1 min_prio_ready in
if ready_fds = [] then
new_ready, Sorted.append waiting waiting_skipped, min_prio
new_ready, Sorted.append waiting waiting_skipped, min_prio_ready
else
let waiting, waiting_skipped_1 = Sorted.partition (filter_file_descriptor ready_fds) waiting in
let waiting, waiting_skipped_2 = Sorted.partition_priority (Sorted.le_user min_prio) waiting in
let waiting_skipped = Sorted.concat [waiting_skipped_2; waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio waiting
let waiting_skipped = Sorted.concat [waiting_skipped_1; waiting_skipped] in
check_for_system_events new_ready waiting_skipped min_prio_ready waiting
in
check_for_system_events Sorted.nil Sorted.nil min_prio_task_queue waiting
let waiting, waiting_skipped = Sorted.partition_priority (fun x -> Sorted.le_user x min_prio_task_queue) waiting in
check_for_system_events Sorted.nil waiting_skipped Sorted.max_priority waiting

let check_for_queue_events : ('a queue_event,'a) ev_checker =
fun waiting ->
Expand All @@ -158,7 +157,10 @@ let rec wait_for_system_or_queue_events ~deadline (fds,sys) queue =
let ready_sys, waiting_sys, min_prio_sys = pull_ready ~advance:advance_system ready_fds sys in
let ready_queue, waiting_queue, min_prio_queue = pull_ready ~advance:advance_queue () queue in
if ready_sys <> Sorted.nil || ready_queue <> Sorted.nil
then ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_queue min_prio_sys
then
let min_prio = Sorted.min_priority min_prio_queue min_prio_sys in
let new_ready_sys, waiting_sys, min_prio_new_ready_sys = check_for_system_events min_prio waiting_sys in
Sorted.append new_ready_sys ready_sys, ready_queue, waiting_sys, waiting_queue, Sorted.min_priority min_prio_new_ready_sys min_prio
else wait_for_system_or_queue_events ~deadline (fds,waiting_sys) queue

let wait_for_system_or_queue_events ~deadline sys queue =
Expand Down
33 changes: 32 additions & 1 deletion test/fairness_prio.ml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ let%test_unit "sel.loop" =
write "Content-Length: 3\n\n123\n";
let x = Sel.now ~priority:2 (Ok "bad") in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let loop todo =
let ready, _todo = Sel.pop todo in
match ready with
| Ok "bad" -> [%test_eq: string] "" "bad1"
Expand All @@ -71,3 +71,34 @@ let%test_unit "sel.loop" =
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

let%test_unit "sel.loop2" =
let read, write = pipe () in
let e = On.httpcle ~priority:1 read (fun x -> Result.map ~f:Bytes.to_string x) in
write "Content-Length: 3\n\n12";
let x = Sel.now ~priority:2 (Ok "bad") in
let todo = Todo.add Todo.empty [e;x] in
let loop todo =
let ready, _todo = Sel.pop todo in
match ready with
| Ok "bad" -> ()
| Ok _ -> [%test_eq: string] "" "bad2"
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

(* pop_opt terminates *)
let%test_unit "sel.loop3" =
let read, write = pipe () in
let e = On.line ~priority:1 read (fun x -> x) in
write "aa\nbb\ncc\n";
let read2, write2 = pipe () in
let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let ready, todo = Sel.pop todo in
match ready with
| Ok "cc" -> ()
| Ok s -> write2 s; loop (Todo.add todo [e])
| Error End_of_file -> ()
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

30 changes: 14 additions & 16 deletions test/perf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,20 +55,18 @@ let read_leftover read n =

(*****************************************************************************)

(* pop_opt terminates *)
let%test_unit "sel.loop" =
let %test_unit "sel.event.http_cle" =
let read, write = pipe () in
let e = On.line ~priority:1 read (fun x -> x) in
write "aa\nbb\ncc\n";
let read2, write2 = pipe () in
let x = On.bytes ~priority:2 read2 2 (function Error e -> Error e | Ok s -> Error (Failure (Stdlib.Format.asprintf "lower priority event triggered: '%s'" (Bytes.to_string s)))) in
let todo = Todo.add Todo.empty [e;x] in
let rec loop todo =
let ready, todo = Sel.pop todo in
match ready with
| Ok "cc" -> ()
| Ok s -> write2 s; loop (Todo.add todo [e])
| Error End_of_file -> ()
| Error e -> [%test_eq: string] "" (Stdlib.Printexc.to_string e) in
loop todo

let e = On.httpcle read b2s in
let t0 = Unix.gettimeofday () in
let n = 99999 in
for _i = 1 to n do
let todo = Todo.add Todo.empty [e] in
write "content-Length: 4\n\n1\n3.";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
[%test_eq: string option] ready (Some "1\n3.");
done;
let t1 = Unix.gettimeofday () in
Stdlib.Printf.eprintf "time to pop %d httpcle events: %f\n" n (t1 -. t0)
;;
6 changes: 6 additions & 0 deletions test/test1.ml
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,12 @@ let %test_unit "sel.event.http_cle.recurring.err" =
write "a\n";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
(* depending on how eager the event is to read data *)
[%test_eq: bool] (osmatch "\\(.*Scan_failure.*\\|End_of_file\\)" ready) true;
let todo = Todo.add todo [e] in
write "content-Lengtx: 2\n";
let ready, todo = pop_opt todo in
[%test_eq: bool] (Todo.is_empty todo) true;
[%test_eq: bool] (osmatch ".*Scan_failure.*" ready) true;
let todo = Todo.add todo [e] in
write "content-Length: 4\n\n4\n6.";
Expand Down

0 comments on commit b403e0c

Please # to comment.