Skip to content

Latest commit

 

History

History
480 lines (353 loc) · 17.7 KB

09.md

File metadata and controls

480 lines (353 loc) · 17.7 KB

並行與併發

建構軟體設計有兩種方式: 一種是簡單明顯地沒有缺陷,另一種則是複雜到沒有明顯的缺陷。

— 東尼•霍爾

現代計算機系統走向多核,爲了運用多核心的能力,開始利用程式語言或作業系統提供的執行緒及處理序,將任務切割後同時處理。而不同任務間如果有共同的資源需要維護,增加了編寫程式人員的負擔。

一般的程式語言會使用鎖或是互斥器,避免不同程式同時存取相同資源,但是一旦使用不當便會發生死鎖或競爭條件等問題。而不同程式之間共享同一資源,更有可能造成兩邊資訊更新不一致。

Clojure 中的不變性 (Immutable) 與持久存在 (Persistent),讓資料結構一旦建立就無法再更改並保持一定的效能,降低開發者面對錯綜複雜更新的風險。而設計巧妙的狀態管理,更減輕了開發者漸強的偏頭痛。

先來看看 Clojure 如何支援並行 (Parallelism)。

並行

並行 (Parallelism) 指的是同時有不同的程式分別去做各自任務,任務完成之後,將結果彙整起來。如果運行的環境具備多核心的能力,則任務便可以在不同的核心上執行,完成的時間將會減少。

既然 Clojure 建基在 Java 之上,自然可以使用 Java 中的執行緒類別,又由於 Clojure 中的函式實作了 Callable 與 Runnable 介面,使用起來自是容易許多:

(.start
 (Thread.
  (fn []
    (Thread/sleep 3000)
    (println "Thread ends."))))
;; => nil
;; 等待三秒
;; => Thread ends.

Future

Clojure 提供了更簡便的方式讓你將函式置於執行緒中運行。使用 future 將欲完成的任務置於另一個執行緒中運行,呼叫 future 會返回 Future 物件並開始運行任務。

當任務完成後返回值則存放在 Future 物件之中,取得返回值則使用標的函式 deref 或小老鼠符號 @ 於 Future 物件,即可取得任務執行後的結果。如任務尚未完成欲取得返回值,則會等待其計算完畢:

(def f (future (Thread/sleep 10000) (println "done") 100))
@f ;; 若在十秒內,此行將會停住等待計算完畢
;; => 100

你可以使用 future-done? 檢查一個 Future 物件是否完成運行:

(def f (future (Thread/sleep 10000) (println "done") 100))
(future-done? f) ;; 十秒內運行
;; => false
;; 十秒後
(future-done? f)
;; => true

使用 future-cancel 則會將一個已經開始運行的 Future 物件終止,如果對一個已經被強制終止的 Future 物件取用標的 (deref),會拋出例外:

(def f (future (Thread/sleep 10000) (println "done") 100))
(future-cancel f)
;; => true
@f
;; => CancellationException

Promise

利用 promise 建立的 Promise 物件則是與呼叫者建立約定,結果計算完畢之後會將它發送給持有 Promise 物件者。使用 promise 建立 Promise 物件、使用 deliver 傳送結果給 Promise 物件:

(def answer (promise))
(future (Thread/sleep 10000) (deliver answer 42))
;; => #future[{:status :pending, :val nil} 0x2b9dc292]
@answer ;; 十秒內運行的話,此行會停住
;; => 42

與 Future 一樣,若 Promise 物件尚未接收到結果,取用標的 Promise 物件將會停住等待結果送到。你可以使用 realized? 於 Promise 物件上,來取得結果是否已送達:

(def p (promise))
(realized? p)
;; => false
(deliver p :done)
(realized? p)
;; => true

pmap

先前的章節已經看過的 map 函式,功能是將群集的各個元素套用到函式之中,產生新的群集。如果被套用的函式需要長時間的運算,等待所有元素都計算完畢就耗時過久。

pmap 函式 (Parallel map) 提供升級的 map 功能,將每個元素的運算分給不同的執行緒,所有元素計算完畢再彙整起來,如果有夠多的計算核心,完成的時間越縮短。

以下的範例中有一個模擬運行耗時十秒的運算,分別套用到有十個元素的群集,使用 pmap 比起 map 效能提升顯著:

(def data [2 4 6 8 10 12 14 16 18 20])
(defn long-computaion [n]
  (Thread/sleep 10000)
  (* n 2))

(time (dorun (map long-computaion data)))
;; => "Elapsed time: 100031.777566 msecs"
(time (dorun (pmap long-computaion data)))
;; => "Elapsed time: 10027.629015 msecs"

可以看到以上範例中,原來的 map 版本以約略於 100 秒的時間完成,而進化的 pmap 版本由於受益於並行化,以近似 10 秒的時間完成。

範例中使用 dorun 強制對 map 返回的惰性序列求值,並以 time 函式計算運行花費的時間。

另外還有 pvalues 以及 pcalls 分別並行地對多個運算式求值,以及呼叫多個函式:

(pvalues (+ 3 2) (/ 2 3) (* 3 2) (- 32 23))
;; => (5 2/3 6 9)
(pcalls #(println "A long time ago in a galaxy far,") #(println "far away") #(println "...."))
;; => A long time ago in a galaxy far,
;; => far away …
;; => (nil nil nil).

Reducer

核心函式庫中的 mapfilterreduce (它還有另外一個名字:fold) 的作用是將一個群集轉換成另一個群集,雖然返回的是惰性序列仍然需要有創建的成本。

不同於核心函式庫的 reducer 函式庫,轉換的則不是資料結構,而是函式。不需要在一連串函式的轉換過程中創建暫時性的序列,而只是轉換運行的函式,此舉將會大大地增加效能。

clojure.core.reducer 函式中的 mapfilter 函式並不回傳惰性序列,而是傳回屆時可以做化約 (reducible) 的函式,稱爲 reducer

其中使用了 Java 7 中用以執行並行任務的框架:Fork/Join,將一連串的計算函式並行處理,減少處理時間。以下是典型的 map 與使用 reducer 後的各別效能評比:

(require '[clojure.core.reducers :as r])

(defn old-reduce [nums]
  (reduce + (filter even? (map inc nums))))

(defn new-fold [nums]
  (r/fold + (r/filter even? (r/map inc nums))))

(time (old-reduce (vec (range 1000000))))
;; => "Elapsed time: 136.409418 msecs"
;; => 250000500000
(time (new-fold (vec (range 1000000))))
;; => "Elapsed time: 96.708929 msecs"
;; => 250000500000

狀態管理與併發

併發 (Concurrency) 是指同時有數個執行單元會交互執行,通常會牽涉一些共享的資源以及互相協作。

其實 Clojure 並沒有提供併發相關的函式或巨集,它提供了經過妥善設計的狀態管理方法,讓不同執行單元之間共享資源更容易管理且不易出錯。

在介紹狀態管理方法之前,先來了解 Clojure 對於事物的世界觀。

身份與狀態

在 Clojure 世界中,一件事物分成身份 (Identity) 與狀態 (State),在時間的長河裡,每件事物在不同的時間有不同的狀態,狀態以值表示,由於值在 Clojure 世界中具有不變性,因此無法對值進行改變。如果要取得事物的狀態,則必須透過身份來取得,但是取得的狀態只是某個時間中狀態的快照 (Snapshot)。

例如一位名爲 Catherine 的使用者,20 歲時剛畢業開始工作,30 歲時結婚,不同的時間有不同的狀態,但是都是同一個身份。如果在傳統的程式語言,身份與狀態是含混不清的:

catherine.age = 20;
catherine.graduated = true;
;; 時間經過十年...
catherine.age += 10;
catherine.married = true;

上面的範例中,一個使用者類型既是代表某一種身份,更混雜了狀態。在 Clojure 中,狀態儲存在四種參考類型中,透過函式取得其中的狀態,新的狀態也是經由函式加上舊的狀態產生而成。而新的狀態在新的時間中,並不會影響其它時間的狀態。

若是有人在 20 秒前取得某個身份的狀態,10 秒後這個身份改變了狀態,之前取得的狀態並不會改變,保證了時間軸上狀態的一致。

參考類型

Clojure 使用四種類型管理狀態,這些類型稱爲參考類型 (Reference type)。四種參考類型分別又隸屬於兩種分類:協作式 (Coordinated) 與同步式 (Synchronous),協作式指的是不同狀態更新時需要協調合作,同步式則是指在更新狀態前有可能會被阻攔或停滯,因爲其他部分正在更新,所以必須等待。

以下以圖形表示參考類型所屬的分類:

;;       | Coordinated | Uncoordinated
;; ------|-------------|--------------
;; Sync  |    Ref      |    Atom
;; Async |    N/A      |    Agent

Ref

Clojure 使用了軟體事務存儲 (Software Transactional Memory,之後簡稱 STM) 模型,來處理併發與狀態管理。STM 類似於資料庫,只是它存在於記憶體之中,僅能保證 ACID 中的三種:不可分割性 (Atomicity)、一致性 (Consistency) 與隔離性 (isolation),並不保證持久性 (Durability)。

在 Clojure 中,一旦進入改變狀態的事務交易 (Transaction) 環境裡,如果其中的變化有一個不成功,則會退出視爲失敗。而其它存取狀態的執行單元並不知道交易的狀態,只會看到交易前的情形。

而 Ref 參考類型就是 Clojure 根據 STM 的實作,透過 ref 函式創建內含有狀態的 Ref 類型:

(def account (ref 0))
;; => #'user/account

要取得參考類型的值,都是使用 deref 函式或小老鼠符號 @

(deref account)
;; => 0
(+ 5 @account)
;; => 5

你可以使用 ref-set 更新 Ref 狀態:

(ref-set account 500)
;; => IllegalStateException No transaction running

以上範例出現的例外告訴我們,更新 Ref 狀態必須在交易 (Transaction) 中進行。建立可以安心運作的交易環境使用 dosync

(dosync
 (ref-set account 500))
;; => 500
@account
;; => 500

現在 Ref 已更新狀態。除了 ref-set 之外,還提供以函式方式更新狀態的方法:

(dosync
 (alter account + 500))
;; => 1000
@account
;; => 1000

alter 的第一個參數是 Ref 類型,之後則是函式與準備帶給函式的參數,函式將會以下列的方式呼叫:

(apply fun value-in-ref args)

dosync 創造的交易環境確保在其中進行更新的多個 Ref,在交易完成之後將會同步更新。如果在交易之中,其中一個 Ref 的狀態已經被外部改變,整個交易便會重啓,利用新的狀態再進行改變。

(def debit (ref 100000))
(def account (ref 1000))
(dosync
 (alter debit - 1500)
 (alter account + 1500))
;; => 2500
@debit
;; => 98500
@account
;; => 2500

如果不想因爲交易內容改變而重啓,而且交易中的執行內容不會因爲順序改變而不同,則可以考慮 commute 函式:

(dosync
 (commute debit + 500)
 (commute account + 500))
;; => 3000
@debit
;; => 99000
@account
;; => 3000

將 Ref 添加驗證器 (Validator) 函式會確保在更新狀態的時候,符合驗證器函式的內容,若不符合則會返回之前的狀態:

(defn validate-account
  [state]
  (not (neg? state)))

(def bank-account (ref 1000 :validator validate-account))
(dosync
 (alter bank-account - 1500))
;; => IllegalStateException Invalid reference state
@bank-account
;; => 1000

Atom

原子類型 (Atom) 與 Ref 類型一樣都是屬於同步式:更新原子類型時必須等候先前的狀態更新完成,但是原子類型則不是協作式的,亦即無法同時更新兩個以上的原子類型,每個原子類型的更新都是與其他原子隔離的。

使用 atom 函式創造內含有狀態的原子類型,取得原子類型的值,也是使用 deref 函式或小老鼠符號 @

(def x (atom 10))
;; => #'user/x
@x
;; => 10

你可以使用 reset! 更新原子中的狀態:

(reset! x 20)
;; => 20
@x
;; => 20

也可以透過類似 alter 函式的 swap!,以更新函式來更新原子類型中的狀態:

(def catherine (atom {:name "Catherine" :age 18 :graduated? false}))
;; => #'user/catherine
(swap! catherine update-in [:age] + 2)
;; => {:name "Catherine", :age 20, :graduated? false}
(swap! catherine update-in [:graduate?] not)
;; => {:name "Catherine", :age 20, :graduated? true}
@catherine
;; => {:name "Catherine", :age 20, :graduated? true}

以上範例 swap!update-in 函式更新原子類型中的映射,它會以下面的呼叫方式更新映射:

(update-in @catherine [:age] + 2)

Clojure 提供了一個更新原子類型的函式,可以在更新之前先檢查狀態是否和預期的相等,若相等則更新至新值,否則不做改變:

(def x (atom 10))
(compare-and-set! x 20 30)
;; => false
@x
;; => 10
(compare-and-set! x 10 20)
;; => true
@x
;; => 20

原子類型也可以像 Ref 類型一樣添加驗證器,驗證欲改變的狀態是否符合驗證規則:

(def x (atom 100 :validator pos?))
(swap! x + 500)
;; => 600
(swap! x - 700)
;; => IllegalStateException Invalid reference state

除了驗證器之外,還可以對四種參考類型添加觀察者函式 (Watch function),一旦更新了參考類型的狀態,觀察者函式便會被呼叫。使用 add-watch 函式添加觀察者:

(add-watch x :echo
           (fn [key ref old new]
             (println "Key:" key)
             (println "Reference:" ref)
             (println "Old:" old)
             (println "New:" new)))
;; => #atom[600 0x1bce3aea]

add-watch 第一個參數是準備觀察的參考類型,第二個參數則是代表新的觀察者的關鍵字標識,你可以使用這個關鍵字來參照到新的觀察者函式。

最後一個參數是欲添加的觀察者函式,觀察者函式必須接受四個參數:觀察者的關鍵字標識、觀察的參考類型、舊狀態、新狀態。

觀察者函式加上去之後,狀態一旦改變,觀察者函式便會被呼叫:

(reset! x 300)
;; => Key: :echo
;; => Reference: #atom[300 0x1bce3aea]
;; => Old: 600
;; => New: 300
;; => 300

觀察完畢後,透過觀察者的關鍵字標識與函式 remove-watch,可以把觀察者函式從參考類型中移除:

(remove-watch x :echo)
;; => #atom[300 0x1bce3aea]
(reset! x 500)
;; => 500

從以上範例可以看到,觀察者函式移除之後,更新狀態就不會出現觀察者函式的訊息了。

Agent

有別於 Ref 與原子類型的協調式與同步式,Agent 類型狀態的更新不需與其他狀態更新協同合作,也不需等候其他更新完成。通常用在需要更新狀態,卻不需要關心更新後的結果。

使用 atom 函式創造內含有狀態的 Agent 類型,取得 Agent 類型的值,也是使用 deref 函式或小老鼠符號 @

(def a (agent 5))
;; =>#'user/a
@a
;; => 5

可以使用 send 函式更新 Agent 類型中的狀態,使用方法與 alter 以及 swap! 類似:

(send a + 100)
;; => #agent[{:status :ready, :val 105} 0x11fdbaa4]
@a
;; => 105

不同於 Ref 與原子類型更新函式的返回值,send 返回的是狀態更新後的 Agent 類型。

另外還有 send-off 提供與 send 函式同樣的使用方法:

(send-off a + 150)
;; => #agent[{:status :ready, :val 255} 0x11fdbaa4]
@a
255

Agent 類型會將更新狀態的動作放入佇列中,啓動執行緒逐個處理。send-offsend 提出的更新動作分別放入不同的佇列,分別由不同的執行緒集區 (Thread pool) 中的執行緒處理佇列,處理 send-off 動作佇列的執行緒集區會依據需要擴增,而處理 send 動作佇列的執行緒集區則是固定大小。

因此建議使用 send-off 處理會阻塞的操作,例如 IO。以免被過多的操作卡住無法從執行緒集區中分配新的執行緒來處理。

Var

Var 物件已經是我們非常熟悉的參考類型。使用 def 創建內含資料的 Var 物件:

(def xx 1)

建立了 Var 物件之後,Var 物件被賦予的值被稱爲根繫結 (root binding),它是全域可見的,所有的執行緒都可以取得:

(.start
 (Thread.
  (fn [] (println "xx:" xx))))
;; => xx: 1

若在創建 Var 物件時加上特殊的詮釋資料 (Metadata),便可以動態改變 Var 物件的內容:

(def ^:dynamic *xx* 111)
(binding [*xx* 222]
  *xx*)
;; => 222
*xx*
;; => 111

以上範例使用 binding 在其本體運算式內重新繫結了 Var 物件,因此在 binding 作用區之內,Var 物件的內容值改變了,一旦離開作用區域,Var 物件又回到根繫結的值。

使用 alter-var-root 可以將 Var 物件的根繫結換成新的值:

(.start
 (Thread.
  (fn []
    (alter-var-root (var *xx*) (fn [_] 333))
    (println "*xx* is now" *xx*))))
;; => *xx* is now 333
*xx*
;; => 333

alter-var-root 接受一個將被改變根繫結的 Var 物件,以及一個函式,該函式的返回值便是此 Var 物件根繫結的新值。

Clojure 不建議在並行或併發的環境下使用 Var 物件做資源的共享,因爲各執行緒之間無法做好協調。執行緒 A 中取得的值,可能在執行緒 B 中已經做了修改。

回顧

經由本篇文章,你學會了 Clojure 支援並行的方式,以及與併發之間的差別。還知道了如何以並行方式更快完成工作,也學會了如何使用四種參考型別來管理狀態,以及他們之間的特色與差別。

還不賴吧?今天就先到這裡,下一篇文章再見囉!