From 097d4e0ff92a5724ad7f9324e1654fe7c3ac60fc Mon Sep 17 00:00:00 2001 From: Cesar Douady Date: Thu, 4 Apr 2024 00:06:40 +0200 Subject: [PATCH] support LMAKE_VIDEO env variable + improve dead-lock avoidance + bug fixes --- Makefile | 2 +- README.md | 40 +++++++--- TO_DO | 6 ++ doc/lmake.texi | 2 + src/autodep/gather.cc | 90 +++++++++++---------- src/autodep/gather.hh | 20 ++--- src/autodep/ld_common.x.cc | 10 +-- src/autodep/ld_server.cc | 4 +- src/autodep/ld_server.hh | 38 ++++----- src/autodep/syscall_tab.cc | 11 ++- src/client.cc | 25 +++--- src/hash.cc | 10 +-- src/hash.hh | 14 ++-- src/job_exec.cc | 65 ++++++--------- src/lmakeserver/backend.cc | 58 +++++++------- src/lmakeserver/backend.x.hh | 30 +++---- src/lmakeserver/backends/generic.hh | 119 ++++++++++++++-------------- src/lmakeserver/backends/slurm.cc | 3 +- src/lmakeserver/job.cc | 31 ++++---- src/lmakeserver/job.x.hh | 2 +- src/lmakeserver/makefiles.cc | 9 ++- src/lmakeserver/node.cc | 2 +- src/lmakeserver/node.x.hh | 22 +++-- src/lmakeserver/req.cc | 26 +++--- src/lmakeserver/req.x.hh | 12 +-- src/lmakeserver/rule.cc | 4 +- src/lmakeserver/rule.x.hh | 13 +-- src/lmakeserver/store.x.hh | 6 +- src/py.cc | 2 +- src/py.hh | 15 ++-- src/rpc_job.cc | 2 +- src/store/file.hh | 34 ++++---- src/store/store_utils.hh | 4 +- src/thread.hh | 19 ++--- src/time.hh | 4 +- src/trace.cc | 10 +-- src/trace.hh | 12 +-- src/utils.cc | 6 ++ src/utils.hh | 102 ++++++++++++++++++++++-- 39 files changed, 506 insertions(+), 378 deletions(-) diff --git a/Makefile b/Makefile index a1d42f87..ee0a4b35 100644 --- a/Makefile +++ b/Makefile @@ -101,8 +101,8 @@ $(error cannot find c compiler) else ifneq ($(findstring gcc,$(CC)),) ifeq ($(intcmp $(shell $(CC) -dumpversion),11,lt,eq,gt),lt) $(error gcc version must be at least 11) -USE_GCC := 1 endif +USE_GCC := 1 else ifneq ($(findstring clang,$(CC)),) ifeq ($(intcmp $(shell $(CC) -dumpversion),15,lt,eq,gt),lt) $(error clang version must be at least 15) diff --git a/README.md b/README.md index 5ed71592..a591f899 100644 --- a/README.md +++ b/README.md @@ -86,7 +86,7 @@ it has been tested with the dockers listed in the docker directory - [dD] controls -DNDEBUG : D passes it to enable asserts, d does not. - [tT] controls -DNO_TRACE : T does not pass it to enable traces, t passes it. - the -j flag of make is automatically set to the number of processors, you may want to override this, though - - it is up to you to provide a suitable LD_LIBRARY_PATH value. + - it is up to you to provide a suitable LD\_LIBRARY\_PATH value. it will be transferred as a default value for rules, to the extent it is necessary to provide lmake semantic (mostly this means we keep what is necessary to run python). - if you modify these variables, you should execute git clean as make will not detect such modifications automatically. @@ -112,16 +112,17 @@ it has been tested with the dockers listed in the docker directory - `g_` : global - `t_` : thread local - `np_` : non-portable +- `::` : standard library or a few exceptions defined in src/utils.hh which, in my mind, should have been part of the STL, e.g. ::vector\_view (analogous to ::string\_view) Names are suffixed with \_ if needed to suppress ambiguities ## abbreviations - general rules : - - words are abbreviated depending on their use and span : the more the span is short and the usage is heavy, the more they are abbreviated + - words are abbreviated depending on their use and span : the shorter the span and the heavier the usage , the more they are abbreviated - words may be abbreviated by their beginning, such as env for environ - words may be abbreviated using only consons such as src for source - these may be combined as in dst for destination - - words may further be abbreviated to a single letter when name spans no more than a few lines + - words may further be abbreviated to a single letter or by the first letter of each word (e.g. tf for target flag) when name spans no more than a few lines - words include standard name such as syscall names or libc functions - special cases : @@ -139,9 +140,9 @@ Names are suffixed with \_ if needed to suppress ambiguities
## layout -- lines are limited to 200 characters +- lines are limited to 200 characters (as is this document) - functions are limited to 100 lines : - - there are a few exceptions, though, where it was impossible to cut without making too artificial a sub-function + - there are few exceptions, though, where it was impossible to cut without making too artificial a sub-function - generally speaking code is put on a single line when several lines are similar and alignment helps readability - separators (such as commas, operators, parentheses, ...) pertaining to the same expression are at the same indentation level - and subexpressions are at the next indentation level if on one or several lines by themselves @@ -151,10 +152,10 @@ Names are suffixed with \_ if needed to suppress ambiguities - example : a = make_a( - my_first_coef [ 0] * my_first_data // note alignment makes expression structure appearing immediately - + my_second_coef[i ] * my_second_data // note + at identation level 3, subexrpession at indentation level 4 - + my_third_coef [i*2] * my_third_data // note following comment means this one is repeated - + my_foorth_coef[i*3] * my_foorth_data // . + my_first_coef [ 0] * my_first_data // note alignment makes expression structure appearing immediately + + my_second_coef[i ] * my_second_data // note + at identation level 3, subexrpession at indentation level 4 + + my_third_coef [i*2] * my_third_data // note following comment means this one is repeated + + my_foorth_coef[i*3] * my_foorth_data // . ) ; ## invariants are either @@ -255,16 +256,26 @@ When there is a choice between "if (cond) branch1 else branch2" and "if (!cond) - vectors - prefix-tree - red-black tree (not used in lmake, could be suppressed) +- the prefix tree is mostly used to store file and job names + - only a 32 bits id is used in most of the code + - id's (the Node and Job objects are very light objects containing only the id) can find their name with the name() method + - names can find their associated id by just constructing the Name or Job object with the name as sole argument + - overall, this is extremely efficient and fast + - need about 20-40 bytes per file, independently of the name length which is often above 200 + - building the name string from the tree is marginally slower than a simple copy and the id mechanism makes this need the exception rather than the rule - this makes booting extremely fast, suppressing the need to keep a live daemon -- persistent states are associated to Rule, Job, Node but not Req +- persistent states are associated with Rule's, Job's, Node's but not Req's ## traces - when lmake is executed, a trace of activity is generated for debug purpose - this is true for all executables (lmake, lmakeserver, autodep, ...) - traces are located in : - `LMAKE/lmake/local_admin/trace/` - - for lmakeserver, the most important trace, an history of a few last execution is kept + - for lmakeserver, the most important trace, an history of the last few executions is kept - `LMAKE/lmake/remote_admin/job_trace/` for remote job execution +- the first character of each line is either ' or "" + - this is because the trace file is managed as a circular buffer for performance + - so each time we wrap around, this first character is toggled between ' and " - trace entries are timestamped and a letter indicates the thread : - '=' refers to the main thread - in server : @@ -283,6 +294,13 @@ When there is a choice between "if (cond) branch1 else branch2" and "if (!cond) - : compute crc - in lmake : - I : manage ^C +- trace records are indented to reflect the call graph + - indentation are done with tabs, preceded by a follow up character (chosen to be graphically light), this eases the reading + - when a function is entered, a * replaces the follow up character +- to add a trace record : + - in a function that already has a Trace variable, just call the variable with the info you want to trace + - else, declare a variable of type Trace. The first argument is a title that will be repeated in all records using the same trace object + - all Trace objects created while this one is alive will be indented, thus reproducing the call graph in the trace # modification diff --git a/TO_DO b/TO_DO index ec001604..9c383360 100644 --- a/TO_DO +++ b/TO_DO @@ -12,6 +12,7 @@ items : * BUGS (implemented but does not work) **************************************************************************************************** +* fix compilation with LMAKE_SAN=T * missing some deps when reading elf - it seems that libc.so is missing at least in some occasions * reimplement hierarchical repositories @@ -52,6 +53,9 @@ items : * COSMETIC (ugly as long as not implemented) **************************************************************************************************** +* generate an error when calling depend on a target + - either known at that time + - or when it becomes a target if later * generate target/dep key everywhere it is pertinent * report Source/Anti rule name when appropriate * be more consise in console output @@ -102,6 +106,8 @@ items : * implement a lshow -r to see what is running now and -B to see BOM (list of sources) - implement a generic walk through deps - use it for check_deps, which will prevent jobs post critical deps to be run +* support dynamic values for interpreter + - python and shell * improve job isolation by using namespaces - much like faketree : https://github.com/enfabrica/enkit/tree/master/faketree - generalize tmp mapping diff --git a/doc/lmake.texi b/doc/lmake.texi index 943e69c4..194d1937 100644 --- a/doc/lmake.texi +++ b/doc/lmake.texi @@ -492,6 +492,8 @@ If it starts with @code{r} or @code{R}, reverse video is assumed. Else output is not colorized. @end itemize +If the @code{LMAKE_VIDEO} environment variable is defined, it is processed as if provided to the @code{--video} option. + @section @lmake Usage : @code{lmake [-a|--archive] [-e|--forget-old-errors] [[-j|--jobs] max_jobs] [-m|--manual-ok] [-o|--live-out] [-l|--local] [-v|--versbose-backend] targets...} diff --git a/src/autodep/gather.cc b/src/autodep/gather.cc index e7cb943f..c1d6a4f2 100644 --- a/src/autodep/gather.cc +++ b/src/autodep/gather.cc @@ -33,7 +33,7 @@ ::ostream& operator<<( ::ostream& os , Gather::AccessInfo const& ai ) { return os <<')' ; } -void Gather::AccessInfo::update( PD pd , bool phony_ok_ , AccessDigest ad , CD const& cd , NodeIdx parallel_id_ ) { +void Gather::AccessInfo::update( PD pd , AccessDigest ad , CD const& cd , NodeIdx parallel_id_ ) { digest.tflags |= ad.tflags ; digest.extra_tflags |= ad.extra_tflags ; digest.dflags |= ad.dflags ; @@ -54,7 +54,6 @@ void Gather::AccessInfo::update( PD pd , bool phony_ok_ , AccessDigest ad , CD c /**/ { PD& d=target ; if ( ad.extra_tflags[ExtraTflag::Allow] && pdupdate( pd , phony_ok , ad , cd , parallel_id ) ; - //^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv + info->update( pd , ad , cd , parallel_id ) ; + //^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ if ( is_new || *info!=old_info ) Trace("_new_access", fd , STR(is_new) , pd , ad , cd , parallel_id , comment , old_info , "->" , *info , it->first ) ; // only trace if something changes } @@ -119,41 +118,44 @@ void _child_wait_thread_func( int* wstatus , pid_t pid , Fd fd ) { bool/*done*/ Gather::kill(int sig) { Trace trace("kill",sig,pid,STR(as_session),child_stdout,child_stderr,mk_key_uset(slaves)) ; - ::unique_lock lock { _pid_mutex } ; - int kill_sig = sig>=0 ? sig : SIGKILL ; - bool killed_ = false ; - killed = true ; // prevent child from starting if killed before + Lock lock { _pid_mutex } ; + int kill_sig = sig>=0 ? sig : SIGKILL ; + bool killed_ = false ; + killed = true ; // prevent child from starting if killed before // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv if (pid>1) killed_ = kill_process(pid,kill_sig,as_session/*as_group*/) ; // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - if (sig<0) { // kill all processes (or process groups) connected to a stream we wait for - pid_t ctl_pid = getpid() ; - ::uset_s fd_strs ; - ::uset to_kill ; // maintain an ordered tab to favor repeatability - if (+child_stdout) fd_strs.insert( read_lnk(to_string("/proc/self/fd/",child_stdout.fd)) ) ; - if (+child_stderr) fd_strs.insert( read_lnk(to_string("/proc/self/fd/",child_stderr.fd)) ) ; - for( auto const& [fd,_] : slaves ) fd_strs.insert( read_lnk(to_string("/proc/self/fd/",fd .fd)) ) ; + if (sig<0) { // kill all processes (or process groups) connected to a stream we wait for + pid_t ctl_pid = as_session ? ::getpgrp() : ::getpid() ; + ::umap_s< GatherKind> fd_strs ; + ::umap to_kill ; + trace("ctl",ctl_pid) ; + if (+child_stdout) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stdout.fd)) ] = GatherKind::Stdout ; + if (+child_stderr) fd_strs[ read_lnk(to_string("/proc/self/fd/",child_stderr.fd)) ] = GatherKind::Stderr ; + for( auto const& [fd,_] : slaves ) fd_strs[ read_lnk(to_string("/proc/self/fd/",fd .fd)) ] = GatherKind::Slave ; + trace("fds",fd_strs) ; for( ::string const& proc_entry : lst_dir("/proc") ) { for( char c : proc_entry ) if (c>'9'||c<'0') goto NextProc ; try { pid_t child_pid = from_string(proc_entry) ; - if (child_pid==ctl_pid) goto NextProc ; if (as_session ) child_pid = ::getpgid(child_pid) ; - if (child_pid<=1 ) goto NextProc ; // no pgid available, ignore + if (child_pid==ctl_pid) goto NextProc ; + if (child_pid<=1 ) goto NextProc ; // no pgid available, ignore for( ::string const& fd_entry : lst_dir(to_string("/proc/",proc_entry,"/fd")) ) { ::string fd_str = read_lnk(to_string("/proc/",proc_entry,"/fd/",fd_entry)) ; - if ( !fd_str ) continue ; // fd has disappeared, ignore - if ( !fd_strs.contains(fd_str) ) continue ; // not any of the fd's we are looking for - to_kill.insert(child_pid) ; + if (!fd_str ) continue ; // fd has disappeared, ignore + auto it = fd_strs.find(fd_str) ; + if (it==fd_strs.end() ) continue ; + to_kill[child_pid] = it->second ; break ; } - } catch(::string const&) {} // if we cannot read /proc/pid, process is dead, ignore + } catch(::string const&) {} // if we cannot read /proc/pid, process is dead, ignore NextProc : ; } trace("to_kill",to_kill) ; - // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv - for( pid_t p : to_kill ) killed_ |= kill_process(p,kill_sig,as_session/*as_group*/) ; - // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv + for( auto [p,_] : to_kill ) killed_ |= kill_process(p,kill_sig,as_session/*as_group*/) ; + // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ } trace("done",STR(killed_)) ; return killed_ ; @@ -195,7 +197,7 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd trace("autodep_env",::string(autodep_env)) ; // ::map_ss add_env {{"LMAKE_AUTODEP_ENV",autodep_env}} ; // required even with method==None or ptrace to allow support (ldepend, lmake module, ...) to work - { ::unique_lock lock{_pid_mutex} ; + { Lock lock{_pid_mutex} ; if (killed ) return Status::Killed ; // dont start if we are already killed before starting if (method==AutodepMethod::Ptrace) { // PER_AUTODEP_METHOD : handle case // XXX : splitting responsibility is no more necessary. Can directly report child termination from within autodep_ptrace.process using same ifce as _child_wait_thread_func @@ -283,9 +285,9 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd default : ; } Fd reply_fd = server_cb(::move(jerr)) ; - trace("reply",reply_fd) ; if (+reply_fd) { epoll.add_read(reply_fd,Kind::ServerReply) ; + trace("read_reply",reply_fd) ; ServerReply& sr = server_replies[reply_fd] ; if (sync_) sr.fd = fd ; /**/ sr.codec_file = codec_file ; @@ -309,10 +311,10 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd end = PD(New) + timeout ; trace("set_timeout",timeout,end) ; } - if (cstdout==Child::Pipe) epoll.add_read(child_stdout=child.stdout,Kind::Stdout ) ; - if (cstderr==Child::Pipe) epoll.add_read(child_stderr=child.stderr,Kind::Stderr ) ; - /**/ epoll.add_read(child_fd ,Kind::ChildEnd) ; - /**/ epoll.add_read(master_fd ,Kind::Master ) ; + if (cstdout==Child::Pipe) { epoll.add_read(child_stdout=child.stdout,Kind::Stdout ) ; trace("read_stdout",child_stdout) ; } + if (cstderr==Child::Pipe) { epoll.add_read(child_stderr=child.stderr,Kind::Stderr ) ; trace("read_stderr",child_stderr) ; } + /**/ { epoll.add_read(child_fd ,Kind::ChildEnd) ; trace("read_child ",child_fd ) ; } + /**/ { epoll.add_read(master_fd ,Kind::Master ) ; trace("read_master",master_fd ) ; } while (epoll.cnt) { uint64_t wait_ns = Epoll::Forever ; if (+end) { @@ -345,29 +347,35 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd char buf[4096] ; int cnt = ::read( fd , buf , sizeof(buf) ) ; SWEAR( cnt>=0 , cnt ) ; ::string_view buf_view { buf , size_t(cnt) } ; - if (!cnt ) { trace("close") ; epoll.del(fd) ; } // /!\ dont close as fd is closed upon child destruction - else if (kind==Kind::Stderr) { stderr.append(buf_view) ; child_stderr={} ; } // tell kill not to wait for this one - else { stdout.append(buf_view) ; child_stdout={} ;live_out_cb(buf_view) ; } // . + if (cnt) { + if (kind==Kind::Stderr) stderr.append(buf_view) ; + else { stdout.append(buf_view) ; live_out_cb(buf_view) ; } + } else { + epoll.del(fd) ; // /!\ dont close as fd is closed upon child destruction + trace("close",kind,fd) ; + if (kind==Kind::Stderr) child_stderr = {} ; // tell kill not to wait for this one + else child_stdout = {} ; + } } break ; case Kind::ChildEnd : { uint64_t one = 0/*garbage*/ ; int cnt = ::read( fd , &one , 8 ) ; SWEAR( cnt==8 && one==1 , cnt , one ) ; - { ::unique_lock lock{_pid_mutex} ; + { Lock lock{_pid_mutex} ; child.pid = -1 ; // too late to kill job } if (WIFEXITED (wstatus)) set_status( WEXITSTATUS(wstatus)!=0 ? Status::Err : Status::Ok ) ; else if (WIFSIGNALED(wstatus)) set_status( is_sig_sync(WTERMSIG(wstatus)) ? Status::Err : Status::LateLost ) ; // synchronous signals are actually errors else fail("unexpected wstatus : ",wstatus) ; - trace(status,::hex,wstatus,::dec) ; epoll.close(fd) ; - epoll.cnt-- ; // do not wait for new connections, but if one arrives before all flows are closed, process it + epoll.cnt-- ; // do not wait for new connections, but if one arrives before all flows are closed, process it + trace("close",kind,status,::hex,wstatus,::dec) ; } break ; case Kind::Master : { SWEAR(fd==master_fd) ; Fd slave = master_fd.accept() ; - trace(slave) ; epoll.add_read(slave,Kind::Slave) ; - slaves[slave] ; // allocate entry + trace("read_slave",slave) ; + slaves[slave] ; // allocate entry } break ; case Kind::ServerReply : { JobRpcReply jrr ; @@ -389,6 +397,7 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ server_replies.erase(it) ; epoll.close(fd) ; + trace("close",kind,fd) ; } break ; case Kind::Slave : { Jerr jerr ; @@ -407,6 +416,7 @@ Status Gather::exec_child( ::vector_s const& args , Fd cstdin , Fd cstdout , Fd break ; case Proc::None : epoll.close(fd) ; + trace("close",kind,fd) ; for( Jerr& j : slave_entry.second ) _new_accesses(fd,::move(j)) ; // process deferred entries although with uncertain outcome slaves.erase(it) ; break ; diff --git a/src/autodep/gather.hh b/src/autodep/gather.hh index b0a5b9c9..d5f15267 100644 --- a/src/autodep/gather.hh +++ b/src/autodep/gather.hh @@ -42,7 +42,7 @@ struct Gather { return res ; } // services - void update( PD , bool phony_ok_ , AccessDigest , CD const& , NodeIdx parallel_id_ ) ; + void update( PD , AccessDigest , CD const& , NodeIdx parallel_id_ ) ; // void chk() const ; // data @@ -55,7 +55,6 @@ struct Gather { CD crc_date ; // state when first read NodeIdx parallel_id = 0 ; AccessDigest digest ; - bool phony_ok = false ; // if false <=> prevent phony flag so as to hide washing to user } ; struct ServerReply { IMsgBuf buf ; // buf to assemble the reply @@ -72,11 +71,8 @@ public : private : void _solve( Fd , Jerr& jerr) ; // Fd for trace purpose only - void _new_access( Fd , PD , bool phony_ok , ::string&& file , AccessDigest , CD const& , bool parallel , ::string const& comment ) ; - // - void _new_access( PD pd , bool po , ::string&& f , AccessDigest ad , CD const& cd , bool p , ::string const& c ) { _new_access({},pd,po ,::move(f),ad,cd,p,c) ; } - void _new_access( Fd fd , PD pd , ::string&& f , AccessDigest ad , CD const& cd , bool p , ::string const& c ) { _new_access(fd,pd,true,::move(f),ad,cd,p,c) ; } - void _new_access( PD pd , ::string&& f , AccessDigest ad , CD const& cd , bool p , ::string const& c ) { _new_access({},pd,true,::move(f),ad,cd,p,c) ; } + void _new_access( Fd , PD , ::string&& file , AccessDigest , CD const& , bool parallel , ::string const& comment ) ; + void _new_access( PD pd , ::string&& f , AccessDigest ad , CD const& cd , bool p , ::string const& c ) { _new_access({},pd,::move(f),ad,cd,p,c) ; } // void _new_accesses( Fd fd , Jerr&& jerr ) { bool parallel = false ; @@ -90,10 +86,10 @@ private : Trace trace("_codec",jrr) ; _new_access( sr.fd , PD(New) , ::move(sr.codec_file) , {.accesses=Access::Reg} , jrr.crc , false/*parallel*/ , comment ) ; } -public : //! phony_ok crc_date parallel - void new_target( PD pd , ::string const& t , ::string const& c="s_target" ) { _new_access(pd, ::copy(t),{.write=Yes},{} ,false ,c) ; } - void new_unlnk ( PD pd , ::string const& t , ::string const& c="s_unlnk" ) { _new_access(pd,false ,::copy(t),{.write=Yes},{} ,false ,c) ; } // new_unlnk is used for internal wash - void new_guard ( ::string const& f ) { guards.insert(f) ; } +public : //! crc_date parallel + void new_target( PD pd , ::string const& t , ::string const& c="s_target" ) { _new_access(pd,::copy(t),{.write=Yes},{} ,false ,c) ; } + void new_unlnk ( PD pd , ::string const& t , ::string const& c="s_unlnk" ) { _new_access(pd,::copy(t),{.write=Yes},{} ,false ,c) ; } // new_unlnk is used for internal wash + void new_guard ( ::string const& f ) { guards.insert(f) ; } // void new_deps( PD , ::vmap_s&& deps , ::string const& stdin={} ) ; void new_exec( PD , ::string const& exe , ::string const& ="s_exec" ) ; @@ -136,5 +132,5 @@ public : //! ::string msg ; // contains error messages not from job ::umap>> slaves ; // Jerr's are waiting for confirmation private : - mutable ::mutex _pid_mutex ; + Mutex mutable _pid_mutex ; } ; diff --git a/src/autodep/ld_common.x.cc b/src/autodep/ld_common.x.cc index 77222291..396ed241 100644 --- a/src/autodep/ld_common.x.cc +++ b/src/autodep/ld_common.x.cc @@ -55,8 +55,8 @@ extern "C" { extern int statx ( int dirfd , const char* pth , int flgs , uint msk , struct statx* buf ) noexcept ; } -static ::mutex _g_mutex ; // ensure exclusivity between threads -static thread_local bool _t_loop = false ; // prevent recursion within a thread +static Mutex _g_mutex ; // ensure exclusivity between threads +static thread_local bool _t_loop = false ; // prevent recursion within a thread // User program may have global variables whose cxtor/dxtor do accesses. // In that case, they may come before our own Audit is constructed if declared global (in the case of LD_PRELOAD). @@ -140,13 +140,13 @@ struct _Execp : _Exec { _Execp( Record& r , const char* file , const char* const envp[] , ::string&& comment ) { if (!file) return ; // - if (::strchr(file,'/')) { // if file contains a /, no search is performed + if (::strchr(file,'/')) { // if file contains a /, no search is performed static_cast(*this) = Base(r,file,false/*no_follow*/,envp,::move(comment)) ; return ; } // ::string p = get_env("PATH") ; - if (!p) { // gather standard path if path not provided + if (!p) { // gather standard path if path not provided size_t n = ::confstr(_CS_PATH,nullptr,0) ; p.resize(n) ; ::confstr(_CS_PATH,p.data(),n) ; @@ -268,7 +268,7 @@ struct Mkstemp : WSolve { if ( _t_loop || !started() ) return orig args ; \ Save sav{_t_loop,true} ; \ if (cond) return orig args ; \ - ::unique_lock lock{_g_mutex} + Lock lock{_g_mutex} // do a first check to see if it is obvious that nothing needs to be done #define HEADER0(syscall, args) HEADER( syscall , false , args ) #define HEADER1(syscall,path, args) HEADER( syscall , Record::s_is_simple(path ) , args ) diff --git a/src/autodep/ld_server.cc b/src/autodep/ld_server.cc index 79b18757..133335dc 100644 --- a/src/autodep/ld_server.cc +++ b/src/autodep/ld_server.cc @@ -5,8 +5,8 @@ #include "ld_server.hh" -thread_local bool AutodepLock::t_active = false ; -/**/ ::mutex AutodepLock::_s_mutex ; +thread_local bool AutodepLock::t_active = false ; +/**/ Mutex AutodepLock::_s_mutex ; static bool started() { return AutodepLock::t_active ; } // no auto-start for server diff --git a/src/autodep/ld_server.hh b/src/autodep/ld_server.hh index f4193e5e..71efc147 100644 --- a/src/autodep/ld_server.hh +++ b/src/autodep/ld_server.hh @@ -8,27 +8,27 @@ #pragma once struct AutodepLock { - // static data - static thread_local bool t_active ; + // static data + static thread_local bool t_active ; private : - static ::mutex _s_mutex ; - // cxtors & casts + static Mutex _s_mutex ; + // cxtors & casts public : - AutodepLock( ) = default ; - AutodepLock(::vmap_s* deps=nullptr) : lock{_s_mutex} { - SWEAR( !Record::s_deps && !Record::s_deps_err ) ; + AutodepLock( ) = default ; + AutodepLock(::vmap_s* deps=nullptr) : lock{_s_mutex} { + SWEAR( !Record::s_deps && !Record::s_deps_err ) ; SWEAR( !*Record::s_access_cache ) ; - Record::s_deps = deps ; - Record::s_deps_err = &err ; - t_active = true ; - } - ~AutodepLock() { - Record::s_deps = nullptr ; - Record::s_deps_err = nullptr ; - t_active = false ; + Record::s_deps = deps ; + Record::s_deps_err = &err ; + t_active = true ; + } + ~AutodepLock() { + Record::s_deps = nullptr ; + Record::s_deps_err = nullptr ; + t_active = false ; Record::s_access_cache->clear() ; - } - // data - ::unique_lock<::mutex> lock ; - ::string err ; + } + // data + Lock> lock ; + ::string err ; } ; diff --git a/src/autodep/syscall_tab.cc b/src/autodep/syscall_tab.cc index 48c5a6ab..21fa2abc 100644 --- a/src/autodep/syscall_tab.cc +++ b/src/autodep/syscall_tab.cc @@ -312,8 +312,13 @@ template void _entry_stat( void* & /*ctx*/ , Record& r , pi } // XXX : find a way to put one entry per line instead of 3 lines(would be much more readable) -SyscallDescr::Tab const& SyscallDescr::s_tab(bool for_ptrace) { // this must *not* do any mem allocation (or syscall impl in ld.cc breaks), so it cannot be a umap - static Tab s_tab = {} ; +SyscallDescr::Tab const& SyscallDescr::s_tab(bool for_ptrace) { // this must *not* do any mem allocation (or syscall impl in ld.cc breaks), so it cannot be a umap + static Tab s_tab = {} ; + static ::atomic s_inited = false ; // set to true once s_tab is initialized + if (s_inited) return s_tab ; // fast test not even taking the lock + static Mutex s_mutex ; + Lock lock{s_mutex} ; + if (s_inited) return s_tab ; // repeat test now that we have the lock in case another thread while in the middle of initializing s_tab // /!\ prio must be non-zero as zero means entry is not allocated // entries marked NFS_GUARD are deemed data access as they touch their enclosing dir and hence must be guarded against strange NFS notion of coherence // entries marked filter (i.e. field is !=0) means that processing can be skipped if corresponding arg is a file name known to require no processing @@ -440,5 +445,7 @@ SyscallDescr::Tab const& SyscallDescr::s_tab(bool for_ptrace) { // this must *no static_assert(SYS_unlinkat , _exit_unlnk ,2 , 1 , true , "Unlinkat" } ; #endif #undef NFS_GUARD + fence() ; // ensure serializatino + s_inited = true ; return s_tab ; } diff --git a/src/client.cc b/src/client.cc index 231590ce..ba5de9e5 100644 --- a/src/client.cc +++ b/src/client.cc @@ -178,16 +178,21 @@ Bool3/*ok*/ out_proc( ::ostream& os , ReqProc proc , bool refresh , ReqSyntax co if ( cmd_line.flags[ReqFlag::Job] && cmd_line.args.size()!=1 ) syntax.usage("can process several files, but a single job" ) ; if ( !cmd_line.flags[ReqFlag::Job] && cmd_line.flags[ReqFlag::Rule] ) syntax.usage("can only force a rule to identify a job, not a file") ; // - Bool3 rv = Maybe ; - if (!cmd_line.flags[ReqFlag::Video]) rv = is_reverse_video(Fd::Stdin,Fd::Stdout) ; - else - switch (cmd_line.flag_args[+ReqFlag::Video][0]) { - case 'n' : - case 'N' : rv = No ; break ; - case 'r' : - case 'R' : rv = Yes ; break ; - default : rv = Maybe ; - } + Bool3 rv = Maybe/*garbage*/ ; + ::string rv_str ; + if (!rv_str) { rv_str = cmd_line.flag_args[+ReqFlag::Video] ; trace("cmd_line",rv_str) ; } + if (!rv_str) { rv_str = get_env("LMAKE_VIDEO") ; trace("env" ,rv_str) ; } + switch (rv_str[0]) { + case 'n' : + case 'N' : rv = No ; break ; + case 'r' : + case 'R' : rv = Yes ; break ; + case 'f' : + case 'F' : rv = Maybe ; break ; // force no color + default : rv = is_reverse_video(Fd::Stdin,Fd::Stdout) ; + } + trace("reverse_video",rv) ; + // ReqRpcReq rrr { proc , cmd_line.files() , { rv , cmd_line } } ; connect_to_server(refresh) ; started_cb() ; diff --git a/src/hash.cc b/src/hash.cc index 1883f983..a859efe2 100644 --- a/src/hash.cc +++ b/src/hash.cc @@ -252,11 +252,11 @@ namespace Hash { // _Xxh // - char _Xxh::_s_lnk_secret[XXH3_SECRET_SIZE_MIN] = {} ; - char _Xxh::_s_exe_secret[XXH3_SECRET_SIZE_MIN] = {} ; - ::mutex _Xxh::_s_inited_mutex ; - bool _Xxh::_s_lnk_inited = false ; - bool _Xxh::_s_exe_inited = false ; + char _Xxh::_s_lnk_secret[XXH3_SECRET_SIZE_MIN] = {} ; + char _Xxh::_s_exe_secret[XXH3_SECRET_SIZE_MIN] = {} ; + Mutex _Xxh::_s_inited_mutex ; + bool _Xxh::_s_lnk_inited = false ; + bool _Xxh::_s_exe_inited = false ; _Xxh::_Xxh() { XXH3_INITSTATE (&_state) ; diff --git a/src/hash.hh b/src/hash.hh index 8db08591..0aec4410 100644 --- a/src/hash.hh +++ b/src/hash.hh @@ -158,14 +158,14 @@ namespace Hash { struct _Xxh { private : - static void _s_init_lnk() { ::unique_lock lock{_s_inited_mutex} ; if (_s_lnk_inited) return ; XXH3_generateSecret(_s_lnk_secret,sizeof(_s_lnk_secret),"lnk",3) ; _s_lnk_inited = true ; } - static void _s_init_exe() { ::unique_lock lock{_s_inited_mutex} ; if (_s_exe_inited) return ; XXH3_generateSecret(_s_exe_secret,sizeof(_s_exe_secret),"exe",3) ; _s_exe_inited = true ; } + static void _s_init_lnk() { Lock lock{_s_inited_mutex} ; if (_s_lnk_inited) return ; XXH3_generateSecret(_s_lnk_secret,sizeof(_s_lnk_secret),"lnk",3) ; _s_lnk_inited = true ; } + static void _s_init_exe() { Lock lock{_s_inited_mutex} ; if (_s_exe_inited) return ; XXH3_generateSecret(_s_exe_secret,sizeof(_s_exe_secret),"exe",3) ; _s_exe_inited = true ; } // static data - static char _s_lnk_secret[XXH3_SECRET_SIZE_MIN] ; - static char _s_exe_secret[XXH3_SECRET_SIZE_MIN] ; - static ::mutex _s_inited_mutex ; - static bool _s_lnk_inited ; - static bool _s_exe_inited ; + static char _s_lnk_secret[XXH3_SECRET_SIZE_MIN] ; + static char _s_exe_secret[XXH3_SECRET_SIZE_MIN] ; + static Mutex _s_inited_mutex ; + static bool _s_lnk_inited ; + static bool _s_exe_inited ; protected : // cxtors & cast _Xxh( ) ; diff --git a/src/job_exec.cc b/src/job_exec.cc index a9361065..ce0a079f 100644 --- a/src/job_exec.cc +++ b/src/job_exec.cc @@ -42,19 +42,18 @@ struct PatternDict { ::vmap patterns = {} ; } ; -ServerSockFd g_server_fd ; -Gather g_gather { New } ; -JobRpcReply g_start_info ; -::string g_service_start ; -::string g_service_mngt ; -::string g_service_end ; -SeqId g_seq_id = 0/*garbage*/ ; -JobIdx g_job = 0/*garbage*/ ; -::atomic g_killed = false ; // written by thread S and read by main thread -PatternDict g_match_dct ; -NfsGuard g_nfs_guard ; -::umap_s g_missing_static_targets ; -::vector_s g_washed ; +ServerSockFd g_server_fd ; +Gather g_gather { New } ; +JobRpcReply g_start_info ; +::string g_service_start ; +::string g_service_mngt ; +::string g_service_end ; +SeqId g_seq_id = 0/*garbage*/ ; +JobIdx g_job = 0/*garbage*/ ; +::atomic g_killed = false ; // written by thread S and read by main thread +PatternDict g_match_dct ; +NfsGuard g_nfs_guard ; +::vector_s g_washed ; void kill_thread_func(::stop_token stop) { t_thread_key = 'K' ; @@ -97,13 +96,6 @@ bool/*keep_fd*/ handle_server_req( JobServerRpcReq&& jsrr , SlaveSockFd const& ) return false ; } -::pair_s wash() { - Trace trace("wash",g_start_info.pre_actions) ; - ::pair_s actions = do_file_actions( ::move(g_start_info.pre_actions) , g_nfs_guard , g_start_info.hash_algo ) ; - trace("unlnks",actions) ; - return actions ; -} - ::map_ss prepare_env(JobRpcReq& end_report) { ::map_ss res ; ::string abs_cwd = g_start_info.autodep_env.root_dir ; @@ -200,8 +192,6 @@ Digest analyze( bool at_end , bool killed=false ) { if (!at_end) continue ; // we are handling chk_deps and we only care about deps // handle targets if (is_tgt) { - if (!info.phony_ok) ad.tflags &= ~Tflag::Phony ; // used to ensure washing is not reported to user as a target - // bool unlnk = !is_target(file) ; TargetDigest td { .tflags=ad.tflags , .extra_tflags=ad.extra_tflags } ; // @@ -214,11 +204,11 @@ Digest analyze( bool at_end , bool killed=false ) { } else switch (flags.is_target) { case Yes : break ; case Maybe : - if (unlnk) break ; // it is ok to write and unlink temporary files + if (unlnk) break ; // it is ok to write and unlink temporary files [[fallthrough]] ; case No : - if (ad.write==No ) break ; // it is ok to attempt writing as long as attempt does not succeed - if (ad.extra_tflags[ExtraTflag::Allow]) break ; // it is ok if explicitly allowed by user + if (ad.write==No ) break ; // it is ok to attempt writing as long as attempt does not succeed + if (ad.extra_tflags[ExtraTflag::Allow]) break ; // it is ok if explicitly allowed by user trace("bad access",ad,flags) ; if (ad.write==Maybe ) append_to_string( res.msg , "maybe " ) ; /**/ append_to_string( res.msg , "unexpected " ) ; @@ -237,11 +227,14 @@ Digest analyze( bool at_end , bool killed=false ) { else res.crcs.emplace_back(res.targets.size()) ; // record index in res.targets for deferred (parallel) crc computation break ; DF} - if ( td.tflags[Tflag::Target] && !static_phony(td.tflags) ) { // unless static or phony, a target loses its official status if not actually produced - if (ad.write==Yes) { if (unlnk ) td.tflags &= ~Tflag::Target ; } - else { if (!is_target(file)) td.tflags &= ~Tflag::Target ; } + if ( td.tflags[Tflag::Target] && !td.tflags[Tflag::Phony] ) { + if (td.tflags[Tflag::Static]) { + if (unlnk ) append_to_string( res.msg , "missing static target " , mk_file(file,No/*exists*/) , '\n' ) ; + } else { + if (ad.write==Yes) { if (unlnk ) td.tflags &= ~Tflag::Target ; } // unless static or phony, a target loses its official status if not actually produced + else { if (!is_target(file)) td.tflags &= ~Tflag::Target ; } + } } - if ( td.tflags[Tflag::Target] && td.tflags[Tflag::Static] && (td.tflags[Tflag::Phony]||td.crc!=Crc::None) ) g_missing_static_targets.erase(file) ; //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv res.targets.emplace_back(file,td) ; //^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -298,7 +291,7 @@ void live_out_cb(::string_view const& txt) { live_out_buf = live_out_buf.substr(pos) ; } -void crc_thread_func( size_t id , vmap_s* targets , ::vector const* crcs , ::string* msg , ::mutex* msg_mutex ) { +void crc_thread_func( size_t id , vmap_s* targets , ::vector const* crcs , ::string* msg , Mutex* msg_mutex ) { static ::atomic crc_idx = 0 ; t_thread_key = '0'+id ; Trace trace("crc",targets->size(),crcs->size()) ; @@ -309,7 +302,7 @@ void crc_thread_func( size_t id , vmap_s* targets , ::vector msg_mutex ; { ::vector crc_threads ; crc_threads.reserve(n_threads) ; for( size_t i=0 ; i Backend::StartEntry::req_info() const { Pdate eta ; - bool keep_tmp = false ; - ::unique_lock lock{Req::s_reqs_mutex} ; // taking Req::s_reqs_mutex is compulsery to derefence req + bool keep_tmp = false ; + Lock lock { Req::s_reqs_mutex } ; // taking Req::s_reqs_mutex is compulsery to derefence req for( ReqIdx r : reqs ) { Req req{r} ; keep_tmp |= req->options.flags[ReqFlag::KeepTmp] ; @@ -48,8 +48,8 @@ namespace Backends { ::string Backend::s_executable ; Backend* Backend::s_tab[N] ; - ::mutex Backend::_s_mutex ; - ::mutex Backend::_s_starting_job_mutex ; + Mutex Backend::_s_mutex ; + Mutex Backend::_s_starting_job_mutex ; ::atomic Backend::_s_starting_job ; ::map Backend::_s_start_tab ; SmallIds Backend::_s_small_ids ; @@ -66,13 +66,13 @@ namespace Backends { } static bool _localize( Tag t , ReqIdx ri ) { - ::unique_lock lock{Req::s_reqs_mutex} ; // taking Req::s_reqs_mutex is compulsery to derefence req + Lock lock{Req::s_reqs_mutex} ; // taking Req::s_reqs_mutex is compulsery to derefence req return Req(ri)->options.flags[ReqFlag::Local] || !Backend::s_ready(t) ; // if asked backend is not usable, force local execution } void Backend::s_submit( Tag tag , JobIdx ji , ReqIdx ri , SubmitAttrs&& submit_attrs , ::vmap_ss&& rsrcs ) { SWEAR(+tag) ; - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; Trace trace(BeChnl,"s_submit",tag,ji,ri,submit_attrs,rsrcs) ; // if ( tag!=Tag::Local && _localize(tag,ri) ) { @@ -90,7 +90,7 @@ namespace Backends { void Backend::s_add_pressure( Tag tag , JobIdx j , ReqIdx ri , SubmitAttrs const& sa ) { SWEAR(+tag) ; if (_localize(tag,ri)) tag = Tag::Local ; - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; Trace trace(BeChnl,"s_add_pressure",tag,j,ri,sa) ; auto it = _s_start_tab.find(j) ; if (it==_s_start_tab.end()) { @@ -104,7 +104,7 @@ namespace Backends { void Backend::s_set_pressure( Tag tag , JobIdx j , ReqIdx ri , SubmitAttrs const& sa ) { SWEAR(+tag) ; if (_localize(tag,ri)) tag = Tag::Local ; - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; Trace trace(BeChnl,"s_set_pressure",tag,j,ri,sa) ; s_tab[+tag]->set_pressure(j,ri,sa) ; auto it = _s_start_tab.find(j) ; @@ -113,7 +113,7 @@ namespace Backends { } void Backend::s_launch() { - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; Trace trace(BeChnl,"s_launch") ; for( Tag t : All ) if (s_ready(t)) { try { @@ -135,8 +135,8 @@ namespace Backends { } void Backend::_s_handle_deferred_wakeup(DeferredEntry&& de) { - { ::unique_lock lock { _s_mutex } ; // lock _s_start_tab for minimal time to avoid dead-locks - auto it = _s_start_tab.find(+de.job_exec) ; + { Lock lock { _s_mutex } ; // lock _s_start_tab for minimal time to avoid dead-locks + auto it = _s_start_tab.find(+de.job_exec) ; if (it==_s_start_tab.end() ) return ; // too late, job has ended if (it->second.conn.seq_id!=de.seq_id) return ; // too late, job has ended and restarted } @@ -159,8 +159,8 @@ namespace Backends { } void Backend::_s_handle_deferred_report(DeferredEntry&& dre) { - ::unique_lock lock { _s_mutex } ; // lock _s_start_tab for minimal time to avoid dead-locks - auto it = _s_start_tab.find(+dre.job_exec) ; + Lock lock { _s_mutex } ; // lock _s_start_tab for minimal time to avoid dead-locks + auto it = _s_start_tab.find(+dre.job_exec) ; if (it==_s_start_tab.end() ) return ; if (it->second.conn.seq_id!=dre.seq_id) return ; Trace trace(BeChnl,"_s_handle_deferred_report",dre) ; @@ -207,8 +207,8 @@ namespace Backends { ::vmap_ss rsrcs ; Trace trace(BeChnl,"_s_handle_job_start",jrr) ; _s_starting_job = jrr.job ; - ::unique_lock lock { _s_starting_job_mutex } ; - { ::unique_lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time + Lock lock { _s_starting_job_mutex } ; + { Lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time // auto it = _s_start_tab.find(+job) ; if (it==_s_start_tab.end() ) { trace("not_in_tab" ) ; return false ; } StartEntry& entry = it->second ; if (entry.conn.seq_id!=jrr.seq_id) { trace("bad_seq_id",entry.conn.seq_id,jrr.seq_id) ; return false ; } @@ -312,7 +312,7 @@ namespace Backends { } // bool dep_ready = true ; - for( auto const& [dn,dd] : ::vector_view(&deps[n_submit_deps],deps.size()-n_submit_deps) ) + for( auto const& [dn,dd] : ::vector_view(deps.data()+n_submit_deps,deps.size()-n_submit_deps) ) // note : this is ok even if deps is empty for( Req r : entry.reqs ) // to be sure, we should check done(Dsk) rather than done(Status), but we do not seek security here, we seek perf (real check will be done at end of job) // and most of the time, done(Status) implies file is ok, and we have less false positive as we do not have the opportunity to fully assess sources @@ -412,7 +412,7 @@ namespace Backends { DF} Job job { jrr.job } ; Trace trace(BeChnl,"_s_handle_job_mngt",jrr) ; - { ::unique_lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time + { Lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time // keep_fd auto it = _s_start_tab.find(+job) ; if (it==_s_start_tab.end() ) { trace("not_in_tab" ) ; return false ; } StartEntry& entry = it->second ; if (entry.conn.seq_id!=jrr.seq_id) { trace("bad_seq_id",entry.conn.seq_id,jrr.seq_id) ; return false ; } @@ -430,15 +430,15 @@ namespace Backends { bool/*keep_fd*/ Backend::_s_handle_job_end( JobRpcReq&& jrr , SlaveSockFd const& ) { switch (jrr.proc) { - case JobProc::None : return false ; // if connection is lost, ignore it - case JobProc::End : break ; // no reply + case JobProc::None : return false ; // if connection is lost, ignore it + case JobProc::End : break ; // no reply DF} Job job { jrr.job } ; JobExec je ; ::vmap_ss rsrcs ; Trace trace(BeChnl,"_s_handle_job_end",jrr) ; - if (jrr.job==_s_starting_job) ::unique_lock lock{_s_starting_job_mutex} ; // ensure _s_handled_job_start is done for this job - { ::unique_lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time + if (jrr.job==_s_starting_job) Lock lock{_s_starting_job_mutex} ; // ensure _s_handled_job_start is done for this job + { Lock lock { _s_mutex } ; // prevent sub-backend from manipulating _s_start_tab from main thread, lock for minimal time // auto it = _s_start_tab.find(+job) ; if (it==_s_start_tab.end() ) { trace("not_in_tab" ) ; return false ; } StartEntry& entry = it->second ; if (entry.conn.seq_id!=jrr.seq_id) { trace("bad_seq_id",entry.conn.seq_id,jrr.seq_id) ; return false ; } @@ -458,13 +458,13 @@ namespace Backends { } trace("info") ; for( auto& [dn,dd] : jrr.digest.deps ) { - if (!dd.is_date) continue ; // fast path + if (!dd.is_date) continue ; // fast path Dep dep { Node(dn) , dd } ; dep.acquire_crc() ; dd.crc_date(dep) ; } ::string jaf = job->ancillary_file() ; - serialize( OFStream(jaf,::ios::app) , JobInfoEnd{jrr} ) ; // /!\ _s_starting_job ensures ancillary file is written by _s_handle_job_start before we append to it + serialize( OFStream(jaf,::ios::app) , JobInfoEnd{jrr} ) ; // /!\ _s_starting_job ensures ancillary file is written by _s_handle_job_start before we append to it job->end_exec() ; je.end_date = file_date(jaf) ; //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv @@ -478,7 +478,7 @@ namespace Backends { Trace trace(BeChnl,"s_kill_req",ri) ; Req req { ri } ; ::vmap> to_wakeup ; - { ::unique_lock lock { _s_mutex } ; // lock for minimal time + { Lock lock { _s_mutex } ; // lock for minimal time for( Tag t : All ) if (s_ready(t)) for( JobIdx j : s_tab[+t]->kill_waiting_jobs(ri) ) { //vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv @@ -536,8 +536,8 @@ namespace Backends { SubmitAttrs submit_attrs ; FullDate start_date ; for( JobIdx job=0 ;; job++ ) { - { ::unique_lock lock { _s_mutex } ; // lock _s_start_tab for minimal time - auto it = _s_start_tab.lower_bound(job) ; + { Lock lock { _s_mutex } ; // lock _s_start_tab for minimal time + auto it = _s_start_tab.lower_bound(job) ; if (it==_s_start_tab.end()) goto WrapAround ; // job = it->first ; // job is now the next valid entry @@ -606,7 +606,7 @@ namespace Backends { Trace trace(BeChnl,"s_config",STR(dynamic)) ; if (!dynamic) s_executable = *g_lmake_dir+"/_bin/job_exec" ; // - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; for( Tag t : All ) if (+t) { Backend* be = s_tab [+t] ; Config::Backend const& cfg = config[+t] ; @@ -637,11 +637,11 @@ namespace Backends { ::vector_s Backend::acquire_cmd_line( Tag tag , JobIdx job , ::vector const& reqs , ::vmap_ss&& rsrcs , SubmitAttrs const& submit_attrs ) { Trace trace(BeChnl,"acquire_cmd_line",tag,job,reqs,rsrcs,submit_attrs) ; - SWEAR(!_s_mutex.try_lock()) ; // check we have the lock to access _s_start_tab + _s_mutex.swear_locked() ; // SubmitRsrcsAttrs::s_canon(rsrcs) ; // - auto [it,fresh] = _s_start_tab.emplace(job,StartEntry()) ; // create entry + auto [it,fresh] = _s_start_tab.emplace(job,StartEntry()) ; // create entry StartEntry& entry = it->second ; entry.open() ; entry.tag = tag ; diff --git a/src/lmakeserver/backend.x.hh b/src/lmakeserver/backend.x.hh index 8c85fd59..97602a23 100644 --- a/src/lmakeserver/backend.x.hh +++ b/src/lmakeserver/backend.x.hh @@ -134,17 +134,17 @@ namespace Backends { static Backend* s_tab[N] ; private : - static JobExecThread * _s_job_start_thread ; - static JobExecThread * _s_job_mngt_thread ; - static JobExecThread * _s_job_end_thread ; - static DeferredThread* _s_deferred_report_thread ; - static DeferredThread* _s_deferred_wakeup_thread ; - static ::mutex _s_mutex ; - static ::atomic _s_starting_job ; // this job is starting when _starting_job_mutex is locked - static ::mutex _s_starting_job_mutex ; - static ::map _s_start_tab ; // use map instead of umap because heartbeat iterates over while tab is moving - static SmallIds _s_small_ids ; - static SmallId _s_max_small_id ; + static JobExecThread * _s_job_start_thread ; + static JobExecThread * _s_job_mngt_thread ; + static JobExecThread * _s_job_end_thread ; + static DeferredThread* _s_deferred_report_thread ; + static DeferredThread* _s_deferred_wakeup_thread ; + static Mutex _s_mutex ; + static ::atomic _s_starting_job ; // this job is starting when _starting_job_mutex is locked + static Mutex _s_starting_job_mutex ; + static ::map _s_start_tab ; // use map instead of umap because heartbeat iterates over while tab is moving + static SmallIds _s_small_ids ; + static SmallId _s_max_small_id ; public : // services // PER_BACKEND : these virtual functions must be implemented by sub-backend, some of them have default implementations that do nothing when meaningful @@ -192,15 +192,15 @@ namespace Backends { inline ::vmap_s Backend::s_n_tokenss (Tag t) { return s_tab[+t]->n_tokenss() ; } // // nj is the maximum number of job backend may run on behalf of this req - #define LOCK ::unique_lock lock{_s_mutex} + #define LOCK Lock lock{_s_mutex} inline void Backend::s_open_req (ReqIdx r,JobIdx nj) { LOCK ; Trace trace(BeChnl,"s_open_req" ,r) ; for( Tag t : All ) if (s_ready(t)) s_tab[+t]->open_req (r,nj) ; } inline void Backend::s_close_req (ReqIdx r ) { LOCK ; Trace trace(BeChnl,"s_close_req" ,r) ; for( Tag t : All ) if (s_ready(t)) s_tab[+t]->close_req (r ) ; } inline void Backend::s_new_req_eta(ReqIdx r ) { LOCK ; Trace trace(BeChnl,"s_new_req_eta",r) ; for( Tag t : All ) if (s_ready(t)) s_tab[+t]->new_req_eta(r ) ; } #undef LOCK // - inline ::string/*msg*/ Backend::s_start ( Tag t , JobIdx j ) { SWEAR(!_s_mutex.try_lock()) ; Trace trace(BeChnl,"s_start" ,t,j) ; return s_tab[+t]->start (j ) ; } - inline ::pair_s Backend::s_end ( Tag t , JobIdx j , Status s ) { SWEAR(!_s_mutex.try_lock()) ; Trace trace(BeChnl,"s_end" ,t,j) ; return s_tab[+t]->end (j,s) ; } - inline ::pair_s Backend::s_heartbeat( Tag t , JobIdx j ) { SWEAR(!_s_mutex.try_lock()) ; Trace trace(BeChnl,"s_heartbeat",t,j) ; return s_tab[+t]->heartbeat(j ) ; } + inline ::string/*msg*/ Backend::s_start ( Tag t , JobIdx j ) { _s_mutex.swear_locked() ; Trace trace(BeChnl,"s_start" ,t,j) ; return s_tab[+t]->start (j ) ; } + inline ::pair_s Backend::s_end ( Tag t , JobIdx j , Status s ) { _s_mutex.swear_locked() ; Trace trace(BeChnl,"s_end" ,t,j) ; return s_tab[+t]->end (j,s) ; } + inline ::pair_s Backend::s_heartbeat( Tag t , JobIdx j ) { _s_mutex.swear_locked() ; Trace trace(BeChnl,"s_heartbeat",t,j) ; return s_tab[+t]->heartbeat(j ) ; } inline bool Backend::StartEntry::useful() const { for( Req r : reqs ) if (!r.zombie()) return true ; diff --git a/src/lmakeserver/backends/generic.hh b/src/lmakeserver/backends/generic.hh index 9e9a0cd6..cd84b1af 100644 --- a/src/lmakeserver/backends/generic.hh +++ b/src/lmakeserver/backends/generic.hh @@ -20,7 +20,7 @@ namespace Backends { // // share actual resources data as we typically have a lot of jobs with the same resources - template< class Data , ::unsigned_integral Cnt > struct Shared { + template< class Data , ::unsigned_integral RefCnt > struct Shared { friend ostream& operator<<( ostream& os , Shared const& s ) { /**/ os << "Shared" ; if (+s) return os << *s ; @@ -28,20 +28,19 @@ namespace Backends { } // static data private : - static ::umap _s_store ; // map rsrcs to count of pointers to it, always >0 (erased when reaching 0) + static ::umap _s_store ; // map rsrcs to refcount, always >0 (erased when reaching 0) // cxtors & casts public : Shared() = default ; // Shared(Shared const& r) : data{r.data} { if (data) _s_store.at(*data)++ ; } - Shared(Shared & r) : data{r.data} { if (data) _s_store.at(*data)++ ; } Shared(Shared && r) : data{r.data} { r.data = nullptr ; } // - template Shared(A&&... args) { - Data d{::forward(args)...} ; - auto it = _s_store.find(d) ; - if (it==_s_store.end()) it = _s_store.insert({d,1}).first ; - else it->second++ ; + template Shared( NewType , A&&... args) { + Data d { ::forward(args)... } ; + auto it = _s_store.find(d) ; + if (it==_s_store.end()) it = _s_store.insert({::move(d),1}).first ; // data is not known, create it + else it->second++ ; // data is known, share and increment refcount data = &it->first ; } // @@ -49,8 +48,8 @@ namespace Backends { if (!data) return ; auto it = _s_store.find(*data) ; SWEAR(it!=_s_store.end()) ; - if (it->second==1) _s_store.erase(it) ; - else it->second-- ; + if (it->second==1) _s_store.erase(it) ; // last pointer, destroy data + else it->second-- ; // data is shared, just decrement refcount } // Shared& operator=(Shared const& r) { this->~Shared() ; new(this) Shared( r ) ; return *this ; } @@ -65,13 +64,13 @@ namespace Backends { // data Data const* data = nullptr ; } ; - template< class Data , ::unsigned_integral Cnt > ::umap Shared::_s_store ; + template< class Data , ::unsigned_integral RefCnt > ::umap Shared::_s_store ; } namespace std { - template< class Data , ::unsigned_integral Cnt > struct hash> { - size_t operator()(Backends::Shared const& s) const { + template< class Data , ::unsigned_integral RefCnt > struct hash> { + size_t operator()(Backends::Shared const& s) const { return hash()(s.data) ; } } ; @@ -105,7 +104,7 @@ namespace Backends { WaitingEntry( RsrcsAsk const& rsa , SubmitAttrs const& sa , bool v ) : rsrcs_ask{rsa} , n_reqs{1} , submit_attrs{sa} , verbose{v} {} // data RsrcsAsk rsrcs_ask ; - ReqIdx n_reqs = 0 ; // number of reqs waiting for this job + ReqIdx n_reqs = 0 ; // number of reqs waiting for this job SubmitAttrs submit_attrs ; bool verbose = false ; } ; @@ -113,7 +112,7 @@ namespace Backends { struct SpawnedEntry { Rsrcs rsrcs ; SpawnId id = -1 ; - bool started = false ; // if true <=> start() has been called for this job, for assert only + bool started = false ; // if true <=> start() has been called for this job, for assert only bool verbose = false ; } ; @@ -141,14 +140,14 @@ namespace Backends { // data ::umap> waiting_queues ; ::umap waiting_jobs ; - ::uset queued_jobs ; // spawned jobs until start - JobIdx n_jobs = 0 ; // manage -j option (if >0 no more than n_jobs can be launched on behalf of this req) + ::uset queued_jobs ; // spawned jobs until start + JobIdx n_jobs = 0 ; // manage -j option (if >0 no more than n_jobs can be launched on behalf of this req) bool verbose = false ; } ; // specialization - virtual Bool3 call_launch_after_start() const { return No ; } // if Maybe, only launch jobs w/ same resources - virtual Bool3 call_launch_after_end () const { return No ; } // . + virtual Bool3 call_launch_after_start() const { return No ; } // if Maybe, only launch jobs w/ same resources + virtual Bool3 call_launch_after_end () const { return No ; } // . // virtual bool/*ok*/ fit_eventually( RsrcsDataAsk const& ) const { return true ; } // true if job with such resources can be spawned eventually virtual bool/*ok*/ fit_now ( RsrcsAsk const& ) const { return true ; } // true if job with such resources can be spawned now @@ -169,14 +168,14 @@ namespace Backends { } virtual void open_req( ReqIdx req , JobIdx n_jobs ) { Trace trace("open_req",req,n_jobs) ; - ::unique_lock lock { Req::s_reqs_mutex } ; // taking Req::s_reqs_mutex is compulsery to derefence req - bool inserted = reqs.insert({ req , {n_jobs,Req(req)->options.flags[ReqFlag::Verbose]} }).second ; + Lock lock { Req::s_reqs_mutex } ; // taking Req::s_reqs_mutex is compulsery to derefence req + bool inserted = reqs.insert({ req , {n_jobs,Req(req)->options.flags[ReqFlag::Verbose]} }).second ; SWEAR(inserted) ; } virtual void close_req(ReqIdx req) { auto it = reqs.find(req) ; Trace trace("close_req",req,STR(it==reqs.end())) ; - if (it==reqs.end()) return ; // req has been killed + if (it==reqs.end()) return ; // req has been killed ReqEntry const& re = it->second ; SWEAR(!re.waiting_jobs) ; SWEAR(!re.queued_jobs ) ; @@ -188,11 +187,11 @@ namespace Backends { } // do not launch immediately to have a better view of which job should be launched first virtual void submit( JobIdx job , ReqIdx req , SubmitAttrs const& submit_attrs , ::vmap_ss&& rsrcs ) { - RsrcsAsk rsa = import_(::move(rsrcs),req) ; // compile rsrcs + RsrcsAsk rsa { New , import_(::move(rsrcs),req) } ; // compile rsrcs if (!fit_eventually(*rsa)) throw to_string("not enough resources to launch job ",Job(job)->name()) ; ReqEntry& re = reqs.at(req) ; - SWEAR(!waiting_jobs .contains(job)) ; // job must be a new one - SWEAR(!re.waiting_jobs.contains(job)) ; // in particular for this req + SWEAR(!waiting_jobs .contains(job)) ; // job must be a new one + SWEAR(!re.waiting_jobs.contains(job)) ; // in particular for this req CoarseDelay pressure = submit_attrs.pressure ; Trace trace("submit",rsa,pressure) ; // @@ -204,36 +203,36 @@ namespace Backends { Trace trace("add_pressure",job,req,submit_attrs) ; ReqEntry& re = reqs.at(req) ; auto wit = waiting_jobs.find(job) ; - if (wit==waiting_jobs.end()) { // job is not waiting anymore, mostly ignore + if (wit==waiting_jobs.end()) { // job is not waiting anymore, mostly ignore auto sit = spawned_jobs.find(job) ; - if (sit==spawned_jobs.end()) { // job is already ended + if (sit==spawned_jobs.end()) { // job is already ended trace("ended") ; } else { - SpawnedEntry& se = sit->second ; // if not waiting, it must be spawned if add_pressure is called - if (re.verbose ) se.verbose = true ; // mark it verbose, though + SpawnedEntry& se = sit->second ; // if not waiting, it must be spawned if add_pressure is called + if (re.verbose ) se.verbose = true ; // mark it verbose, though trace("queued") ; } return ; } WaitingEntry& we = wit->second ; - SWEAR(!re.waiting_jobs.contains(job)) ; // job must be new for this req + SWEAR(!re.waiting_jobs.contains(job)) ; // job must be new for this req CoarseDelay pressure = submit_attrs.pressure ; trace("adjusted_pressure",pressure) ; // re.waiting_jobs[job] = pressure ; - re.waiting_queues[we.rsrcs_ask].insert({pressure,job}) ; // job must be known + re.waiting_queues[we.rsrcs_ask].insert({pressure,job}) ; // job must be known we.submit_attrs |= submit_attrs ; we.verbose |= re.verbose ; we.n_reqs++ ; } virtual void set_pressure( JobIdx job , ReqIdx req , SubmitAttrs const& submit_attrs ) { - ReqEntry& re = reqs.at(req) ; // req must be known to already know job + ReqEntry& re = reqs.at(req) ; // req must be known to already know job auto it = waiting_jobs.find(job) ; // - if (it==waiting_jobs.end()) return ; // job is not waiting anymore, ignore + if (it==waiting_jobs.end()) return ; // job is not waiting anymore, ignore WaitingEntry & we = it->second ; - CoarseDelay & old_pressure = re.waiting_jobs .at(job ) ; // job must be known - ::set& q = re.waiting_queues.at(we.rsrcs_ask) ; // including for this req + CoarseDelay & old_pressure = re.waiting_jobs .at(job ) ; // job must be known + ::set& q = re.waiting_queues.at(we.rsrcs_ask) ; // including for this req CoarseDelay pressure = submit_attrs.pressure ; Trace trace("set_pressure","pressure",pressure) ; we.submit_attrs |= submit_attrs ; @@ -244,28 +243,28 @@ namespace Backends { protected : virtual ::string start(JobIdx job) { auto it = spawned_jobs.find(job) ; - if (it==spawned_jobs.end()) return {} ; // job was killed in the mean time + if (it==spawned_jobs.end()) return {} ; // job was killed in the mean time SpawnedEntry& se = it->second ; // se.started = true ; for( auto& [r,re] : reqs ) re.queued_jobs.erase(job) ; ::string msg = start_job(job,se) ; - launch( call_launch_after_start() , se.rsrcs ) ; // not compulsery but improves reactivity + launch( call_launch_after_start() , se.rsrcs ) ; // not compulsery but improves reactivity return msg ; } virtual ::pair_s end( JobIdx j , Status s ) { auto it = spawned_jobs.find(j) ; - if (it==spawned_jobs.end()) return {{},false/*retry*/} ; // job was killed in the mean time + if (it==spawned_jobs.end()) return {{},false/*retry*/} ; // job was killed in the mean time SpawnedEntry& se = it->second ; SWEAR(se.started) ; ::pair_s digest = end_job(j,se,s) ; - Rsrcs rsrcs = se.rsrcs ; // copy resources before erasing job from spawned_jobs - spawned_jobs.erase(it) ; // erase before calling launch so job is freed w.r.t. n_jobs - launch( call_launch_after_end() , rsrcs ) ; // not compulsery but improves reactivity + Rsrcs rsrcs = se.rsrcs ; // copy resources before erasing job from spawned_jobs + spawned_jobs.erase(it) ; // erase before calling launch so job is freed w.r.t. n_jobs + launch( call_launch_after_end() , rsrcs ) ; // not compulsery but improves reactivity return digest ; } - virtual ::pair_s heartbeat(JobIdx j) { // called on jobs that did not start after at least newwork_delay time + virtual ::pair_s heartbeat(JobIdx j) { // called on jobs that did not start after at least newwork_delay time auto it = spawned_jobs.find(j) ; SWEAR(it!=spawned_jobs.end(),j) ; - SpawnedEntry& se = it->second ; SWEAR(!se.started ,j) ; // we should not be called on started jobs + SpawnedEntry& se = it->second ; SWEAR(!se.started ,j) ; // we should not be called on started jobs ::pair_s digest = heartbeat_queued_job(j,se) ; // if (digest.second!=HeartbeatState::Alive) { @@ -280,13 +279,13 @@ namespace Backends { ::vector res ; Trace trace("kill_req",T,req,reqs.size()) ; if ( !req || reqs.size()<=1 ) { - if (req) SWEAR( reqs.size()==1 && req==reqs.begin()->first ) ; // ensure the last req is the right one + if (req) SWEAR( reqs.size()==1 && req==reqs.begin()->first ) ; // ensure the last req is the right one // kill waiting jobs for( auto const& [j,_] : waiting_jobs ) res.push_back(j) ; waiting_jobs.clear() ; for( auto& [_,re] : reqs ) re.clear() ; } else { - auto rit = reqs.find(req) ; SWEAR(rit!=reqs.end()) ; // we should not kill a non-existent req + auto rit = reqs.find(req) ; SWEAR(rit!=reqs.end()) ; // we should not kill a non-existent req ReqEntry& re = rit->second ; // kill waiting jobs for( auto const& [j,_] : re.waiting_jobs ) { @@ -303,11 +302,11 @@ namespace Backends { Trace trace("kill_job",j) ; auto it = spawned_jobs.find(j) ; SWEAR(it!=spawned_jobs.end()) ; - SWEAR(!it->second.started) ; // if job is started, it is not our responsibility any more + SWEAR(!it->second.started) ; // if job is started, it is not our responsibility any more kill_queued_job(j,it->second) ; spawned_jobs.erase(it) ; } - virtual void launch( ) { launch(Yes,{}) ; } // using default arguments is not recognized to override virtual methods + virtual void launch( ) { launch(Yes,{}) ; } // using default arguments is not recognized to override virtual methods virtual void launch( Bool3 go , Rsrcs rsrcs ) { Trace trace("launch",T,go,rsrcs) ; RsrcsAsk rsrcs_ask ; @@ -315,29 +314,29 @@ namespace Backends { case No : return ; case Yes : break ; case Maybe : - if constexpr (::is_same_v) rsrcs_ask = rsrcs ; // only process jobs with same resources if possible - else FAIL("cannot convert resources") ; // if possible + if constexpr (::is_same_v) rsrcs_ask = rsrcs ; // only process jobs with same resources if possible + else FAIL("cannot convert resources") ; // if possible break ; DF} // ::vmap> err_jobs ; - for( auto [req,eta] : Req::s_etas() ) { // /!\ it is forbidden to dereference req without taking Req::s_reqs_mutex first + for( auto [req,eta] : Req::s_etas() ) { // /!\ it is forbidden to dereference req without taking Req::s_reqs_mutex first auto rit = reqs.find(+req) ; if (rit==reqs.end()) continue ; JobIdx n_jobs = rit->second.n_jobs ; ::umap>& queues = rit->second.waiting_queues ; for(;;) { - if ( n_jobs && spawned_jobs.size()>=n_jobs ) break ; // cannot have more than n_jobs running jobs because of this req, process next req + if ( n_jobs && spawned_jobs.size()>=n_jobs ) break ; // cannot have more than n_jobs running jobs because of this req, process next req auto candidate = queues.end() ; if (+rsrcs_ask) { - if (fit_now(rsrcs_ask)) candidate = queues.find(rsrcs_ask) ; // if we have resources, only consider jobs with same resources + if (fit_now(rsrcs_ask)) candidate = queues.find(rsrcs_ask) ; // if we have resources, only consider jobs with same resources } else { for( auto it=queues.begin() ; it!=queues.end() ; it++ ) { if ( candidate!=queues.end() && it->second.begin()->pressure<=candidate->second.begin()->pressure ) continue ; - if ( fit_now(it->first) ) candidate = it ; // continue to find a better candidate + if ( fit_now(it->first) ) candidate = it ; // continue to find a better candidate } } - if (candidate==queues.end()) break ; // nothing for this req, process next req + if (candidate==queues.end()) break ; // nothing for this req, process next req // ::set& pressure_set = candidate->second ; auto pressure_first = pressure_set.begin() ; SWEAR(pressure_first!=pressure_set.end(),candidate->first) ; // what is this candiate with no pressure ? @@ -345,7 +344,7 @@ namespace Backends { JobIdx j = pressure_first->job ; auto wit = waiting_jobs.find(j) ; RsrcsAsk const& rsrcs_ask = candidate->first ; - Rsrcs rsrcs = adapt(*rsrcs_ask) ; + Rsrcs rsrcs { New , adapt(*rsrcs_ask) } ; ::vmap_ss rsrcs_map = export_(*rsrcs) ; bool ok = true ; // @@ -372,13 +371,13 @@ namespace Backends { ::set& pes = wit2->second ; PressureEntry pe { wit1->second , j } ; // /!\ pressure is job pressure for r, not for req SWEAR(pes.contains(pe)) ; - if (pes.size()==1) re.waiting_queues.erase(wit2) ; // last entry for this rsrcs, erase the entire queue + if (pes.size()==1) re.waiting_queues.erase(wit2) ; // last entry for this rsrcs, erase the entire queue else pes .erase(pe ) ; } /**/ re.waiting_jobs.erase (wit1) ; if (ok) re.queued_jobs .insert(j ) ; } - if (pressure_set.size()==1) queues .erase(candidate ) ; // last entry for this rsrcs, erase the entire queue + if (pressure_set.size()==1) queues .erase(candidate ) ; // last entry for this rsrcs, erase the entire queue else pressure_set.erase(pressure_first) ; } } @@ -386,9 +385,9 @@ namespace Backends { } // data - ::umap reqs ; // all open Req's - ::umap waiting_jobs ; // jobs retained here - ::umap spawned_jobs ; // jobs spawned until end + ::umap reqs ; // all open Req's + ::umap waiting_jobs ; // jobs retained here + ::umap spawned_jobs ; // jobs spawned until end } ; diff --git a/src/lmakeserver/backends/slurm.cc b/src/lmakeserver/backends/slurm.cc index fe6b639e..7291532a 100644 --- a/src/lmakeserver/backends/slurm.cc +++ b/src/lmakeserver/backends/slurm.cc @@ -630,8 +630,9 @@ namespace Backends::Slurm { trace("spawn_error",err_str) ; throw "slurm spawn job error : " + err_str ; } + uint32_t res = msg->job_id ; SlurmApi::free_submit_response_response_msg(msg) ; - return msg->job_id ; + return res ; } Daemon slurm_sense_daemon() { diff --git a/src/lmakeserver/job.cc b/src/lmakeserver/job.cc index a6583752..6e2fb37d 100644 --- a/src/lmakeserver/job.cc +++ b/src/lmakeserver/job.cc @@ -82,7 +82,7 @@ namespace Engine { // mark target dirs to protect from deletion by other jobs // this must be perfectly predictible as this mark is undone in end_exec below if (mark_target_dirs) { - ::unique_lock lock{_s_target_dirs_mutex} ; + Lock lock{_s_target_dirs_mutex} ; for( Node d : to_mkdirs ) { trace("protect_dir" ,d) ; _s_target_dirs [d]++ ; } for( Node d : to_mkdir_uphills ) { trace("protect_hier_dir",d) ; _s_hier_target_dirs[d]++ ; } } @@ -103,7 +103,7 @@ namespace Engine { if (it->second==1) dirs.erase(it) ; else it->second-- ; } ; - ::unique_lock lock(_s_target_dirs_mutex) ; + Lock lock(_s_target_dirs_mutex) ; for( Node d : dirs ) { trace("unprotect_dir" ,d) ; dec(_s_target_dirs ,d) ; } for( Node d : dir_uphills ) { trace("unprotect_hier_dir",d) ; dec(_s_hier_target_dirs,d) ; } } @@ -125,7 +125,7 @@ namespace Engine { // ::ostream& operator<<( ::ostream& os , JobReqInfo const& ri ) { - return os<<"JRI(" << ri.req <<','<< (ri.full?"full":"makable") <<','<< ri.speculate <<','<< ri.step()<<':'< JobData::_s_target_dirs ; - ::umap JobData::_s_hier_target_dirs ; + Mutex JobData::_s_target_dirs_mutex ; + ::umap JobData::_s_target_dirs ; + ::umap JobData::_s_hier_target_dirs ; void JobData::_reset_targets(Rule::SimpleMatch const& match) { ::vector ts ; ts.reserve(rule->n_static_targets) ; @@ -837,15 +837,13 @@ namespace Engine { if (is_special()) { if (!_submit_special(ri )) goto Done ; } // if no new deps, we cannot be waiting and we are done else { if (!_submit_plain (ri,ri.reason(),dep_pressure)) goto Done ; } // . // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - if (!ri.waiting()) { - SWEAR(!ri.running()) ; - make_action = MakeAction::End ; // restart analysis as if called by end() as in case of flash execution, submit has called end() - if (is_ok(status)!=Maybe) ri.reasons = { {} , {} } ; // . - else ri.reasons = { mk_reason(status) , {} } ; // . - trace("restart_analysis",ri) ; - goto RestartFullAnalysis ; // BACKWARD - } - goto Wait ; + if (ri.waiting()) goto Wait ; + SWEAR(!ri.running()) ; + make_action = MakeAction::End ; // restart analysis as if called by end() as in case of flash execution, submit has called end() + if (is_ok(status)!=Maybe) ri.reasons = { {} , {} } ; // . + else ri.reasons = { mk_reason(status) , {} } ; // . + trace("restart_analysis",ri) ; + goto RestartFullAnalysis ; // BACKWARD Done : SWEAR( !ri.running() && !ri.waiting() , idx() , ri ) ; ri.step(Step::Done) ; @@ -883,7 +881,10 @@ namespace Engine { if ( submit_loop ) status = Status::Err ; if ( ri.done() && wakeup_watchers ) ri.wakeup_watchers() ; // ^^^^^^^^^^^^^^^^^^^^ + goto Return ; Wait : + trace("wait",ri) ; + Return : if ( stop_speculate ) _propag_speculate(ri) ; if ( !rule->is_special() && ri.step()!=prev_step ) { bool remove_old = prev_step>=Step::MinCurStats && prev_step _s_target_dirs_mutex ; static ::umap _s_target_dirs ; // dirs created for job execution that must not be deleted static ::umap _s_hier_target_dirs ; // uphill hierarchy of _s_target_dirs // cxtors & casts diff --git a/src/lmakeserver/makefiles.cc b/src/lmakeserver/makefiles.cc index a1ad90ba..bf897d7c 100644 --- a/src/lmakeserver/makefiles.cc +++ b/src/lmakeserver/makefiles.cc @@ -295,6 +295,7 @@ namespace Engine::Makefiles { if ( old.rules_module!=new_.rules_module ) new_rules = !old.rules_module ? Reason::Set : !new_.rules_module ? Reason::Cleared : Reason::Modified ; } ; try { + Gil::Anti no_gil { gil } ; // release gil as new_config needs Backend which is of lower priority // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv Persistent::new_config( ::move(config) , dynamic , rescue , diff_config ) ; // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ @@ -310,7 +311,9 @@ namespace Engine::Makefiles { ::pair_s srcs_digest = _refresh_srcs( srcs , src_dirs_s , srcs_deps , new_srcs , py_info , startup_dir_s , nfs_guard ) ; bool invalidate_src = srcs_digest.second ; if (invalidate_src) { - try { //! vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv + try { + Gil::Anti no_gil { gil } ; + // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv invalidate_src &= Persistent::new_srcs( ::move(srcs) , ::move(src_dirs_s) , dynamic ) ; } catch (::string const& e) { //! ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // if srcs_digest is empty, sources were in config @@ -322,7 +325,9 @@ namespace Engine::Makefiles { ::pair_s rules_digest = _refresh_rules( rules , rules_deps , new_rules , py_info , startup_dir_s , nfs_guard ) ; bool invalidate_rule = rules_digest.second ; if (invalidate_rule) { - try { //! vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv + try { + Gil::Anti no_gil { gil } ; // release gil as new_rules acquires it when needed + // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv invalidate_rule &= Persistent::new_rules( ::move(rules) , dynamic ) ; } catch (::string const& e) { //! ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ // if rules_digest is empty, rules were in config diff --git a/src/lmakeserver/node.cc b/src/lmakeserver/node.cc index 25998f03..2354978b 100644 --- a/src/lmakeserver/node.cc +++ b/src/lmakeserver/node.cc @@ -791,7 +791,7 @@ namespace Engine { if (it.i_chunk!=0) { cur_dep->sz = it.i_chunk ; _fill_hole(*cur_dep) ; - cur_dep = &cur_dep->next() ; + cur_dep = cur_dep->next() ; } // create new tail ::vector ds ; diff --git a/src/lmakeserver/node.x.hh b/src/lmakeserver/node.x.hh index ef8cd179..f8560c53 100644 --- a/src/lmakeserver/node.x.hh +++ b/src/lmakeserver/node.x.hh @@ -155,8 +155,8 @@ namespace Engine { // cxtors & casts using Base::Base ; // accesses - Dep const& next() const { return *(this+1+div_up(sz,NodesPerDep)) ; } - Dep & next() { return *(this+1+div_up(sz,NodesPerDep)) ; } + Dep const* next() const { return this+1+div_up(sz,NodesPerDep) ; } + Dep * next() { return this+1+div_up(sz,NodesPerDep) ; } ::string accesses_str() const ; ::string dflags_str () const ; // services @@ -193,6 +193,9 @@ namespace Engine { // services Dep const* operator->() const { return &**this ; } Dep const& operator* () const { + // Node's in chunk are semanticly located before header so : + // - if i_chunk< hdr->sz : refer to dep with no crc, flags nor parallel + // - if i_chunk==hdr->sz : refer to header if (i_chunk==hdr->sz) return *hdr ; static_cast(tmpl) = static_cast(hdr+1)[i_chunk] ; tmpl.accesses = hdr->chunk_accesses ; @@ -200,19 +203,14 @@ namespace Engine { } DepsIter& operator++(int) { return ++*this ; } DepsIter& operator++( ) { - if (i_chunk==hdr->sz) { // go to next chunk - /**/ i_chunk = 0 ; // Node's in chunk are semanticly located before header - /**/ hdr = &hdr->next() ; - if (hdr->sz) tmpl = { hdr->chunk_accesses , Crc::None , {} } ; // prepare tmpl when first accessing it (assumes sequential access) - } else { // go to next item in chunk - i_chunk++ ; - } + if (i_chunksz) i_chunk++ ; // go to next item in chunk + else { i_chunk = 0 ; hdr = hdr->next() ; } // go to next chunk } return *this ; } // data - Dep const* hdr = nullptr ; // pointer to current chunk header - uint8_t i_chunk = 0 ; // current index in chunk - mutable Dep tmpl = {{},Crc::None} ; // template to store uncompressed Dep's + Dep const* hdr = nullptr ; // pointer to current chunk header + uint8_t i_chunk = 0 ; // current index in chunk + mutable Dep tmpl = {{}/*accesses*/,Crc::None} ; // template to store uncompressed Dep's } ; struct Deps : DepsBase { diff --git a/src/lmakeserver/req.cc b/src/lmakeserver/req.cc index ce201efe..052d38ef 100644 --- a/src/lmakeserver/req.cc +++ b/src/lmakeserver/req.cc @@ -18,7 +18,7 @@ namespace Engine { SmallIds Req::s_small_ids ; ::vector Req::s_reqs_by_start ; - ::mutex Req::s_reqs_mutex ; + Mutex Req::s_reqs_mutex ; ::vector Req::s_store(1) ; ::vector Req::_s_reqs_by_eta ; ::array,1<<(sizeof(ReqIdx)*8-3)> Req::_s_zombie_tab = {} ; @@ -28,9 +28,7 @@ namespace Engine { } void Req::make(EngineClosureReq const& ecr) { - { ::unique_lock lock{s_reqs_mutex} ; // grow may reallocate - grow(s_store,+*this) ; - } + { Lock lock{s_reqs_mutex} ; grow(s_store,+*this) ; } ReqData& data = **this ; // for( int i=0 ;; i++ ) { // try increasing resolution in file name until no conflict @@ -102,7 +100,7 @@ namespace Engine { s_reqs_by_start[i]->idx_by_start = i ; } s_reqs_by_start.pop_back() ; - { ::unique_lock lock{s_reqs_mutex} ; + { Lock lock{s_reqs_mutex} ; for( Idx i=(*this)->idx_by_eta ; iidx_by_eta = i ; @@ -141,8 +139,8 @@ namespace Engine { Trace trace("_adjust_eta",now,(*this)->ete) ; // reorder _s_reqs_by_eta and adjust idx_by_eta to reflect new order bool changed = false ; - { ::unique_lock lock { s_reqs_mutex } ; - Idx idx_by_eta = (*this)->idx_by_eta ; + { Lock lock { s_reqs_mutex } ; + Idx idx_by_eta = (*this)->idx_by_eta ; (*this)->eta = now + (*this)->ete ; if (push_self) _s_reqs_by_eta.push_back(*this) ; while ( idx_by_eta>0 && _s_reqs_by_eta[idx_by_eta-1]->eta>(*this)->eta ) { // if eta is decreased @@ -356,7 +354,7 @@ namespace Engine { // ReqData // - ::mutex ReqData::_s_audit_mutex ; + Mutex ReqData::_s_audit_mutex ; void ReqData::clear() { Trace trace("clear",job) ; @@ -383,7 +381,7 @@ namespace Engine { case JobReport::Done : c = Color::Ok ; break ; default : ; } - audit_info( c , to_string(::setw(9),snake(jr)," jobs : ",stats.ended(jr)) ) ; // 9 is for "completed" + audit_info( c , to_string(::setw(9),snake(jr)," jobs : ",stats.ended(jr)) ) ; // 9 is for "completed" } /**/ audit_info( Color::Note , to_string( "useful time : " , stats.jobs_time[true /*useful*/].short_str() ) ) ; if (+stats.jobs_time[false/*useful*/]) audit_info( Color::Note , to_string( "rerun time : " , stats.jobs_time[false/*useful*/].short_str() ) ) ; @@ -405,7 +403,7 @@ namespace Engine { if (+long_names) { ::vmap long_names_ = mk_vmap(long_names) ; size_t pm = 0 ; - ::sort( long_names_ , []( ::pair const& a , ::pair b ) { return a.second const& a , ::pair b ) { return a.secondname().size() ) ; for( auto [n,_] : ::c_vector_view(long_names_,0,g_config.n_errs(long_names_.size())) ) audit_node( Color::Warning , "name too long" , n ) ; if ( g_config.errs_overflow(long_names_.size()) ) audit_info( Color::Warning , "..." ) ; @@ -414,24 +412,24 @@ namespace Engine { } if (+frozen_jobs) { ::vmap frozen_jobs_ = mk_vmap(frozen_jobs) ; - ::sort( frozen_jobs_ , []( ::pair const& a , ::pair b ) { return a.second const& a , ::pair b ) { return a.secondrule->name.size() ) ; for( auto [j,_] : frozen_jobs_ ) audit_info( j->err()?Color::Err:Color::Warning , to_string("frozen ",::setw(w),j->rule->name) , j->name() ) ; } if (+frozen_nodes) { ::vmap frozen_nodes_ = mk_vmap(frozen_nodes) ; - ::sort( frozen_nodes_ , []( ::pair const& a , ::pair b ) { return a.second const& a , ::pair b ) { return a.second no_triggers_ = mk_vmap(no_triggers) ; - ::sort( no_triggers_ , []( ::pair const& a , ::pair b ) { return a.second const& a , ::pair b ) { return a.second clash_nodes_ = mk_vmap(clash_nodes) ; - ::sort( clash_nodes_ , []( ::pair const& a , ::pair b ) { return a.second const& a , ::pair b ) { return a.secondrule->special!=Special::Req) { diff --git a/src/lmakeserver/req.x.hh b/src/lmakeserver/req.x.hh index ceccf448..ecc1adb9 100644 --- a/src/lmakeserver/req.x.hh +++ b/src/lmakeserver/req.x.hh @@ -50,14 +50,14 @@ namespace Engine { return res ; } // - static Idx s_n_reqs () { return s_reqs_by_start.size() ; } - static ::vector s_reqs_by_eta() { ::unique_lock lock{s_reqs_mutex} ; return _s_reqs_by_eta ; } + static Idx s_n_reqs () { return s_reqs_by_start.size() ; } + static ::vector s_reqs_by_eta() { Lock lock{s_reqs_mutex} ; return _s_reqs_by_eta ; } static ::vmap s_etas () ; // static data static SmallIds s_small_ids ; static ::vector s_reqs_by_start ; // INVARIANT : ordered by item->start // - static ::mutex s_reqs_mutex ; // protects s_store, _s_reqs_by_eta as a whole + static Mutex s_reqs_mutex ; // protects s_store, _s_reqs_by_eta as a whole static ::vector s_store ; private : static ::vector _s_reqs_by_eta ; // INVARIANT : ordered by item->stats.eta @@ -223,7 +223,7 @@ namespace Engine { static constexpr size_t StepSz = 14 ; // size of the field representing step in output // static data private : - static ::mutex _s_audit_mutex ; // should not be static, but if per ReqData, this would prevent ReqData from being movable + static Mutex _s_audit_mutex ; // should not be static, but if per ReqData, this would prevent ReqData from being movable // cxtors & casts public : void clear() ; @@ -298,8 +298,8 @@ namespace Engine { inline ReqData const& Req::operator*() const { return s_store[+*this] ; } inline ReqData & Req::operator*() { return s_store[+*this] ; } inline ::vmap Req::s_etas() { - ::unique_lock lock{s_reqs_mutex} ; - ::vmap res ; + Lock lock { s_reqs_mutex } ; + ::vmap res ; for ( Req r : _s_reqs_by_eta ) res.emplace_back(r,r->eta) ; return res ; } diff --git a/src/lmakeserver/rule.cc b/src/lmakeserver/rule.cc index 994070e3..c2fd5ebc 100644 --- a/src/lmakeserver/rule.cc +++ b/src/lmakeserver/rule.cc @@ -239,7 +239,7 @@ namespace Engine { , lmake_dir_var_name{ py_src.size()>4 ? ::string(py_src[4].as_a()) : ""s } , dbg_info { py_src.size()>5 ? ::string(py_src[5].as_a()) : ""s } { - Py::Gil gil ; + Py::Gil::s_swear_locked() ; if (py_src.size()<=1) return ; ctx.reserve(py_src[1].as_a().size()) ; for( Py::Object const& py_item : py_src[1].as_a() ) { @@ -640,8 +640,8 @@ namespace Engine { } void RuleData::_acquire_py(Py::Dict const& dct) { static ::string root_dir_s = *g_root_dir+'/' ; - Gil gil ; ::string field ; + Gil::s_swear_locked() ; try { // // acquire essential (necessary for Anti & GenericSrc) diff --git a/src/lmakeserver/rule.x.hh b/src/lmakeserver/rule.x.hh index b3f4a83e..eae18b52 100644 --- a/src/lmakeserver/rule.x.hh +++ b/src/lmakeserver/rule.x.hh @@ -466,10 +466,10 @@ namespace Engine { } // data private : - mutable ::mutex _glbs_mutex ; // ensure glbs is not used for several jobs simultaneously + mutable Mutex _glbs_mutex ; // ensure glbs is not used for several jobs simultaneously public : - Py::Ptr mutable glbs ; // if is_dynamic <=> dict to use as globals when executing code, modified then restored during evaluation - Py::Ptr code ; // if is_dynamic <=> python code object to execute with stems as locals and glbs as globals leading to a dict that can be used to build data + Py::Ptr mutable glbs ; // if is_dynamic <=> dict to use as globals when executing code, modified then restored during evaluation + Py::Ptr code ; // if is_dynamic <=> python code object to execute with stems as locals and glbs as globals leading to a dict that can be used to build data } ; struct DynamicDepsAttrs : Dynamic { @@ -806,7 +806,7 @@ namespace Engine { template void Dynamic::compile() { if (!is_dynamic) return ; - Py::Gil gil ; + Py::Gil::s_swear_locked() ; try { code = code_str ; code->boost() ; } catch (::string const& e) { throw to_string("cannot compile code :\n" ,indent(e,1)) ; } try { glbs = Py::py_run(append_dbg_info(glbs_str)) ; glbs->boost() ; } catch (::string const& e) { throw to_string("cannot compile context :\n",indent(e,1)) ; } } @@ -920,7 +920,10 @@ namespace Engine { ::serdes(s,exec_time ) ; ::serdes(s,stats_weight ) ; } - if (is_base_of_v<::istream,S>) _compile() ; + if (is_base_of_v<::istream,S>) { + Py::Gil gil ; + _compile() ; + } } // END_OF_VERSIONING diff --git a/src/lmakeserver/store.x.hh b/src/lmakeserver/store.x.hh index f4872a94..989b8e65 100644 --- a/src/lmakeserver/store.x.hh +++ b/src/lmakeserver/store.x.hh @@ -455,13 +455,13 @@ namespace Engine::Persistent { return ; } // restart with lock - static ::mutex s_m ; // nodes can be created from several threads, ensure coherence between names and nodes - ::unique_lock lock = locked? (SWEAR(!s_m.try_lock()),::unique_lock()) : ::unique_lock(s_m) ; + static Mutex s_m ; // nodes can be created from several threads, ensure coherence between names and nodes + Lock lock = locked ? (s_m.swear_locked(),Lock>()) : Lock(s_m) ; *this = _name_file.c_at(name_).node() ; if (+*this) { SWEAR( name_==(*this)->_full_name , *this , name_ , (*this)->_full_name , _name_file.c_at((*this)->_full_name).node() ) ; } else { - _name_file.at(name_) = *this = _node_file.emplace(name_,no_dir,true/*locked*/) ; // if dir must be created, we already hold the lock + _name_file.at(name_) = *this = _node_file.emplace(name_,no_dir,true/*locked*/) ; // if dir must be created, we already hold the lock } } inline NodeBase::NodeBase( ::string const& n , bool no_dir , bool locked ) { diff --git a/src/py.cc b/src/py.cc index 7b0f108f..0528ff87 100644 --- a/src/py.cc +++ b/src/py.cc @@ -14,7 +14,7 @@ namespace Py { - ::recursive_mutex Gil::_s_mutex ; + Mutex Gil::_s_mutex ; // // functions diff --git a/src/py.hh b/src/py.hh index 8a5d56bf..6657b19a 100644 --- a/src/py.hh +++ b/src/py.hh @@ -17,17 +17,20 @@ ENUM( Exception ) namespace Py { - - struct Gil { + struct Gil : Lock> { + friend class AntiGil ; + using Base = Lock> ; + // statics + static void s_swear_locked() { _s_mutex.swear_locked() ; } + // static data private : - static ::recursive_mutex _s_mutex ; + static Mutex _s_mutex ; // cxtors & casts public : - Gil () : lock{_s_mutex} { trace("acquired") ; } + Gil () : Base{_s_mutex} { trace("acquired") ; } ~Gil() { trace("released") ; } // data - Trace trace { "Gil" } ; - ::unique_lock lock ; + Trace trace { "Gil" } ; } ; struct Object ; diff --git a/src/rpc_job.cc b/src/rpc_job.cc index dfe14785..fb919383 100644 --- a/src/rpc_job.cc +++ b/src/rpc_job.cc @@ -21,7 +21,7 @@ ::ostream& operator<<( ::ostream& os , FileAction const& fa ) { return os <<')' ; } -::pair_s do_file_actions( ::vector_s* unlnks , ::vmap_s&& pre_actions , NfsGuard& nfs_guard , Algo ha ) { +::pair_s do_file_actions( ::vector_s* unlnks/*out*/ , ::vmap_s&& pre_actions , NfsGuard& nfs_guard , Algo ha ) { ::uset_s keep_dirs ; ::string msg ; bool ok = true ; diff --git a/src/store/file.hh b/src/store/file.hh index ec2b2518..9a26aaff 100644 --- a/src/store/file.hh +++ b/src/store/file.hh @@ -5,7 +5,7 @@ #pragma once -#include // mmap, mremap, munmap +#include // mmap, mremap, munmap #include "disk.hh" #include "trace.hh" @@ -18,10 +18,10 @@ namespace Store { - extern size_t g_page ; // cannot initialize directly as this may occur after first call to cxtor + extern size_t g_page ; // cannot initialize directly as this may occur after first call to cxtor - template using UniqueLock = ::conditional_t,NoLock<::shared_mutex>> ; - template using SharedLock = ::conditional_t,NoLock<::shared_mutex>> ; + template using UniqueLock = ::conditional_t>,NoLock>> ; + template using SharedLock = ::conditional_t>,NoLock>> ; template struct File { using ULock = UniqueLock ; @@ -45,18 +45,12 @@ namespace Store { bool operator+() const { return size ; } bool operator!() const { return !+*this ; } // services - void lock () { _mutex.lock () ; } // behave as a mutex, so file can be used in ::unique_lock et al. - bool try_lock () { return _mutex.try_lock () ; } // but if AutoLock, then there is no reason to access it from outside - void unlock () { _mutex.unlock () ; } // . - void lock_shared () const { _mutex.lock_shared () ; } // . - bool try_lock_shared() const { return _mutex.try_lock_shared() ; } // . - void unlock_shared () const { _mutex.unlock_shared () ; } // . void expand(size_t sz) { - if (sz<=size) return ; // fast path + if (sz<=size) return ; // fast path ULock lock{_mutex} ; - if ( AutoLock && sz<=size ) return ; // redo size check, now that we have the lock + if ( AutoLock && sz<=size ) return ; // redo size check, now that we have the lock size_t old_size = size ; - _resize_file(::max( sz , size + (size>>2) )) ; // ensure remaps are in log(n) + _resize_file(::max( sz , size + (size>>2) )) ; // ensure remaps are in log(n) _map(old_size) ; } void clear(size_t sz=0) { @@ -90,12 +84,12 @@ namespace Store { // data public : ::string name ; - char* base = nullptr ; // address of mapped file - ::atomic size = 0 ; // underlying file size (fake if no file) - size_t capacity = 0 ; // max size that can ever be allocated + char* base = nullptr ; // address of mapped file + ::atomic size = 0 ; // underlying file size (fake if no file) + size_t capacity = 0 ; // max size that can ever be allocated bool writable = false ; protected : - mutable ::shared_mutex _mutex ; + SharedMutex mutable _mutex ; private : AutoCloseFd _fd ; } ; @@ -125,7 +119,7 @@ namespace Store { if (writable) { open_flags |= O_RDWR | O_CREAT ; Disk::dir_guard(name) ; } else open_flags |= O_RDONLY ; // vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv - _fd = ::open( name.c_str() , open_flags , 0644 ) ; // mode is only used if created, which implies writable + _fd = ::open( name.c_str() , open_flags , 0644 ) ; // mode is only used if created, which implies writable // ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ SWEAR_PROD(+_fd) ; Disk::FileInfo fi{_fd} ; @@ -156,11 +150,11 @@ namespace Store { sz = round_up(sz,g_page) ; if (+_fd) { // vvvvvvvvvvvvvvvvvvvvv - int rc = ::ftruncate( _fd , sz ) ; // may increase file size + int rc = ::ftruncate( _fd , sz ) ; // may increase file size // ^^^^^^^^^^^^^^^^^^^^^ if (rc!=0) FAIL_PROD(rc,strerror(errno)) ; } - fence() ; // update state when it is legal to do so + fence() ; // update state when it is legal to do so size = sz ; } diff --git a/src/store/store_utils.hh b/src/store/store_utils.hh index d848e4ba..daf1b6ca 100644 --- a/src/store/store_utils.hh +++ b/src/store/store_utils.hh @@ -22,8 +22,8 @@ namespace Store { template concept IsIdx = requires(I i) { { +I{size_t(0)} }->::convertible_to ; } ; template using IntIdx = ::make_unsigned_t< ::conditional_t< ::is_integral_v , I , decltype(+I{}) > > ; - template struct NoLock { - NoLock(Mutex&) {} + template struct NoLock { + NoLock(M&) {} } ; } diff --git a/src/thread.hh b/src/thread.hh index aa3bdd31..b47dbd3f 100644 --- a/src/thread.hh +++ b/src/thread.hh @@ -15,6 +15,7 @@ #include "serialize.hh" template struct ThreadQueue : private ::deque { + using ThreadMutex = Mutex ; private : using Base = ::deque ; public : @@ -22,27 +23,27 @@ public : // cxtors & casts ThreadQueue() = default ; bool operator+() const { - ::unique_lock lock{_mutex} ; + Lock lock{_mutex} ; return !Base::empty() ; } bool operator!() const { return !+*this ; } // services - /**/ void push (T const& x) { ::unique_lock lock{_mutex} ; Base::push_back (x ) ; _cond.notify_one() ; } - /**/ void push_urgent (T const& x) { ::unique_lock lock{_mutex} ; Base::push_front (x ) ; _cond.notify_one() ; } - template void emplace (A&&... a) { ::unique_lock lock{_mutex} ; Base::emplace_back (::forward(a)...) ; _cond.notify_one() ; } - template void emplace_urgent(A&&... a) { ::unique_lock lock{_mutex} ; Base::emplace_front(::forward(a)...) ; _cond.notify_one() ; } + /**/ void push (T const& x) { Lock lock{_mutex} ; Base::push_back (x ) ; _cond.notify_one() ; } + /**/ void push_urgent (T const& x) { Lock lock{_mutex} ; Base::push_front (x ) ; _cond.notify_one() ; } + template void emplace (A&&... a) { Lock lock{_mutex} ; Base::emplace_back (::forward(a)...) ; _cond.notify_one() ; } + template void emplace_urgent(A&&... a) { Lock lock{_mutex} ; Base::emplace_front(::forward(a)...) ; _cond.notify_one() ; } T pop() { - ::unique_lock lock{_mutex} ; + Lock lock { _mutex } ; _cond.wait( lock , [&](){ return !Base::empty() ; } ) ; return _pop() ; } ::pair try_pop() { - ::unique_lock lock{_mutex} ; + Lock lock { _mutex } ; if (Base::empty()) return {false/*popped*/,T() } ; else return {true /*popped*/,_pop()} ; } ::pair pop(::stop_token tkn) { - ::unique_lock lock{_mutex} ; + Lock lock { _mutex } ; if (!_cond.wait( lock , tkn , [&](){ return !Base::empty() ; } )) return {false/*popped*/,T()} ; return {true/*popped*/,_pop()} ; } @@ -53,7 +54,7 @@ private : return res ; } // data - ::mutex mutable _mutex ; + ThreadMutex mutable _mutex ; ::condition_variable_any _cond ; } ; diff --git a/src/time.hh b/src/time.hh index eba87317..eb077f7e 100644 --- a/src/time.hh +++ b/src/time.hh @@ -295,8 +295,8 @@ namespace Time { } inline bool/*slept*/ Delay::_s_sleep( ::stop_token tkn , Delay sleep , Pdate until ) { if (sleep<=Delay()) return !tkn.stop_requested() ; - ::mutex m ; - ::unique_lock lck { m } ; + Mutex m ; + Lock> lck { m } ; ::condition_variable_any cv ; bool res = cv.wait_for( lck , tkn , ::chrono::nanoseconds(sleep.nsec()) , [until](){ return Pdate(New)>=until ; } ) ; return res ; diff --git a/src/trace.cc b/src/trace.cc index 0bdf0fd2..c41adfb0 100644 --- a/src/trace.cc +++ b/src/trace.cc @@ -19,10 +19,10 @@ Channels Trace::s_channels = DfltChannels ; // by default, trace de #ifndef NO_TRACE - size_t Trace::_s_pos = 0 ; - bool Trace::_s_ping = false ; - Fd Trace::_s_fd ; - ::mutex Trace::_s_mutex ; + size_t Trace::_s_pos = 0 ; + bool Trace::_s_ping = false ; + Fd Trace::_s_fd ; + Mutex Trace::_s_mutex ; thread_local int Trace::_t_lvl = 0 ; thread_local bool Trace::_t_hide = false ; @@ -39,7 +39,7 @@ Channels Trace::s_channels = DfltChannels ; // by default, trace de void Trace::s_new_trace_file(::string const& trace_file) { if (trace_file==*g_trace_file) return ; // - ::unique_lock lock{_s_mutex} ; + Lock lock{_s_mutex} ; // _s_fd.close() ; *g_trace_file = trace_file ; diff --git a/src/trace.hh b/src/trace.hh index fbcb32df..9956b82a 100644 --- a/src/trace.hh +++ b/src/trace.hh @@ -60,10 +60,10 @@ static constexpr Channels DfltChannels = ~Channels() ; static ::atomic s_sz ; // max overall size of trace, beyond, trace wraps static Channels s_channels ; private : - static size_t _s_pos ; // current line number - static bool _s_ping ; // ping-pong to distinguish where trace stops in the middle of a trace - static Fd _s_fd ; - static ::mutex _s_mutex ; + static size_t _s_pos ; // current line number + static bool _s_ping ; // ping-pong to distinguish where trace stops in the middle of a trace + static Fd _s_fd ; + static Mutex _s_mutex ; // static thread_local int _t_lvl ; static thread_local bool _t_hide ; // if true <=> do not generate trace @@ -117,8 +117,8 @@ static constexpr Channels DfltChannels = ~Channels() ; ::string buf_view = _t_buf->str () ; #endif // - { ::unique_lock lock { _s_mutex } ; - size_t new_pos = _s_pos+buf_view.size() ; + { Lock lock { _s_mutex } ; + size_t new_pos = _s_pos+buf_view.size() ; if (new_pos>s_sz) { if (_s_pos struct Mutex : ::conditional_t<::is_void_v,::conditional_t,M> { + using Base = ::conditional_t<::is_void_v,::conditional_t,M> ; + static constexpr MutexLvl Lvl = Lvl_ ; + void lock (MutexLvl& lvl) { SWEAR(t_mutex_lvl< Lvl,t_mutex_lvl) ; lvl = t_mutex_lvl ; t_mutex_lvl = Lvl ; Base::lock () ; } + void unlock (MutexLvl lvl) { SWEAR(t_mutex_lvl==Lvl,t_mutex_lvl) ; t_mutex_lvl = lvl ; Base::unlock () ; } + void lock_shared (MutexLvl& lvl) requires(S) { SWEAR(t_mutex_lvl< Lvl,t_mutex_lvl) ; lvl = t_mutex_lvl ; t_mutex_lvl = Lvl ; Base::lock_shared () ; } + void unlock_shared(MutexLvl lvl) requires(S) { SWEAR(t_mutex_lvl==Lvl,t_mutex_lvl) ; t_mutex_lvl = lvl ; Base::unlock_shared() ; } + // services + #ifndef NDEBUG + void swear_locked () { SWEAR(t_mutex_lvl>=Lvl) ; SWEAR(!Base::try_lock ()) ; } + void swear_locked_shared() requires(S) { SWEAR(t_mutex_lvl>=Lvl) ; SWEAR(!Base::try_lock_shared()) ; } + #else + void swear_locked () {} + void swear_locked_shared() requires(S) {} + #endif +} ; +template using SharedMutex = Mutex ; + +template struct Lock { + struct Anti { + Anti (Lock& l) : _lock{l} { _lock.unlock() ; } + ~Anti( ) { _lock.lock () ; } + Lock& _lock ; + } ; + // cxtors & casts + Lock ( ) = default ; + Lock (Lock&& l) { *this = ::move(l) ; } + Lock (M& m ) : _mutex{&m} { lock () ; } + ~Lock( ) { if (_locked) unlock() ; } + Lock& operator=(Lock&& l) { + if (_locked) unlock() ; + _mutex = l._mutex ; + _lvl = l._lvl ; + _locked = l._locked ; + l._locked = false ; + return *this ; + } + // services + Anti anti () { return Anti(*this) ; } + void swear_locked() requires(!S) { _mutex->swear_locked () ; } + void swear_locked() requires( S) { _mutex->swear_locked_shared() ; } + void lock () requires(!S) { SWEAR(!_locked) ; _locked = true ; _mutex->lock (_lvl) ; } + void lock () requires( S) { SWEAR(!_locked) ; _locked = true ; _mutex->lock_shared (_lvl) ; } + void unlock () requires(!S) { SWEAR( _locked) ; _locked = false ; _mutex->unlock (_lvl) ; } + void unlock () requires( S) { SWEAR( _locked) ; _locked = false ; _mutex->unlock_shared(_lvl) ; } + // data + M* _mutex = nullptr ; // must be !=nullptr when _locked + MutexLvl _lvl = MutexLvl::None/*garbage*/ ; // valid when _locked + bool _locked = false ; +} ; +template using SharedLock = Lock ; + // // miscellaneous // @@ -922,11 +1010,13 @@ template<::unsigned_integral T,bool ThreadSafe=false> struct SmallIds { struct NoLock { NoLock(NoMutex) {} } ; - using Mutex = ::conditional_t ; - using Lock = ::conditional_t,NoLock > ; +private : + using _Mutex = ::conditional_t< ThreadSafe , Mutex , NoMutex > ; + using _Lock = ::conditional_t< ThreadSafe , Lock<_Mutex> , NoLock > ; +public : T acquire() { - T res ; - Lock lock { _mutex } ; + T res ; + _Lock lock { _mutex } ; if (!free_ids) { res = n_allocated ; if (n_allocated==::numeric_limits::max()) throw "cannot allocate id"s ; @@ -940,7 +1030,7 @@ template<::unsigned_integral T,bool ThreadSafe=false> struct SmallIds { } void release(T id) { if (!id) return ; // id 0 has not been acquired - Lock lock { _mutex } ; + _Lock lock { _mutex } ; SWEAR(!free_ids.contains(id)) ; // else, double release free_ids.insert(id) ; } @@ -951,7 +1041,7 @@ template<::unsigned_integral T,bool ThreadSafe=false> struct SmallIds { set free_ids ; T n_allocated = 1 ; // dont use id 0 so that it is free to mean "no id" private : - Mutex _mutex ; + _Mutex _mutex ; } ; inline void fence() { ::atomic_signal_fence(::memory_order_acq_rel) ; } // ensure execution order in case of crash to guaranty disk integrity