diff --git a/ChangeLog b/ChangeLog index 2fd808341..5d8c19319 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,8 @@ +2025-01-10 Shiro Kawai + + * lib/control/pmap.scm: Make sure all worker threads are joined, + even some of them result uncaught/timeout exceptions. + 2025-01-07 Shiro Kawai * src/libnum.scm (number->string): Allow the first optional argument diff --git a/lib/control/pmap.scm b/lib/control/pmap.scm index a5c5f0f53..02b64456f 100644 --- a/lib/control/pmap.scm +++ b/lib/control/pmap.scm @@ -103,6 +103,31 @@ (define (%start-threads threads) (for-each thread-try-start! threads)) +;; thread-join! may raise . We want to join +;; all the threads, though, so we should wrap thread-join! to keep +;; going while recording the thrown exception, and re-raise it +;; when all threads are joined. +;; NB: There may be multiple exceptions, but we only keep one. It's +;; the same situation that 'map' throws just one error even multiple +;; elements could cause errors. + +(define join-exc (make-parameter #f)) + +(define (%make-joiner r) + (^[thread :optional (timeout #f) (timeout-val #f)] + (guard (e [(terminated-thread-exception? e) r] + [else (join-exc e) '()]) + (thread-join! thread timeout timeout-val)))) + +(define-syntax %with-wrapped-join + (syntax-rules () + [(_ body ...) + (parameterize ([join-exc #f]) + (receive rs (begin body ...) + (if (join-exc) + (raise (join-exc)) + (apply values rs))))])) + ;; ;; sequential-mapper ;; @@ -137,7 +162,8 @@ (make-thread (cut map proc c)))) cols)]) (%start-threads ts) - (append-map thread-join! ts))) + (%with-wrapped-join + (append-map (%make-joiner '()) ts)))) ;; (define (%split-collection coll n) ;; (define qs (list-tabulate n (^_ (make-queue)))) @@ -180,10 +206,11 @@ (cut with-stopper proc c ts)))) cols)]) (%start-threads ts) - (do ([ts ts (cdr ts)] - [r #f (guard (e [( e) r]) - (or (thread-join! (car ts)) r))]) - [(null? ts) r]))) + (%with-wrapped-join + (let1 join! (%make-joiner #f) + (do ([ts ts (cdr ts)] + [r #f (or (join! (car ts)) r)]) + [(null? ts) r]))))) ;; ;; pool mapper @@ -261,12 +288,14 @@ [timeout (absolute-time (~ mapper'timeout))] [timeout-val (~ mapper'timeout-val)]) (%start-threads ts) - (if timeout - ($ map (^r (if (and (pair? r) (eq? (car r) unique)) - (begin (thread-terminate! (cdr r)) timeout-val) - r)) - $ map (^t (thread-join! t timeout (cons unique t))) ts) - (map thread-join! ts)))) + (%with-wrapped-join + (let1 join! (%make-joiner #f) + (if timeout + ($ map (^r (if (and (pair? r) (eq? (car r) unique)) + (begin (thread-terminate! (cdr r)) timeout-val) + r)) + $ map (^t (join! t timeout (cons unique t))) ts) + (map join! ts)))))) (define-method run-select ((mapper ) proc coll) (define signaled (atom #f)) @@ -285,10 +314,11 @@ #f))) (letrec ([ts (map (^e (make-thread (^[] (task e ts)))) coll)]) (%start-threads ts) - (do ([ts ts (cdr ts)] - [r #f (guard (e [( e) r]) - (or (thread-join! (car ts)) r))]) - [(null? ts) r]))) + (%with-wrapped-join + (let1 join! (%make-joiner #f) + (do ([ts ts (cdr ts)] + [r #f (or (join! (car ts)) r)]) + [(null? ts) r]))))) ;; ;; default mapper