Skip to content

Commit

Permalink
Add optional capacity argument to Queue.create and Stack.create
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed May 26, 2023
1 parent 368ccee commit a233cee
Show file tree
Hide file tree
Showing 12 changed files with 361 additions and 94 deletions.
11 changes: 11 additions & 0 deletions src/kcas_data/elems.ml
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,14 @@ let rev_prepend_to_seq t tl =
| `Reversed t' -> t'
in
prepend_to_seq t tl ()

let rec of_list_rev tl length = function
| [] -> tl
| x :: xs ->
let length = length + 1 in
of_list_rev { value = x; tl; length } length xs

let of_list_rev = function
| [] -> empty
| x :: xs -> of_list_rev (singleton x) 1 xs
[@@inline]
1 change: 1 addition & 0 deletions src/kcas_data/elems.mli
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ val prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val to_seq : 'a t -> 'a Seq.t
val of_seq_rev : 'a Seq.t -> 'a t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_list_rev : 'a list -> 'a t
113 changes: 113 additions & 0 deletions src/kcas_data/list_with_capacity.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
open Kcas

type 'a t = { capacity : int; length : int; list : 'a list; limit : int }

let empty_unlimited =
{ capacity = Int.max_int; length = 0; list = []; limit = Int.max_int }

let make_empty ~capacity =
if capacity = Int.max_int then empty_unlimited
else { capacity; length = 0; list = []; limit = capacity }
[@@inline]

let make ~capacity ~length ~list ~limit = { capacity; length; list; limit }
[@@inline]

let to_rev_elems t = Elems.of_list_rev t.list [@@inline]
let is_empty t = t.length = 0 [@@inline]
let length t = t.length [@@inline]
let capacity t = t.capacity [@@inline]
let limit t = t.limit [@@inline]
let list t = t.list [@@inline]

let tl_safe = function
| { list = []; _ } as t -> t
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }
[@@inline]

let tl_or_retry = function
| { list = []; _ } -> Retry.later ()
| { capacity; length; list = _ :: list; _ } as t ->
let limit = if capacity = Int.max_int then capacity else t.limit in
{ capacity; length = length - 1; list; limit }
[@@inline]

let hd_opt t = match t.list with [] -> None | x :: _ -> Some x [@@inline]

let hd_or_retry t = match t.list with [] -> Retry.later () | x :: _ -> x
[@@inline]

let hd_unsafe t = List.hd t.list [@@inline]

let cons_safe x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else t
[@@inline]

let cons_or_retry x ({ capacity; _ } as t) =
if capacity = Int.max_int then
let { length; list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit = capacity }
else
let { length; limit; _ } = t in
if length < limit then
let { list; _ } = t in
{ capacity; length = length + 1; list = x :: list; limit }
else Retry.later ()
[@@inline]

let move ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else
let { length; _ } = t in
if length = 0 then t
else
let { limit; _ } = t in
{ capacity; length = 0; list = []; limit = limit - length }
[@@inline]

let clear ({ capacity; _ } as t) =
if capacity = Int.max_int then empty_unlimited
else if t.length = 0 then t
else make_empty ~capacity
[@@inline]

let rec prepend_to_seq xs tl =
match xs with
| [] -> tl
| x :: xs -> fun () -> Seq.Cons (x, prepend_to_seq xs tl)

let to_seq { list; _ } = prepend_to_seq list Seq.empty

let rev_prepend_to_seq { length; list; _ } tl =
if length <= 1 then prepend_to_seq list tl
else
let t = ref (`Original list) in
fun () ->
let t =
match !t with
| `Original t' ->
(* This is domain safe as the result is always equivalent. *)
let t' = List.rev t' in
t := `Reversed t';
t'
| `Reversed t' -> t'
in
prepend_to_seq t tl ()

let of_list ?(capacity = Int.max_int) list =
let length = List.length list in
let limit = Int.min 0 (capacity - length) in
{ capacity; length; list; limit }

let of_seq_rev ?capacity xs =
of_list ?capacity (Seq.fold_left (fun xs x -> x :: xs) [] xs)
23 changes: 23 additions & 0 deletions src/kcas_data/list_with_capacity.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
type !'a t

val empty_unlimited : 'a t
val make_empty : capacity:int -> 'a t
val make : capacity:int -> length:int -> list:'a list -> limit:int -> 'a t
val is_empty : 'a t -> bool
val length : 'a t -> int
val capacity : 'a t -> int
val limit : 'a t -> int
val list : 'a t -> 'a list
val cons_safe : 'a -> 'a t -> 'a t
val cons_or_retry : 'a -> 'a t -> 'a t
val move : 'a t -> 'a t
val clear : 'a t -> 'a t
val to_rev_elems : 'a t -> 'a Elems.t
val to_seq : 'a t -> 'a Seq.t
val rev_prepend_to_seq : 'a t -> 'a Seq.t -> 'a Seq.t
val of_seq_rev : ?capacity:int -> 'a Seq.t -> 'a t
val tl_safe : 'a t -> 'a t
val tl_or_retry : 'a t -> 'a t
val hd_opt : 'a t -> 'a option
val hd_or_retry : 'a t -> 'a
val hd_unsafe : 'a t -> 'a
167 changes: 106 additions & 61 deletions src/kcas_data/queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,127 +2,172 @@ open Kcas

type 'a t = {
front : 'a Elems.t Loc.t;
middle : 'a Elems.t Loc.t;
back : 'a Elems.t Loc.t;
back : 'a List_with_capacity.t Loc.t;
middle : 'a List_with_capacity.t Loc.t;
}

let alloc ~front ~middle ~back =
let alloc ~front ~back ~middle =
(* We allocate locations in specific order to make most efficient use of the
splay-tree based transaction log. *)
let front = Loc.make front
and middle = Loc.make middle
and back = Loc.make back in
and back = Loc.make back
and middle = Loc.make middle in
{ back; middle; front }

let create () = alloc ~front:Elems.empty ~middle:Elems.empty ~back:Elems.empty
let create ?(capacity = Int.max_int) () =
if capacity < 0 then invalid_arg "Queue.create: capacity must be non-negative";
let back = List_with_capacity.make_empty ~capacity in
alloc ~front:Elems.empty ~back ~middle:List_with_capacity.empty_unlimited

let copy q =
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.middle, Xt.get ~xt q.back) in
let front, middle, back = Xt.commit { tx } in
alloc ~front ~middle ~back
let tx ~xt = (Xt.get ~xt q.front, Xt.get ~xt q.back, Xt.get ~xt q.middle) in
let front, back, middle = Xt.commit { tx } in
alloc ~front ~back ~middle

module Xt = struct
let is_empty ~xt { back; middle; front } =
let is_empty ~xt q =
(* We access locations in order of allocation to make most efficient use of
the splay-tree based transaction log. *)
Xt.get ~xt front == Elems.empty
&& Xt.get ~xt middle == Elems.empty
&& Xt.get ~xt back == Elems.empty

let length ~xt { back; middle; front } =
Elems.length (Xt.get ~xt front)
+ Elems.length (Xt.get ~xt middle)
+ Elems.length (Xt.get ~xt back)

let add ~xt x q = Xt.unsafe_modify ~xt q.back @@ Elems.cons x
Xt.get ~xt q.front == Elems.empty
&& List_with_capacity.is_empty (Xt.get ~xt q.back)
&& Xt.get ~xt q.middle == List_with_capacity.empty_unlimited

let length ~xt q =
Elems.length (Xt.get ~xt q.front)
+ List_with_capacity.length (Xt.get ~xt q.back)
+ List_with_capacity.length (Xt.get ~xt q.middle)

let try_add ~xt x q =
let lwc = Xt.unsafe_update ~xt q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
||
let other_length =
List_with_capacity.length (Xt.get ~xt q.middle)
+ Elems.length (Xt.get ~xt q.front)
in
let limit = capacity - other_length in
back_length < limit
&&
(Xt.set ~xt q.back
(List_with_capacity.make ~capacity ~length:(back_length + 1)
~list:(x :: List_with_capacity.list lwc)
~limit);
true)

let add ~xt x q = Retry.unless (try_add ~xt x q)
let push = add

(** Cooperative helper to move elems from back to middle. *)
let back_to_middle ~middle ~back =
let back_to_middle ~back ~middle =
let tx ~xt =
let xs = Xt.exchange ~xt back Elems.empty in
if xs == Elems.empty || Xt.exchange ~xt middle xs != Elems.empty then
raise Exit
let xs = Xt.unsafe_update ~xt back List_with_capacity.move in
if
List_with_capacity.length xs = 0
|| Xt.exchange ~xt middle xs != List_with_capacity.empty_unlimited
then raise Exit
in
try Xt.commit { tx } with Exit -> ()

let take_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let take_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front (Elems.tl_safe elems);
Elems.hd_opt elems

let take_opt ~xt { back; middle; front } =
let take_opt ~xt { front; back; middle } =
let elems = Xt.unsafe_update ~xt front Elems.tl_safe in
if elems != Elems.empty then Elems.hd_opt elems
else (
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
take_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then take_opt_finish ~xt front elems else None)
let lwc = Xt.unsafe_update ~xt back List_with_capacity.move in
if List_with_capacity.length lwc <> 0 then take_opt_finish ~xt front lwc
else None)

let take_blocking ~xt q = Xt.to_blocking ~xt (take_opt q)

let peek_opt_finish ~xt front elems =
let elems = Elems.rev elems in
let peek_opt_finish ~xt front lwc =
let elems = List_with_capacity.to_rev_elems lwc in
Xt.set ~xt front elems;
Elems.hd_opt elems

let peek_opt ~xt { back; middle; front } =
let peek_opt ~xt { front; back; middle } =
let elems = Xt.get ~xt front in
if elems != Elems.empty then Elems.hd_opt elems
else (
if not (Xt.is_in_log ~xt middle || Xt.is_in_log ~xt back) then
back_to_middle ~middle ~back;
let elems = Xt.exchange ~xt middle Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems
back_to_middle ~back ~middle;
let lwc = Xt.exchange ~xt middle List_with_capacity.empty_unlimited in
if lwc != List_with_capacity.empty_unlimited then
peek_opt_finish ~xt front lwc
else
let elems = Xt.exchange ~xt back Elems.empty in
if elems != Elems.empty then peek_opt_finish ~xt front elems else None)
let lwc = Xt.unsafe_update ~xt back List_with_capacity.move in
if List_with_capacity.length lwc <> 0 then peek_opt_finish ~xt front lwc
else None)

let peek_blocking ~xt q = Xt.to_blocking ~xt (peek_opt q)

let clear ~xt { back; middle; front } =
Xt.set ~xt front Elems.empty;
Xt.set ~xt middle Elems.empty;
Xt.set ~xt back Elems.empty
let clear ~xt q =
Xt.set ~xt q.front Elems.empty;
Xt.unsafe_modify ~xt q.back List_with_capacity.clear;
Xt.set ~xt q.middle List_with_capacity.empty_unlimited

let swap ~xt q1 q2 =
let front = Xt.get ~xt q1.front
and middle = Xt.get ~xt q1.middle
and back = Xt.get ~xt q1.back in
and back = Xt.get ~xt q1.back
and middle = Xt.get ~xt q1.middle in
let front = Xt.exchange ~xt q2.front front
and middle = Xt.exchange ~xt q2.middle middle
and back = Xt.exchange ~xt q2.back back in
and back = Xt.exchange ~xt q2.back back
and middle = Xt.exchange ~xt q2.middle middle in
Xt.set ~xt q1.front front;
Xt.set ~xt q1.middle middle;
Xt.set ~xt q1.back back
Xt.set ~xt q1.back back;
Xt.set ~xt q1.middle middle

let seq_of ~front ~middle ~back =
(* Sequence construction is lazy, so this function is O(1). *)
Seq.empty
|> Elems.rev_prepend_to_seq back
|> Elems.rev_prepend_to_seq middle
|> List_with_capacity.rev_prepend_to_seq back
|> List_with_capacity.rev_prepend_to_seq middle
|> Elems.prepend_to_seq front

let to_seq ~xt { front; middle; back } =
let front = Xt.get ~xt front
and middle = Xt.get ~xt middle
and back = Xt.get ~xt back in
let to_seq ~xt q =
let front = Xt.get ~xt q.front
and back = Xt.get ~xt q.back
and middle = Xt.get ~xt q.middle in
seq_of ~front ~middle ~back

let take_all ~xt { front; middle; back } =
let front = Xt.exchange ~xt front Elems.empty
and middle = Xt.exchange ~xt middle Elems.empty
and back = Xt.exchange ~xt back Elems.empty in
let take_all ~xt q =
let front = Xt.exchange ~xt q.front Elems.empty
and back = Xt.unsafe_update ~xt q.back List_with_capacity.clear
and middle = Xt.exchange ~xt q.middle List_with_capacity.empty_unlimited in
seq_of ~front ~middle ~back
end

let is_empty q = Kcas.Xt.commit { tx = Xt.is_empty q }
let length q = Kcas.Xt.commit { tx = Xt.length q }
let add x q = Loc.modify q.back @@ Elems.cons x

let try_add x q =
let lwc = Loc.update q.back (List_with_capacity.cons_safe x) in
let capacity = List_with_capacity.capacity lwc in
capacity = Int.max_int
||
let back_length = List_with_capacity.length lwc in
back_length < List_with_capacity.limit lwc
|| Kcas.Xt.commit { tx = Xt.try_add x q }

let add x q =
let lwc = Loc.update q.back (List_with_capacity.cons_safe x) in
if List_with_capacity.capacity lwc <> Int.max_int then
if List_with_capacity.length lwc = List_with_capacity.limit lwc then
Kcas.Xt.commit { tx = Xt.add x q }

let push = add

let take_opt q =
Expand Down
Loading

0 comments on commit a233cee

Please sign in to comment.