diff --git a/src_lockfree/skiplist.ml b/src_lockfree/skiplist.ml index bde2ff48..227a8239 100644 --- a/src_lockfree/skiplist.ml +++ b/src_lockfree/skiplist.ml @@ -29,6 +29,7 @@ generation based on the dynamic number of bindings. *) module Atomic = Multicore_magic.Transparent_atomic +module Atomic_array = Multicore_magic.Atomic_array (* OCaml doesn't allow us to use one of the unused (always 0) bits in pointers for the marks and an indirection is needed. This representation avoids the @@ -56,7 +57,7 @@ and ('k, 'v) link = | Link : ('k, 'v, [< `Null | `Node | `Mark ]) node -> ('k, 'v) link [@@unboxed] -and ('k, 'v) links = ('k, 'v) link Atomic.t array +and ('k, 'v) links = ('k, 'v) link Atomic_array.t type 'k compare = 'k -> 'k -> int (* Encoding the [compare] function using an algebraic type would allow the @@ -106,41 +107,35 @@ let[@inline] is_marked = function boolean return value is only meaningful when [lowest] is given as [0]. *) let rec find_path t key preds succs lowest = let prev = t.root in - let level = Array.length prev - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_path_rec t key prev prev_at_level preds succs level lowest - (Atomic.get prev_at_level) + let level = Atomic_array.length prev - 1 in + find_path_rec t key prev preds succs level lowest + (Atomic_array.unsafe_fenceless_get prev level) -and find_path_rec t key prev prev_at_level preds succs level lowest = function +and find_path_rec t key prev preds succs level lowest = function | Link Null -> if level < Array.length preds then begin - Array.unsafe_set preds level prev_at_level; - Array.unsafe_set succs level Null + Array.unsafe_set preds level prev; + Array.unsafe_set succs level (Link Null) end; lowest < level && let level = level - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_path_rec t key prev prev_at_level preds succs level lowest - (Atomic.get prev_at_level) + find_path_rec t key prev preds succs level lowest + (Atomic_array.unsafe_fenceless_get prev level) | Link (Node r as curr) -> begin - let next_at_level = Array.unsafe_get r.next level in - match Atomic.get next_at_level with + match Atomic_array.unsafe_fenceless_get r.next level with | Link (Null | Node _) as next -> let c = t.compare key r.key in - if 0 < c then - find_path_rec t key r.next next_at_level preds succs level lowest - next + if 0 < c then find_path_rec t key r.next preds succs level lowest next else begin if level < Array.length preds then begin - Array.unsafe_set preds level (Array.unsafe_get prev level); - Array.unsafe_set succs level curr + Array.unsafe_set preds level prev; + Array.unsafe_set succs level (Link curr) end; if lowest < level then let level = level - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_path_rec t key prev prev_at_level preds succs level lowest - (Atomic.get prev_at_level) + find_path_rec t key prev preds succs level lowest + (Atomic_array.unsafe_fenceless_get prev level) else begin if level = 0 && r.incr != Size.used_once then begin Size.update_once t.size r.incr; @@ -153,11 +148,11 @@ and find_path_rec t key prev prev_at_level preds succs level lowest = function (* The [curr_node] is being removed from the skiplist and we help with that. *) if level = 0 then Size.update_once t.size r.decr; - find_path_rec t key prev prev_at_level preds succs level lowest + find_path_rec t key prev preds succs level lowest (let after = Link r.node in - if Atomic.compare_and_set prev_at_level (Link curr) after then - after - else Atomic.get prev_at_level) + if Atomic_array.unsafe_compare_and_set prev level (Link curr) after + then after + else Atomic_array.unsafe_fenceless_get prev level) end | Link (Mark _) -> (* The node corresponding to [prev] is being removed from the skiplist. @@ -172,24 +167,22 @@ and find_path_rec t key prev prev_at_level preds succs level lowest = function is found. *) let rec find_node t key = let prev = t.root in - let level = Array.length prev - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_node_rec t key prev prev_at_level level (Atomic.get prev_at_level) + let level = Atomic_array.length prev - 1 in + find_node_rec t key prev level (Atomic_array.unsafe_fenceless_get prev level) -and find_node_rec t key prev prev_at_level level : - _ -> (_, _, [< `Null | `Node ]) node = function +and find_node_rec t key prev level : _ -> (_, _, [< `Null | `Node ]) node = + function | Link Null -> if 0 < level then let level = level - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_node_rec t key prev prev_at_level level (Atomic.get prev_at_level) + find_node_rec t key prev level + (Atomic_array.unsafe_fenceless_get prev level) else Null | Link (Node r as curr) -> begin - let next_at_level = Array.unsafe_get r.next level in - match Atomic.get next_at_level with + match Atomic_array.unsafe_fenceless_get r.next level with | Link (Null | Node _) as next -> let c = t.compare key r.key in - if 0 < c then find_node_rec t key r.next next_at_level level next + if 0 < c then find_node_rec t key r.next level next else if 0 = c then begin (* At this point we know the node was not removed, because removal is done in order of descending levels. *) @@ -201,17 +194,16 @@ and find_node_rec t key prev prev_at_level level : end else if 0 < level then let level = level - 1 in - let prev_at_level = Array.unsafe_get prev level in - find_node_rec t key prev prev_at_level level - (Atomic.get prev_at_level) + find_node_rec t key prev level + (Atomic_array.unsafe_fenceless_get prev level) else Null | Link (Mark r) -> if level = 0 then Size.update_once t.size r.decr; - find_node_rec t key prev prev_at_level level + find_node_rec t key prev level (let after = Link r.node in - if Atomic.compare_and_set prev_at_level (Link curr) after then - after - else Atomic.get prev_at_level) + if Atomic_array.unsafe_compare_and_set prev level (Link curr) after + then after + else Atomic_array.unsafe_fenceless_get prev level) end | Link (Mark _) -> find_node t key @@ -223,11 +215,11 @@ let create ?(max_height = 10) ~compare () = practice. *) if max_height < 1 || 30 < max_height then invalid_arg "Skiplist: max_height must be in the range [1, 30]"; - let root = Array.init max_height @@ fun _ -> Atomic.make (Link Null) in + let root = Atomic_array.make max_height (Link Null) in let size = Size.create () in { compare; root; size } -let max_height_of t = Array.length t.root +let max_height_of t = Atomic_array.length t.root (* *) @@ -244,13 +236,14 @@ let rec try_add t key value preds succs = (not (find_path t key preds succs 0)) && let (Node r as node : (_, _, [ `Node ]) node) = - let next = Array.map (fun succ -> Atomic.make (Link succ)) succs in + let next = Atomic_array.of_array succs in let incr = Size.new_once t.size Size.incr in Node { key; value; incr; next } in if - let succ = Link (Array.unsafe_get succs 0) in - Atomic.compare_and_set (Array.unsafe_get preds 0) succ (Link node) + let succ = Array.unsafe_get succs 0 in + Atomic_array.unsafe_compare_and_set (Array.unsafe_get preds 0) 0 succ + (Link node) then begin if r.incr != Size.used_once then begin Size.update_once t.size r.incr; @@ -258,8 +251,8 @@ let rec try_add t key value preds succs = end; (* The node is now considered as added to the skiplist. *) let rec update_levels level = - if Array.length r.next = level then begin - if is_marked (Atomic.get (Array.unsafe_get r.next (level - 1))) then begin + if Atomic_array.length r.next = level then begin + if is_marked (Atomic_array.unsafe_fenceless_get r.next (level - 1)) then begin (* The node we finished adding has been removed concurrently. To ensure that no references we added to the node remain, we call [find_node] which will remove nodes with marked references along @@ -269,23 +262,26 @@ let rec try_add t key value preds succs = true end else if - let succ = Link (Array.unsafe_get succs level) in - Atomic.compare_and_set (Array.unsafe_get preds level) succ (Link node) + let succ = Array.unsafe_get succs level in + Atomic_array.unsafe_compare_and_set + (Array.unsafe_get preds level) + level succ (Link node) then update_levels (level + 1) else let _found = find_path t key preds succs level in let rec update_nexts level' = if level' < level then update_levels level else - let next = Array.unsafe_get r.next level' in - match Atomic.get next with + match Atomic_array.unsafe_fenceless_get r.next level' with | Link (Null | Node _) as before -> - let succ = Link (Array.unsafe_get succs level') in + let succ = Array.unsafe_get succs level' in if before != succ then (* It is possible for a concurrent remove operation to have marked the link. *) - if Atomic.compare_and_set next before succ then - update_nexts (level' - 1) + if + Atomic_array.unsafe_compare_and_set r.next level' before + succ + then update_nexts (level' - 1) else update_levels level else update_nexts (level' - 1) | Link (Mark _) -> @@ -296,25 +292,25 @@ let rec try_add t key value preds succs = find_node t key |> ignore; true in - update_nexts (Array.length r.next - 1) + update_nexts (Atomic_array.length r.next - 1) in update_levels 1 end else try_add t key value preds succs let try_add t key value = - let height = get_random_height (Array.length t.root) in + let height = get_random_height (Atomic_array.length t.root) in let preds = (* Init with [Obj.magic ()] is safe as the array is fully overwritten by [find_path] called at the start of the recursive [try_add]. *) Array.make height (Obj.magic ()) in - let succs = Array.make height Null in + let succs = Array.make height (Link Null) in try_add t key value preds succs (* *) -let rec try_remove t key next level link = function +let rec try_remove t key next level = function | Link (Mark r) -> if level = 0 then begin Size.update_once t.size r.decr; @@ -322,14 +318,17 @@ let rec try_remove t key next level link = function end else let level = level - 1 in - let link = Array.unsafe_get next level in - try_remove t key next level link (Atomic.get link) + try_remove t key next level + (Atomic_array.unsafe_fenceless_get next level) | Link ((Null | Node _) as succ) -> let decr = if level = 0 then Size.new_once t.size Size.decr else Size.used_once in let marked_succ = Mark { node = succ; decr } in - if Atomic.compare_and_set link (Link succ) (Link marked_succ) then + if + Atomic_array.unsafe_compare_and_set next level (Link succ) + (Link marked_succ) + then if level = 0 then (* We have finished marking references on the node. To ensure that no references to the node remain, we call [find_node] which will @@ -338,17 +337,18 @@ let rec try_remove t key next level link = function true else let level = level - 1 in - let link = Array.unsafe_get next level in - try_remove t key next level link (Atomic.get link) - else try_remove t key next level link (Atomic.get link) + try_remove t key next level + (Atomic_array.unsafe_fenceless_get next level) + else + try_remove t key next level + (Atomic_array.unsafe_fenceless_get next level) let try_remove t key = match find_node t key with | Null -> false | Node { next; _ } -> - let level = Array.length next - 1 in - let link = Array.unsafe_get next level in - try_remove t key next level link (Atomic.get link) + let level = Atomic_array.length next - 1 in + try_remove t key next level (Atomic_array.unsafe_fenceless_get next level) (* *)