Skip to content

Commit

Permalink
Merge pull request #48 from jkool702/forkrun_testing
Browse files Browse the repository at this point in the history
nQueue logic now fully working
  • Loading branch information
jkool702 authored Aug 9, 2024
2 parents 1b900bd + af0a8be commit 44bfc0a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 59 deletions.
4 changes: 2 additions & 2 deletions UNIT_TESTS/forkrun.unit-tests.sorting.new.bash
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

#testDir='/usr'
useRamdiskFlag=true
testDir='/mnt/ramdisk/usr'
#useRamdiskFlag=true

################################################################################

Expand Down
122 changes: 66 additions & 56 deletions forkrun.bash
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ forkrun() {

############################ BEGIN FUNCTION ############################

trap - EXIT INT TERM HUP USR1
trap - EXIT INT TERM HUP USR1 USR2

shopt -s extglob

# make all variables local
local tmpDir fPath outStr delimiterVal delimiterReadStr delimiterRemoveStr exitTrapStr exitTrapStr_kill nLines0 nOrder nProcs nBytes tTimeout coprocSrcCode outCur tmpDirRoot returnVal tmpVar t0 readBytesProg nullDelimiterProg ddQuietStr trailingNullFlag inotifyFlag lseekFlag fallocateFlag nLinesAutoFlag nQueueFlag substituteStringFlag substituteStringIDFlag nOrderFlag readBytesFlag readBytesExactFlag nullDelimiterFlag subshellRunFlag stdinRunFlag pipeReadFlag rmTmpDirFlag exportOrderFlag noFuncFlag unescapeFlag optParseFlag continueFlag doneIndicatorFlag FORCE_allowCarriageReturnsFlag ddAvailableFlag fd_continue fd_inotify fd_inotify0 fd_nAuto fd_nAuto0 fd_nOrder fd_nOrder0 fd_read fd_read0 fd_write fd_stdout fd_stdin fd_stderr pWrite_PID pNotify_PID pOrder_PID pAuto_PID fd_read_pos fd_read_pos_old fd_write_pos DEBUG_FORKRUN
local -i PID0 nLines nLinesCur nLinesNew nLinesMax nRead nWait nOrder0 nBytesRead nQueue nQueueMin nCPU v9 kkMax kkCur kk kkProcs verboseLevel
local tmpDir fPath outStr delimiterVal delimiterReadStr delimiterRemoveStr exitTrapStr exitTrapStr_kill nLines0 nOrder nProcs nBytes tTimeout coprocSrcCode outCur tmpDirRoot returnVal tmpVar t0 readBytesProg nullDelimiterProg ddQuietStr trailingNullFlag inotifyFlag lseekFlag fallocateFlag nLinesAutoFlag nQueueFlag substituteStringFlag substituteStringIDFlag nOrderFlag readBytesFlag readBytesExactFlag nullDelimiterFlag subshellRunFlag stdinRunFlag pipeReadFlag rmTmpDirFlag exportOrderFlag noFuncFlag unescapeFlag optParseFlag continueFlag doneIndicatorFlag FORCE_allowCarriageReturnsFlag ddAvailableFlag fd_continue fd_inotify fd_inotify0 fd_nAuto fd_nAuto0 fd_nOrder fd_nOrder0 fd_read fd_read0 fd_write fd_stdout fd_stdin fd_stderr pWrite pOrder pAuto pQueue pWrite_PID pNotify_PID pOrder_PID pAuto_PID pQueue_PID fd_read_pos fd_read_pos_old fd_write_pos DEBUG_FORKRUN
local -i PID0 nLines nLinesCur nLinesNew nLinesMax nRead nWait nOrder0 nBytesRead nQueue nQueueLast nQueueMin nCPU v9 kkMax kkCur kk kkProcs verboseLevel
local -a A p_PID runCmd outHave

# # # # # PARSE OPTIONS # # # # #
Expand Down Expand Up @@ -376,10 +376,9 @@ forkrun() {
}
}

: "${nQueueFlag:=false}"
{ ${nQueueFlag} && (( ${nQueueMin:-0} > 0 )) && { [[ ${nProcsMax:-0} == '0' ]] || (( ${nProcs} < ${nProcsMax} )); }; } || : "${nQueueFlag:=false}"
: "${nQueueFlag:=false}" "${nQueueMin:=1}"

declare -i nProcs="${nProcs}"
local -i nProcs="${nProcs}"
nCPU="$({ type -a nproc &>/dev/null && nproc; } || { type -a grep &>/dev/null && grep -cE '^processor.*: ' /proc/cpuinfo; } || { mapfile -t tmpA </proc/cpuinfo && tmpA=("${tmpA[@]//processor*/$'\034'}") && tmpA=("${tmpA[@]//!($'\034')/}") && tmpA=("${tmpA[@]//$'\034'/1}") && tmpA="${tmpA[*]}" && tmpA="${tmpA// /}" && echo ${#tmpA}; } || printf '8')";
{ [[ ${nProcs} ]] && (( ${nProcs:-0} > 0 )); } || { ${nQueueFlag} && nProcs=$(( ${nCPU} / 2 )) || nProcs=${nCPU}; }

Expand All @@ -388,6 +387,8 @@ forkrun() {
[[ ${nQueueMin//0/} ]] || nQueueMin=1
}

{ ${nQueueFlag} && (( ${nQueueMin:-0} > 0 )) && { [[ ${nProcsMax:-0} == '0' ]] || (( ${nProcs} < ${nProcsMax} )); }; } || : "${nQueueFlag:=false}"

# if reading 1 line at a time (and not automatically adjusting it) skip saving the data in a tmpfile and read directly from stdin pipe
${nLinesAutoFlag} || { [[ ${nLines} == 1 ]] && : "${pipeReadFlag:=true}"; }

Expand Down Expand Up @@ -693,7 +694,7 @@ kill -USR1 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) 2>/dev/null; '$
trap 'trap - TERM INT HUP USR1; kill -TERM '"${PID0}"' ${BASHPID}' TERM
trap 'trap - TERM INT HUP USR1; kill -HUP '"${PID0}"' ${BASHPID}' HUP
trap 'trap - TERM INT HUP USR1' USR1
inotifywait -q -m --format '' "${fPath}" >&${fd_inotify0} &
inotifywait -q -m -e modify,close --format '' "${fPath}" >&${fd_inotify0} &
printf '%s\n' "${!}" >"${tmpDir}"/.run/pNotify
)

Expand Down Expand Up @@ -746,7 +747,7 @@ trap 'trap - TERM INT HUP USR1' USR1
while true; do"""
${nLinesAutoFlag} && echo "\${nLinesAutoFlag} && read -r <\"${tmpDir}\"/.nLines && [[ \${REPLY} == +([0-9]) ]] && nLinesCur=\${REPLY}"
${nQueueFlag} && echo "printf '\n' >&${fd_nQueue}"
${nQueueFlag} && echo "printf '%s' '+' >&${fd_nQueue}"
echo """
read -u ${fd_continue}
[[ -f \"${tmpDir}\"/.quit ]] && {
Expand Down Expand Up @@ -847,7 +848,7 @@ else
echo "[[ \"\${REPLY}\" == ${delimiterVal} ]] || {"
fi
elif ${nullDelimiterFlag}; then
echo """
echo """
read -r fd_read_pos </proc/self/fdinfo/${fd_read}"""
case "${nullDelimiterProg}" in
'dd') echo """
Expand Down Expand Up @@ -883,31 +884,31 @@ else
echo """
until read -r -u ${fd_read} ${delimiterReadStr}; do
A[-1]+=\"\${REPLY}\";
done
A[-1]+=\"\${REPLY}\"${delimiterVal}"""
(( ${verboseLevel} > 2 )) && echo "echo \"Partial read fixed to: \${A[-1]}\" >&${fd_stderr}"
done"""
printf '%s' "A[-1]+=\"\${REPLY}\""
${lseekFlag} && printf '\n' || printf '%s\n' "${delimiterVal}"
(( ${verboseLevel} > 2 )) && echo "echo \"Partial read fixed to: \${A[-1]}\" >&${fd_stderr}"
echo "}"
}
fi
${pipeReadFlag} || { ${nullDelimiterFlag} && [[ -z ${nullDelimiterProg} ]]; } || ${readBytesFlag} || echo "}"
${nOrderFlag} && echo "read -u ${fd_nOrder} nOrder"
echo """
printf '\\n' >&${fd_continue}"""
${nQueueFlag} && echo "echo '-' >&${fd_nQueue}"
${nQueueFlag} && echo "printf '%s' '-' >&${fd_nQueue}"
echo """
[[ \${#A[@]} == 0 ]] && {
\${doneIndicatorFlag} || {
[[ -f \"${tmpDir}\"/.done ]] && {
read -r fd_read_pos </proc/self/fdinfo/${fd_read}
read -r fd_write_pos </proc/self/fdinfo/${fd_write}
[[ \"\${fd_read_pos##*$'\t'}\" == \"\${fd_write_pos##*$'\t'}\" ]] && doneIndicatorFlag=true
}
}"""
echo """
\${doneIndicatorFlag} || {
[[ -f \"${tmpDir}\"/.done ]] && {
read -r fd_read_pos </proc/self/fdinfo/${fd_read}
read -r fd_write_pos </proc/self/fdinfo/${fd_write}
[[ \"\${fd_read_pos##*$'\t'}\" == \"\${fd_write_pos##*$'\t'}\" ]] && doneIndicatorFlag=true
}
}
if \${doneIndicatorFlag} || [[ -f \"${tmpDir}\"/.quit ]]; then"""
${nLinesAutoFlag} && echo "printf '\\n' >&\${fd_nAuto0}"
${nOrderFlag} && echo ": >\"${tmpDir}\"/.out/.quit{<#>}"
${nQueueFlag} && echo "echo 0 >&${fd_nQueue}"
${nQueueFlag} && echo "\printf '%s' '0' >&${fd_nQueue}"
${inotifyFlag} && echo 'kill -9 '"${pNotify_PID}"' 2>/dev/null'
echo """
: >\"${tmpDir}\"/.quit
Expand Down Expand Up @@ -996,31 +997,41 @@ p_PID+=(\${p{<#>}_PID})""" )"
# start spawning after nProcs workers already forked
kkProcs=${nProcs}

# first read will always be adding 1 to the queue
nQueue=1
read -r -u ${fd_nQueue}

until [[ -f "${tmpDir}"/.quit ]] || { [[ -f "${tmpDir}"/.done ]] && (( ${nQueue} >= ${nQueueMin} )); }; do
# read from fd_queue pipe.
# $'\n' --> increase queue depth by 1.
# '-' --> decrease queue depth by 1.
# '0' --> quit
read -r -u ${fd_nQueue} -t 0.1 || continue
[[ ${REPLY} == '0' ]] && break
nQueue+=${REPLY}1

# dont trigger spawning more workers until the main thread is done spawning the initial $nProcs workers

[[ -f "${tmpDir}"/.spawned ]] && (( ${nQueue} < ${nQueueMin} )) && {
source /proc/self/fd/0 <<<"${coprocSrcCode//'{<#>}'/"${kkProcs}"}"
(( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC (read queue depth = %s)\n' "${nQueue}" >&${fd_stderr}
((kkProcs++))
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
(( ${kkProcs} >= ${nProcsMax} )) && break
}
p_PID=()

nQueue=0

until [[ -f "${tmpDir}"/.quit ]] || (( ${kkProcs} >= ${nProcsMax} )); do
nQueueLast=${nQueue}

# read from fd_queue pipe.
# '+' --> increase queue depth by 1.
# '-' --> decrease queue depth by 1.
# '0' --> quit
read -r -u ${fd_nQueue} -N 1

case "${REPLY}" in
'+') ((nQueue++)) ;;
'-') ((nQueue--)) ;;
0) break ;;
*) continue ;;
esac

#(( ${verboseLevel} > 3 )) && { printf '\nnQueue = %s (nProcs = %s)\n' "${nQueue}" "${kkProcs}"; cat /proc/self/schedstat; } >&${fd_stderr}

# dont trigger spawning more workers until the main thread is done spawning the initial $nProcs workers

[[ -f "${tmpDir}"/.spawned ]] && (( ( ${nQueue} + ${nQueueLast} ) < ( 2 * ${nQueueMin:=1} ) )) && {
source /proc/self/fd/0 <<<"${coprocSrcCode//'{<#>}'/"${kkProcs}"}"
(( ${verboseLevel} > 2 )) && printf '\nSPAWNING A NEW WORKER COPROC. There are now %s coprocs. (read queue depth = %s)\n' "${nQueue}" "${kkProcs}" >&${fd_stderr}
((kkProcs++))
echo "${kkProcs}" >"${tmpDir}"/.nWorkers
}

done

[[ ${#p_PID[@]} == 0 ]] || wait -f "${p_PID[@]}"

} 2>&${fd_stderr}
} 2>/dev/null

Expand All @@ -1034,7 +1045,7 @@ p_PID+=(\${p{<#>}_PID})""" )"
# if ordering output print the remaining ones in trap
${nOrderFlag} && exitTrapStr+='cat </dev/null "'"${tmpDir}"'"/.out/x* >&'"${fd_stdout}"'; '$'\n'

# make sure asll rocesses are dead
# make sure all processes are dead
exitTrapStr+='kill $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) 2>/dev/null;
kill -9 '"${exitTrapStr_kill}"' 2>/dev/null;
kill -9 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) 2>/dev/null; '$'\n'
Expand All @@ -1050,21 +1061,20 @@ p_PID+=(\${p{<#>}_PID})""" )"
trap 'trap - TERM INT HUP USR1;
returnVal=1;
kill -USR1 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null);
kill -INT $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PIDO}" INT
kill -INT $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PID0}" INT

trap 'trap - TERM INT HUP USR1;
returnVal=1;
kill -USR1 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null);
kill -TERM $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PIDO}" TERM
kill -TERM $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PID0}" TERM

trap 'trap - TERM INT HUP USR1;
returnVal=1;
kill -USR1 $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null);
kill -HUP $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PIDO}" HUP
kill -HUP $(cat </dev/null "'"${tmpDir}"'"/.run/p* 2>/dev/null) '"${PID0}" HUP

#trap 'source /proc/self/fd/0 <<<"${coprocSrcCode//'"'"'{<#>}'"'"'/"${kkProcs}"}"' USR2

#trap 'kill "${p_PID[@]}" 2>/dev/null'$'\n''returnVal=1'$'\n''return 1' INT TERM HUP
#${nQueueFlag} && trap '${coprocSrcCode//'"'"'{<#>}'"'"'/"${kkProcs}"}' USR2

(( ${verboseLevel} > 1 )) && printf '\n\nALL HELPER COPROCS FORKED\n\n' >&${fd_stderr}
(( ${verboseLevel} > 3 )) && { printf '\nSET TRAPS:\n\n'; trap -p; } >&${fd_stderr}

Expand Down Expand Up @@ -1149,13 +1159,13 @@ p_PID+=(\${p{<#>}_PID})""" )"
(( ${verboseLevel} > 1 )) && printf '\n\nWAITING FOR WORKER COPROCS TO FINISH\n\n' >&${fd_stderr}
#p_PID=($(_forkrun_rmdups "${p_PID[@]}" $(cat </dev/null "${tmpDir}"/.run/p[0-9]* 2>/dev/null)))
p_PID+=($(cat </dev/null "${tmpDir}"/.run/p[0-9]* 2>/dev/null))
wait "${p_PID[@]}" 2>/dev/null;
wait "${p_PID[@]}" "${pQueue_PID}" &>/dev/null;

# print final nLines count
(( ${verboseLevel} > 1 )) && {
${nLinesAutoFlag} && printf 'nLines (final) = %s ( max = %s )\n' "$(<"${tmpDir}"/.nLines)" "${nLinesMax}"
${nLinesAutoFlag} && printf 'nLines (final) = %s ( max = %s )\n' "$(<"${tmpDir}"/.nLines)" "${nLinesMax}"
${nQueueFlag} && printf 'final worker process count: %s ( min read queue: %s )\n' "$(<"${tmpDir}"/.nWorkers)" "${nQueueMin}"
} >&${fd_stderr}
} >&${fd_stderr}

# open anonymous pipes + other misc file descriptors for the above code block
) {fd_continue}<><(:) {fd_inotify}<><(:) {fd_nAuto}<><(:) {fd_nOrder}<><(:) {fd_nOrder0}<><(:) {fd_nQueue}<><(:) {fd_read0}<"${fPath}" {fd_read}<"${fPath}" {fd_write}>"${fPath}" {fd_stdin}<&0 {fd_stdout}>&1 {fd_stderr}>&2
Expand Down Expand Up @@ -1615,7 +1625,7 @@ EOF

}

forkrun_lseek_setup() {
_forkrun_lseek_setup() {
## sets up a "lseek" bash builtin for x86_64 machines
local lseekPreFlag=false

Expand Down Expand Up @@ -1670,4 +1680,4 @@ forkrun_lseek_setup() {
esac
}

forkrun_lseek_setup
_forkrun_lseek_setup
2 changes: 1 addition & 1 deletion hyperfine_benchmark/forkrun.speedtest.hyperfine.bash
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ C0[3]='cat "'"${hfdir0}"'"/file_lists/f${kk} | '
C1[3]=' | wc -l'

C0[4]=''
C1[4]=' <"'"${hfdir0}"'"/file_lists/f${kk} | >/dev/null'
C1[4]=' <"'"${hfdir0}"'"/file_lists/f${kk} >/dev/null'

C0[5]='cat "'"${hfdir0}"'"/file_lists/f${kk} | '
C1[5]=' >/dev/null'
Expand Down

0 comments on commit 44bfc0a

Please sign in to comment.